06.Mapreduce实例——Reduce端join
实验原理
在Reudce端进行Join连接是MapReduce框架进行表之间Join操作最为常见的模式。
1.Reduce端Join实现原理
(1)Map端的主要工作,为来自不同表(文件)的key/value对打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
(2)Reduce端的主要工作,在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在map阶段已经打标志)分开,最后进行笛卡尔只就ok了。
2.Reduce端Join的使用场景
Reduce端连接比Map端连接更为普遍,因为在map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中,但是Reduce端连接效率比较低,因为所有数据都必须经过Shuffle过程。
3.本实验的Reduce端Join代码执行流程:
(1)Map端读取所有的文件,并在输出的内容里加上标识,代表数据是从哪个文件里来的。
(2)在Reduce处理函数中,按照标识对数据进行处理。
(3)然后将相同的key值进行Join连接操作,求出结果并直接输出。
实验步骤
1. 建两个文本文档,用逗号分隔开,数据如下
orders1表
订单ID 订单号 用户ID 下单日期
52304 111215052630 176474 2011-12-15 04:58:21
52303 111215052629 178350 2011-12-15 04:45:31
52302 111215052628 172296 2011-12-15 03:12:23
52301 111215052627 178348 2011-12-15 02:37:32
52300 111215052626 174893 2011-12-15 02:18:56
52299 111215052625 169471 2011-12-15 01:33:46
52298 111215052624 178345 2011-12-15 01:04:41
52297 111215052623 176369 2011-12-15 01:02:20
52296 111215052622 178343 2011-12-15 00:38:02
52295 111215052621 178342 2011-12-15 00:18:43
52294 111215052620 178341 2011-12-15 00:14:37
52293 111215052619 178338 2011-12-15 00:13:07
order_items1表
明细ID 订单ID 商品ID
252578 52293 1016840
252579 52293 1014040
252580 52294 1014200
252581 52294 1001012
252582 52294 1022245
252583 52294 1014724
252584 52294 1010731
252586 52295 1023399
252587 52295 1016840
252592 52296 1021134
252593 52296 1021133
252585 52295 1021840
252588 52295 1014040
252589 52296 1014040
252590 52296 1019043
2. 虚拟机中启动Hadoop
3. 本地新建/data/mapreduce6目录。
mkdir -p /data/mapreduce6
4. 将两个表上传到虚拟机中
5. 上传并解压hadoop2lib文件
6. 在HDFS上新建/mymapreduce6/in目录,然后将Linux本地/data/mapreduce6目录下的orders1和order_items1文件导入到HDFS的/mymapreduce6/in目录中。
hadoop fs -mkdir -p /mymapreduce6/in
hadoop fs -put /data/mapreduce6/orders1 /mymapreduce6/in
hadoop fs -put /data/mapreduce6/order_items1 /mymapreduce6/in
7. IDEA中编写Java代码
8. package mapreduce6;
import java.io.IOException;
import java.util.Iterator;
import java.util.Vector;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class ReduceJoin {
public static class mymapper extends Mapper<Object, Text, Text, Text>{
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String filePath =
((FileSplit)context.getInputSplit()).getPath().toString();
if (filePath.contains("orders1")) {
String line = value.toString();
String[] arr = line.split(",");
context.write(new Text(arr[0]), new Text( "1+" + arr[2]+"\t"+arr[3]));
//System.out.println(arr[0] + "_1+"
+ arr[2]+"\t"+arr[3]);
}else if(filePath.contains("order_items1")) {
String line =
value.toString();
String[] arr = line.split(",");
context.write(new Text(arr[1]), new Text("2+" + arr[2]));
//System.out.println(arr[1] + "_2+"
+ arr[2]);
}
}
}
public static class myreducer extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException {
Vector
left = new Vector();
Vector right = new Vector();
for (Text val : values) {
String str =
val.toString();
if (str.startsWith("1+")) {
left.add(str.substring(2));
}
else if (str.startsWith("2+")) {
right.add(str.substring(2));
}
}
int sizeL = left.size();
int sizeR = right.size();
//System.out.println(key +
"left:"+left);
//System.out.println(key +
"right:"+right);
for (int i = 0; i < sizeL; i++) {
for (int j = 0; j < sizeR; j++) {
context.write( key, new Text(
left.get(i) + "\t" + right.get(j) ) );
//System.out.println(key + " \t"
+ left.get(i) + "\t" + right.get(j));
}
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance();
job.setJobName("reducejoin");
job.setJarByClass(ReduceJoin.class);
job.setMapperClass(mymapper.class);
job.setReducerClass(myreducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
Path left = new Path("hdfs://localhost:9000/mymapreduce6/in/orders1");
Path right = new Path("hdfs://localhost:9000/mymapreduce6/in/order_items1");
Path out = new Path("hdfs://localhost:9000/mymapreduce6/out");
FileInputFormat.addInputPath(job, left);
FileInputFormat.addInputPath(job, right);
FileOutputFormat.setOutputPath(job, out);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
9. 将hadoop2lib目录中的jar包,拷贝到hadoop2lib目录下。
10. 拷贝log4j.properties文件
11. 运行结果