MapReduce Cross 示例
package com.bsr.cross; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 第一次mr--目的是获取某一人是哪些人的好友 * * */ public class Cross { //输入:A:B,C,D,F,E,O //输出:B->A C->A D->A F->A E->A O->A public static class Map extends Mapper<LongWritable, Text, Text, Text>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] value1=value.toString().split(":"); String[] value2=value1[1].split(","); for (String string : value2) { context.write(new Text(string), new Text(value1[0])); } } } public static class Reduce extends Reducer<Text, Text, Text, Text>{ // 输入<B->A><B->E><B->F>.... // 输出 B A,E,F,J @Override protected void reduce(Text key, Iterable<Text> value,Context context) throws IOException, InterruptedException { StringBuffer sb=new StringBuffer(); for (Text text : value) { sb.append(text+","); } context.write(key, new Text(sb.toString())); } } public static void main(String[] args) throws Exception { //读取classpath下的所有xxx-site.xml配置文件,并进行解析 Configuration conf=new Configuration(); FileSystem fs = FileSystem.get(configuration); String s = "/wc/output3"; Path path = new Path(s); fs.delete(path, true); Job job=Job.getInstance(conf); //通过主类的类加载器机制获取到本job的所有代码所在的jar包 job.setJarByClass(Cross.class); //指定本job使用的mapper类 job.setMapperClass(Map.class); //指定本job使用的reducer类 job.setReducerClass(Reduce.class); //指定mapper输出的kv数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //指定reducer输出的kv数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //指定本job要处理的文件所在的路径 FileInputFormat.setInputPaths(job, new Path("/wc/data/")); FileOutputFormat.setOutputPath(job, new Path("/wc/output3")); //将本job向hadoop集群提交执行 boolean flag=job.waitForCompletion(true); System.exit(flag?0:1); } }
进行了逻辑的转换;