基于Spark机器学习和实时流计算的智能推荐系统

本文涉及的产品
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/46675501 概要:随着电子商务的高速发展和普及应用,个性化推荐的推荐系统已成为一个重要研究领域。
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/46675501

概要:

随着电子商务的高速发展和普及应用,个性化推荐的推荐系统已成为一个重要研究领域。
个性化推荐算法是推荐系统中最核心的技术,在很大程度上决定了电子商务推荐系统性能的优劣,决定着是否能够推荐用户真正感兴趣的信息,而面对用户的不断提升的需求,推荐系统不仅需要正确的推荐,还要实时地根据用户的行为进行分析并推荐最新的 结果。
实时推荐系统的任务就是为每个用户,不断地、精准地推送个性化的服务,甚至到达让用户体会到推荐系统比他们更了解自己的感觉。

本文主要研究的是基于模型的协同过滤算法—ALS以及实时推荐系统的可行性并详细讲解ALS(交替最小二乘法)的思想
然后在Spark Streaming框架上运用ALS算法进行测试,评估实时推荐中算法的可靠性
最后,在Spark Mllib和Streaming框架上构建了实时推荐引擎,将推荐数据保存在Hbase中,WebApp通过读取Hbase中的推荐数据来向用户展示推荐结果

关于其他类别的推荐算法就不细说了,网上有很多的资料查看,推荐几篇文章:
IBM-探索推荐引擎内部的秘密系列

以及向亮的《推荐系统实践》
下载地址

下面进入正文

基于矩阵分解的协同过滤算法–ALS:

基于模型的协同过滤推荐就是基于样本的用户喜好信息,训练一个推荐模型,然后根据实时的用户喜好的信息进行预测,计算推荐。

对于一个users-products-rating的评分数据集,ALS会建立一个user*product的m*n的矩阵(其中,m为users的数量,n为products的数量),如下图:

这里写图片描述

这个矩阵的每一行代表一个用户 (u1,u2,…,u9)、每一列代表一个产品 (v1,v2,…,v9)。用户隔天产品的打分在 1-9 之间。
但是在这个数据集中,并不是每个用户都对每个产品进行过评分,所以这个矩阵往往是稀疏的,用户i对产品j的评分往往是空的
ALS所做的事情就是将这个稀疏矩阵通过一定的规律填满,这样就可以从矩阵中得到任意一个user对任意一个product的评分,ALS填充的评分项也称为用户i对产品j的预测得分
所以说,ALS算法的核心就是通过什么样子的规律来填满(预测)这个稀疏矩阵
它是这么做的:
假设m*n的评分矩阵R,可以被近似分解成U*(V)T
U为m*d的用户特征向量矩阵
V为n*d的产品特征向量矩阵((V)T代表V的转置)
d为user/product的特征值的数量

关于d这个值的理解,大概可以是这样的:
对于每个产品,可以从d个角度进行评价,以电影为例,可以从主演,导演,特效,剧情4个角度来评价一部电影,那么d就等于4
可以认为,每部电影在这4个角度上都有一个固定的基准评分值
例如《末日崩塌》这部电影是一个产品,它的特征向量是由d个特征值组成的
d=4,有4个特征值,分别是主演,导演,特效,剧情
每个特征值的基准评分值分别为(满分为1.0):
主演:0.9
导演:0.7
特效:0.8
剧情:0.6
矩阵V由n个product*d个特征值组成

对于矩阵U,假设对于任意的用户A,该用户对一部电影的综合评分和电影的特征值存在一定的线性关系,即电影的综合评分=(a1*d1+a2*d2+a3*d3+a4*d4)
其中a1-4为用户A的特征值,d1-4为之前所说的电影的特征值

那么对于之前ALS算法的这个假设
m*n的评分矩阵R,可以被近似分解成U*(V)T
就是成立的,某个用户对某个产品的评分可以通过矩阵U某行和矩阵V(转置)的某列相乘得到

那么现在的问题是,如何确定用户和产品的特征值?(之前仅仅是举例子,实际中这两个都是未知的变量)
采用的是交替的最小二乘法
这里写图片描述

在上面的公式中,a表示评分数据集中用户i对产品j的真实评分,另外一部分表示用户i的特征向量(转置)*产品j的特征向量(这里可以得到预测的i对j的评分)
用真实评分减去预测评分然后求平方,对下一个用户,下一个产品进行相同的计算,将所有结果累加起来(其中,数据集构成的矩阵是存在大量的空打分,并没有实际的评分,解决的方法是就只看对已知打分的项)

但是这里之前问题还是存在,就是用户和产品的特征向量都是未知的,这个式子存在两个未知变量

解决的办法是交替的最小二乘法
首先对于上面的公式,以下面的形式显示:
这里写图片描述
为了防止过度拟合,加上正则化参数
这里写图片描述

