协同过滤(ALS)的原理及Python实现

简介: 提到ALS相信大家应该都不会觉得陌生(不陌生你点进来干嘛[捂脸]),它是协同过滤的一种,并被集成到Spark的Mllib库中。本文就ALS的基本原理进行讲解,并手把手、肩并肩地带您实现这一算法。原理篇我们用人话而不是大段的数学公式来讲讲ALS是怎么一回事。

提到ALS相信大家应该都不会觉得陌生(不陌生你点进来干嘛[捂脸]),它是协同过滤的一种,并被集成到Spark的Mllib库中。本文就ALS的基本原理进行讲解,并手把手、肩并肩地带您实现这一算法。

  1. 原理篇

我们用人话而不是大段的数学公式来讲讲ALS是怎么一回事。

1.1 你听说过推荐算法么

假如我是豆瓣的CEO,很多豆瓣的用户在豆瓣电影上都会对电影进行评分。那么根据这个评分数据,我们有可能知道这些用户除了自己评过分的电影之外还喜欢或讨厌哪些电影吗?这就是一个典型的推荐问题,解决这一类问题的算法被称为推荐算法。

1.2 什么是协同过滤

协同过滤的英文全称是Collaborative Filtering,简称CF。注意,这不是一款游戏!从字面上分析,协同就是寻找共同点,过滤就是筛选出优质的内容。

1.3 协同过滤的分类

一般来说,协同过滤推荐分为三种类型:

  1. 基于用户(user-based)的协同过滤,通过计算用户和用户的相似度找到跟用户A相似的用户B, C, D...再把这些用户喜欢的内容推荐给A;
  2. 基于物品(item-based)的协同过滤,通过计算物品和物品的相似度找到跟物品1相似的物品2, 3, 4...再把这些物品推荐给看过物品1的用户们;
  3. 基于模型(model based)的协同过滤。主流的方法可以分为:矩阵分解,关联算法,聚类算法,分类算法,回归算法,神经网络。

1.4 矩阵分解

矩阵分解 (decomposition, factorization)是将矩阵拆解为数个矩阵的乘积。比如豆瓣电影有m个用户,n个电影。那么用户对电影的评分可以形成一个m行n列的矩阵R,我们可以找到一个m行k列的矩阵U,和一个k行n列的矩阵I,通过U * I来得到矩阵R。

1.5 ALS

如果想通过矩阵分解的方法实现基于模型的协同过滤,ALS是一个不错的选择,其英文全称是Alternating Least Square,翻译过来是交替最小二乘法。假设用户为a,物品为b,评分矩阵为R(m, n),可分解为用户矩阵U(k, m)和物品矩阵I(k, n),其中m, n, k代表矩阵的维度。前方小段数学公式低能预警:

根据矩阵分解的定义,有
image
用MSE作为损失函数,为了方便化简,加法符号左侧的常数改为-1/2
image

  1. 对损失函数求U_a的一阶偏导数,那
    image
  2. 令一阶偏导数等于0
    image
  3. 同理,可证
    image

1.6 求解用户矩阵U和物品矩阵I

矩阵R是已知的,我们随机生成用户矩阵U,

  1. 利用1.5中的式5、R和U求出I
  2. 利用1.5中的式6、R和I求出U

如此交替地执行步骤1和步骤2,直到算法收敛或者迭代次数超过了最大限制,最终我们用RMSE来评价模型的好坏。

  1. 实现篇

本人用全宇宙最简单的编程语言——Python实现了ALS算法,没有依赖任何第三方库,便于学习和使用。简单说明一下实现过程,更详细的注释请参考本人github上的代码。

注:代码中用到的Matrix类是我写的一个矩阵类,可以取出矩阵的行或列,计算矩阵的乘法、转置和逆。

2.1 创建ALS类

初始化,存储用户ID、物品ID、用户ID与用户矩阵列号的对应关系、物品ID与物品矩阵列号的对应关系、用户已经看过哪些物品、评分矩阵的Shape以及RMSE。

