hadoop 使用map合并小文件到SequenceFile



1.1 继承RecordReader泛型,重写这个类。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; public class WholeFileRecordReader extends RecordReader<NullWritable,BytesWritable> {
private FileSplit fileSplit;
private Configuration conf;
private BytesWritable value = new BytesWritable();
private boolean processed = false;
* Called once at initialization.
* @param split the split that defines the range of records to read
* @param context the information about the task
* @throws IOException
* @throws InterruptedException
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
this.fileSplit = (FileSplit) split;
this.conf = context.getConfiguration();
} /**
* Read the next key, value pair.
* @return true if a key/value pair was read
* @throws IOException
* @throws InterruptedException
public boolean nextKeyValue() throws IOException, InterruptedException {
byte[] contents = new byte[(int)fileSplit.getLength()];
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try {
in = fs.open(file);
}finally {
processed = true;
return true;
} return false;
} /**
* Get the current key
* @return the current key or null if there is no current key
* @throws IOException
* @throws InterruptedException
public NullWritable getCurrentKey() throws IOException, InterruptedException {
return NullWritable.get();
} /**
* Get the current value.
* @return the object that was read
* @throws IOException
* @throws InterruptedException
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return value;
} /**
* The current progress of the record reader through its data.
* @return a number between 0.0 and 1.0 that is the fraction of the data read
* @throws IOException
* @throws InterruptedException
public float getProgress() throws IOException, InterruptedException {
return processed ? 1.0f:0.0f;
} /**
* Close the record reader.
public void close() throws IOException { }

1.2 继承FileInputFormat泛型,重写文件输入格式

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import java.io.IOException; public class WholeFileInputFormat extends FileInputFormat<NullWritable,BytesWritable> {
* Is the given filename splittable? Usually, true, but if the file is
* stream compressed, it will not be.
* <p>
* The default implementation in <code>FileInputFormat</code> always returns
* true. Implementations that may deal with non-splittable files <i>must</i>
* override this method.
* <p>
* <code>FileInputFormat</code> implementations can override this and return
* <code>false</code> to ensure that individual input files are never split-up
* so that process entire files.
* @param context the job context
* @param filename the file name to check
* @return is this file splitable?
protected boolean isSplitable(JobContext context, Path filename) {
return false;//文件不分片,为了整体读入
} /**
* Create a record reader for a given split. The framework will call
* {@link RecordReader#initialize(InputSplit, TaskAttemptContext)} before
* the split is used.
* @param split the split to be read
* @param context the information about the task
* @return a new record reader
* @throws IOException
* @throws InterruptedException
public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
WholeFileRecordReader recordReader = new WholeFileRecordReader();
return recordReader;


public class SequenceFileMapper extends Mapper<NullWritable,BytesWritable,Text,BytesWritable> {
enum FileCounter {
private Text filenameKey;
* Called once at the beginning of the task.
* @param context
protected void setup(Context context) throws IOException, InterruptedException {
InputSplit split = context.getInputSplit();
Path path = ((FileSplit)split).getPath();
filenameKey = new Text(path.toString());
} /**
* Called once for each key/value pair in the input split. Most applications
* should override this, but the default is the identity function.
* @param key
* @param value
* @param context
protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; public class SmallFilesToSequenceFileConverter extends Configured implements Tool { /**
* Execute the command with the given arguments.
* @param args command specific arguments.
* @return exit code.
* @throws Exception
public int run(String[] args) throws Exception {
Configuration conf = getConf();
return -1;
} Path outPath = new Path(args[1]);
FileSystem fileSystem = outPath.getFileSystem(conf);
} Job job = Job.getInstance(conf,"SmallFilesToSequenceFile");
job.setJarByClass(SmallFilesToSequenceFileConverter.class); job.setMapperClass(SequenceFileMapper.class); job.setInputFormatClass(WholeFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class); FileInputFormat.addInputPath(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1])); return job.waitForCompletion(true) ? 0:1;
} public static void main(String[] args) throws Exception{
long startTime = System.currentTimeMillis(); int exitCode = ToolRunner.run(new SmallFilesToSequenceFileConverter(),args);
System.exit(exitCode); long endTime = System.currentTimeMillis();
long timeSpan = endTime - startTime;


[hadoop@bigdata-senior01 ~]$ hadoop jar SmallFilesToSequenceFileConverter.jar -D mapreduce.job.reduces=2 /demo /output3 ...
[hadoop@bigdata-senior01 ~]$ hadoop fs -ls /output3
Found 3 items
-rw-r--r--   1 hadoop supergroup          0 2019-02-18 16:17 /output3/_SUCCESS
-rw-r--r--   1 hadoop supergroup      60072 2019-02-18 16:17 /output3/part-r-00000
-rw-r--r--   1 hadoop supergroup      28520 2019-02-18 16:17 /output3/part-r-00001
