MapReduce分区程序实现
为了方便起见,让我们假设我们有一个名为Employee的小表,其中包含以下数据。我们将使用此样本数据作为输入数据集来演示分区程序的工作方式。
Id |
Name |
Age |
Gender |
Salary |
1201 |
gopal |
45 |
Male |
50,000 |
1202 |
manisha |
40 |
Female |
50,000 |
1203 |
khalil |
34 |
Male |
30,000 |
1204 |
prasanth |
30 |
Male |
30,000 |
1205 |
kiran |
20 |
Male |
40,000 |
1206 |
laxmi |
25 |
Female |
35,000 |
1207 |
bhavya |
20 |
Female |
15,000 |
1208 |
reshma |
19 |
Female |
15,000 |
1209 |
kranthi |
22 |
Male |
22,000 |
1210 |
Satish |
24 |
Male |
25,000 |
1211 |
Krishna |
25 |
Male |
25,000 |
1212 |
Arshad |
28 |
Male |
20,000 |
1213 |
lavanya |
18 |
Female |
8,000 |
我们必须编写一个应用程序来处理输入数据集,以找到不同年龄段(例如,低于20岁,21至30岁,高于30岁)中按性别划分的薪水最高的员工。
输入数据
以上数据另存为input.txt在“/home/hadoop/hadoopPartitioner”目录中,并作为输入提供。
1201 gopal 45 Male 50000
1202 manisha 40 Female 51000
1203 khaleel 34 Male 30000
1204 prasanth 30 Male 31000
1205 kiran 20 Male 40000
1206 laxmi 25 Female 35000
1207 bhavya 20 Female 15000
1208 reshma 19 Female 14000
1209 kranthi 22 Male 22000
1210 Satish 24 Male 25000
1211 Krishna 25 Male 26000
1212 Arshad 28 Male 20000
1213 lavanya 18 Female 8000
根据给定的输入,以下是该程序的算法说明。
Map任务
当我们将文本数据保存在文本文件中时,map任务接受键值对作为输入。此地图任务的输入如下-
- 输入- 键将是一种模式,例如“任何特殊键+文件名+行号”(示例:key = @input1),值将是该行中的数据(示例:value = 1201\tgopal\t45\t男性\t50000)。
- 方法- 此映射任务的操作如下-
- 从字符串中的参数列表中读取作为输入值的值(记录数据)。
- 使用split函数,将性别分开并存储在字符串变量中。
String[] str = value.toString().split("\t", -3);
String gender=str[3];
- 将性别信息和记录数据值作为输出键值对从映射任务发送到分区任务。
context.write(new Text(gender), new Text(value));
- 对文本文件中的所有记录重复上述所有步骤。
- 输出-您将获得性别数据和记录数据值作为键值对。
分区任务
分区器任务接受来自映射(Map)任务的键-值对作为其输入。分区意味着将数据划分为多个段。根据给定的分区条件条件,可以将输入的键值对数据根据年龄条件分为三部分。
- 输入 - 键值对集合中的整个数据。键 = 记录中的性别字段值。值= 该性别的整个记录数据值。
- 方法-分区逻辑的过程如下运行。
Reducer任务
分区程序任务的数量等于简化(Reducer)程序任务的数量。在这里,我们有三个分区程序任务,因此有三个要执行的Reducer任务。
- 输入-Reducer将使用不同的键值对集合执行三次。键=记录中的性别字段值。值=该性别的整个记录数据。
- 方法-以下逻辑将应用于每个集合。
- 读取每个记录的薪金字段值。
String [] str = val.toString().split("\t", -3);
Note: str[4] have the salary field value.
- 用max变量检查薪水。如果str[4]是最高工资,则将str [4]分配给max,否则跳过该步骤。
if(Integer.parseInt(str[4])>max)
{
max=Integer.parseInt(str[4]);
}
- 对每个key集合重复步骤1和2(Male 和 Female是key集合)。执行完这三个步骤后,您将从“Female”钥匙集合中找到一个最高薪水,从“Male”钥匙集合中找到一个最高薪水。
context.write(new Text(key), new IntWritable(max));
- 输出-最后,您将在三个不同年龄组的集合中获得一组键值对数据。它分别包含每个年龄组中男性收入的最高薪水和女性收入的最高薪水。
执行Map,Partitioner和Reduce任务后,键-值对数据的三个集合存储在三个不同的文件中作为输出。
所有这三个任务都被视为MapReduce作业。这些作业的以下要求和规格应在“配置”中指定-
- 工作名称
- 键和值的输入和输出格式
- Map,Reduce和Partitioner任务的各个类
Configuration conf = getConf();
//Create Job
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);
// File Input and Output paths
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));
//Set Mapper class and Output format for key-value pair.
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);
//Set Reducer class and Input/Output format for key-value pair.
job.setReducerClass(ReduceClass.class);
//Number of Reducer tasks.
job.setNumReduceTasks(3);
//Input and Output format for data
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
范例程序
以下程序显示了如何在MapReduce程序中为给定条件实现分区程序。
package partitionerexample;
import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;
public class PartitionerExample extends Configured implements Tool
{
//Map class
public static class MapClass extends Mapper<LongWritable,Text,Text,Text>
{
public void map(LongWritable key, Text value, Context context)
{
try{
String[] str = value.toString().split("\t", -3);
String gender=str[3];
context.write(new Text(gender), new Text(value));
}
catch(Exception e)
{
System.out.println(e.getMessage());
}
}
}
//Reducer class
public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable>
{
public int max = -1;
public void reduce(Text key, Iterable <Text> values, Context context) throws IOException, InterruptedException
{
max = -1;
for (Text val : values)
{
String [] str = val.toString().split("\t", -3);
if(Integer.parseInt(str[4])>max)
max=Integer.parseInt(str[4]);
}
context.write(new Text(key), new IntWritable(max));
}
}
//Partitioner class
public static class CaderPartitioner extends
Partitioner < Text, Text >
{
@Override
public int getPartition(Text key, Text value, int numReduceTasks)
{
String[] str = value.toString().split("\t");
int age = Integer.parseInt(str[2]);
if(numReduceTasks == 0)
{
return 0;
}
if(age<=20)
{
return 0;
}
else if(age>20 && age<=30)
{
return 1 % numReduceTasks;
}
else
{
return 2 % numReduceTasks;
}
}
}
@Override
public int run(String[] arg) throws Exception
{
Configuration conf = getConf();
Job job = new Job(conf, "topsal");
job.setJarByClass(PartitionerExample.class);
FileInputFormat.setInputPaths(job, new Path(arg[0]));
FileOutputFormat.setOutputPath(job,new Path(arg[1]));
job.setMapperClass(MapClass.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//set partitioner statement
job.setPartitionerClass(CaderPartitioner.class);
job.setReducerClass(ReduceClass.class);
job.setNumReduceTasks(3);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true)? 0 : 1);
return 0;
}
public static void main(String ar[]) throws Exception
{
int res = ToolRunner.run(new Configuration(), new PartitionerExample(),ar);
System.exit(0);
}
}
将以上代码另存为“ /home/hadoop/hadoopPartitioner”中的PartitionerExample.java。该程序的编译和执行如下。
编译与执行
让我们假设我们位于Hadoop用户的主目录中(例如/home/hadoop)。
请按照下面给出的步骤来编译和执行上述程序。
让我们假设下载的文件夹是“/home/hadoop/hadoopPartitioner”
步骤2-以下命令用于编译程序PartitionerExample.java并为该程序创建一个jar。
$ javac -classpath hadoop-core-1.2.1.jar -d ProcessUnits.java
$ jar -cvf PartitionerExample.jar -C .
步骤3 - 使用以下命令在HDFS中创建输入目录。
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
步骤4-使用以下命令将名为input.txt的输入文件复制到HDFS的输入目录中。
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/hadoopPartitioner/input.txt input_dir
步骤5-使用以下命令来验证输入目录中的文件。
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
步骤6-使用以下命令通过从输入目录获取输入文件来运行最高薪水应用程序。
$HADOOP_HOME/bin/hadoop jar PartitionerExample.jar partitionerexample.PartitionerExample input_dir/input.txt output_dir
等待一段时间,直到文件被执行。执行后,输出包含许多输入拆分,Mapper任务和Reducer任务。
15/02/04 15:19:51 INFO mapreduce.Job: Job job_1423027269044_0021 completed successfully
15/02/04 15:19:52 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=467
FILE: Number of bytes written=426777
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=480
HDFS: Number of bytes written=72
HDFS: Number of read operations=12
HDFS: Number of large read operations=0
HDFS: Number of write operations=6
Job Counters
Launched map tasks=1
Launched reduce tasks=3
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=8212
Total time spent by all reduces in occupied slots (ms)=59858
Total time spent by all map tasks (ms)=8212
Total time spent by all reduce tasks (ms)=59858
Total vcore-seconds taken by all map tasks=8212
Total vcore-seconds taken by all reduce tasks=59858
Total megabyte-seconds taken by all map tasks=8409088
Total megabyte-seconds taken by all reduce tasks=61294592
Map-Reduce Framework
Map input records=13
Map output records=13
Map output bytes=423
Map output materialized bytes=467
Input split bytes=119
Combine input records=0
Combine output records=0
Reduce input groups=6
Reduce shuffle bytes=467
Reduce input records=13
Reduce output records=6
Spilled Records=26
Shuffled Maps =3
Failed Shuffles=0
Merged Map outputs=3
GC time elapsed (ms)=224
CPU time spent (ms)=3690
Physical memory (bytes) snapshot=553816064
Virtual memory (bytes) snapshot=3441266688
Total committed heap usage (bytes)=334102528
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=361
File Output Format Counters
Bytes Written=72
步骤7-使用以下命令来验证输出文件夹中的结果文件。
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
您将在三个文件中找到输出,因为您在程序中使用了三个分区程序和三个Reducer。
步骤8-使用以下命令查看Part-00000文件中的输出。该文件由HDFS生成。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
part-00000 输出
使用以下命令查看Part-00001文件中的输出。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00001
在part-00001中输出
使用以下命令查看Part-00002文件中的输出。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00002
在part-00002中输出