Tuesday, June 7, 2016

Apache Hadoop MapReduce - Detailed word count example from scratch

In this post we will look at how to create and run a word count program in Apache Hadoop. In order to make it easy for a beginner we will cover most of the setup steps as well. Please note that this blog entry is for Linux based environment. I am running Ubuntu 14.04 LTS on my machine. For windows users steps might be a little different, information regarding running Hadoop on Windows is available at  Build and Install Hadoop 2.x or newer on Windows.

Prerequisites

1. Need to have Java installed (preferabally a newer java version such as 1.7 or 1.8 )

Download Oracle JDK 8 from http://www.oracle.com/technetwork/java/javase/downloads/index.html Extract the archive to a folder named jdk1.8.0 Set the following environment variables. (You can set the variables in the .bashrc file)
JAVA_HOME=
PATH=$JAVA_HOME/bin:$PATH
export JAVA_HOME PATH

2. SSH, If you do not have ssh installed in your machine use the following command to install ssh and rsync which is also needed

  $ sudo apt-get install ssh
  $ sudo apt-get install rsync 

3. Download and extract latest Hadoop binary into your machine. The latest hadoop binary files are available at http://hadoop.apache.org/releases.html. The following commands will download and extract Hadoop version 2.7.2.

wget http://www-eu.apache.org/dist/hadoop/common/hadoop-2.7.2/hadoop-2.7.2.tar.gz
tar -xzvf hadoop-2.7.2.tar.gz

Word Count
Image reference - http://blog.enablecloud.com/2012/06/what-lies-at-core-of-hadoop.html



Word Count is a simple and easy to understand algorithm which can be implemented as a mapreduce application easily. Given a set of text documents the program counts the number of occurrences of each word. The algorithm consists of three main sections.


  1. Main Program
  2. Mapper
  3. Reducer
Writing the Mapper Class

The WordCountMapper class is created by extending the Mapper class and the map function is implemented by overriding the map method in the Mapper class. The mapper functions takes a key-value pair as an input and outputs a key-values pair as an output ( The output is given through the context object ). The key value pair that the map function takes as an input and the key value pair that is given as an output need not be of the same type.

For instance in the WordCountMapper the input to the map method is a key-value pair where the key is the line number and the value is the line of text in the corresponding line (line_number, line_text). And outputs (word,1) for each word it reads in the line.

The pseudo code for the map function is below

void map(file, text) {
     foreach word in text.split() {
            output(word, 1);
     }
    }


The actual java code for the map function is below

public static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

Writing the Reducer Class

The WordCountReducer class is created by extending the org.apache.hadoop.mapreduce.Reducer class and the reduce method is implemented by overriding the reduce method from the Reducer class. The reduce function collects all the intermediate key-value pairs  generated by the multiple map functions and will sum up all the occurrences of each word and output a key-value pair for each word in the text documents as . The detailed implementation of the WordCountReducer is below

The pseudo code for the reducer function is below 

 void reduce(word, list(count)) {
        output(word, sum(count));
    }

The actual java code for the reducer function is below.


public static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }


Writing Main Method

The main method sets up all necessary configurations and runs the mapreduce job.

    1. Job Name : name of this Job
    2. Executable (Jar) Class: the main executable class. For here, WordCount.
    3. Mapper Class: class which overrides the "map" function. For here, WordCountMapper.
    4. Reducer: class which override the "reduce" function. For here , WordCountReducer.
    5. Output Key: type of output key. For here, Text.
    6. Output Value: type of output value. For here, IntWritable.
    7. File Input Path
    8. File Output Path


public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(WordCountMapper.class);
        job.setCombinerClass(WordCountReducer.class);
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

The complete code for the word Count example is below.


    import java.io.IOException;
    import java.util.StringTokenizer;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    public class WordCount {

        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "word count");
            job.setJarByClass(WordCount.class);
            job.setMapperClass(WordCountMapper.class);
            job.setCombinerClass(WordCountReducer.class);
            job.setReducerClass(WordCountReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }

        public static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> {

            private final static IntWritable one = new IntWritable(1);
            private Text word = new Text();

            public void map(Object key, Text value, Context context) throws IOException,
                    InterruptedException {
                StringTokenizer itr = new StringTokenizer(value.toString());
                while (itr.hasMoreTokens()) {
                    word.set(itr.nextToken());
                    context.write(word, one);
                }
            }
        }

        public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
            private IntWritable result = new IntWritable();

            public void reduce(Text key, Iterable<IntWritable> values, Context context)
                    throws IOException, InterruptedException {
                int sum = 0;
                for (IntWritable val : values) {
                    sum += val.get();
                }
                result.set(sum);
                context.write(key, result);
            }
        }

    }



Compiling the Code

In order to compile the code we need to create a .java file that contains our code. execute the following commands and paste the code listed above into the file created by the vim command


   cd ~/software/hadoop-2.7.2
    vim WordCount.java

They are several ways to generate an jar file from WordCount.java. The following is a very simple and straightforward method that can be used.  Move to the Hadoop directory and execute the following commands. You need to have WordCount.java file also under the Hadoop directory

    cd ~/software/hadoop-2.7.2
    export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar
    ./bin/hadoop com.sun.tools.javac.Main WordCount.java
    jar cf wc.jar WordCount*.class

This will create a jar file that contains the compiled classes needed to run the program on Hadoop.

If you are familiar with MVN another more cleaner method will be to create a MVN project for the WordCount example and simply do a “mvn clean install” which will produce a jar file. You will need to add the following dependency in the pom.xml

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-core</artifactId>
        <version>1.2.1</version>
    </dependency>


Running the Code.

Next we will run the example on a local standalone Hadoop node. Before we run the example we need to create a set of input files that will be given to the program.

First create a directory to put all the input files in. The program will read all the files that are in this folder. Use the following commands to create the files and directories

    mkdir -p ~/hadoop/wordcount/input
    cd ~/hadoop/wordcount/input
    vim 1.txt
    vim 2.txt

Create two text files 1.txt and 2.txt under the folder containing the following

1.txt - Hello World Bye World

2.txt - Hello Hadoop Goodbye Hadoop

To run the mapreduce job execute the following command from the hadoop directory


  cd ~/software/hadoop-2.7.2
  ./bin/hadoop jar wc.jar WordCount ~/hadoop/wordcount/input ~/hadoop/wordcount/output

After the job has completed execute the following command and check the output that was generated.

   cat ~/hadoop/wordcount/output/part-r-00000
    Bye 1
    Goodbye 1
    Hadoop 2
    Hello 2
    World 2

I hope that you were able to better understand how Hadoop MapReduce works from this blog post. Let me know if you think there are improvements that i can make to this blog. Or if i got something wrong.

pulasthi

Amazon Deals