MapReduce实例——好友推荐

1. 实例介绍

好友推荐算法在实际的社交环境中应用较多,比如qq软件中的“你可能认识的好友”或者是Facebook中的好友推介。好友推荐功能简单的说是这样一个需求,预测某两个人是否认识,并推荐为好友,并且某两个非好友的用户,他们的共同好友越多,那么他们越可能认识。

2. 数据流程

MapReduce实例——好友推荐

3. 具体实现

3.1 上传数据qq.txt

tom		cat	hadoop	hello
hello	mr	tom	world	hive
cat	tom	hive
hive	cat	hadoop	world	hello	mr
mr		hive	hello
hadoop	tom	hive	world
world	hadoop	hive	hello

每个名字之间用”\t”分隔符隔开,第一个是用户,之后是好友列表。

3.2 代码编写

3.2.1 类型定义

由于A1:A2与A2:A1是同一个潜在好友列表,为了能够方便的统计,故统一按照字典排序,输出A1:A2格式。

public class FoF extends Text {
	
	public  FoF() {
		super();
	}
	
	public FoF(String friend01,String friend02) {
		set(getof(friend01,friend02));
	}

	private String getof(String friend01, String friend02) {
		
		int c = friend01.compareTo(friend02);
		if (c>0) {
			return friend02+"\t"+friend01;
		} 
		return friend01+"\t"+friend02;
	}
}

3.2.2 定义Map01

/**
 * map函数,统计好友之间的FOF关系列表(FOF关系:潜在好友关系)
 * @author JankoWilliam
 *
 */
public class Map01 extends Mapper<LongWritable, Text, FoF, IntWritable> {
	
	@Override
	protected void map(LongWritable key, Text value,Context context)
			throws IOException, InterruptedException {
		String lines = value.toString(); // 用户所有的好友列表
		String friends[] = StringUtils.split(lines, '\t');
		//好友之间的FOF关系矩阵
		for (int i = 1; i < friends.length; i++) {
			String friend = friends[i];
			context.write(new FoF(friends[0],friend),new IntWritable(0));//输出好友列表,值为0。方便在reduce阶段去除已经是好友的FOF关系。
			
			for (int j = i+1; j < friends.length; j++) {
				String friend2 = friends[j];
				context.write(new FoF(friend, friend2), new IntWritable(1));//输出好友之间的FOF关系列表,值为1,方便reduce阶段累加
			}
		}		
	}	
}

3.2.2 定义Reduce01

/**
 * reduce函数,统计全部的FOF关系列表的系数
 * @author JankoWilliam
 *
 */
public class Reduce01 extends Reducer<FoF, IntWritable,Text, NullWritable> {
	
	@Override
	protected void reduce(FoF key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
	
		int sum = 0;
		boolean f = true;
		for(IntWritable i : values){
			if (0==i.get()) { //已经是好友关系
				f=false;
				break;
			}
			sum+=i.get();	//累计,统计FOF的系数
		}
		System.out.println("******************Reduce01*******************");
		if (f) {
			String msg = StringUtils.split(key.toString(), '\t')[0]+" "+StringUtils.split(key.toString(), '\t')[1]+" "+sum;
			System.out.println(msg);
			context.write(new Text(msg), NullWritable.get()); //输出key为潜在好友对,值为出现的次数
		}
	}
}

3.2.3 定义 Job类

	public static void main(String[] args) {
		Boolean flag = jobOne();
	}
	// MapReduce01
	private static Boolean jobOne() {
		Configuration config = new Configuration();
		config.set("fs.defaultFS", "hdfs://node01:8020");
		config.set("yarn.resourcemanager.hostname", "node02:8088");
		
		boolean flag = false;
		try{
			Job job = Job.getInstance(config);
			
			job.setJarByClass(JobFriends.class);
			job.setJobName("fof one job");
			
			job.setMapperClass(Map01.class);
			job.setReducerClass(Reduce01.class);
			
			job.setOutputKeyClass(FoF.class);
			job.setOutputValueClass(IntWritable.class);
			
			FileInputFormat.addInputPath(job,new Path("/friend/input/qq.txt"));
			
			Path output = new Path("/friend/output/01");
			FileSystem fs = FileSystem.get(config);
			
			if (fs.exists(output)) {
				fs.delete(output,true);
			}
			FileOutputFormat.setOutputPath(job, output);
			
			flag = job.waitForCompletion(true);
			if (flag) {
				System.out.println("job1 success...");
			}
		} catch(Exception e){
			e.printStackTrace();
		} 
		
		return flag;
	}

输出结果:

cat hadoop 2
cat hello 2
cat mr 1
cat world 1
hadoop hello 3
hadoop mr 1
hive tom 3
mr tom 1
mr world 2
tom world 2

3.2.4 好友推介计算,类型定义

由于在MapReduce中,key值自动能够排序,而value值往往不可以。所以为了根据每一个用户与其他用户的共同好友个数从高到低排序,不仅需要将用户名作为key,还需要将该用户与推介用户的共同好友个数作为key的一部分,所以需要重新定义一个类。

/**
 *  FriendSort作为key和value,FriendSort:用户名+FOF系数  
 *  用户名一致,FOF系数从大到小排序。 很容易得到一个用户的好友推介的列表
 * @author JankoWilliam
 *
 */
public class FriendSort implements WritableComparable<FriendSort>{
	
	private String friend;
	private int hot;
	
	public String getFriend() {
		return friend;
	}

	public void setFriend(String friend) {
		this.friend = friend;
	}

	public int getHot() {
		return hot;
	}

	public void setHot(int hot) {
		this.hot = hot;
	}
	

