ReduceJoin 案例 Reduce| 学习笔记

开发者学堂课程【Hadoop 分布式计算框架 MapReduc:ReduceJoin 案例Reduce学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/94/detail/1548


ReduceJoin 案例 Reduce

 

代码示例

package com.liun.mr.reducejoin;

 

import java.io.IOException;

import java.lang.reflect.InvocationTargetException;

import java.util.ArrayList;

 

import org.apache.commons.beanutils.BeanUtils;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class TableReducer<E> extends Reducer<Text, TableBean, TableBean, NullWritable> {

 

@Override

protected void reduce(Text key, Iterable<TableBean> values, Context context)

throws IOException, InterruptedException {

 

// 存储所有订单集合

ArrayList<TableBean> orderBeans = new ArrayList<>();

// 存储产品信息

TableBean pdBean = new TableBean();

 

for (TableBean tableBean : values) {

 

if ("order".equals(tableBean.getFlag())) {// 订单表

 

TableBean tmpBean = new TableBean();

 

try {

// 拷贝传递过来的每条订单数据到集合中

BeanUtils.copyProperties(tmpBean, tableBean);

 

orderBeans.add(tmpBean);

 

} catch (IllegalAccessException e) {

e.printStackTrace();

} catch (InvocationTargetException e) {

e.printStackTrace();

}

} else {// 产品表

try {

// 拷贝传递过来的产品表到内存中

BeanUtils.copyProperties(pdBean, tableBean);

 

} catch (IllegalAccessException e) {

e.printStackTrace();

} catch (InvocationTargetException e) {

e.printStackTrace();

}

}

}

 

for (TableBean tableBean : orderBeans) {

tableBean.setPname(pdBean.getPname());

 

// 写出

context.write(tableBean, NullWritable.get());

}

}

}


上一篇:ReduceJoin 案例 Mapper| 学习笔记


下一篇:NLineInputFormat 案例实现| 学习笔记