1、MR 中的排序
MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑.上是否需要。默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。
- MapTask它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,溢写完毕后,它会对磁盘上所有文件进行归并排序。
- ReduceTask 当所有数据拷贝完毕后,ReduceTask统-对内存和磁盘上的所有数据进行一次归并排序。
- 部分排序.
MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。 - 全排序
最终输出结果只有一个文件,且文件内部有序。实现方式是只设置- -个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件完全丧失了MapReduce所提供的并行架构。 - 辅助排序: ( GroupingComparator分组)
在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。 - 二次排序.
在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。
2、WritableComparable
Bean对象如果作为Map输出的key时,需要实WritableComparable接口并重写compareTo方法指定排序规则
2.1 全排序
数据
01 a00df6s kar 120.196.100.99 384 33 200
日志id 设备id 厂商id ip 自有内容时长(秒) 第三方内容时长(秒) 网络状态码
需求
每台设备按照总播放时长(*时长+第三方时长) 降序排序
bean
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class SpeakCompareBean implements WritableComparable<SpeakCompareBean> {
private String deviceId;
private long selfDuration; // 自有播放时长
private long thirdPartDuration; // 第三方播放时长
private long sumDuration; // 总时长
public SpeakCompareBean() {
}
public SpeakCompareBean(String deviceId, long selfDuration, long thirdPartDuration) {
this.deviceId = deviceId;
this.selfDuration = selfDuration;
this.thirdPartDuration = thirdPartDuration;
this.sumDuration = selfDuration + thirdPartDuration;
}
@Override
public int compareTo(SpeakCompareBean o) {
long sumDuration = o.sumDuration;
int result ;
if ( this.sumDuration > sumDuration){
result = 1;
} else if (this.sumDuration < sumDuration){
result = -1;
} else {
result = 0;
}
return result;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(deviceId);
dataOutput.writeLong(selfDuration);
dataOutput.writeLong(thirdPartDuration);
dataOutput.writeLong(sumDuration);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.deviceId= dataInput.readUTF();
this.selfDuration = dataInput.readLong();
this.thirdPartDuration = dataInput.readLong();
this.sumDuration = dataInput.readLong();
}
@Override
public String toString() {
return deviceId + '\t' +
selfDuration + "\t" +
thirdPartDuration + "\t" +
sumDuration ;
}
}
map
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class SpeakCompareBeanMapper extends Mapper<LongWritable, Text,SpeakCompareBean, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//super.map(key, value, context);
String[] fields = value.toString().trim().split("\t");
String deviceId = fields[1];
long selfDuration = Long.parseLong(fields[4]);
long thirdPartDuration = Long.parseLong(fields[5]);
SpeakCompareBean speakCompareBean = new SpeakCompareBean(deviceId,selfDuration,thirdPartDuration);
NullWritable e = NullWritable.get();
context.write(speakCompareBean, e);
}
}
reduce
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class SpeakCompareBeanReducer extends Reducer<SpeakCompareBean, NullWritable, NullWritable,SpeakCompareBean> {
@Override
protected void reduce(SpeakCompareBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
for(NullWritable value : values){
context.write(value,key);
}
}
}
main
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class SpeakCompareBeanDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(SpeakCompareBeanDriver.class);
job.setMapperClass(SpeakCompareBeanMapper.class);
job.setReducerClass(SpeakCompareBeanReducer.class);
job.setMapOutputKeyClass(SpeakCompareBean.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(SpeakCompareBean.class);
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
boolean result =job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
运行结果
2.2 GroupingComparator
GroupingComparator是mapreduce当中reduce端的一个功能组件,主要的作用是决定哪些数据作为一组,调用一次reduce的逻辑,默认是每个不同的key,作为多个不同的组,每个组调用一次reduce逻辑,我们可以自定义GroupingComparator实现不同的key作为同一个组,调用一次reduce逻辑。
数据
需求
需要求出每一个订单中成交金额最大的一笔交易
实现思路
-
Mapper
读取一行文本数据,切分出每个字段;订单id和金额封装为一个Bean对象,Bean对象的排序规则指定为先按照订单Id排序,订单Id相等再按照金额降序排;map()方法输出kv;key–>bean对象 ,value–>NullWritable.get(); -
Shuffle
指定分区器,保证相同订单id的数据去往同个分区(自定义分区器) -
指定GroupingComparator,分组规则指定只要订单Id相等则认为属于同一组;
-
Reduce
每个reduce()方法写出一组key的第一个
bean
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class OrderBean implements WritableComparable<OrderBean> {
private String orderId;
private Double price;
public OrderBean(){}
public OrderBean(String orderId,Double price){
this.orderId = orderId;
this.price = price;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}
@Override
public int compareTo(OrderBean o) {
int i = orderId.compareTo(o.getOrderId());
if (i == 0){
return - price.compareTo(o.getPrice());
}
return - i;
}
@Override
public String toString() {
return
"orderId='" + orderId + '\'' +
", price=" + price ;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(orderId);
dataOutput.writeDouble(price);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.orderId = dataInput.readUTF();
this.price = dataInput.readDouble();
}
}
partitioner
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class OrderPartitioner extends Partitioner<OrderBean, NullWritable> {
@Override
public int getPartition(OrderBean orderBean, NullWritable nullWritable, int i) {
//自定义分区,将相同订单id的数据发送到同一个reduce里面去
return (orderBean.getOrderId().hashCode() & Integer.MAX_VALUE) % i;
}
}
WritableComparator
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class OrderGroupingComparator extends WritableComparator {
//将我们自定义的OrderBean注册到我们自定义的CustomGroupIngCompactor当中来
//表示我们的分组器在分组的时候,对OrderBean这一种类型的数据进行分组
//传入作为key的bean的class类型,以及制定需要让框架做反射获取实例对象
public OrderGroupingComparator() {
super(OrderBean.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBean first = (OrderBean) a;
OrderBean second = (OrderBean) b;
final int i = first.getOrderId().compareTo(second.getOrderId());
if (i == 0) {
System.out.println(first.getOrderId() + "----" +
second.getOrderId());
}
return i;
}
}
map
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class OrderMapper extends Mapper<LongWritable, Text,OrderBean, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//super.map(key, value, context);
String[] fields = value.toString().trim().split("\t");
String orderId = fields[0];
Double price = Double.parseDouble(fields[2]);
context.write(new OrderBean(orderId,price),NullWritable.get());
}
}
reduce
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class OrderReducer extends Reducer<OrderBean, NullWritable,OrderBean, NullWritable> {
@Override
protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
//每个reduce里面第一个就已经是金额最大的了;
//
context.write(key, NullWritable.get());
}
}
main
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class OrderDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1 获取配置信息,或者job对象实例
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2 指定本程序的jar包所在的本地路径
job.setJarByClass(OrderDriver.class);
// 3 指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(OrderMapper.class);
job.setReducerClass(OrderReducer.class);
// 4 指定mapper输出数据的kv类型
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(NullWritable.class);
// 5 指定最终输出的数据的kv类型
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
// 6 指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path("E:\\hdfs_test_dir\\groupingComparator.txt"));
FileOutputFormat.setOutputPath(job, new Path("E:\\hdfs_test_dir\\order_output"));
// 7 指定分区器,指定分组比较器,设置reducetask数量
job.setPartitionerClass(OrderPartitioner.class);
job.setGroupingComparatorClass(OrderGroupingComparator.class);
job.setNumReduceTasks(2);
// 8 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
运行结果