这里写图片描述

首先用一个小于1的随机数初始化V
根据公式(4)求U
此时就可以得到初始的UV矩阵了,计算上面说过的差平方和
根据计算得到的U和公式(5),重新计算并覆盖V,计算差平方和
反复进行以上两步的计算,直到差平方和小于一个预设的数,或者迭代次数满足要求则停止
取得最新的UV矩阵
则原本的稀疏矩阵R就可以用R=U(V)T来表示了

ALS算法的核心就是将稀疏评分矩阵分解为用户特征向量矩阵和产品特征向量矩阵的乘积
交替使用最小二乘法逐步计算用户/产品特征向量,使得差平方和最小
通过用户/产品特征向量的矩阵来预测某个用户对某个产品的评分

算法原理讲述完毕,接下来进行算法测试

算法测试:

算法测试分为两部分:
一、测试最佳的参数,如:隐性因子个数,正则式等
二、测试在Streaming框架上算法的可用性

测试数据集来自MovieLens

测试一:
将整个数据集上传至HDFS中
在spark程序中读取ratings.dat文件,并随机划出80%作为训练数据集,20%作为测试数据集

设置隐性因子、正则式参数列表(由于物理机配置不好,集群能够支持的最大迭代次数只有7次,在多就会内存溢出,所以这里直接将迭代次数设置为7)

对参数列表的全排列分别进行模型训练,并计算MSE、RMSE

结果如下图:
这里写图片描述

比较得出最佳的参数组合,以后的模型训练参数都使用这个参数组合

测试二:

将原本的数据划分为三部分
trainingData-10k
testData-10k
剩下的为streamData,作为流数据实时发送
首先将trainingData、testData上传到HDFS/data目录下
在spark程序中读取,并转化为RDD[Rating]类型
使用Streaming框架接受流数据,并进行在线模型训练
每训练一次就计算一次MSE和RMSE
对比模型的精准性有没有提高

使用Scala读取本地的streamData,通过Socket发送到spark程序中

结果如下图:

随着数据的不断增加,模型的精准度在不断的提高,所以实时的更新推荐模型是可行的

推荐系统整合:

整体流程图:

这里写图片描述

首先用程序生成用户和图书数据,并随机模拟用户行为数据,保存在Hbase中

在Hbase数据库中包含了用户表(4000个用户),图书表(5060本图书)以及评分表(用户对图书的百万条数据)
由于对个人来说无法得到真实的商业性数据,故评分数据都是程序 模拟随机生成的,包括实时发送的流数据,所以这可能会对整个系统的推荐结果带来影响

另外,除了WebUI部分,其余的程序都是运行在Linux的Spark集群上

原始数据通过一个程序不断地向Hbase的评分表中写入数据
模拟用户在网站上的评分行为
运行截图:

这里写图片描述

其中,前300个用户的行为偏向于前600本图书(计算机相关)
实时流数据将通过另外一个程序发送Socket数据,模拟用户当前在网站上的实时评分行为
在最后使用用户进行观察测试时,程序将会只模拟这个用户的评分行为以便观察推荐系统的实时性

首先推荐引擎会读取Hbase中的评分数据
并使用算法测试时得到的最佳参数组合来对其进行训练
得到初始的模型
使用这个模型对Hbase中所有用户进行图书推荐(取 top10)
并将推荐结果保存在Hbase中
以上阶段为系统初始化阶段
运行截图:

这里写图片描述
这里写图片描述

这里写图片描述

在系统初始化完成之后,开启实时推荐引擎
接收不断生成的用户行为数据,并和Hbase中的原始数据混合,训练出新的模型,产生推荐结果保存
不断地进行流数据的读取、训练和保存推荐结果,直至系统关闭或者无流数据产生
推荐引擎运行如下图:

这里写图片描述

WebUI部分:

WebUI是由ASP.NET开发的一个简单的B/S应用,通过Thrift和Linux中的Hbase交互
选择使用一个用户观察系统的实时推荐性,此时流数据模拟程序只产生这个用户的评分行为
不同时刻,在该用户有行为数据产生的情况下,推荐的内容(细节没有仔细处理,比如有的图片找不到路径等。。。):
当前记录

这里写图片描述
这里写图片描述

新的行为数据产生的记录

这里写图片描述
这里写图片描述

总结:

前前后后大概花了两个礼拜多一点的时间(毕竟还要顾着上课,基本也就是晚上才有时间)
其中遇到了许多坑,上网找过,请人问过,也上过知乎啥的让大牛指导过

总之一句话,没有真正动手做过是不会知道其中的艰苦,当然我早就变态的把它当乐趣来看了

原本的设想是使用联合聚类+ALS矩阵分解来做的,但是试了一下,联合聚类貌似不想k-means啥的那么简单,以自己的水平来说暂时无法实现(还是要怪自己基础不好咯~),遂放弃之~