class ALS(object):
    def __init__(self):
        self.user_ids = None
        self.item_ids = None
        self.user_ids_dict = None
        self.item_ids_dict = None
        self.user_matrix = None
        self.item_matrix = None
        self.user_items = None
        self.shape = None
        self.rmse = None

2.2 数据预处理

对训练数据进行处理,得到用户ID、物品ID、用户ID与用户矩阵列号的对应关系、物品ID与物品矩阵列号的对应关系、评分矩阵的Shape、评分矩阵及评分矩阵的转置。

def _process_data(self, X):
    self.user_ids = tuple((set(map(lambda x: x[0], X))))
    self.user_ids_dict = dict(map(lambda x: x[::-1],
                                    enumerate(self.user_ids)))

    self.item_ids = tuple((set(map(lambda x: x[1], X))))
    self.item_ids_dict = dict(map(lambda x: x[::-1],
                                    enumerate(self.item_ids)))

    self.shape = (len(self.user_ids), len(self.item_ids))

    ratings = defaultdict(lambda: defaultdict(int))
    ratings_T = defaultdict(lambda: defaultdict(int))
    for row in X:
        user_id, item_id, rating = row
        ratings[user_id][item_id] = rating
        ratings_T[item_id][user_id] = rating

    err_msg = "Length of user_ids %d and ratings %d not match!" % (
        len(self.user_ids), len(ratings))
    assert len(self.user_ids) == len(ratings), err_msg

    err_msg = "Length of item_ids %d and ratings_T %d not match!" % (
        len(self.item_ids), len(ratings_T))
    assert len(self.item_ids) == len(ratings_T), err_msg
    return ratings, ratings_T

2.3 用户矩阵乘以评分矩阵

实现稠密矩阵与稀疏矩阵的矩阵乘法,得到用户矩阵与评分矩阵的乘积。

def _users_mul_ratings(self, users, ratings_T):

    def f(users_row, item_id):
        user_ids = iter(ratings_T[item_id].keys())
        scores = iter(ratings_T[item_id].values())
        col_nos = map(lambda x: self.user_ids_dict[x], user_ids)
        _users_row = map(lambda x: users_row[x], col_nos)
        return sum(a * b for a, b in zip(_users_row, scores))

    ret = [[f(users_row, item_id) for item_id in self.item_ids]
            for users_row in users.data]
    return Matrix(ret)

2.4 物品矩阵乘以评分矩阵

实现稠密矩阵与稀疏矩阵的矩阵乘法,得到物品矩阵与评分矩阵的乘积。

def _items_mul_ratings(self, items, ratings):

    def f(items_row, user_id):
        item_ids = iter(ratings[user_id].keys())
        scores = iter(ratings[user_id].values())
        col_nos = map(lambda x: self.item_ids_dict[x], item_ids)
        _items_row = map(lambda x: items_row[x], col_nos)
        return sum(a * b for a, b in zip(_items_row, scores))

    ret = [[f(items_row, user_id) for user_id in self.user_ids]
            for items_row in items.data]
    return Matrix(ret)

2.5 生成随机矩阵

def _gen_random_matrix(self, n_rows, n_colums):
    data = [[random() for _ in range(n_colums)] for _ in range(n_rows)]
    return Matrix(data)

2.6 计算RMSE

def _get_rmse(self, ratings):
        m, n = self.shape
        mse = 0.0
        n_elements = sum(map(len, ratings.values()))
        for i in range(m):
            for j in range(n):
                user_id = self.user_ids[i]
                item_id = self.item_ids[j]
                rating = ratings[user_id][item_id]
                if rating > 0:
                    user_row = self.user_matrix.col(i).transpose
                    item_col = self.item_matrix.col(j)
                    rating_hat = user_row.mat_mul(item_col).data[0][0]
                    square_error = (rating - rating_hat) ** 2
                    mse += square_error / n_elements
        return mse ** 0.5