	public FriendSort() {
		super();
	}
	
	public FriendSort(String friend, int hot) {
		this.friend = friend;
		this.hot = hot;
	}
	//反序列化
	@Override
	public void readFields(DataInput in) throws IOException {
		this.friend=in.readUTF();
		this.hot=in.readInt();
	}
	//序列化
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(friend);
		out.writeInt(hot);
	}
	//判断是否为同一用户,并通过hot值排序
	@Override
	public int compareTo(FriendSort newFriend) {
		System.out.println(friend+"-------"+newFriend.getFriend());

		int c = friend.compareTo(newFriend.getFriend());
		int e = -Integer.compare(hot, newFriend.getHot());
		if (c==0) {
			return e;
		}
		return c;
	}
}	

3.2.5 定义Map02

/**
 * map函数,每个用户的推介好友列表,并按推介指数从大到小排序
 * @author JankoWilliam
 *
 */
public class Map02 extends Mapper<LongWritable, Text, FriendSort, Text > {
	
	@Override
	protected void map(LongWritable key, Text value,Context context)
			throws IOException, InterruptedException {
		
		String lines = value.toString();	
		String friend01 = StringUtils.split(lines,' ')[0];
		String friend02 = StringUtils.split(lines,' ')[1];	//推介的好友
		int hot = Integer.parseInt(StringUtils.split(lines,' ')[2]);	// 该推介好友的推介系数
		
		System.out.println("**************Map02******************");
		System.out.println(friend01+"  "+friend02+"  "+hot);
		System.out.println(friend02+"  "+friend01+"  "+hot);
		context.write(new FriendSort(friend01,hot),new Text(friend02+":"+hot));	 //mapkey输出用户和好友推介系数
		context.write(new FriendSort(friend02,hot),new Text(friend01+":"+hot));	//好友关系是相互的
		
	}
}

3.2.6 定义sort类

/**
 * 将key根据用户名和次数排序
 * @author JankoWilliam
 *  
 */
public class NumSort extends WritableComparator{
	public NumSort(){
		super(FriendSort.class,true);
	}
	
	public int compare(WritableComparable a, WritableComparable b) {
		FriendSort o1 =(FriendSort) a;
		FriendSort o2 =(FriendSort) b;
		
		int r =o1.getFriend().compareTo(o2.getFriend());
		if(r==0){
			return -Integer.compare(o1.getHot(), o2.getHot());
		}
		return r;
	}
}

3.2.7 定义group类

	/**
	 *  将同一个用户作为一个Group,同时被reduce处理
	 * @author JankoWilliam
	 *
	 */
	public class UserGroup extends WritableComparator{
		public UserGroup(){
			super(FriendSort.class,true);
		}
		
		public int compare(WritableComparable a, WritableComparable b) {
			FriendSort o1 =(FriendSort) a;
			FriendSort o2 =(FriendSort) b;
			return o1.getFriend().compareTo(o2.getFriend());
		}
	}	

3.2.8 定义Reduce02

/**
 * reduce函数
 * @author JankoWilliam
 *
 */
public class Reduce02 extends Reducer<FriendSort, Text, Text, Text> {
	
	@Override
	protected void reduce(FriendSort user, Iterable<Text> friends,Context context)
			throws IOException, InterruptedException {
		
		String msg = "";
		//拼接推荐好友
		for(Text friend :friends){
			System.out.println("***************Reduce02*****************");
			msg += friend.toString()+",";
			System.out.println(msg);
		}
		context.write(new Text(user.getFriend()), new Text(msg));	
	}	
}

3.2.9 定义 Job类

public class JobFriends {
	
	public static void main(String[] args) {
		Boolean flag = jobOne();
		if (flag) {
			jobTwo();
		}
	}
	
	private static Boolean jobTwo() {  
		 Configuration config =new Configuration();
		 config.set("fs.defaultFS", "hdfs://node01:8020");
		 config.set("yarn.resourcemanager.hostname", "node02:8088");
		 
		 Boolean flag = false;
		 try {
			 Job job = Job.getInstance(config);
			 
			 job.setJarByClass(JobFriends.class);
			 job.setJobName("fof two job");
			 
			 job.setMapperClass(Map02.class);
			 job.setReducerClass(Reduce02.class);
			 job.setSortComparatorClass(NumSort.class); //sort类
			 job.setGroupingComparatorClass(UserGroup.class); //group类
			 job.setMapOutputKeyClass(FriendSort.class);
			 job.setMapOutputValueClass(Text.class);
			 
			 
			 FileInputFormat.addInputPath(job, new Path("/friend/output/01/"));
			 
			 Path output = new Path("/friend/output/02/");
			 
			 FileSystem fs = FileSystem.get(config);
			 if (fs.exists(output)) {
				 fs.delete(output, true);
			 }
			 
			 FileOutputFormat.setOutputPath(job, output);
			 
			 flag = job.waitForCompletion(true);
			 if (flag) {
				 System.out.println("job2 success...");
			 }
			 
		 } catch (Exception e) {
			 e.printStackTrace();
		 };
		 return flag;
	 }
}

输出结果,完成好友推荐列表:

cat		hello:2,hadoop:2,mr:1,world:1,
hadoop	hello:3,cat:2,mr:1,
hello	hadoop:3,cat:2,
hive	tom:3,
mr		world:2,hadoop:1,tom:1,cat:1,
tom		hive:3,world:2,mr:1,
world	mr:2,tom:2,cat:1,
上一篇:【BZOJ2732】【HNOI2012】射箭 二分+半平面交


下一篇:547. Friend Circles