大数据入门--hadoop(三)--MR编程

MR相关内容

  • InputFormat(切片和为maptask读取数据)
  • 分区(与Reducer的执行数量,自定义分区)
  • 排序
  • 合并Combiner(快速排序后、第一次归并后、不能影响执行结果,输入kv与输出kv类型一致)
  • 分组(第二次归并后,Reducer输入前,自定义分组)

切片(InputFormat)

类型 切片方法getSplits kv方法createRecordReader 用途说明
TextInputFormat FIF的切片方法 LineRecordReader 按照块大小分片,按行行读取记录。
KeyValueTextInputFormat FIF的切片方法 KeyValueLineRecordReader 按照块大小分片,按行读取记录。
SequenceFileInputFormat FIF的切片方法 SequenceFileRecordReader 按照块大小分片,专用读取上一个任务使用SequenceFileOutputFormat输出的文件。
FixedLengthInputFormat FIF的切片方法 FixedLengthRecordReader 按照块大小分片,定长读取记录。(使用频率低)
NLineInputFormat 自定义,N行一片 LineRecordReader 通过指定行数进行分片,按行读取记录。
CombineFileInputFormat 自定义 LineRecordReader 合并小文件进行分片读取,按行读取记录。

RecordReader分类

类型 说明
LineRecordReader 按行读取。
key:LongWritable,内容偏移量
value:Text,一行数据
KeyValueLineRecordReader 按照指定分割符进行分割
key:Text,分割的前一部分
value:Text,分割的后一部分,可以通过configuration中的mapreduce.input.keyvaluelinerecordreader.key.value.separator属性指定,默认是\t
FixedLengthRecordReader 读取固定长度内容(byte)
key:LongWritable,记录偏移量
value:BytesWritable,二进制数据
SequenceFileRecordReader 主要串联job执行,读取上一个job的结果,作为当前job的输入,可以传递对象数据

自定义InputFormat

场景:
假设目前有一堆小文件,需要通过一个MR程序转换为一个SequenceFile文件,其中key:文件路径,value:文件内容
思路:

  1. 自定义一个Format类继承FileInputFormat,其中key:Text,value:BytesWritable(因为文件内容不一定是文本)
  2. 需要重写方法:public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context),此时我们需要定义一个自己的RecordReader
  3. 同时考虑分片问题,我们FileInputFormat默认是按照文件和块大小分片的,这里我们需要同一个文件不被切片即一个文件在一个分片内,需要重写方法protected boolean isSplitable(JobContext context, Path filename)

InputFormat.java

/**
 * 将一堆小文件,转换为一个SequenceFile文件,key:原文件路径,value:原文件内容
 * 这里的泛型即为MapTask的记录的输入类型,所以key:Text,value:BytesWritable(因为文件不一定是文本,所以用BytesWritable)
 */
public class PackageInputFormat extends FileInputFormat<Text, BytesWritable> {
    @Override
    public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        return new PackageSequenceRecordReader();
    }

    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;
    }
}

RecordReader.java

public class PackageSequenceRecordReader extends RecordReader<Text, BytesWritable> {
    //标识文件是否已经被读取过
    private boolean notRead = true;
    private FileSplit fs = null;
    private Text key = new Text();
    private BytesWritable val = new BytesWritable();
    private FSDataInputStream inputStream;
    //初始化只执行一次
    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        fs = (FileSplit) split;
        Path path = fs.getPath();
        FileSystem fileSystem = path.getFileSystem(context.getConfiguration());
        inputStream =  fileSystem.open(path);
    }

    //是否还有下一个数据,返回true则标识还有数据,否则无数据
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        //判断是否已读取过,未读取过进行读取
        if(notRead){
            //设置key
            key.set(fs.getPath().getName());
            //设置val
            long fileLength = fs.getLength();
            System.out.println("fs.getlength():"+fileLength);
            byte[] buf = new byte[(int) fileLength];
            int length = inputStream.read(buf);
            val.set(buf,0,length);
            notRead = false;
            return true;
        }else{
            return false;
        }
    }

    //获取当前key
    @Override
    public Text getCurrentKey() throws IOException, InterruptedException {
        return key;
    }

    //获取当前value
    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return val;
    }

    //去读进度
    @Override
    public float getProgress() throws IOException, InterruptedException {
        return notRead ? 0 : 1;
    }

    //关闭资源
    @Override
    public void close() throws IOException {
        IOUtils.closeStream(inputStream);
    }
}

Driver.java

public class PackageDriver {
    public static void main(String[] args) throws Exception {
        //1. 创建一个Job
        Configuration conf = new Configuration();
        conf.set("mapred.reduce.child.java.opts", "-Xmx512m");
        Job job = Job.getInstance(conf);

        //2. 设置类路径
        job.setJarByClass(FlowDriver.class);

        //3. 设置Mapper和Reducer:不需要MR

        //4. 设置Mapper和Reducer的输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(BytesWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);

        //5. 设置输入输出数据
        FileInputFormat.setInputPaths(job,new Path("d://hadoop-study/inputformat/input"));
        FileOutputFormat.setOutputPath(job,new Path("d://hadoop-study/inputformat/output"));

        //设置InputFormat
        job.setInputFormatClass(PackageInputFormat.class);

        //设置OutputFormat
        job.setOutputFormatClass(SequenceFileOutputFormat.class);

        //6. 提交job
        boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);
    }
}
上一篇:算法思想-三分


下一篇:伪分布式yarn上运行mr程序