摘要:mapreduce中执行reduce(KEYIN key, Iterable<VALUEIN> values, Context context),调用一次reduce方法,迭代value集合时,发现key的值也是在不断变化的,这是因为key的地址在内部会随着value的迭代而不断变化。
序:我们知道reduce方法每执行一次,里面我们会通过for循环迭代value的迭代器。如果key是bean的时候,for循环里面value值变化的同时我们的bean值也是会跟随着变化,调用reduce方法时传参数就传了一次key的值,但是在方法内部迭代的时候,key值在变化,那他怎么变动的?
误区:在map处理完成之后,将所有kv对缓存起来,进行分组,然后传递一个组<key,valus{}>,调用一次reduce方法传入的key和value的迭代器如<hello,{1,1,1,1,1,1.....}>。
给一个需求来观察现象
对日志数据中的上下行流量信息汇总,并输出按照总流量倒序排序的结果,且该需求日志中手机号是不会重复的——即不会存在多条数据,手机号相同,且流量不同,还需要进行多条数据的汇总。
数据如下:
13888888801,1,9,10
13888888802,5,5,10
13888888803,2,7,9
13888888804,4,6,10
13888888805,6,4,10
13888888806,1,0,1
分析
基本思路:实现自定义的bean来封装流量信息,并将bean作为map输出的key来传输。
MR程序在处理数据的过程中会对数据排序(map输出的kv对传输到reduce之前,会排序),排序的依据是map输出的key,所以,我们如果要实现自己需要的排序规则,则可以考虑将排序因素放到key中,让key实现接口:WritableComparable,然后重写key的compareTo方法。
package cn.intsmaze.flowsum.SortBean; public class FlowBeanOne implements WritableComparable<FlowBeanOne> { private long upFlow; private long dFlow; private long sumFlow; private long phone; // 序列化框架在反序列化操作创建对象实例时会调用无参构造 public FlowBeanOne() { } // 序列化方法 @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(dFlow); out.writeLong(sumFlow); out.writeLong(phone); } // 反序列化方法,注意: 字段的反序列化顺序与序列化时的顺序保持一致 @Override public void readFields(DataInput in) throws IOException { this.upFlow = in.readLong(); this.dFlow = in.readLong(); this.sumFlow = in.readLong(); this.phone = in.readLong(); } public void set(long phone,long upFlow, long dFlow) { this.phone=phone; this.upFlow = upFlow; this.dFlow = dFlow; this.sumFlow = upFlow + dFlow; } @Override public String toString() { return upFlow + "\t" + dFlow + "\t" + sumFlow+ "\t" + phone; }//自定义倒序比较规则,总流量相同视为同一个key. @Override public int compareTo(FlowBeanOne o) { return (int)(o.getSumFlow() - this.sumFlow); } get,set...... }
代码实现如下:
package cn.intsmaze.flowsum.SortBean;
/** * 实现流量汇总并且按照流量大小倒序排序 * 前提:处理的数据是已经汇总过的结果文件,然后再次对该文件进行排序 * @author */ public class FlowSumSort { public static class FlowSumSortMapperOne extends Mapper<LongWritable, Text, FlowBeanOne, Text> { FlowBeanOne k = new FlowBeanOne(); Text v = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split(","); long phoneNbr = Long.parseLong(fields[0]); long upFlowSum = Long.parseLong(fields[1]); long dFlowSum = Long.parseLong(fields[2]); k.set(phoneNbr,upFlowSum, dFlowSum);//这里对bean作为key。 context.write(k, v); } } public static class FlowSumSortReducerOne extends Reducer<FlowBeanOne, Text, Text, FlowBeanOne> {
@Override protected void reduce(FlowBeanOne bean, Iterable<Text> phoneNbrs, Context context) throws IOException, InterruptedException { System.out.println("-------------------"); for (Text text : phoneNbrs) { System.out.println(bean); context.write(text, bean); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FlowSumSort.class); // 告诉框架,我们的程序所用的mapper类和reducer类 job.setMapperClass(FlowSumSortMapperOne.class); job.setReducerClass(FlowSumSortReducerOne.class); job.setMapOutputKeyClass(FlowBeanOne.class); job.setMapOutputValueClass(Text.class); // 告诉框架,我们的mapperreducer输出的数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBeanOne.class); // 告诉框架,我们要处理的文件在哪个路径下 FileInputFormat.setInputPaths(job, new Path("d:/intsmaze/input/")); // 告诉框架,我们的处理结果要输出到哪里去 FileOutputFormat.setOutputPath(job, new Path("d:/intsmaze/output/")); boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); } }
这里要注意,因为是汇总排序,所以reduce的并行度必须为1,。除了使用框架的组件外,我们还可以通过使用reduce的cleanup方法,自己在reduce端对收集到的数据进行汇总排序。
6 4 10 13888888805
4 6 10 13888888804
5 5 10 13888888802
1 9 10 13888888801
2 7 9 13888888803
1 0 1 13888888806
-------------------
6 4 10 13888888805
4 6 10 13888888804
5 5 10 13888888802
1 9 10 13888888801
-------------------
2 7 9 13888888803
-------------------
1 0 1 13888888806
灵异现象
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context ) throws IOException, InterruptedException { for(VALUEIN value: values) { context.write((KEYOUT) key, (VALUEOUT) value); } }
来看看hadoop2.6.4源码解析吧:
因为这个问题是一年前遇到的,看完源码搞明白后,并没有时间去整理,所以再次解析有所不足。
Reducer源码解析
public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { public abstract class Context implements ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { } /** * 这个方法我们不需要管,因为我们实现的类重写了该方法。 */ protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context ) throws IOException, InterruptedException { for(VALUEIN value: values) { context.write((KEYOUT) key, (VALUEOUT) value); } } //通过debug我们可以看到,数据在结束map任务执行reduce任务的时候,reduce端会先调用这个方法,而调用这个 //方法的类是我们实现的reduce类,通过继承调用该方法,然后在该方法里面调用我们实现类重写的reduce方法。 public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKey()) {//这个地方调用ReduceContextImpl的方法进行判断 reduce(context.getCurrentKey(), context.getValues(), context);//这个地方调用我们的实现类的reduce方法走我们的逻辑代码了 // If a back up store is used, reset it Iterator<VALUEIN> iter = context.getValues().iterator(); if(iter instanceof ReduceContext.ValueIterator) { ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore(); } } } finally { cleanup(context); } } }
ReduceContextImpl源码解析
(由于代码太多,我只截取了部分主要的代码)
public class ReduceContextImpl { private RawKeyValueIterator input;//这个迭代器里面存储的key-value对元素。 private KEYIN key; // current key private VALUEIN value; // current value private boolean firstValue = false; // first value in key private boolean nextKeyIsSame = false; // more w/ this key private boolean hasMore; // more in file private ValueIterable iterable = new ValueIterable();//访问自己的内部类 public ReduceContextImpl() throws InterruptedException, IOException{ hasMore = input.next();//对象创建的时候,就先判断reduce接收的key-value迭代器是否有元素,并获取下一个元素 } /** 创建完成就调用该方法 ,开始处理下一个唯一的key*/ public boolean nextKey() throws IOException,InterruptedException { while (hasMore && nextKeyIsSame) { //判断迭代器是否还有下一个元素已经下一个元素是否和上一个已经遍历出来的key-value元素的key是不是一样 nextKeyValue(); } if (hasMore) { if (inputKeyCounter != null) { inputKeyCounter.increment(1); } return nextKeyValue(); } else { return false; } } /** * Advance to the next key/value pair. */ @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (!hasMore) { key = null; value = null; return false; } firstValue = !nextKeyIsSame; //获取迭代器下一个元素的key DataInputBuffer nextKey = input.getKey(); //设置当前key的坐标 currentRawKey.set(nextKey.getData(), nextKey.getPosition(), nextKey.getLength() - nextKey.getPosition()); buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength()); //反序列化得到当前key对象 key = keyDeserializer.deserialize(key); //获取迭代器下一个元素的value DataInputBuffer nextVal = input.getValue(); buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength() - nextVal.getPosition()); //反序列化value value = valueDeserializer.deserialize(value); currentKeyLength = nextKey.getLength() - nextKey.getPosition(); currentValueLength = nextVal.getLength() - nextVal.getPosition(); if (isMarked) { //存储下一个key和value backupStore.write(nextKey, nextVal); } //迭代器向下迭代一次 hasMore = input.next(); //如果还有元素,则进行比较,判断key是否相同 if (hasMore) { nextKey = input.getKey(); //这个地方也是比较关键的: nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, currentRawKey.getLength(), nextKey.getData(), nextKey.getPosition(), nextKey.getLength() - nextKey.getPosition() ) == 0; } else { nextKeyIsSame = false; } inputValueCounter.increment(1); return true; } //一个迭代器模式的内部类 protected class ValueIterator implements ReduceContext.ValueIterator<VALUEIN> { private boolean inReset = false; private boolean clearMarkFlag = false; @Override//它并不仅仅是判断迭代器是否还有下一个元素,而且还要判断下一个元素和上一个元素是不是相同的key public boolean hasNext() { if (inReset && backupStore.hasNext()) { return true; } return firstValue || nextKeyIsSame; } @Override //这个地方要注意了,其实在获取下一个元素的时候主要调用的是nextKeyValue(); public VALUEIN next() { if (inReset) { if (backupStore.hasNext()) { backupStore.next(); DataInputBuffer next = backupStore.nextValue(); buffer.reset(next.getData(), next.getPosition(), next.getLength() - next.getPosition()); value = valueDeserializer.deserialize(value); return value; } else { inReset = false; backupStore.exitResetMode(); if (clearMarkFlag) { clearMarkFlag = false; isMarked = false; } } } // if this is the first record, we don't need to advance if (firstValue) { firstValue = false; return value; } // otherwise, go to the next key/value pair nextKeyValue();//该方法就是获取下一个key,value对,key值的变化也就在这里表现出来了。 return value; } } //内部类,实现迭代器,具备迭代器功能 protected class ValueIterable implements Iterable<VALUEIN> { private ValueIterator iterator = new ValueIterator(); @Override public Iterator<VALUEIN> iterator() { return iterator; } } public Iterable<VALUEIN> getValues() throws IOException, InterruptedException { return iterable; } }
简单一句话总结就是:ReduceContextImpl类的RawKeyValueIterator input迭代器对象里面存储中着key-value对的元素, 以及一个只存储value的迭代器,然后每调一次我们实现的reduce方法,就是传入ValueIterable迭代器对象和当前的key。但是我们在方法里面调用迭代器的next方法时,其实调用了nextKeyValue,来获取下一个key和value,并判断下一个key是否和 上一个key是否相同,然后决定hashNext方法是否结束,同时对key进行了一次重新赋值。
这个方法获取KV的迭代器的下一个KV值,然后把K值和V值放到之前传入我们自己写的Reduce类的方法中哪个输入参数的地址上,白话说:框架调用我们写的reduce方法时,传入了三个参数,然后我们方法内部调用phoneNbrs.hashNext方法就是调用的ReduceContextImpl的内部类ValueIterator的hashNext方法,这个方法里面调用了ReduceContextImpl内的nextKeyValue方法,该方法内部又清除了之前调用用户自定义reduce方法时传入的k,v参数的内存地址的数据,然后获取了RawKeyValueIterator input迭代器的下一个KV值,然后把k值和V值放入该数据。这就是原因了。
再看我们的reduce实现类
public static class FlowSumSortReducerOne extends Reducer<FlowBeanOne, Text, Text, FlowBeanOne> { @Override protected void reduce(FlowBeanOne bean, Iterable<Text> phoneNbrs, Context context) throws IOException, InterruptedException { System.out.println("-------------------"); for (Text text : phoneNbrs) {//这里就是迭代器,相当于调用ValueIterable.hashNext System.out.println(bean); context.write(text, bean); } } }
最近实在是不知道学点什么了呦,就把hadoop回顾一下,当初学时,为了快速上手,都是记各种理论以及结论,没有时间去看源码验证,也不知道人家说的结论是否正确,这次回滚就是看源码验证当初结论的正确性。这也快一年没有用了,最近一直从事分布式实时计算的研究。