之后又有一个美好的想法,通过ItemCF、UserCF、关联规则、ALS等算法组合起来,形成一个混合的模型,毕竟这种模式才是比较接近商业化的构架,但是在Spark上面调用Mahout算法的时候又出现了各种各样的问题,有时候甚至编译都不通过。。。

在推荐算法性能测试的时候,自己实现了召回率,准确率,覆盖率,多样性,新颖度等指标的计算方式,但是实际测试时总是飙出莫名其妙的数据。。。

另外,使用ALS进行实时训练模型的时候,每次都要重新训练,感觉这是一个优化点,可否修改成接受到新数据之后不重复训练,只计算新来的数据(水平有限,暂时只是想法)

期末考又临近了,只好先放下这些不成器的东西以后再研究

最后的最后,无奈之下只能实现了一个最简单的推荐系统

最后附上所有源代码和简要记录的开发日志

源代码已打包上传:
下载地址
(代码有些凌乱,没来得及重构,仅仅做了基本的注释,有需要的童鞋不要介意。。。)

开发日志:

6-9:准备book数据到hbase中。上传到hdfs中文乱码(docker中),读取hdfs数据到hbase中出异常(原因:数据格式不对,内容太多超出一行,仔细看日志;scala输入hbase异常)

6-10:完成t_users,t_books,t_ratings的数据导入

6-12:scala操作scan hbase表
坑位:
1:resultScaner不能直接for循环
2:spark上操作hbase
第一次简单测试(按照之前的过程)
offset (0) + length (4) exceed the capacity of the array: 2 使用String
3:Streaming接收socket数据测试
4:Streaming执行内容测试

6-13:实时推荐测试
问题记录:不能同时运行两个sparkcontext
解决:使用sparkContext来创建StreamingContext
Streaming的处理方式
socketTextFile无法接受数据—logger缺少换行符
foreachRDD理解
完成实时更新模型

6-14:namenode经常莫名挂掉,重新配置虚拟机
ubuntu下hostname默认为ubuntu所以一直无法正确启动–修改/etc/hostname 重启

6-15:SparkStreaming实时读取更新模型老是抛异常
解决:allData.cache(没有缓存的话之前的流数据丢失无法找到)

Unable to reconnect to ZooKeeper service, session 0x14df6b4bcdb0009 has expired, closing socket connection/
Socket connection established to localhost/127.0.0.1:2181, initiating sessio
解决:在代码中设置hbase的zk,配置文件中无效

6-16:解决15鈤的问题
allData.repartition(3).cache
更新模型时连接到zk异常

WARN [sparkDriver-akka.actor.default-dispatcher-46] storage.BlockManagerMasterActor (Logging.scala:logWarning(71)) - Removing BlockManager BlockManagerId(4, cloud1, 56133) with no recent heart beats: 125833ms exceeds 120000ms
原因:由于网络差或者数据量太大,worker节点在一定的时间内(默认45s)没有给master信号,master以为它挂了。
解决办法:修改运行命令或者sprak-env.sh,添加参数 -Dspark.storage.blockManagerHeartBeatMs=6000000(以ms为单位,即6分钟)。
修改:在此配置中无效,要在代码中通过SparkConf设置
Spark1.4中直接通过spark.network.timeout一个配置全部

6-17:完成基础推荐引擎搭建和测试;c#连接hbase环境搭建

6-18:spark批量写hbase性能优化
myTable.setAutoFlush(false, false)//关键点1
myTable.setWriteBufferSize(3*1024*1024)//关键点2
myTable.flushCommits()//关键点3
关键点1_:将自动提交关闭,如果不关闭,每写一条数据都会进行提交,是导入数据较慢的做主要因素。
关键点2:设置缓存大小,当缓存大于设置值时,hbase会自动提交。此处可自己尝试大小,一般对大数据量,设置为5M即可,本文设置为3M。
关键点3:每一个分片结束后都进行flushCommits(),如果不执行,当hbase最后缓存小于上面设定值时,不会进行提交,导致数据丢失。

注:此外如果想提高Spark写数据如Hbase速度,可以增加Spark可用核数量。

修改:实际测试中,以上优化并没有起作用,反而会使一下数据丢失,没有继续深入测试

完成webapp的基本搭建

6-23:完成算法测试部分,评测指标RMSE,MSE,==》(平均值,取不同的n推荐列表画曲线)召回率,准确率,覆盖率,多样性,新颖度
使用spark1.4 的新api来推荐物品提升效率
在spark-env和default里面的配置无效,在代码中配置
System.setProperty(“spark.akka.frameSize”, “2000”)

6-24:系统原型完成

6-25:完善系统原型

6-26:论文初稿

参考资料:
Spark 下操作 HBase(1.0.0 新 API)
【C#】通过Thrift操作HBase系列(1)
ALS 在 Spark MLlib 中的实现
基于矩阵分解的协同过滤算法

