自定义OutputFormat使用场景:
控制文件最终的输出路径和格式。比如:一个MR程序要求根据不同的结果将数据输出到不同的目录中。
需求
过滤输入的log日志,包含百度的网站输出到 baidu.log,不包含baidu的网站输出到 other.log。
代码实现
1.自定义MyOutputFormat类,继承FileOutputFormat类。
其中的泛型是Reduce端数据输出的k-v类型。
package com.aura.hadoop.outputformat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @author panghu
* @description
* @create 2021-02-16-17:12
*/
public class MyOutputformat extends FileOutputFormat<LongWritable,Text> {
@Override
public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
return new MyRecordWriter(job);
}
}
2.自定义MyRecordWriter继承RecordWriter。作用是把k-v键值对转换为数据。
package com.aura.hadoop.outputformat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @author panghu
* @description
* @create 2021-02-16-17:14
*/
public class MyRecordWriter extends RecordWriter<LongWritable, Text> {
FSDataOutputStream baidu;
FSDataOutputStream other;
/**
* 开流
*
* @param job context上下文对象
*/
public MyRecordWriter(TaskAttemptContext job) throws IOException {
Configuration conf = job.getConfiguration();
// 获取文件输出路径
String outDir = conf.get(FileOutputFormat.OUTDIR);
// 设置不同输出路径
FileSystem fs = FileSystem.get(conf);
baidu = fs.create(new Path(outDir + "/baidu.log"));
other = fs.create(new Path(outDir + "/other.log"));
}
/**
* 处理逻辑,不同的日志输出到不同的文件夹
*
* @param longWritable
* @param text
* @throws IOException
* @throws InterruptedException
*/
@Override
public void write(LongWritable longWritable, Text text) throws IOException, InterruptedException {
String line = text.toString() + "\n";
if (line.contains("baidu")) {
baidu.write(line.getBytes());
} else {
other.write(line.getBytes());
}
}
/**
* 关流
*
* @param job
* @throws IOException
* @throws InterruptedException
*/
@Override
public void close(TaskAttemptContext job) throws IOException, InterruptedException {
IOUtils.closeStream(other);
IOUtils.closeStream(baidu);
}
}
3.在此需求中,Mapper端和Reducer端可以不做任何处理,直接写出即可。创建Driver类。
package com.aura.hadoop.outputformat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @author panghu
* @description
* @create 2021-02-16-17:30
*/
public class OutputDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(OutputDriver.class);
// 指定outformat类型
job.setOutputFormatClass(MyOutputformat.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);
// 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path("D:\\data\\hadoopdata\\自定义输出流\\log.txt"));
FileOutputFormat.setOutputPath(job, new Path("D:\\data\\out\\MyOutputformat_out"));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}