1.FileInputFormat切片源码
切片源码解析
- 程序先找到你数据存储的目录。
- 开始遍历处理(规划切片)录下的每个文件
- 遍历第一个文件ss.txt
- 获取文件大小fs.sizeOf(ss.txt)
- 计算切片大小
computeSplitSize(Math.max(minSize,Math .min(maxSize. blocksize))=blocksize=128M - 默认情况下,切大小=blocksize
- 开始切,形成第1个切片: ss.txt- -0:128M 第2个切片ss txt- 128:256M 第3个切片ss.txt- -256M: 300M
(每次切片时,都要判断切完剩下的部分是否大于块的1 1倍, 不大于1.1倍就划分-块切片) - 将切片信息写到一个切片规划文件中
- 整个切片的核心过程在getSp lit0方法中完成
- InputSplit只记录了切片的元数据信息,比如起始位置、长度以及所在的节点列表等。
- 提交切片规划文件到YARN.上,YARN. 上的MrAppMaster就可以根据切片规划文件计算开启MapTask个数。
切片机制
- 简单地按照文件的内容长度进行切片
- 切片大小,默认等于Block大小
- 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
切片大小的参数配置
- 源码中计算切片大小的公式
- 切片大小的设置
maxsize (切片最大值) : 参数如果调得比blockSize小,则会让切片变小,而且就等于配置的这个参数的值
minsize (切片最小值) : 参数调的比blockSize大, 则可以让切片变得比blockSize还大。
- 获取切片信息API
//获取切片的文件名称
String name = inputSplit.getPath() . getName () ;
//根据文件类型获取切片信息
FileSplit inputSplit = (FileSplit) context. getInputSplit() ;
2. TextInputFormat
思考:在运行MapReduce程序时,输入的文件格式包括:基于行的日志文件、二进制格式文件、数据库表等。那么,针对不同的数据类型,MapReduce是如何读取这些数据的呢?FileInputFormat常见的接口实现类包括: TextInputFormat、 KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat 和自定义InputFormat等
TextInputFormat是默认的FileInputFormat实现类。按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量, LongWritable类型。值是这行的内容,不包括任何行终止符(换行符和回车符),Text类型。
3. CombineTextInputFormat切片机制
框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。
- 应用场景
CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理。 - 虚拟存储切片最大值设置
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。 - 切片机制
生成切片过程包括:虚拟存储过程和切片过程二部分。
- 虚拟存储过程:
将输入目录下所有文件大小,依次和设置的setMaxInputSplitSize值比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)。
例如setMaxInputSplitSize值为4M,输入文件大小为8.02M,则先逻辑上分成一个4M。剩余的大小为4.02M,如果按照4M逻辑划分,就会出现0.02M的小的虚拟存储文件,所以将剩余的4.02M文件切分成(2.01M和2.01M)两个文件。 - 切片过程:
- 判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片。
- 如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。
- 测试举例:有4个小文件大小分别为1.7M、5.1M、3.4M以及6.8M这四个小文件,则虚拟存储之后形成6个文件块,大小分别为:1.7M,(2.55M、2.55M),3.4M以及(3.4M、3.4M)
最终会形成3个切片,大小分别为:(1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M
4. CombineTextInputFormat案例实操
需求:将输入的大量小文件合并成一个切片统一处理
在资料给的\inputcombinetextinputformat
文件夹下,有四个小文件案例
同样是之前的程序,Driver下的输入输出路径修改一下。看一下默认是多少个mapTask
//设置输入路径输出路径
FileInputFormat.setInputPaths(job, new Path("F:\\11_input\\inputcombinetextinputformat"));
FileOutputFormat.setOutputPath(job, new Path("F:\\hadoopDemo\\outputCombine1"));
运行得知,默认情况下reduceTask就是1
在源程序修改添加以下代码:
//如果不设置InputFormat,它默认使用的是TextInputFormat
job.setInputFormatClass(CombineTextInputFormat.class);
//虚拟存储切片最大值设置为4m
CombineTextInputFormat.setMaxInputSplitSize(job,4194304);
运行后,观察日志情况:
number of splits:3
如果改成20m呢?
//如果不设置InputFormat,它默认使用的是TextInputFormat
job.setInputFormatClass(CombineTextInputFormat.class);
//虚拟存储切片最大值设置为4m
CombineTextInputFormat.setMaxInputSplitSize(job,20971520);
按照计算规则,切片就会变成1片,1个maptask和1个reducetask
后面在处理小文件的时候经常使用,CombineTextInputFormat,经常用在生产环境下。
5.MapReduce工作流程
上面的流程是整个MapReduce最全工作流程,但是Shuffle过程只是从第7步开始到第16步结束,具体Shuffle过程详解,如下:
-
MapTask收集我们的map()方法输出的kv对,放到内存缓冲区中
-
从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件
-
多个溢出文件会被合并成大的溢出文件
-
在溢出过程及合并的过程中,都要调用Partitioner进行分区和针对key进行排序
-
ReduceTask根据自己的分区号,去各个MapTask机器上取相应的结果分区数据
-
ReduceTask会抓取到同一个分区的来自不同MapTask的结果文件,ReduceTask会将这些文件再进行合并(归并排序)
-
合并成大文件后,Shuffle的过程也就结束了,后面进入ReduceTask的逻辑运算过程(从文件中取出一个一个的键值对Group,调用用户自定义的reduce()方法)
注意:
(1)Shuffle中的缓冲区大小会影响到MapReduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快。
(2)缓冲区的大小可以通过参数调整,参数:mapreduce.task.io.sort.mb默认100M。
6.Shuffle机制
Shuffle(混洗/洗牌)在map方法之后,reduce方法之前这段过程。
面试重点!
环形缓冲区100M,如果传过来128M,最多也就2次溢出?
但是环形缓冲区100M并不是只存储数据,它还有一半是元数据,所以100M并不是都用来存文件的。所以会出现溢出(溢写情况)
- Map1方法,进入到环形缓冲区
- 排序:快排,对key的索引按照字典顺序排序。
- Combiner:为可选流程,传输数据量小
- 第一次溢出:spilt.index索引文件和spilt.out真正的文件
- 第二次溢出,其实有第n次溢出(数据量很多)
- 然后对这些数据进行归并排序
- 归并完的数据还能进行第二次combiner,第一次第二次自动会帮你完成(如果有设置的话),在combiner之后,还能进行相应的压缩。
- 为什么要压缩?因为这是map阶段,如果压缩之后,文件小了,然后再网络上传输的内容少了,效率就高了。(一种优化手段)
- 之后的数据写到磁盘上,(按照分区写),等待reduce端来拉取。
- Map2的话也是同样上述步骤
- MapTask准备完毕后,就是ReduceTask拉取自己指定分区的数据。拉的话要注意,拉过来之后先尝试放到内存里面,如果内存不够的情况下,才会溢写到磁盘上。
- 无论是内存上的数据,还是磁盘上的数据,最终都要进行一次大排序,即归并。
- 然后按照相同的key分组
- key相同的进入到对应的reduce方法,每一次reduce方法都处理的是内部key是相同的。里面的存储结构是(key,v1,v2,v3)放到一个集合里面,然后对这个集合里的数据可以进行相应的加减乘除。
6.1 Partition分区
问题引出
要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机归属地不同省份输出到不同文件中(分区)
默认Partitioner分区
public class HashPartitioner<K,V> extends Partitioner<K,V> {
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
key.hashCode() & Integer.MAX_VALUE
案例
在第一个wordcount案例代码基础上加入job.setNumReduceTasks(2);
public class WordCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//获取job
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
//设置jar包路径
job.setJarByClass(WordCountDriver.class);
//关联mapper,reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//设置map输出的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置最终输出的map的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setNumReduceTasks(2);
//设置输入路径输出路径
FileInputFormat.setInputPaths(job, new Path("F:\\11_input\\inputword"));
FileOutputFormat.setOutputPath(job, new Path("F:\\hadoopDemo\\outputPartitioner"));
//提交job
boolean completion = job.waitForCompletion(true);
System.exit(completion ? 0 : 1);
}
}
运行后发现
文件变成2个了。
里面的内容被分开,如果我希望将banzhang和cls在同一个分区应如何控制?
默认分区是根据key的hashCode对Reduce Tasks个数取模得到的。用户没法控制哪个key存储到哪个分区。
但是我们可以重写getPartition方法自己实现
那么如何自定义?
自定义Partitioner
-
自定义类继承Partitioner,重写getPartition()方法
public class Cus tomPartitioner extends Partitioner<Text, FlowBean> { @Override public int getPartition (Text key, FlowBean value,int numPartitions) { //控制分区代码逻辑 return partition; } }
-
在Job驱动中,设置自定义Partitioner .
job.setPartitionerClass(CustomPartitioner.class);
-
自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask
job.setNumReduceTasks(5);
设置多少个,后面再讲。
分区案例
需求:将统计结果按照手机归属地不同省份输出到不同文件中(分区)
数据是上一次计算手机流量的数据文件
在完成案例前,我先总结一下Map,分区,Reduce之间K,V的关系
分区的Key,Value应该是Map的K,V
创建ProvincePartitioner类
代码如下:
public class ProvincePartitioner extends Partitioner<Text,FlowBean> {
@Override
public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
return 0;
}
}
完成分区代码
public class ProvincePartitioner extends Partitioner<Text,FlowBean> {
@Override
public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
//text手机号
String phone = text.toString();
//得到前3位
String prePhone = phone.substring(0, 3);
int partition;
if("136".equals(prePhone)){
partition = 0;
} else if ("137".equals(prePhone)) {
partition = 1;
}else if ("138".equals(prePhone)) {
partition = 2;
}else if ("139".equals(prePhone)) {
partition = 3;
}else{
partition = 4;
}
return partition;
}
}
第二步:回到Driver驱动类,设置分区类,设置任务数量job.setPartitionerClass(ProvincePartitioner.class); //在ProvincePartitioner里设置的就是5个 job.setNumReduceTasks(5);
/**
* 驱动代码
* @author:whd
* @createTime: 2021/11/12
*/
public class FlowDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//1.获取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2.设置jar
job.setJarByClass(FlowDriver.class);
//3.关联mapper Reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReduce.class);
//4.设置mapper 输出key和value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
//5.设置最终数据输出的key和value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
job.setPartitionerClass(ProvincePartitioner.class);
//在ProvincePartitioner里设置的就是5个
job.setNumReduceTasks(5);
//6.设置数据的输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path("F:\\01centos\\资料\\11_input\\inputflow"));
FileOutputFormat.setOutputPath(job, new Path("F:\\hadoopDemo\\outputPartitioner3"));
//7.提交Job,true打印的日志信息更多
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
输出结果,5个!
这就是自定义分区案例,可以输出多个文件。
job.setNumReduceTasks(5);
这里设置的是5,如果我不设置5.设置别的行不行?
假设设置4,会抛出异常Caused by: java. io. IOException Create breakpoint : IlLegal partition for 18271575951
剩下一个不知道往哪个分区写。
那如果设置1呢?job.setNumReduceTasks(1);
可以正常运行
底层是走了partition - 1 的默认分支,压根不走你的分区设置。
也就是说,设置1可以,设置2、3、4报IO异常,设置5正常。
设置6会怎么样?能够正常运行。
但是最后一个文件是个0kb的空文件。
总结:
分区总结
- 如果ReduceTask的数量 > getPartition的结果数, 则会多产生几个空的输出文件art-1-000xx;
- 如果1< ReduceTask的数量 < getPatition的结果数,则有一部分分区数据无处安放, 会Exception;
- 如果ReduceTask的数量 = 1,则不管MapTask端输出多少个分区文件,最终结果都交给这-一个ReduceTask,最终也就只会产生一个结果文件 pat-0000;
- 分区号必须从零开始,逐一 累加。
例如:假设自定义分区数为5,则
- job.setNumReduceTasks(1);会正常运行, 只不过会产生一个输出文件
- job setNumReduceTasks(2);会报错
- job. setNumReduceTasks(6);大于5,程序会正常运行,会产生空文件
6.2 WritableComparable排序
回忆MapReduce的过程
-
在环形缓冲区溢写之前,要对数据进行快排,会产生多次的溢写文件。多次的溢写文件,我们需要再次对它归并。
-
在整个MapReduce当中,MapTask阶段执行了2次排序。
-
分别是在环形缓冲区溢写之前,进行了一次快排(对KEY的索引排序)
-
对溢写文件再次进行了一次Merge合并操作(归并排序)
-
-
接下来到Reduce阶段
-
Map阶段结束之后,Reduce阶段主动的去拉取对应的数据,拉取自己分区的数据过来之后,它需要对拉取过来的数据进行一次归并排序。
-
总结:在map阶段进行了一次快排一次归并,在reduce进行了一次归并
MapTask和ReduceTask的三次排序
接下来我们就来研究一下这个排序
排序是MapReduce框架中最重要的操作之一。
MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。
默认排序是按照字典顺序排序,且实现该排序的方法是快速排序
- 不管你是MapTask还是ReduceTask,他们的key都必须是支持排序的。如果不支持排序Hadoop会报错。
-
为什么一定要对它进行排序呢?
- 如果你前面这个数据不排序的话,我们最终是把相同key的内容进入到reduce中,如果你在前面不进行一系列的排序操作。你每次把key放入reducer就得一个个判断是不是相同key(效率低下)
- 反观,现在已经把相同的key放在一起了,我们只需要跟下一个去比较当前的key是否一样即可,如果一样就是相同key,不一样,直接把到这的前面这一组拿走。(效率高)
不是进来一条数据,就对你进行排序,它是缓冲区到达一定阈值后,要往磁盘上溢写之前进行一次排序。
这个排序的过程是在内存当中完成的。
对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘.上所有文件进行归并排序。
最终无论你的数据在内存还是在磁盘,都要进行一次统一的归并排序。因为我们要将相同的key传到reducer方法里面去。只有统一的全都排好序,效率才能是最高,否则的话,每一次获取key都要遍历所有。(如果这是10亿个key,每次要遍历10亿遍)
对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成- -一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一-次合并后将数据溢写到磁盘 上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一-次归并排序。
排序的分类
-
**部分排序:**MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。
比如说手机号,第一个分区文件,里面的手机号内容,137xxx后面是按一定顺序的。它只考虑单个分区文件内部是有序的。
使用场景:XX地区销售额前10
-
**全排序:**最终输出结果只有一个文件,且文件内部有序。实现方式是只设置-个ReduceTask。 但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。
比如说:wordcount案例,只有一个文件。
这种在生产环境慎用,实际数据量非常大,都在一个reduce里是处理不了的。因此全排序用的可能性不大。
-
辅助排序 – GroupingComparator分组 , 用的较少。
-
**二次排序:**在自定义排序过程中,如果compareto(java里的一个可重写的方法)中的判断条件为两个即为二次排序。
比如说:手机总流量排序,如果总流量相同,按上行流量倒序,如果还相同,下行流量倒序。
自定义排序也叫二次排序
自定义排序
bean对象做为key传输,需要实现WritableComparable接口重写compareTo方法,就可以实现排序。
回忆,之前序列化实现的是writable接口
WritableComparable
接口,继承了Writable也继承了Comparable
自定义排序案例(全排序)
根据案例2.3序列化案例产生的结果再次对总流量进行倒序排序。