基于Spark机器学习和实时流计算的智能推荐系统

概要:

随着电子商务的高速发展和普及应用,个性化推荐的推荐系统已成为一个重要研究领域。

个性化推荐算法是推荐系统中最核心的技术,在很大程度上决定了电子商务推荐系统性能的优劣,决定着是否能够推荐用户真正感兴趣的信息,而面对用户的不断提升的需求,推荐系统不仅需要正确的推荐,还要实时地根据用户的行为进行分析并推荐最新的 结果。

实时推荐系统的任务就是为每个用户,不断地、精准地推送个性化的服务,甚至到达让用户体会到推荐系统比他们更了解自己的感觉。

本文主要研究的是基于模型的协同过滤算法—ALS以及实时推荐系统的可行性并详细讲解ALS(交替最小二乘法)的思想

然后在Spark Streaming框架上运用ALS算法进行测试,评估实时推荐中算法的可靠性

最后,在Spark Mllib和Streaming框架上构建了实时推荐引擎,将推荐数据保存在Hbase中,WebApp通过读取Hbase中的推荐数据来向用户展示推荐结果

关于其他类别的推荐算法就不细说了,网上有很多的资料查看,推荐几篇文章:

IBM-探索推荐引擎内部的秘密系列

以及向亮的《推荐系统实践》

下载地址

下面进入正文

基于矩阵分解的协同过滤算法–ALS:

基于模型的协同过滤推荐就是基于样本的用户喜好信息,训练一个推荐模型,然后根据实时的用户喜好的信息进行预测,计算推荐。

对于一个users-products-rating的评分数据集,ALS会建立一个user*product的m*n的矩阵(其中,m为users的数量,n为products的数量),如下图:

基于Spark机器学习和实时流计算的智能推荐系统

这个矩阵的每一行代表一个用户 (u1,u2,…,u9)、每一列代表一个产品 (v1,v2,…,v9)。用户隔天产品的打分在 1-9 之间。

但是在这个数据集中,并不是每个用户都对每个产品进行过评分,所以这个矩阵往往是稀疏的,用户i对产品j的评分往往是空的

ALS所做的事情就是将这个稀疏矩阵通过一定的规律填满,这样就可以从矩阵中得到任意一个user对任意一个product的评分,ALS填充的评分项也称为用户i对产品j的预测得分

所以说,ALS算法的核心就是通过什么样子的规律来填满(预测)这个稀疏矩阵

它是这么做的:

假设m*n的评分矩阵R,可以被近似分解成U*(V)T

U为m*d的用户特征向量矩阵

V为n*d的产品特征向量矩阵((V)T代表V的转置)

d为user/product的特征值的数量

关于d这个值的理解,大概可以是这样的:

对于每个产品,可以从d个角度进行评价,以电影为例,可以从主演,导演,特效,剧情4个角度来评价一部电影,那么d就等于4

可以认为,每部电影在这4个角度上都有一个固定的基准评分值

例如《末日崩塌》这部电影是一个产品,它的特征向量是由d个特征值组成的

d=4,有4个特征值,分别是主演,导演,特效,剧情

每个特征值的基准评分值分别为(满分为1.0):

主演:0.9

导演:0.7

特效:0.8

剧情:0.6

矩阵V由n个product*d个特征值组成

对于矩阵U,假设对于任意的用户A,该用户对一部电影的综合评分和电影的特征值存在一定的线性关系,即电影的综合评分=(a1*d1+a2*d2+a3*d3+a4*d4)

其中a1-4为用户A的特征值,d1-4为之前所说的电影的特征值

那么对于之前ALS算法的这个假设

m*n的评分矩阵R,可以被近似分解成U*(V)T

就是成立的,某个用户对某个产品的评分可以通过矩阵U某行和矩阵V(转置)的某列相乘得到

那么现在的问题是,如何确定用户和产品的特征值?(之前仅仅是举例子,实际中这两个都是未知的变量)

采用的是交替的最小二乘法

基于Spark机器学习和实时流计算的智能推荐系统

在上面的公式中,a表示评分数据集中用户i对产品j的真实评分,另外一部分表示用户i的特征向量(转置)*产品j的特征向量(这里可以得到预测的i对j的评分)

用真实评分减去预测评分然后求平方,对下一个用户,下一个产品进行相同的计算,将所有结果累加起来(其中,数据集构成的矩阵是存在大量的空打分,并没有实际的评分,解决的方法是就只看对已知打分的项)

但是这里之前问题还是存在,就是用户和产品的特征向量都是未知的,这个式子存在两个未知变量

解决的办法是交替的最小二乘法

首先对于上面的公式,以下面的形式显示:

基于Spark机器学习和实时流计算的智能推荐系统

为了防止过度拟合,加上正则化参数

基于Spark机器学习和实时流计算的智能推荐系统

基于Spark机器学习和实时流计算的智能推荐系统

首先用一个小于1的随机数初始化V

根据公式(4)求U

此时就可以得到初始的UV矩阵了,计算上面说过的差平方和

根据计算得到的U和公式(5),重新计算并覆盖V,计算差平方和