相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
  相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情: https://cn.aliyun.com/product/hbase   ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
3月前
|
机器学习/深度学习 数据采集 数据可视化
基于爬虫和机器学习的招聘数据分析与可视化系统,python django框架,前端bootstrap,机器学习有八种带有可视化大屏和后台
本文介绍了一个基于Python Django框架和Bootstrap前端技术,集成了机器学习算法和数据可视化的招聘数据分析与可视化系统,该系统通过爬虫技术获取职位信息,并使用多种机器学习模型进行薪资预测、职位匹配和趋势分析,提供了一个直观的可视化大屏和后台管理系统,以优化招聘策略并提升决策质量。
167 4
|
1天前
|
机器学习/深度学习 自然语言处理 Linux
Linux 中的机器学习:Whisper——自动语音识别系统
本文介绍了先进的自动语音识别系统 Whisper 在 Linux 环境中的应用。Whisper 基于深度学习和神经网络技术,支持多语言识别,具有高准确性和实时处理能力。文章详细讲解了在 Linux 中安装、配置和使用 Whisper 的步骤,以及其在语音助手、语音识别软件等领域的应用场景。
10 5
|
25天前
|
机器学习/深度学习 API 计算机视觉
基于Python_opencv人脸录入、识别系统(应用dlib机器学习库)(下)
基于Python_opencv人脸录入、识别系统(应用dlib机器学习库)(下)
19 2
|
25天前
|
机器学习/深度学习 存储 算法
基于Python_opencv人脸录入、识别系统(应用dlib机器学习库)(上)
基于Python_opencv人脸录入、识别系统(应用dlib机器学习库)(上)
26 1
|
2月前
|
机器学习/深度学习 存储 人工智能
文本情感识别分析系统Python+SVM分类算法+机器学习人工智能+计算机毕业设计
使用Python作为开发语言,基于文本数据集(一个积极的xls文本格式和一个消极的xls文本格式文件),使用Word2vec对文本进行处理。通过支持向量机SVM算法训练情绪分类模型。实现对文本消极情感和文本积极情感的识别。并基于Django框架开发网页平台实现对用户的可视化操作和数据存储。
46 0
文本情感识别分析系统Python+SVM分类算法+机器学习人工智能+计算机毕业设计
|
3月前
|
机器学习/深度学习 数据采集 数据可视化
基于python 机器学习算法的二手房房价可视化和预测系统
文章介绍了一个基于Python机器学习算法的二手房房价可视化和预测系统,涵盖了爬虫数据采集、数据处理分析、机器学习预测以及Flask Web部署等模块。
102 2
基于python 机器学习算法的二手房房价可视化和预测系统
|
3月前
|
C# 机器学习/深度学习 搜索推荐
WPF与机器学习的完美邂逅:手把手教你打造一个具有智能推荐功能的现代桌面应用——从理论到实践的全方位指南,让你的应用瞬间变得高大上且智能无比
【8月更文挑战第31天】本文详细介绍如何在Windows Presentation Foundation(WPF)应用中集成机器学习功能,以开发具备智能化特性的桌面应用。通过使用Microsoft的ML.NET框架,本文演示了从安装NuGet包、准备数据集、训练推荐系统模型到最终将模型集成到WPF应用中的全过程。具体示例代码展示了如何基于用户行为数据训练模型,并实现实时推荐功能。这为WPF开发者提供了宝贵的实践指导。
41 0
|
3月前
|
机器学习/深度学习 数据可视化 数据处理
Python vs R:机器学习项目中的实用性与生态系统比较
【8月更文第6天】Python 和 R 是数据科学和机器学习领域中最受欢迎的两种编程语言。两者都有各自的优点和适用场景,选择哪种语言取决于项目的具体需求、团队的技能水平以及个人偏好。本文将从实用性和生态系统两个方面进行比较,并提供代码示例来展示这两种语言在典型机器学习任务中的应用。
80 1
|
3月前
|
机器学习/深度学习 算法
【Deepin 20系统】机器学习分类算法模型xgboost、lightgbm、catboost安装及使用
介绍了在Deepin 20系统上使用pip命令通过清华大学镜像源安装xgboost、lightgbm和catboost三个机器学习分类算法库的过程。
46 4
|
4月前
|
机器学习/深度学习 数据采集 运维
智能化运维:利用机器学习优化系统性能
在当今快速发展的信息技术时代,传统的运维方式已难以满足日益增长的业务需求和复杂性。本文将探讨如何通过机器学习技术来提升运维效率,确保系统的高可用性和性能优化。我们将深入分析机器学习模型在预测系统负载、自动故障检测与响应以及资源分配中的应用,并讨论实施这些策略时可能遇到的挑战和解决思路。
下一篇
无影云桌面