【外连接】是在【内连接】的基础上稍微修改即可。具体HQL语句详见Hive查询Join
package join.map; import java.io.IOException;
import java.util.ArrayList;
import java.util.List; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class JoinOn { public static void main(String[] args) throws Exception { //临时配置windows的环境变量
System.setProperty("hadoop.home.dir", "D:\\workspace\\hadoop-2.2.0"); Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(JoinOn.class); job.setMapperClass(JOMapper.class);
job.setReducerClass(JOReducer.class); job.setMapOutputKeyClass(VLongWritable.class);
job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true)? 0:1); } public static class JOMapper extends Mapper<LongWritable, Text, VLongWritable, Text>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException { //获取当前分片所对应的文件名
String name = ((FileSplit)context.getInputSplit()).getPath().getName(); String[] splited = value.toString().split("\t"); if(name.endsWith("sales")){ //sales表
//<key,value> --> <id, things+':'+name+'\t'+id>
context.write(new VLongWritable(Long.parseLong(splited[1])), new Text(name+":"+value.toString()));
}else if(name.endsWith("things")) {
//<key,value> --> <id, sales+':'+id+'\t'+name>
context.write(new VLongWritable(Long.parseLong(splited[0])), new Text(name+":"+value.toString()));
}
}
} public static class JOReducer extends Reducer<VLongWritable, Text, Text, Text>{
@Override
protected void reduce(VLongWritable key, Iterable<Text> v2s, Context context)
throws IOException, InterruptedException { //分别存储sales和things两表的name
List<String> sales=new ArrayList<String>();
List<String> things=new ArrayList<String>(); for(Text text : v2s){
String[] splited = text.toString().split(":"); //sales表中的数据
if(splited[0].endsWith("sales")){ //加入集合
sales.add(splited[1]);
}
//things表中数据
else if(splited[0].endsWith("things")){
things.add(splited[1]);
}
}
//笛卡尔积
/**
* 左外连接:只要求左表中有数据即可
*/
if(sales.size()!=0 /*&& things.size()!=0*/){
for(String sale : sales){
//如果右表中没有数据,则使用 NULL 代替
if(things.size()==0){
context.write(new Text(sale), new Text("NULL"+"\t"+"NILL"));
}else {//如果右表中有数据,则直接输出
for(String thing : things){
context.write(new Text(sale), new Text(thing));
}
}
}
}
}
}
}
总结:
1).左外连接:左表全部显示,右表不匹配的部分以NULL替代。
2).代码实现即要求左表不为空即可,右表为空则以NULL输出,右表不为空则直接输出。