Hadoop学习笔记: MapReduce二次排序

本文给出一个实现MapReduce二次排序的例子

package SortTest;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.*; public class SortComparable implements WritableComparable<SortComparable> { private Integer first;
private Integer second; public SortComparable(){ } public SortComparable(Integer first, Integer second) {
this.first = first;
this.second = second;
} public Integer getFirst() {
return first;
} public Integer getSecond() {
return second;
} @Override
public void readFields(DataInput in) throws IOException {
this.first = in.readInt();
this.second = in.readInt();
} @Override
public void write(DataOutput out) throws IOException {
out.writeInt(this.first);
out.writeInt(this.second);
} @Override
public int compareTo(SortComparable o) {
int temp = this.first - o.first;
if(temp != 0){
return temp;
} else {
return (o.second - this.second);
}
}
}
package SortTest;

import java.io.IOException;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*; public class SortComparableTest extends Configured implements Tool { public static class MapperTest extends Mapper<LongWritable, Text, SortComparable, IntWritable> {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
Integer first = Integer.parseInt(split[0]);
Integer second = Integer.parseInt(split[1]);
SortComparable sc = new SortComparable(first, second);
context.write(sc, new IntWritable(1));
}
} public static class ReducerTest extends Reducer<SortComparable, IntWritable, IntWritable, IntWritable> {
public void reduce(SortComparable key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
context.write(new IntWritable(key.getFirst()),new IntWritable(key.getSecond()));
}
} public static void main(String[] args) {
try {
int returnCode = ToolRunner.run(new SortComparableTest(), args);
System.exit(returnCode);
} catch (Exception e) {
e.printStackTrace();
}
} static final String INPUT = "/home/sortInput";
static final String OUTPUT = "/home/sortOutput"; @Override
public int run(String[] arg0) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name","hdfs://localhost:9001");
Job job = Job.getInstance(conf, "SortTest"); FileInputFormat.addInputPath(job, new Path(INPUT));
FileOutputFormat.setOutputPath(job, new Path(OUTPUT)); job.setJarByClass(SortComparableTest.class);
job.setMapperClass(MapperTest.class);
job.setReducerClass(ReducerTest.class); job.setMapOutputKeyClass(SortComparable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class); job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
} }
上一篇:hibernate 的SessionFactory的getCurrentSession 与 openSession() 的区别


下一篇:【hadoop】mapreduce原理总结