User协同过滤(基于Spark实现)

项目地址:https://github.com/ChanKamShing/UserCF_Spark.git

推荐系统的作业流程:

召回/match(推荐引擎)-> 物品候选集 -> 过滤 -> 排序 -> 策略(保证结果多样性) -> 推荐list

协同过滤CF属于第一阶段,我们常常称之为“推荐引擎”。“推荐引擎”可以有多个基准,包括:基于相似用户、基于相似物品、基于特征搜索,以及基于热门等方式。通过不同的方式可以解决不同的问题,譬如冷启动问题,这里介绍的是基于相似用户的方式。

在本文中,不会详细介绍代码,主要从逻辑上讲述。

基本步骤:
1、找出当前用户的若干个相似用户,取出每个相似用户购买过的商品(或打分过的电影)集合;

2、基于当前用户的购买过的商品(或打分过的电影)集合,对其相似用户购买过的商品(或打分过的电影)集合进行过滤,得出存在相似用户,同时不存在当前用户的商品(或电影)集合;

3、基于当前用户与相似用户之间的相似度,以及用户对商品(或电影)的打分,进行排序取topN,得到物品候选集

具体实现:
u.data数据格式(user_id, item_id, rating, timestamp):

一、创建源数据

        这里采用数据源为hive,同样的,可以创建一张具有(user_id, item_id, rating, timestamp)字段的hive表,这样子就可以通过Spark的DF、SparkSQL等组建对数据进行处理。

二、计算用户相似度

        区别于上一篇的python实现,使用Spark实现,是使用向量的cosine定理。

cosine=a*b/|a|*|b|

每个用户相当于一个向量,他们各自购买过的商品是其对应向量的维度,维度值就是商品的分值。

1、计算分母

        通过向量的定义,那么在Spark操作hive数据当中,就显得特别容易,

  1. 对每一行数据的rating进行平方运算;
  2. 基于user_id进行聚合;
  3. 再对平方后的rating求和,再开根号;

经过上述步骤,可以将所有的向量的模都求出来。得到的DF数据结构为:(user_id, rating_sqrt_sum)

val userScoreSum = userDataDF.rdd.map(x=>(x(0).toString,x(2).toString))
            .groupByKey()
            .mapValues(x=>sqrt(x.toArray.map(rating=>pow(rating.toDouble,2)).sum))
            .toDF("user_id","rating_sqrt_sum")

2、计算分子

        分子部分是两个向量之间进行点乘,即向量之间的各个维度进行一一相乘,再相加。所以先基于原始DF,重新copy一份,作为相似用户的DF,然后基于item_id,对两张表进行聚合,构建了DF的数据结构为:(item_id, user_id, rating, user_v, rating_v)。有了这张表,就可以对rating和rating_v进行相乘,然后基于user_id、user_v做聚合操作,再将刚才rating和rating_v的乘积进行累加,就可以算出分子,计算得到的DF数据结构:(user_id, user_v, rating_dot)

//        倒排表(基于item的笛卡儿积)
        val vDataDF = userDataDF.selectExpr("user_id as user_v", "item_id", "rating as rating_v")
        val u_v_decare = userDataDF.join(vDataDF,"item_id")
            .filter("case(rating as long)<>case(rating_v as long)")
//        计算分子,维度值(rating)点乘,累加求和
        val df_product = u_v_decare.selectExpr("item_id","user_id","user_v","case(rating as double)*case(rating_v as double) as prod")
        val df_sim_group = df_product.groupBy("user_id","user_v")
            .agg("prod"->"sum")
            .withColumnRenamed("sum(prod)","rating_dot")

3、计算cosine

        构建一个新的DF数据结构,分别基于user_id,user_v,将步骤1、2的DF进行聚合,得到(user_id, user_v, rating_dot, rating_sqrt_sum, rating_sqrt_sum_v),接着对每一行数据直接套用cosine公式,最后选取需要的字段,构成新数据结构:(user_id, user_v)。

//计算整个cosine
val vScoreSum = userScoreSum.selectExpr("user_id as user_v","rating_sqrt_sum as rating_sqrt_sum_v")
val df_sim_cosine = df_sim_group
    .join(userScoreSum,"user_id")
    .join(vScoreSum,"user_v")
    .selectExpr("user_id","user_v","rating_dot/(rating_sqrt_sum*rating_sqrt_sum_v) as cosine_sim")

三、过滤商品,并对商品进行打分

1、过滤

        过滤商品之前,我们需要做得事情,首先获取topN个相似用户,然后取出这topN个相似用户所对应的物品集合,再进行过滤。

1.1、获取topN相似用户

        df_sim_cosine的结构里面是(user_id, user_v, cosine_sim),这个结构的数据已经是包括用户两两之间的相似度,换句话说,只要根据user_id做聚合,然后基于cosine做反向排序,slice切片,就可以取到user_id的topN个相似用户。

//使用slice取得topN个相似用户
val sim_user_topN = df_sim_cosine.rdd.map(row=>(row(0).toString,(row(1).toString,row(2).toString)))
    .groupByKey()
    .mapValues(_.toArray.sortWith((x,y)=>x._2>y._2).slice(0,10))    //列转行, RDD[(String, Array[(String, String)])]
    .flatMapValues(x=>x)    //行转列, RDD[(String, (String, String))]
    .toDF("user_id","user_v_sim")
    .selectExpr("user_id","user_v_sim._1 as user_v","user_v_sim._2 as cosine_sim")//将一个tuple的字段拆分成两个字段