反复进行以上两步的计算,直到差平方和小于一个预设的数,或者迭代次数满足要求则停止

取得最新的UV矩阵

则原本的稀疏矩阵R就可以用R=U(V)T来表示了

ALS算法的核心就是将稀疏评分矩阵分解为用户特征向量矩阵和产品特征向量矩阵的乘积

交替使用最小二乘法逐步计算用户/产品特征向量,使得差平方和最小

通过用户/产品特征向量的矩阵来预测某个用户对某个产品的评分

算法原理讲述完毕,接下来进行算法测试

算法测试:

算法测试分为两部分:

一、测试最佳的参数,如:隐性因子个数,正则式等

二、测试在Streaming框架上算法的可用性

测试数据集来自MovieLens

测试一:

将整个数据集上传至HDFS中

在spark程序中读取ratings.dat文件,并随机划出80%作为训练数据集,20%作为测试数据集

设置隐性因子、正则式参数列表(由于物理机配置不好,集群能够支持的最大迭代次数只有7次,在多就会内存溢出,所以这里直接将迭代次数设置为7)

对参数列表的全排列分别进行模型训练,并计算MSE、RMSE

结果如下图:

基于Spark机器学习和实时流计算的智能推荐系统

比较得出最佳的参数组合,以后的模型训练参数都使用这个参数组合

测试二:

将原本的数据划分为三部分

trainingData-10k

testData-10k

剩下的为streamData,作为流数据实时发送

首先将trainingData、testData上传到HDFS/data目录下

在spark程序中读取,并转化为RDD[Rating]类型

使用Streaming框架接受流数据,并进行在线模型训练

每训练一次就计算一次MSE和RMSE

对比模型的精准性有没有提高

使用Scala读取本地的streamData,通过Socket发送到spark程序中

结果如下图:

随着数据的不断增加,模型的精准度在不断的提高,所以实时的更新推荐模型是可行的

推荐系统整合:

整体流程图:

基于Spark机器学习和实时流计算的智能推荐系统

首先用程序生成用户和图书数据,并随机模拟用户行为数据,保存在Hbase中

在Hbase数据库中包含了用户表(4000个用户),图书表(5060本图书)以及评分表(用户对图书的百万条数据)

由于对个人来说无法得到真实的商业性数据,故评分数据都是程序 模拟随机生成的,包括实时发送的流数据,所以这可能会对整个系统的推荐结果带来影响

另外,除了WebUI部分,其余的程序都是运行在Linux的Spark集群上

原始数据通过一个程序不断地向Hbase的评分表中写入数据

模拟用户在网站上的评分行为

运行截图:

基于Spark机器学习和实时流计算的智能推荐系统

其中,前300个用户的行为偏向于前600本图书(计算机相关)

实时流数据将通过另外一个程序发送Socket数据,模拟用户当前在网站上的实时评分行为

在最后使用用户进行观察测试时,程序将会只模拟这个用户的评分行为以便观察推荐系统的实时性

首先推荐引擎会读取Hbase中的评分数据

并使用算法测试时得到的最佳参数组合来对其进行训练

得到初始的模型

使用这个模型对Hbase中所有用户进行图书推荐(取 top10)

并将推荐结果保存在Hbase中

以上阶段为系统初始化阶段

运行截图:

基于Spark机器学习和实时流计算的智能推荐系统

基于Spark机器学习和实时流计算的智能推荐系统

基于Spark机器学习和实时流计算的智能推荐系统

在系统初始化完成之后,开启实时推荐引擎

接收不断生成的用户行为数据,并和Hbase中的原始数据混合,训练出新的模型,产生推荐结果保存

不断地进行流数据的读取、训练和保存推荐结果,直至系统关闭或者无流数据产生

推荐引擎运行如下图:

基于Spark机器学习和实时流计算的智能推荐系统

WebUI部分:

WebUI是由ASP.NET开发的一个简单的B/S应用,通过Thrift和Linux中的Hbase交互

选择使用一个用户观察系统的实时推荐性,此时流数据模拟程序只产生这个用户的评分行为

不同时刻,在该用户有行为数据产生的情况下,推荐的内容(细节没有仔细处理,比如有的图片找不到路径等。。。):

当前记录

基于Spark机器学习和实时流计算的智能推荐系统

基于Spark机器学习和实时流计算的智能推荐系统

新的行为数据产生的记录

基于Spark机器学习和实时流计算的智能推荐系统

基于Spark机器学习和实时流计算的智能推荐系统

总结:

前前后后大概花了两个礼拜多一点的时间(毕竟还要顾着上课,基本也就是晚上才有时间)

其中遇到了许多坑,上网找过,请人问过,也上过知乎啥的让大牛指导过

总之一句话,没有真正动手做过是不会知道其中的艰苦,当然我早就变态的把它当乐趣来看了

原本的设想是使用联合聚类+ALS矩阵分解来做的,但是试了一下,联合聚类貌似不想k-means啥的那么简单,以自己的水平来说暂时无法实现(还是要怪自己基础不好咯~),遂放弃之~

