在上一次二次排序的基础上,
求每组的最大值
20 21//取
50 51
50 52
50 53
50 54//取
60 51
60 52
60 53
60 56
60 57
60 61//取
70 54
70 55
70 56
70 57
70 58
70 58//取
需要将输入分组:
MyGroupCompartor.java
package MR;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparator;
/**
* 自定义分组比较器:
* 1、需要实现RawComparator
* 2、该类需要实现两个比较方法,一个是对象比较、另外一个是字节比较
* 3、字节比较方法需要注意:字节长度需要和数据类型中的对应的属性类型一致。long=8 int=4
* 4、如果key是对象,则默认使用对象中的第一个属性进行分组。
* @author lyd
*
*/
public class MyGroupCompartor implements RawComparator<SecondarySortWritable>{
/**
* 字节的比较
*/
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
//return WritableComparator.compareBytes(b1, s1, 2, b2, s2, 2);
return 0;
}
/**
* 对象比较,自定义数据类型没有实现,可以再这实现
*/
@Override
public int compare(SecondarySortWritable o1, SecondarySortWritable o2) {
return o1.getFirst() - o2.getFirst();
}
}
上一次二次排序,稍作变动即可
package MR;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
* 分组最大值
*/
public class SortSecondaryMaxDemo implements Tool {
/**
* map阶段
* @author lyd
*
*/
public static class MyMapper extends Mapper<LongWritable, Text, SecondarySortWritable, IntWritable> {
SecondarySortWritable ss = new SecondarySortWritable();
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String line = value.toString();
String dig [] = line.split(" ");
/*ss.setFirst(Integer.parseInt(dig[0]));
ss.setSecond(Integer.parseInt(dig[1]));*/
ss.setFirst(Integer.parseInt(dig[1]));
ss.setSecond(Integer.parseInt(dig[0]));
context.write(ss, new IntWritable(Integer.parseInt(dig[1])));
}
}
/**
* reduce阶段
* @author lyd
*
*/
public static class MyReducer extends Reducer<SecondarySortWritable, IntWritable, SecondarySortWritable, IntWritable> {
private IntWritable v = new IntWritable(0);
private static int tmp = 0;
@Override
protected void reduce(SecondarySortWritable key, Iterable<IntWritable> values,Context context)
throws IOException, InterruptedException {
/*
//方法一
if(key.getFirst() != tmp){
context.write(key,v);
tmp = key.getFirst();
}*/
context.write(key,v);
}
}
public void setConf(Configuration conf) {
}
public Configuration getConf() {
return new Configuration();
}
/**
* 驱动方法
*/
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Job job = Job.getInstance(conf, "ssjob");
job.setJarByClass(SortSecondaryMaxDemo.class);
// set inputpath and outpuatpath
setInputAndOutput(job, conf, args);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(SecondarySortWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
//设置分组比较器
// job.setGroupingComparatorClass(MyGroupCompartor.class);
job.setGroupingComparatorClass(WritableComparator.class);
//提交
return job.waitForCompletion(true) ? 0 : 1;
}
//主方法
public static void main(String[] args) throws Exception {
int isok = ToolRunner.run(new Configuration(), new SortSecondaryMaxDemo(), args);
System.exit(isok);
}
/**
* 处理参数的方法
* @param job
* @param conf
* @param args
*/
public static void setInputAndOutput(Job job,Configuration conf,String[] args){
if(args.length != 2){
System.out.println("usage:yarn jar /*.jar package.classname /inputpath /outputpath");
return ;
}
//正常处理输入输出参数
try {
FileInputFormat.addInputPath(job, new Path(args[0]));
//FileSystem fs = FileSystem.get(conf);
Path outputPath = new Path(args[1]);
/*if(fs.exists(outputPath)){
fs.delete(outputPath, true);
}*/
FileOutputFormat.setOutputPath(job, outputPath);
} catch (Exception e) {
e.printStackTrace();
}
}
}