MR相关内容
- InputFormat(切片和为maptask读取数据)
- 分区(与Reducer的执行数量,自定义分区)
- 排序
- 合并Combiner(快速排序后、第一次归并后、不能影响执行结果,输入kv与输出kv类型一致)
- 分组(第二次归并后,Reducer输入前,自定义分组)
切片(InputFormat)
类型 | 切片方法getSplits
|
kv方法createRecordReader
|
用途说明 |
---|---|---|---|
TextInputFormat |
FIF的切片方法 | LineRecordReader |
按照块大小分片,按行行读取记录。 |
KeyValueTextInputFormat |
FIF的切片方法 | KeyValueLineRecordReader |
按照块大小分片,按行读取记录。 |
SequenceFileInputFormat |
FIF的切片方法 | SequenceFileRecordReader |
按照块大小分片,专用读取上一个任务使用SequenceFileOutputFormat 输出的文件。 |
FixedLengthInputFormat |
FIF的切片方法 | FixedLengthRecordReader |
按照块大小分片,定长读取记录。(使用频率低) |
NLineInputFormat |
自定义,N行一片 | LineRecordReader |
通过指定行数进行分片,按行读取记录。 |
CombineFileInputFormat |
自定义 | LineRecordReader |
合并小文件进行分片读取,按行读取记录。 |
RecordReader分类
类型 | 说明 |
---|---|
LineRecordReader |
按行读取。 key: LongWritable ,内容偏移量value: Text ,一行数据 |
KeyValueLineRecordReader |
按照指定分割符进行分割 key: Text ,分割的前一部分value: Text ,分割的后一部分,可以通过configuration中的mapreduce.input.keyvaluelinerecordreader.key.value.separator 属性指定,默认是\t
|
FixedLengthRecordReader |
读取固定长度内容(byte) key: LongWritable ,记录偏移量value: BytesWritable ,二进制数据 |
SequenceFileRecordReader |
主要串联job执行,读取上一个job的结果,作为当前job的输入,可以传递对象数据 |
自定义InputFormat
场景:
假设目前有一堆小文件,需要通过一个MR程序转换为一个SequenceFile文件,其中key:文件路径,value:文件内容
思路:
- 自定义一个Format类继承
FileInputFormat
,其中key:Text,value:BytesWritable
(因为文件内容不一定是文本)- 需要重写方法:
public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
,此时我们需要定义一个自己的RecordReader- 同时考虑分片问题,我们FileInputFormat默认是按照文件和块大小分片的,这里我们需要同一个文件不被切片即一个文件在一个分片内,需要重写方法
protected boolean isSplitable(JobContext context, Path filename)
InputFormat.java
/**
* 将一堆小文件,转换为一个SequenceFile文件,key:原文件路径,value:原文件内容
* 这里的泛型即为MapTask的记录的输入类型,所以key:Text,value:BytesWritable(因为文件不一定是文本,所以用BytesWritable)
*/
public class PackageInputFormat extends FileInputFormat<Text, BytesWritable> {
@Override
public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
return new PackageSequenceRecordReader();
}
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
}
RecordReader.java
public class PackageSequenceRecordReader extends RecordReader<Text, BytesWritable> {
//标识文件是否已经被读取过
private boolean notRead = true;
private FileSplit fs = null;
private Text key = new Text();
private BytesWritable val = new BytesWritable();
private FSDataInputStream inputStream;
//初始化只执行一次
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
fs = (FileSplit) split;
Path path = fs.getPath();
FileSystem fileSystem = path.getFileSystem(context.getConfiguration());
inputStream = fileSystem.open(path);
}
//是否还有下一个数据,返回true则标识还有数据,否则无数据
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
//判断是否已读取过,未读取过进行读取
if(notRead){
//设置key
key.set(fs.getPath().getName());
//设置val
long fileLength = fs.getLength();
System.out.println("fs.getlength():"+fileLength);
byte[] buf = new byte[(int) fileLength];
int length = inputStream.read(buf);
val.set(buf,0,length);
notRead = false;
return true;
}else{
return false;
}
}
//获取当前key
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
//获取当前value
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return val;
}
//去读进度
@Override
public float getProgress() throws IOException, InterruptedException {
return notRead ? 0 : 1;
}
//关闭资源
@Override
public void close() throws IOException {
IOUtils.closeStream(inputStream);
}
}
Driver.java
public class PackageDriver {
public static void main(String[] args) throws Exception {
//1. 创建一个Job
Configuration conf = new Configuration();
conf.set("mapred.reduce.child.java.opts", "-Xmx512m");
Job job = Job.getInstance(conf);
//2. 设置类路径
job.setJarByClass(FlowDriver.class);
//3. 设置Mapper和Reducer:不需要MR
//4. 设置Mapper和Reducer的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(BytesWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
//5. 设置输入输出数据
FileInputFormat.setInputPaths(job,new Path("d://hadoop-study/inputformat/input"));
FileOutputFormat.setOutputPath(job,new Path("d://hadoop-study/inputformat/output"));
//设置InputFormat
job.setInputFormatClass(PackageInputFormat.class);
//设置OutputFormat
job.setOutputFormatClass(SequenceFileOutputFormat.class);
//6. 提交job
boolean b = job.waitForCompletion(true);
System.exit(b?0:1);
}
}