之后又有一个美好的想法,通过ItemCF、UserCF、关联规则、ALS等算法组合起来,形成一个混合的模型,毕竟这种模式才是比较接近商业化的构架,但是在Spark上面调用Mahout算法的时候又出现了各种各样的问题,有时候甚至编译都不通过。。。

在推荐算法性能测试的时候,自己实现了召回率,准确率,覆盖率,多样性,新颖度等指标的计算方式,但是实际测试时总是飙出莫名其妙的数据。。。

另外,使用ALS进行实时训练模型的时候,每次都要重新训练,感觉这是一个优化点,可否修改成接受到新数据之后不重复训练,只计算新来的数据(水平有限,暂时只是想法)

期末考又临近了,只好先放下这些不成器的东西以后再研究

最后的最后,无奈之下只能实现了一个最简单的推荐系统

最后附上所有源代码和简要记录的开发日志

源代码已打包上传:

下载地址

(代码有些凌乱,没来得及重构,仅仅做了基本的注释,有需要的童鞋不要介意。。。)

开发日志:

6-9:准备book数据到hbase中。上传到hdfs中文乱码(docker中),读取hdfs数据到hbase中出异常(原因:数据格式不对,内容太多超出一行,仔细看日志;scala输入hbase异常)

6-10:完成t_users,t_books,t_ratings的数据导入

6-12:scala操作scan hbase表

坑位:

1:resultScaner不能直接for循环

2:spark上操作hbase

第一次简单测试(按照之前的过程)

offset (0) + length (4) exceed the capacity of the array: 2 使用String

3:Streaming接收socket数据测试

4:Streaming执行内容测试

6-13:实时推荐测试

问题记录:不能同时运行两个sparkcontext

解决:使用sparkContext来创建StreamingContext

Streaming的处理方式

socketTextFile无法接受数据—logger缺少换行符

foreachRDD理解

完成实时更新模型

6-14:namenode经常莫名挂掉,重新配置虚拟机

ubuntu下hostname默认为ubuntu所以一直无法正确启动–修改/etc/hostname 重启

6-15:SparkStreaming实时读取更新模型老是抛异常

解决:allData.cache(没有缓存的话之前的流数据丢失无法找到)

Unable to reconnect to ZooKeeper service, session 0x14df6b4bcdb0009 has expired, closing socket connection/

Socket connection established to localhost/127.0.0.1:2181, initiating sessio

解决:在代码中设置hbase的zk,配置文件中无效

6-16:解决15鈤的问题

allData.repartition(3).cache

更新模型时连接到zk异常

WARN [sparkDriver-akka.actor.default-dispatcher-46] storage.BlockManagerMasterActor (Logging.scala:logWarning(71)) - Removing BlockManager BlockManagerId(4, cloud1, 56133) with no recent heart beats: 125833ms exceeds 120000ms

原因:由于网络差或者数据量太大,worker节点在一定的时间内(默认45s)没有给master信号,master以为它挂了。

解决办法:修改运行命令或者sprak-env.sh,添加参数 -Dspark.storage.blockManagerHeartBeatMs=6000000(以ms为单位,即6分钟)。

修改:在此配置中无效,要在代码中通过SparkConf设置

Spark1.4中直接通过spark.network.timeout一个配置全部

6-17:完成基础推荐引擎搭建和测试;c#连接hbase环境搭建

6-18:spark批量写hbase性能优化

myTable.setAutoFlush(false, false)//关键点1

myTable.setWriteBufferSize(3*1024*1024)//关键点2

myTable.flushCommits()//关键点3

关键点1_:将自动提交关闭,如果不关闭,每写一条数据都会进行提交,是导入数据较慢的做主要因素。

关键点2:设置缓存大小,当缓存大于设置值时,hbase会自动提交。此处可自己尝试大小,一般对大数据量,设置为5M即可,本文设置为3M。

关键点3:每一个分片结束后都进行flushCommits(),如果不执行,当hbase最后缓存小于上面设定值时,不会进行提交,导致数据丢失。

注:此外如果想提高Spark写数据如Hbase速度,可以增加Spark可用核数量。

修改:实际测试中,以上优化并没有起作用,反而会使一下数据丢失,没有继续深入测试

完成webapp的基本搭建

6-23:完成算法测试部分,评测指标RMSE,MSE,==》(平均值,取不同的n推荐列表画曲线)召回率,准确率,覆盖率,多样性,新颖度

使用spark1.4 的新api来推荐物品提升效率

在spark-env和default里面的配置无效,在代码中配置

System.setProperty(“spark.akka.frameSize”, “2000”)

6-24:系统原型完成

6-25:完善系统原型

6-26:论文初稿

参考资料:

Spark 下操作 HBase(1.0.0 新 API)

【C#】通过Thrift操作HBase系列(1)

ALS 在 Spark MLlib 中的实现

基于矩阵分解的协同过滤算法

上一篇:轻松进行WPF界面开发,DevExpress WPF的全新Badge控件


下一篇:ActiveMQ与RabbitMQ的区别