2.7 训练模型

数据预处理

变量k合法性检查

生成随机矩阵U

交替计算矩阵U和矩阵I,并打印RMSE信息,直到迭代次数达到max_iter

保存最终的RMSE

def fit(self, X, k, max_iter=10):
    ratings, ratings_T = self._process_data(X)
    self.user_items = {k: set(v.keys()) for k, v in ratings.items()}
    m, n = self.shape

    error_msg = "Parameter k must be less than the rank of original matrix"
    assert k < min(m, n), error_msg

    self.user_matrix = self._gen_random_matrix(k, m)

    for i in range(max_iter):
        if i % 2:
            items = self.item_matrix
            self.user_matrix = self._items_mul_ratings(
                items.mat_mul(items.transpose).inverse.mat_mul(items),
                ratings
            )
        else:
            users = self.user_matrix
            self.item_matrix = self._users_mul_ratings(
                users.mat_mul(users.transpose).inverse.mat_mul(users),
                ratings_T
            )
        rmse = self._get_rmse(ratings)
        print("Iterations: %d, RMSE: %.6f" % (i + 1, rmse))

    self.rmse = rmse

2.8 预测一个用户

预测一个用户感兴趣的内容,剔除用户已看过的内容。然后按感兴趣分值排序,取出前n_items个内容。

def _predict(self, user_id, n_items):
    users_col = self.user_matrix.col(self.user_ids_dict[user_id])
    users_col = users_col.transpose

    items_col = enumerate(users_col.mat_mul(self.item_matrix).data[0])
    items_scores = map(lambda x: (self.item_ids[x[0]], x[1]), items_col)
    viewed_items = self.user_items[user_id]
    items_scores = filter(lambda x: x[0] not in viewed_items, items_scores)

    return sorted(items_scores, key=lambda x: x[1], reverse=True)[:n_items]

2.9 预测多个用户

循环调用2.8,预测多个用户感兴趣的内容。

def predict(self, user_ids, n_items=10):
    return [self._predict(user_id, n_items) for user_id in user_ids]

3 效果评估

3.1 main函数

使用电影评分数据集,训练模型并统计RMSE。

@run_time
def main():
    print("Tesing the accuracy of ALS...")

    X = load_movie_ratings()

    model = ALS()
    model.fit(X, k=3, max_iter=5)
    print()

    print("Showing the predictions of users...")

    user_ids = range(1, 5)
    predictions = model.predict(user_ids, n_items=2)
    for user_id, prediction in zip(user_ids, predictions):
        _prediction = [format_prediction(item_id, score)
                       for item_id, score in prediction]
        print("User id:%d recommedation: %s" % (user_id, _prediction))

3.2 效果展示

设置k=3,迭代5次,并展示了前4个用户的推荐内容,最终RMSE为0.370,运行时间46.5秒,效果还算不错~

image

3.3 工具函数

本人自定义了一些工具函数,可以在github上查看:

https://github.com/tushushu/imylu/tree/master/imylu/utils

  1. run_time - 测试函数运行时间
  2. load_movie_ratings - 加载电影评分数据

总结

ALS的原理:鸡生蛋、蛋生鸡

ALS的实现:基本上就是矩阵乘法

原文发布时间为:2019-1-2
本文作者:李小文
本文来自云栖社区合作伙伴“ Python爱好者社区”,了解相关信息可以关注“python_shequ”微信公众号

