一. 切片与MapTask并行度决定机制
现有如下的问题: 1G的数据, 启动8个MapTask, 可以提高集群的并发处理能力. 那么1K的数据, 如果也启动8个MapTask, 会提高集群性能吗? MapTask并行任务是否是越多越好呢? 哪些因素影响了MapTask并行度?
MapTask并行度决定机制
首先需要区分两个概念:
1. 数据块: 数据块(Blocks)是HDFS物理上把数据分成不同的块. 数据块是HDFS的存储数据单位
2. 数据切片: 数据切片只是在逻辑上对输入进行分片, 并不会在磁盘上将其切分进行存储. 数据切片是MapRudece程序计算输入数据的单位, 一个切片会对应启动一个MapTask.
需要注意的是, 默认情况下, 切片大小=数据块大小. 这是由HDFS的"数据本地化优化"的特性决定的, 也即在存储输入数据的节点上运行map任务, 无需集群带宽资源, 便可获得最佳性能. 如果分片跨越2个数据块,对于任何一个HDFS节点(基本不可能同时存储这2个数据块), 分片中的另外一块数据就需要通过网络传输到map任务节点, 与使用本地数据运行map任务相比, 效率则更低. 同时, 对数据进行切片时不会考虑从数据整体, 而是会逐个针对每一个文件进行单独的切片.
二. Job提交流程源码和切片源码详解
Job提交流程源码
FileInputFormat切片源码解析(input.getSplits(job))
(1) 先找到数据存储的目录
(2) 遍历处理目录下的每一个文件, 每个文件中:
a) 获取文件大小fs.sizeOf(ss.txt)
b) 计算切片大小
> minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
getFormatMinSplitSize()方法返回一个long类型的1
getMinSplitSiez(JobContext job)方法返回SPLIT_MINSIZE常量. 这个常量参数由mapred-default.xml参数配置文件决定, 默认为0.
因此, minSize返回一个1L
> maxSize = getMaxSplitSize(job)
getMaxSplitSize(JobContext context) 方法返回SPLIT_MAXSIZE常量, 这个常量参数同样由mapred-default.xml参数配置文件决定. 但默认情况下没有配置此常量的参数, 因此在方法返回参数时, 由getLong()方法提供默认值: Long.MAX_VALUE.
c) 判断这个文件是否可以进行切割. 若可进行切割, 则获取块大小
long blockSize = file.getBlockSize();
本地模式下运行时, 块大小默认为32M
d) 计算切片大小
long splitSize = this.computeSplitSize(blockSIze, minSize, maxSize);
在computeSplitSize()方法中, 返回值如下:
return Math.max(minSize, Math.min(maxSize, blockSize));
在默认情况下, 切片大小 = 块大小
e) 进行切片, 每次完成切片后, 都需要判断剩下部分的大小是否大于块的1.1倍, 如果小于1.1倍, 则就将剩下的部分划为一块切片
while(((double) bytesRemaining)/splitSize > SPLIT_SLOP)
f) 将切片信息写入一个切片规划文件中, 暂时存储在.staging文件夹
(3) 提交切片规划文件到YARN上, YARN上的MrAppMaster根据切片规划文件计算需要开启MapTask的个数
**************************************************************************************************************
如何调整切片的大小?
如果需要调大切片, 则需要将minSize调大; 如果需要更小的切片, 则需要将maxSize调小
**************************************************************************************************************
获取切片信息API
// 获取切片信息API
String name = inputSplit.getPath().getName();
// 根据文件类型获取切片信息
FileSplit inputSplit = (FileSplit) context.getInputSplit();
二. TextInputFormat
在运行MapReduce程序时, 输入的文件格式多种多样, 包括: 基于行的日志文件, 二进制格式文件, 数据库表等等, 那么针对不同的数据类型, MapReduce也同样提供了很多不同种类的数据格式. FileInputFormat常见的接口实现类包括: TextInputFormat, KeyValueTextInputFormat, NLineInputFormat, CombineTextInputFormat, 以及自定义InputFormat等.
TextInputFormat
TextInputFormat是FileInputFormat的默认实现类, 按行读取每条记录, 其键值对是<LongWritable, Text>
示例:
Rich learning form
Intelligent learning engine
Learning more convenient
From the real demand for more close to the enterprise每条记录表示为以下键/值对:
(0,Rich learning form)
(20,Intelligent learning engine)
(49,Learning more convenient)
(74,From the real demand for more close to the enterprise)
CombineTextInputFormat
框架默认的TextInputFormat切片机制是对每个文件进行单独的切片规划, 不管文件多小, 都会产生一个单独的切片, 且都会交给一个MapTask. 如果一个任务中包含多个小文件, 那么就会产生大量的MapTask, 导致处理效率低下. 因此, 在这种情况下, 需要使用CombineTextInputFormat切片机制进行处理.
1) 虚拟存储切片最大值设置
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); // 4M
注意: 虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置
2) 切片机制
三. MapReduce工作流程
> 关于环形缓冲区内的元数据(Meta data), 其中包含以下几类数据:
index: 记录数据开始的位置
partition: 记录数据存放于哪个分区
keystart: 记录数据的key值开始的索引
valuestart: 记录数据的value值开始的索引
> 环形缓冲区默认大小是100M, 如果数据占到磁盘空间总大小的80%时, 则会开始反向写入. 这么做的原因是: 可以在向文件溢写的过程中, 同时开启向环形缓冲区写入的线程. 若新数据写入的速度较快, 则会等待旧数据溢写完成后继续进行写入. 如果直到缓冲区写满才进行溢写, 则需要等待溢写完成之后才可以重新对缓冲区进行写入.
缓冲区的大小可以通过参数调整: mapreduce.task.is.sort.mb 默认100M
四. Shuffle机制(混洗)
Map方法之后, Reduce方法之前的数据处理过程称之为Shuffle.
Shuffle机制
具体Shuffle过程如下:
1) MapTask收集map()方法输出的kv对, 放到内存缓冲区中.
2) 从内存缓冲区不断溢写到本地磁盘文件, 可能会溢出多个文件
3) 多个溢出文件会被合并成大的溢出文件
4) 在溢出过程即合并的过程中, 都需要调用Partitioner进行分区和针对key的排序(按字典进行快速排序)
5) ReduceTask根据自己的分区号, 从各个MapTask机器上拉取相应的结果分区数据
6) ReduceTask会抓取到来自不同MapTask同一分区的结果文件, ReduceTask会将这些文件再进行合并(Merge Sort)
7) 合并成大文件后, Shuffle的过程也就结束了, 后面就要进入ReduceTask的逻辑运算过程
Partition分区
默认的Partitioner分区代码如下:
public class HashPartitioner<K, V> extends Partitioner<K, V>{
public int getPartition(K key, V value, int numReduceTasks){
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
默认分区是根据key的hashCode对ReudceTasks个数取模得到的. 用户无法指定key存储在哪个分区.
自定义Partitioner
1)自定义Partitioner方法需要重写getPartition()方法
public class CustomPartitioner exxtends Partitioner<TExt, FlowBean> {
@Override
public int getPartition(Text key, FlowBean value, int numPartitions)
// 控制分区代码逻辑
... ...
return partition;
}
}
2) 在Job驱动中, 设置自定义Partitioner
job.setPartitionerClass(CustomPartitioner.class);
3) 自定义Partition后, 要根据自定义Partition的逻辑设置相应数量的ReduceTask
job.setNumReduceTasks(5);
分区总结
1) 如果ReduceTask的数量 > getPartition的结果数, 则会多产生几个空的输出文件part-r-000xx;
2) 如果1 < ReduceTask的数量 < getPartition的结果数, 则有一部分分区数据无法存储, 会产生Exception;
3) 如果ReduceTask的数量 = 1, 则不管MapTask端输出多少个分区文件, 最终结果都会交给这一个ReduceTask, 最终也就只会产生一个结果文件part-r-00000. 因为在源代码中, 当用户把ReduceTask的个数设置为1时, 会进入程序本身已经定义好的getPartitioner匿名内部类;
if (this.partitions > 1) {
this.partitioner = (Partitioner)ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
} else {
this.partitioner = new Partitioner<K, V>() {
public int getPartition(K key, V value, int numPartitions) {
return NewOutputCollector.this.partitions - 1;
}
};
}
4) 分区号必须从零开始, 逐一累加