groupingComparator实现分组求取topN
求Top1:
GroupingComparator是mapreduce当中reduce端的一个功能组件,主要的作用是决定哪些数据作为一组,调用一次reduce的逻辑,默认是每个不同的key,作为多个不同的组,每个组调用一次reduce逻辑,我们可以自定义GroupingComparator实现不同的key作为同一个组,调用一次reduce逻辑
需求如下: 求下列每一个订单中 交易金额最大的一个
分析:
将订单号和金额合并为OrderBean作为key,在map阶段按照key分区
在reduce阶段利用groupingComparator来聚合成组,取第一个值即为最大值
定义OderBean
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class OrderBean implements WritableComparable<OrderBean> {
private String orderId; //定义订单号
private Double price; //定义价格必须使用Double包装类型
/**
* 按照价格进行排序
* @param o
* @return
*/
@Override
public int compareTo(OrderBean o) {
//需要先比较我们的订单id,如果订单id相同的,我们再按照金额进行排序
//如果订单id不相同,没有可比性
int result = this.orderId.compareTo(o.orderId);
if(result ==0){
//如果订单id相同,继续比较价格,按照价格进行排序,
//如果订单id不相同没有可比性
//默认降序
result = this.price.compareTo(o.price);
return -result;
}
return result;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(orderId);
out.writeDouble(price);
}
@Override
public void readFields(DataInput in) throws IOException {
this.orderId= in.readUTF();
this.price = in.readDouble();
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
@Override
public String toString() {
return orderId+"\t"+price;
}
}
定义分区
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class GroupPartition extends Partitioner<OrderBean,NullWritable> {
@Override
public int getPartition(OrderBean orderBean, NullWritable nullWritable, int numPartitions) {
return (orderBean.getOrderId().hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
定义MyGroupComparator:
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class MyGroupComparator extends WritableComparator {
/**
* 注意调用compare方法的时候的参数,参数必须是WritableComparable类型,这个类型才可以进行比较
* @param a
* @param b
* @return
*/
public MyGroupComparator() {
super(OrderBean.class,true); //无参构造器
}
//将我们自定义的OrderBean注册到我们自定义的MyGroupIngCompactor当中来
//表示我们的分组器在分组的时候,对OrderBean这一种类型的数据进行分组
//传入作为key的bean的class类型,以及制定需要让框架做反射获取实例对象
@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBean first = (OrderBean) a; //类型强制转化
OrderBean second = (OrderBean) b;
return first.getOrderId().compareTo(second.getOrderId()); //根据订单号来进行对比
}
}
定义Mapper
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class GroupMapper extends Mapper<LongWritable,Text,OrderBean,NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split("\t"); //数据切割
OrderBean orderBean = new OrderBean();
orderBean.setOrderId(split[0]);
orderBean.setPrice(Double.valueOf(split[2]));
context.write(orderBean,NullWritable.get());
}
}
定义Reducer
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class GroupReducer extends Reducer<OrderBean,NullWritable,OrderBean,NullWritable> {
@Override
protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key,NullWritable.get()); //利用context写出去
}
}
定义main:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class GroupMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(super.getConf(), "group");
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///F:\\自定义groupingComparator\\input"));
job.setMapperClass(GroupMapper.class);
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(NullWritable.class);
job.setPartitionerClass(GroupPartition.class);
//设置我们自定义的分组类
job.setGroupingComparatorClass(MyGroupComparator.class);
job.setReducerClass(GroupReducer.class);
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("file:///F:\\自定义groupingComparator\\output_top1"));
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new GroupMain(), args);
System.exit(run);
}
}
输出结果:
求Top 2…N:
修改Mapper:
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class GroupMapper extends Mapper<LongWritable,Text,OrderBean,DoubleWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
OrderBean orderBean = new OrderBean();
orderBean.setOrderId(split[0]);
orderBean.setPrice(Double.valueOf(split[2]));
DoubleWritable doubleWritable = new DoubleWritable(Double.valueOf(split[2])); //
context.write(orderBean,doubleWritable);
}
}
修改reducer:
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class GroupReducer extends Reducer<OrderBean,DoubleWritable,OrderBean,DoubleWritable> {
@Override
protected void reduce(OrderBean key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
//context.write(key,NullWritable.get());
int i = 0;
for (DoubleWritable value : values) {
i++;
if(i <= 2){
context.write(key,value);
}else{
break;
}
}
}
}
输出结果:
其他参数以及调优
分区和分组的区别:
mapreduce当中的推测执行:一般直接关掉推测执行
如果推测执行打开了,那么如果一个maptask执行任务时间比较长,那么久再启动一个另外相同的maptask去执行同样的任务,
谁先执行完,就采用谁的执行结果
一个任务为什么会长时间的没有执行完成——有可能是因为数据的倾斜
只会造成集群资源的更加的紧张
一般直接关闭推测执行
yarn的资源调度管理:
yarn是我们hadoop2.x当中引进的一个新的模块,主要用于管理我们集群当中的资源
比如说:内存,cpu
yarn不光管理硬件资源,还管理运行的一些任务信息等等
yarn的调度可以分为两个层级来说
一级管理调度:
管理计算机的资源
运行的job任务的生命周
二级管理调度:
任务的计算模型
多样化的计算模型 storm spark
yarn集群当中各个组件的作用:
resourceManager:主节点,主要用于接收用户的请求,分配资源
nodeManager:从节点,主要用于处理任务的计算
ApplicationMaster:每提交一个任务,启动一个appmaster,
这个appmaster全权负责管理我们的任务的执行
主要职责:申请资源
分配资源(分配container)
监控任务执行的进度状况
回收资源
与resourceManager通信,报告任务的执行状况
自杀,
Container:资源分配的单位,所有的资源都是以container的形式来进行划分的,便于资源的分配和回收
jobHistory:历史完成的任务的日志信息
TimeLineServer: 2.4版本以后出来的新特性,查看正在执行的任务的信息
yarn当中的调度器:
调度器主要解决的是任务先后提交,如何保证任务最快执行的一种策略
研究的是任务之间如何一起执行的问题
队列 栈
队列是两端都开口 就跟我们火车头进隧道一样
栈 一端开口,一端封闭 先进去的后出去 弹夹类似
hadoop当中的调度器主要有三种
1:fifo 队列调度器,first in first out 没人用
第一个任务来了,先执行,第二个任务来了,等着
如果一个很大的计算任务先来,需要执行两个小时,再来一个小任务,需要执行两分钟
第二种:capacity scheduler 容量调度器 apache的hadoop版本默认使用的
容量调度器:将我们集群的资源,划分成好几个队列
30% 40% 30%
3 4 3
任务提交的时候,可以选择不同的队列来进行提交
根据提交的任务需要的资源大小不同,可以将我们的任务,划分到不同的队列下面去
第三种:fairScheduler 公平调度器 CDH版本的hadoop默认的调度规则
如果没有任务提交,第一个任务过来,将集群当中的所有的资源全部给第一个任务
第二个任务来了,将第一个任务的资源划分一点出来给第二个任务,保证第二个任务也可以窒息感
保证每一个任务都可以公平的一起执行
一般调度器不会去更改