1.2、获取user_id和其相似用户物品列表

        经过上一步可以获取topN个相似用户的数据,只要分别基于user_id,user_v进行关联,就可以将商品列表给关联上,而且关联后的数据,附带了商品的打分,便于后面给候选商品列表打分。

        val df_user_items = userDataDF.rdd.map(row=>(row(0).toString,row(1).toString+"_"+row(2).toString))
            .groupByKey()
            .mapValues(_.toArray)
            .toDF("user_id","item_rating_arr")
 
        val df_user_items_v = df_user_items.selectExpr("user_id as user_id_v", "item_rating_arr as item_rating_arr_v")
        //依次基于user_id、user_v聚合
        val df_gen_item = sim_user_topN
            .join(df_user_items,"user_id")
            .join(df_user_items_v,"user_v")

要知道,userDataDF的数据结构是(user_id, item_id, rating),所以,要获取“列表”,则必须对user_id进行聚合,这里做了一个格式处理,将item和rating用“_”连接,合并成一个数据处理。

1.3、过滤商品

        由于上一步对item和rating的数据结构进行处理,所以这一步需要定义一个UDF来对商品进行过滤。

//        用一个udf从user_v的商品集合中,将与user_id具有相同的商品过滤掉,得到候选集
        import org.apache.spark.sql.functions._
        val filter_udf = udf{(items:Seq[String],items_v:Seq[String])=>
            val fMap = items.map{x=>
                val l = x.split("_")
                (l(0),l(1))
            }.toMap
            //返回items_v,过滤商品
            items_v.filter{x=>
                val l = x.split("_")
                fMap.getOrElse(l(0),-1) == -1
            }
        }

items参数是user_id的商品集,items_v是user_v的商品集,使用该UDF后,会得到一个在user_v商品集基础上过滤掉user_id商品集的、全新的商品集,然后选取需要的列构建新的DF。

//过滤掉user_id商品的DF数据(user_id, consine_sim, item_rating)
        val df_filter_item = df_gen_item.withColumn("filtered_item", filter_udf(df_gen_item("item_rating_arr"),df_gen_item("item_rating_arr_v")))
            .select("user_id","cosine_sim", "filtered_item")

2、给候选商品进行打分

(物品分数=用户相似度*相似用户对电影(物品)的打分)

        经过过滤操作,我们得到一个数据结构(user_id, cosine_sim, filtered_item)的DF,现在显而易见,需要的参数已经有了,剩下的就是直接套用公式。但是不要忘记,filtered_item的数据是一个Array类型,是一个商品的集合,所以可以定义一个UDF,作用是遍历商品集合,分别乘以对应的cosine_sim。

        val simRatingUDF = udf{(sim:Double,items:Seq[String])=>
            items.map{item_rating=>
                val l = item_rating.split("_")
                l(0)+"_"+l(1).toDouble*sim
            }
        }

得到的仍然是一个Array类型数据,即topN里的每个相似用户对应的物品的集合,我们最终要的是topN相似用户的商品集合组成的总的商品集合,再取topN个商品,所以,必须将Array拆开,可以使用explode。

//DF:(user_id,item_prod)
        val itemSimRating = df_filter_item.withColumn("item_prod",simRatingUDF(df_filter_item("cosine_sim"),df_filter_item("filtered_item")))
            .select("user_id","item_prod")
        //行转列Array[item_prod],并分割item_pro。
        // 注意:得出的数据结果,会出现多个相同的user_id->item,因为同一个user_id的不同相似用户,可能会有同一样商品,分割后,就出现这情况
        val userItemScore = itemSimRating.select(itemSimRating("user_id"),explode(itemSimRating("item_prod")))
            .selectExpr("user_id","split('item_prod','_')[0] as item_id","case(split('item_prod','_')[1] as double) as score")//将一个字符串的字段拆分成两个字段

        这里又会出现一个问题,user_id有topN个相似用户,他们对应得到的商品集合里面,很大可能存在相同的item,那么就需要基于user_id和item_id做一个聚合,然后将相同item_id的打分进行累加,这才候选商品最后的打分。(区别于上一篇的python实现,它是基于杰卡尔德计算用户相似度,使用的是商品数量,相当于商品的原始权重都为1,并没有区分出一些具有代表性意义的商品,所以它还要对商品进行log降权处理;但是cosine计算相似度,直接使用rating值,rating值已经是对商品标上不同的权重)

//DF:(user_id,item_prod)
        val itemSimRating = df_filter_item.withColumn("item_prod",simRatingUDF(df_filter_item("cosine_sim"),df_filter_item("filtered_item")))
            .select("user_id","item_prod")
        //行转列Array[item_prod],并分割item_pro。
        // 注意:得出的数据结果,会出现多个相同的user_id->item,因为同一个user_id的不同相似用户,可能会有同一样商品,分割后,就出现这情况
        val userItemScore = itemSimRating.select(itemSimRating("user_id"),explode(itemSimRating("item_prod")))
            .selectExpr("user_id","split('item_prod','_')[0] as item_id","case(split('item_prod','_')[1] as double) as score")//将一个字符串的字段拆分成两个字段
        //基于user_id和item_id做聚合
        val userItemScoreSum = userItemScore.groupBy("user_id","item_id")
            .agg("score"->"sum")
            .withColumnRenamed("sum(score)","last_score")

四、取topN商品

        //排序取topN商品
        val df_rec = userItemScoreSum.rdd.map(row=>(row(0),(row(1).toString,row(2).toString)))
            .groupByKey()
            .mapValues(_.toArray.sortWith((x,y)=>x._2>y._2).slice(0,10))
            .flatMapValues(x=>x)
            .toDF("user_id","item_sim")
            .selectExpr("user_id","item_sim._1 as item_id", "item_sim._2 as score")

 

上一篇:技术问答-18


下一篇:线程工具CyclicBarrier