1. MapReduce 的介绍:
MapReduce 是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个 Hadoop 集群上。
MapReduce大体上分三个部分:
- MRAppMaster:MapReduce Application Master,分配任务,协调任务的运行
- MapTask:阶段并发任,负责 mapper 阶段的任务处理 YARNChild
- ReduceTask:阶段汇总任务,负责 reducer 阶段的任务处理 YARNChild
2.MapReduce编写代码的流程:
- 用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行 MR 程序的客户端)
- Mapper 的输入数据是 KV 对的形式(KV 的类型可自定义)
- Mapper 的输出数据是 KV 对的形式(KV 的类型可自定义)
- Mapper 中的业务逻辑写在 map()方法中
- map()方法(maptask 进程)对每一个<K,V>调用一次
- Reducer 的输入数据类型对应 Mapper 的输出数据类型,也是 KV 对的形式
- Reducer 的业务逻辑写在 reduce()方法中
- Reducetask 进程对每一组相同 k 的<K,V>组调用一次 reduce()方法
- 用户自定义的 Mapper 和 Reducer 都要继承各自的父类
- 整个程序需要一个 Drvier 来进行提交,提交的是一个描述了各种必要信息的 job 对象
3.WordCount 案例:
public class MyWordCount {
public static void main(String[] args) {
// 指定 hdfs 相关的参数
Configuration conf=new Configuration(true);
conf.set("fs.defaultFS","hdfs://hadoop01:9000");
System.setProperty("HADOOP_USER_NAME", "hadoop");
try {
// 新建一个 job 任务
Job job=Job.getInstance(conf);
// 设置 jar 包所在路径
job.setJarByClass(MyWordCount.class);
// 指定 mapper 类和 reducer 类
job.setMapperClass(Mapper.class);
job.setReducerClass(MyReduce.class);
// 指定 maptask 的输出类型,注意,如果maptask的输出类型与reducetask输出类型一样,mapTask可以不用设置
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 指定 reducetask 的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 指定该 mapreduce 程序数据的输入和输出路径
Path input=new Path("/data/input");
Path output =new Path("/data/output");
//一定要保证output不存在
if(output.getFileSystem(conf).exists(output)){
output.getFileSystem(conf).delete(output,true); //递归删除
}
FileInputFormat.addInputPath(job,input);
FileOutputFormat.setOutputPath(job,output);
// 最后提交任务
boolean success = job.waitForCompletion(true);
System.exit(success?0:-1);
} catch (Exception e) {
e.printStackTrace();
}
}
private class MyMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
Text mk =new Text();
IntWritable mv=new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 计算任务代码:切割单词,输出每个单词计 1 的 key-value 对
String[] words = value.toString().split("\\s+");
for(String word:words){
mk.set(word);
context.write(mk,mv);
}
}
}
private class MyReduce extends Reducer<Text,IntWritable,Text,IntWritable> {
IntWritable mv=new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum=0;
// 汇总计算代码:对每个 key 相同的一组 key-value 做汇总统计
for(IntWritable value:values){
sum+=value.get();
}
mv.set(sum);
context.write(key,mv);
}
}
}
4. MapReduce 程序的核心运行机制:
1)MapReduce 程序的运行流程:
- 一个 mr 程序启动的时候,最先启动的是 MRAppMaster,MRAppMaster 启动后根据本次job 的描述信息,计算出需要的 maptask 实例数量,然后向集群申请机器启动相应数量的maptask 进程
- maptask 进程启动之后,根据给定的数据切片(哪个文件的哪个偏移量范围)范围进行数据处理,主体流程为:
- 利用客户指定的 InputFormat 来获取 RecordReader 读取数据,形成输入 KV 对
- 将输入 KV 对传递给客户定义的 map()方法,做逻辑运算,并将 map()方法输出的 KV 对收集到缓存
- 将缓存中的 KV 对按照 K 分区排序后不断溢写到磁盘文件
- MRAppMaster 监控到所有 maptask 进程任务完成之后(真实情况是,某些 maptask 进程处理完成后,就会开始启动 reducetask 去已完成的 maptask 处 fetch 数据),会根据客户指定的参数启动相应数量的 reducetask 进程,并告知 reducetask 进程要处理的数据范围(数据分区)
- Reducetask 进程启动之后,根据 MRAppMaster 告知的待处理数据所在位置,从若干台maptask 运行所在机器上获取到若干个 maptask 输出结果文件,并在本地进行重新归并排序,然后按照相同 key 的 KV 为一个组,调用客户定义的 reduce()方法进行逻辑运算,并收集运算输出的结果 KV,然后调用客户指定的 OutputFormat 将结果数据输出到外部存储
2)MapTask 并行度决定机制:
一个 job 的 map 阶段并行度由客户端在提交 job 时决定,客户端对 map 阶段并行度的规划的基本逻辑为:将待处理数据执行逻辑切片(即按照一个特定切片大小,将待处理数据划分成逻辑上的多个 split),然后每一个 split 分配一个 mapTask 并行实例处理。这段逻辑及形成的切片规划描述文件,是由FileInputFormat实现类的getSplits()方法完成的,小编后续会对MPjob提交过程的源码进行详细的分析。
决定map task的个数主要由这几个方面:
-文件的大小
- 文件的个数
- block的大小
- 逻辑切片的大小
总的来说就是,当对文件进行逻辑划分的时候,默认的划分规则就是一个split和一个block的大小一样,如果文件没有到一个block大小,也会被切分出来一个split,这里有调优点,就是如果处理的文件都是小文件的话,那么机会并行很多的maptask,导致大量的时间都浪费在了启动jvm上,此时可以通过合并小文件或者重用jvm的方式提高效率。
逻辑切片机制:
long splitSize = computeSplitSize(blockSize, minSize, maxSize)
blocksize:默认是 128M,可通过 dfs.blocksize 修改
minSize:默认是 1,可通过 mapreduce.input.fileinputformat.split.minsize 修改
maxsize:默认是 Long.MaxValue,可通过mapreduce.input.fileinputformat.split.maxsize 修改
因此,如果是想调小split的大小,那么就将 maxsize调整到比block小。
如果是想调大split的大小,那么就将minSize调整到比block大。3)ReduceTask 并行度决定机制:
reducetask 的并行度同样影响整个 job 的执行并发度和执行效率,但与 maptask 的并发数由切片数决定不同,Reducetask 数量的决定是可以直接手动设置:job.setNumReduceTasks(4);,默认是1个,如果设置为0个表示没有reduce阶段,当然也可以设置多个,根据需求,如果有些需要全局计数的操作,那么只能设置1个reduce,有些可以设置多个reducetask的,千万不要设置太多,最好设置的和分区的个数能一一对应,不然的会就会有一些reduceTask空跑,导致了不能处理业务而且还占用系统资源。