# 示例
- A:C,F,H,M,E
B:F,H,E,X,C
C:B,F,D,E
D:M,H,B,D
Map阶段
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com._51doit.AnLie.FlowMapper;
import com._51doit.AnLie.FlowReducer;
import com.sun.jersey.api.ConflictException;
public class Friends {
static class FriendsMapper extends Mapper<LongWritable, Text, Text, Text>
{
Text k= new Text();
Text v= new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] sp = line.split(":");
String user = sp[0];//value
v.set(user);
String fs = sp[1];
String[] f = fs.split(",");//
for (String st : f) {//key
k.set(st);
context.write(k, v);//输出为b,a c,a等
}
}
}
static class FriendsReducer extends Reducer<Text, Text, Text, NullWritable>
{
@Override
protected void reduce(Text key, Iterable<Text> itera, Reducer<Text, Text, Text,NullWritable>.Context context)
throws IOException, InterruptedException {
//Text都是一个一个的,我们要把他们的共同用户放置一个list集合里
List<String> list = new ArrayList<>();
for (Text text : itera) {
//Text是同一个对象,但是toString不是同一个对象
list.add(text.toString());
}
Collections.sort(list);
for (int i = 0; i < list.size()-1; i++) {
for (int j = i+1; j < list.size(); j++) {
String k=list.get(i)+"和"+list.get(j)+"的共同好友是:"+key.toString();
context.write(new Text(k), NullWritable.get());
}
}
}
}
public static void main(String[] args) throws Exception {
//配置
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 设置map和reduce类 调用类中自定义的map reduce方法的业务逻辑
job.setMapperClass(FriendsMapper.class);
job.setReducerClass(FriendsReducer.class);
//Mapper的输出格式
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//Reducer的输出格式
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//输出文件
FileInputFormat.setInputPaths(job, new Path("D:\\www"));
//输入文件
FileOutputFormat.setOutputPath(job, new Path("D:\\rr"));
//提交任务
job.waitForCompletion(true);
}
}
第一阶段的结果:
C和D的共同好友是:B A和B的共同好友是:C C和D的共同好友是:D A和B的共同好友是:E A和C的共同好友是:E
B和C的共同好友是:E A和B的共同好友是:F A和C的共同好友是:F B和C的共同好友是:F A和B的共同好友是:H
A和D的共同好友是:H B和D的共同好友是:H A和D的共同好友是:M
Reduce阶段
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Friends2 {
static class FriendsMapper extends Mapper<LongWritable, Text, Text, Text>
{
Text k= new Text();
Text v= new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] sp = line.split(":");
k.set(sp[0]);
v.set(sp[1]);
context.write(k, v);
}
}
static class FriendsReducer extends Reducer<Text, Text, Text, Text>
{ Text v=new Text();
@Override
protected void reduce(Text key, Iterable<Text> itera, Reducer<Text, Text, Text,Text>.Context context)
throws IOException, InterruptedException {
StringBuilder b = new StringBuilder();
for (Text text : itera) {
b.append(text.toString()+"");
}
v.set(b.toString().trim());
context.write(key, v);
}
}
public static void main(String[] args) throws Exception {
//配置
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 设置map和reduce类 调用类中自定义的map reduce方法的业务逻辑
job.setMapperClass(FriendsMapper.class);
job.setReducerClass(FriendsReducer.class);
//Mapper的输出格式
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//Reducer的输出格式
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//输出文件
FileInputFormat.setInputPaths(job, new Path("D:\\rr"));
//输入文件
FileOutputFormat.setOutputPath(job, new Path("D:\\vvv"));
//提交任务
job.waitForCompletion(true);
}
}
第二阶段结果:
A和B的共同好友是 FCHE A和C的共同好友是 EF A和D的共同好友是 MH B和C的共同好友是 EF
B和D的共同好友是 H C和D的共同好友是 DB
MapReduce的两个阶段分为治之的思想,奠定了大数据离线批处理的si'xin思想。有兴趣的,可以找一下Goole的那3篇关于离线处理大数据思想的论文。