Skip to content
Published on

Running Hadoop MapReduce WordCount

Authors
  • Name
    Twitter

Overview

When you install Hadoop, you also install YARN (ResourceManager, NodeManager) along with HDFS. The MapReduce (Mapreduce-Paper) framework is a brilliant way to perform computations on data stored in HDFS. All we need to do is write a Mapper and Reducer and submit the Job, and YARN takes care of everything else. The Word Count example counts the number of each word in a document, making it a great example for learning MapReduce. There is a wordcount-tutorial on the official site, but since there is no Korean version, I will introduce it in this post.

environment variable Preparation

Register JAVA_HOME/bin as a path and set HADOOP_CLASSPATH to $JAVA_HOME/lib/tools.jar as shown below. Since JRE may not have lib/tools.jar, make sure to set JAVA_HOME to JDK.

export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export PATH=${JAVA_HOME}/bin:${PATH}
export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar

Compiling WordCount.java

WordCount.java
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

Compile the WordCount.java file written above with the hadoop com.sun.tools.javac.Main WordCount.java command, then package it into a jar.

root@ubuntu01:~# hadoop com.sun.tools.javac.Main WordCount.java
root@ubuntu01:~# ls
    'WordCount$IntSumReducer.class'  'WordCount$TokenizerMapper.class'   WordCount.class   WordCount.java
root@ubuntu01:~# jar cf wc.jar WordCount*.class
root@ubuntu01:~# ls
    wc.jar  'WordCount$IntSumReducer.class'  'WordCount$TokenizerMapper.class'   WordCount.class   WordCount.java

Preparing the Text

Prepare a text file to be the target of the word count. I prepared a passage from the official website.

sample.txt
cat sample.txt
Hadoop MapReduce is a software framework for easily writing applications which process vast amounts of data (multi-terabyte data-sets) in-parallel on large clusters (thousands of nodes) of commodity hardware in a reliable, fault-tolerant manner.

A MapReduce job usually splits the input data-set into independent chunks which are processed by the map tasks in a completely parallel manner. The framework sorts the outputs of the maps, which are then input to the reduce tasks. Typically both the input and the output of the job are stored in a file-system. The framework takes care of scheduling tasks, monitoring them and re-executes the failed tasks.

Typically the compute nodes and the storage nodes are the same, that is, the MapReduce framework and the Hadoop Distributed File System (see HDFS Architecture Guide) are running on the same set of nodes. This configuration allows the framework to effectively schedule tasks on the nodes where data is already present, resulting in very high aggregate bandwidth across the cluster.

Then, create the input directory and move the local file to HDFS using the commands below.

hdfs dfs -mkdir /tmp/input
hdfs dfs -put sample.txt /tmp/input

We are now ready to count the words used in sample.txt uploaded to HDFS using the wc.jar we created above.

Running WordCount

Run the command hadoop jar wc.jar WordCount /tmp/input /tmp/output to submit the job.

root@ubuntu01:~# hadoop jar wc.jar WordCount /tmp/input /tmp/output
2022-11-20 14:57:56,435 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at ubuntu01/192.168.219.101:8040
2022-11-20 14:57:56,648 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
2022-11-20 14:57:56,675 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1668956245532_0001
2022-11-20 14:57:56,827 INFO input.FileInputFormat: Total input files to process : 1
2022-11-20 14:57:56,902 INFO mapreduce.JobSubmitter: number of splits:1
2022-11-20 14:57:57,010 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1668956245532_0001
2022-11-20 14:57:57,011 INFO mapreduce.JobSubmitter: Executing with tokens: []
2022-11-20 14:57:57,134 INFO conf.Configuration: resource-types.xml not found
2022-11-20 14:57:57,134 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
2022-11-20 14:57:57,456 INFO impl.YarnClientImpl: Submitted application application_1668956245532_0001
2022-11-20 14:57:57,483 INFO mapreduce.Job: The url to track the job: http://ubuntu01:8088/proxy/application_1668956245532_0001/
2022-11-20 14:57:57,484 INFO mapreduce.Job: Running job: job_1668956245532_0001
2022-11-20 14:58:02,539 INFO mapreduce.Job: Job job_1668956245532_0001 running in uber mode : false
2022-11-20 14:58:02,539 INFO mapreduce.Job:  map 0% reduce 0%
2022-11-20 14:58:07,605 INFO mapreduce.Job:  map 100% reduce 0%
2022-11-20 14:58:11,634 INFO mapreduce.Job:  map 100% reduce 100%
2022-11-20 14:58:12,652 INFO mapreduce.Job: Job job_1668956245532_0001 completed successfully
2022-11-20 14:58:12,716 INFO mapreduce.Job: Counters: 54
	File System Counters
		FILE: Number of bytes read=1358
		FILE: Number of bytes written=554613
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=1142
		HDFS: Number of bytes written=949
		HDFS: Number of read operations=8
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=2
		HDFS: Number of bytes read erasure-coded=0
	Job Counters
		Launched map tasks=1
		Launched reduce tasks=1
		Data-local map tasks=1
		Total time spent by all maps in occupied slots (ms)=2072
		Total time spent by all reduces in occupied slots (ms)=2033
		Total time spent by all map tasks (ms)=2072
		Total time spent by all reduce tasks (ms)=2033
		Total vcore-milliseconds taken by all map tasks=2072
		Total vcore-milliseconds taken by all reduce tasks=2033
		Total megabyte-milliseconds taken by all map tasks=2121728
		Total megabyte-milliseconds taken by all reduce tasks=2081792
	Map-Reduce Framework
		Map input records=5
		Map output records=161
		Map output bytes=1678
		Map output materialized bytes=1358
		Input split bytes=106
		Combine input records=161
		Combine output records=101
		Reduce input groups=101
		Reduce shuffle bytes=1358
		Reduce input records=101
		Reduce output records=101
		Spilled Records=202
		Shuffled Maps =1
		Failed Shuffles=0
		Merged Map outputs=1
		GC time elapsed (ms)=77
		CPU time spent (ms)=790
		Physical memory (bytes) snapshot=583352320
		Virtual memory (bytes) snapshot=5094989824
		Total committed heap usage (bytes)=501219328
		Peak Map Physical memory (bytes)=337264640
		Peak Map Virtual memory (bytes)=2544484352
		Peak Reduce Physical memory (bytes)=246087680
		Peak Reduce Virtual memory (bytes)=2550505472
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters
		Bytes Read=1036
	File Output Format Counters
		Bytes Written=949
root@ubuntu01:~# hdfs dfs -cat /tmp/output/part-r-00000
(multi-terabyte	1
(see	1
(thousands	1
A	1
Architecture	1
Distributed	1
File	1
Guide)	1
HDFS	1
Hadoop	2
MapReduce	3
System	1
The	2
This	1