MapReduce合并器实现
以下示例提供了有关合并器的理论思想。假设我们有以下MapReduce的名为input.txt的输入文本文件。
What do you mean by Object
What do you know about Java
What is Java Virtual Machine
How Java enabled High Performance
下面讨论了带有合并器的MapReduce程序的重要阶段。
记录读取器
这是MapReduce的第一阶段,记录读取器从输入文本文件中以文本形式读取每一行,并以键值对的形式输出输出。
- 输入 - 输入文件中的逐行文本。
- 输出 - 形成键值对。以下是一组预期的键值对。
<1, What do you mean by Object>
<2, What do you know about Java>
<3, What is Java Virtual Machine>
<4, How Java enabled High Performance>
Map阶段
Map阶段从记录读取器获取输入,进行处理,然后将输出作为另一组键值对产生。
输入-以下键值对是从记录读取器获取的输入。
<1, What do you mean by Object>
<2, What do you know about Java>
<3, What is Java Virtual Machine>
<4, How Java enabled High Performance>
Map阶段读取每个键值对,使用StringTokenizer将每个单词从值中除,将每个单词视为键,并将该单词的计数视为值。以下代码段显示了Mapper类和map函数。
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens())
{
word.set(itr.nextToken());
context.write(word, one);
}
}
}
输出-预期输出如下-
<What,1> <do,1> <you,1> <mean,1> <by,1> <Object,1>
<What,1> <do,1> <you,1> <know,1> <about,1> <Java,1>
<What,1> <is,1> <Java,1> <Virtual,1> <Machine,1>
<How,1> <Java,1> <enabled,1> <High,1> <Performance,1>
Combiner 阶段
Combiner阶段从Map阶段获取每个键值对,对其进行处理,然后将输出作为键值集合对产生。
输入-以下键值对是从Map阶段获取的输入。
<What,1> <do,1> <you,1> <mean,1> <by,1> <Object,1>
<What,1> <do,1> <you,1> <know,1> <about,1> <Java,1>
<What,1> <is,1> <Java,1> <Virtual,1> <Machine,1>
<How,1> <Java,1> <enabled,1> <High,1> <Performance,1>
合并器(Combiner)阶段读取每个键值对,将常见单词组合为键,将值组合为集合。通常,合并器的代码和操作与简化器的代码和操作相似。以下是Mapper,Combiner和Reducer类声明的代码段。
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
输出-预期输出如下-
<What,1,1,1> <do,1,1> <you,1,1> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,1,1,1>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>
Reducer 阶段
Reducer阶段从Combiner阶段获取每个键值集合对,对其进行处理,然后将输出作为键值对传递。请注意,Combiner功能与Reducer相同。
输入-以下键值对是从Combiner阶段获取的输入。
<What,1,1,1> <do,1,1> <you,1,1> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,1,1,1>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>
Reducer阶段读取每个键值对。以下是Combiner的代码段。
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable>
{
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values)
{
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
输出-减速器(Reducer)阶段的预期输出如下-
<What,3> <do,2> <you,2> <mean,1> <by,1> <Object,1>
<know,1> <about,1> <Java,3>
<is,1> <Virtual,1> <Machine,1>
<How,1> <enabled,1> <High,1> <Performance,1>
记录写入
这是MapReduce的最后一个阶段,其中Record Writer写入Reducer阶段中的每个键值对,并将输出作为文本发送。
输入-Reducer阶段中的每个键值对以及输出格式。
输出-它以文本格式为您提供键-值对。以下是预期的输出。
What 3
do 2
you 2
mean 1
by 1
Object 1
know 1
about 1
Java 3
is 1
Virtual 1
Machine 1
How 1
enabled 1
High 1
Performance 1
范例程序
以下代码块计算程序中的单词数。
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>
{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens())
{
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable>
{
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values)
{
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
将上述程序另存为WordCount.java。该程序的编译和执行如下。
编译与执行
让我们假设我们位于Hadoop用户的主目录中(例如/home/hadoop)。
请按照下面给出的步骤来编译和执行上述程序。
步骤1-使用以下命令创建一个目录来存储已编译的java类。
让我们假设下载的文件夹是/home/hadoop/。
步骤3-使用以下命令来编译WordCount.java程序并为该程序创建一个jar。
$ javac -classpath hadoop-core-1.2.1.jar -d units WordCount.java
$ jar -cvf units.jar -C units/ .
步骤4-使用以下命令在HDFS中创建输入目录。
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
步骤5-使用以下命令将名为input.txt的输入文件复制到HDFS的输入目录中。
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/input.txt input_dir
步骤6-使用以下命令来验证输入目录中的文件。
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
步骤7-使用以下命令通过从输入目录获取输入文件来运行字数统计应用程序。
$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir
等待一段时间,直到文件被执行。执行后,输出包含许多输入拆分,映射任务和Reducer任务。
步骤8-使用以下命令来验证输出文件夹中的结果文件。
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
步骤9-使用以下命令查看Part-00000文件中的输出。该文件由HDFS生成。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
以下是MapReduce程序生成的输出。
What 3
do 2
you 2
mean 1
by 1
Object 1
know 1
about 1
Java 3
is 1
Virtual 1
Machine 1
How 1
enabled 1
High 1
Performance 1