MR自定义OutputFormat

自定义OutputFormat使用场景:

控制文件最终的输出路径和格式。比如:一个MR程序要求根据不同的结果将数据输出到不同的目录中。

需求

过滤输入的log日志,包含百度的网站输出到 baidu.log,不包含baidu的网站输出到 other.log。

代码实现

1.自定义MyOutputFormat类,继承FileOutputFormat类。
其中的泛型是Reduce端数据输出的k-v类型。

package com.aura.hadoop.outputformat;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author panghu
 * @description
 * @create 2021-02-16-17:12
 */
public class MyOutputformat extends FileOutputFormat<LongWritable,Text> {
    @Override
    public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        return new MyRecordWriter(job);
    }
}

2.自定义MyRecordWriter继承RecordWriter。作用是把k-v键值对转换为数据。

package com.aura.hadoop.outputformat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author panghu
 * @description
 * @create 2021-02-16-17:14
 */
public class MyRecordWriter extends RecordWriter<LongWritable, Text> {

    FSDataOutputStream baidu;
    FSDataOutputStream other;

    /**
     * 开流
     *
     * @param job context上下文对象
     */
    public MyRecordWriter(TaskAttemptContext job) throws IOException {
        Configuration conf = job.getConfiguration();
        // 获取文件输出路径
        String outDir = conf.get(FileOutputFormat.OUTDIR);

        // 设置不同输出路径
        FileSystem fs = FileSystem.get(conf);

        baidu = fs.create(new Path(outDir + "/baidu.log"));
        other = fs.create(new Path(outDir + "/other.log"));

    }

    /**
     * 处理逻辑,不同的日志输出到不同的文件夹
     *
     * @param longWritable
     * @param text
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public void write(LongWritable longWritable, Text text) throws IOException, InterruptedException {
        String line = text.toString() + "\n";
        if (line.contains("baidu")) {
            baidu.write(line.getBytes());
        } else {
            other.write(line.getBytes());
        }
    }

    /**
     * 关流
     *
     * @param job
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public void close(TaskAttemptContext job) throws IOException, InterruptedException {
        IOUtils.closeStream(other);
        IOUtils.closeStream(baidu);
    }
}

3.在此需求中,Mapper端和Reducer端可以不做任何处理,直接写出即可。创建Driver类。

package com.aura.hadoop.outputformat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author panghu
 * @description
 * @create 2021-02-16-17:30
 */
public class OutputDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(OutputDriver.class);

        // 指定outformat类型
        job.setOutputFormatClass(MyOutputformat.class);

        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);

        // 设置输入输出路径
        FileInputFormat.setInputPaths(job, new Path("D:\\data\\hadoopdata\\自定义输出流\\log.txt"));
        FileOutputFormat.setOutputPath(job, new Path("D:\\data\\out\\MyOutputformat_out"));

        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);

    }
}

上一篇:02 Java IO 文件、管道、网络、字节 字符数组 java.io.IOException: Write end dead


下一篇:4.Java IO