相关文章
|
2月前
|
机器学习/深度学习 Python
堆叠集成策略的原理、实现方法及Python应用。堆叠通过多层模型组合,先用不同基础模型生成预测,再用元学习器整合这些预测,提升模型性能
本文深入探讨了堆叠集成策略的原理、实现方法及Python应用。堆叠通过多层模型组合,先用不同基础模型生成预测,再用元学习器整合这些预测,提升模型性能。文章详细介绍了堆叠的实现步骤,包括数据准备、基础模型训练、新训练集构建及元学习器训练,并讨论了其优缺点。
64 3
|
2月前
|
机器学习/深度学习 算法 数据挖掘
线性回归模型的原理、实现及应用,特别是在 Python 中的实践
本文深入探讨了线性回归模型的原理、实现及应用,特别是在 Python 中的实践。线性回归假设因变量与自变量间存在线性关系,通过建立线性方程预测未知数据。文章介绍了模型的基本原理、实现步骤、Python 常用库(如 Scikit-learn 和 Statsmodels)、参数解释、优缺点及扩展应用,强调了其在数据分析中的重要性和局限性。
65 3
|
5天前
|
算法 数据处理 Python
高精度保形滤波器Savitzky-Golay的数学原理、Python实现与工程应用
Savitzky-Golay滤波器是一种基于局部多项式回归的数字滤波器,广泛应用于信号处理领域。它通过线性最小二乘法拟合低阶多项式到滑动窗口中的数据点,在降噪的同时保持信号的关键特征,如峰值和谷值。本文介绍了该滤波器的原理、实现及应用,展示了其在Python中的具体实现,并分析了不同参数对滤波效果的影响。适合需要保持信号特征的应用场景。
46 11
高精度保形滤波器Savitzky-Golay的数学原理、Python实现与工程应用
|
18天前
|
缓存 数据安全/隐私保护 Python
python装饰器底层原理
Python装饰器是一个强大的工具,可以在不修改原始函数代码的情况下,动态地增加功能。理解装饰器的底层原理,包括函数是对象、闭包和高阶函数,可以帮助我们更好地使用和编写装饰器。无论是用于日志记录、权限验证还是缓存,装饰器都可以显著提高代码的可维护性和复用性。
31 5
|
30天前
|
缓存 开发者 Python
深入探索Python中的装饰器:原理、应用与最佳实践####
本文作为技术性深度解析文章,旨在揭开Python装饰器背后的神秘面纱,通过剖析其工作原理、多样化的应用场景及实践中的最佳策略,为中高级Python开发者提供一份详尽的指南。不同于常规摘要的概括性介绍,本文摘要将直接以一段精炼的代码示例开篇,随后简要阐述文章的核心价值与读者预期收获,引领读者快速进入装饰器的世界。 ```python # 示例:一个简单的日志记录装饰器 def log_decorator(func): def wrapper(*args, **kwargs): print(f"Calling {func.__name__} with args: {a
40 2
|
2月前
|
机器学习/深度学习 人工智能 算法
强化学习在游戏AI中的应用,从基本原理、优势、应用场景到具体实现方法,以及Python在其中的作用
本文探讨了强化学习在游戏AI中的应用,从基本原理、优势、应用场景到具体实现方法,以及Python在其中的作用,通过案例分析展示了其潜力,并讨论了面临的挑战及未来发展趋势。强化学习正为游戏AI带来新的可能性。
119 4
|
2月前
|
搜索推荐 Python
快速排序的 Python 实践:从原理到优化,打造你的排序利器!
本文介绍了 Python 中的快速排序算法,从基本原理、实现代码到优化方法进行了详细探讨。快速排序采用分治策略,通过选择基准元素将数组分为两部分,递归排序。文章还对比了快速排序与冒泡排序的性能,展示了优化前后快速排序的差异。通过这些分析,帮助读者理解快速排序的优势及优化的重要性,从而在实际应用中选择合适的排序算法和优化策略,提升程序性能。
49 1
|
3月前
|
数据采集 调度 Python
Python编程异步爬虫——协程的基本原理(一)
Python编程异步爬虫——协程的基本原理(一)
25 0
|
3月前
|
数据采集 Python
Python编程异步爬虫——协程的基本原理(二)
Python编程异步爬虫——协程的基本原理(二)
27 0
|
3月前
|
Java C语言 Python
解析Python中的全局解释器锁(GIL):影响、工作原理及解决方案
解析Python中的全局解释器锁(GIL):影响、工作原理及解决方案
62 0