GraphPro

简介: GraphPro

GraphPro: Graph Pre-training and Prompt Learning for Recommendation

北京B区 / 032机->优先用

北京B区 / 224机

数据集介绍

本文使用了三个公开的数据集来进行实验和评估GraphPro框架的性能。这些数据集分别代表了不同的商业场景和用户交互模式,具有丰富的动态交互数据,非常适合用来评估推荐系统在处理时间变化数据时的有效性。以下是对这些数据集的详细介绍:

1. Taobao数据集

  • 来源:Taobao是中国最大的电子商务平台之一,数据集收集了用户在该平台上的隐式反馈数据。
  • 时间跨度:数据集覆盖了10天的时间段。
  • 数据特点:包含了大量的用户点击和购买行为,这些行为被用作推荐系统的训练和测试数据。
  • 规模:具有大量的用户和物品,以及相应的交互记录。

2. Koubei数据集

  • 来源:Koubei是支付宝平台上的一个本地生活服务平台,数据集记录了用户与附近商家的互动。
  • 时间跨度:数据集涵盖了9周的时间段。
  • 数据特点:包含了用户对商家的评分、评论和交易等信息,反映了用户对本地服务的偏好和选择。
  • 规模:包含了一定数量的用户和商家,以及用户对商家的各种互动。

3. Amazon数据集

  • 来源:Amazon是全球知名的电子商务平台,数据集包含了用户对产品的评价信息。
  • 时间跨度:数据集收集了13周内的用户评价数据。
  • 数据特点:包含了用户对产品的评分和评论,这些数据对于理解用户偏好和产品质量非常重要。
  • 规模:具有大量的用户和产品,以及丰富的评价信息。

这些数据集的共同特点是都包含了随时间变化的用户-物品交互信息,这对于评估推荐系统的动态适应性至关重要。通过在这些数据集上进行实验,作者能够验证GraphPro框架在不同场景和时间尺度下的推荐性能,以及其在处理动态数据变化时的有效性和鲁棒性。此外,这些数据集的公开可用性也使得其他研究人员可以复现和比较GraphPro与其他推荐系统模型的性能。

对论文中的pretrain.txt数据做分析:

<!--
  从0之后的8个数字(16198, 39895, ..., 81015)代表用户0与特定物品的交互。
  后面的数字(1453161600, 1453161600, ..., 1453161600)是对应的时间戳,表示这些交互发生的时间。由于时间戳重复,这意味着在这些特定时间点,用户0与多个物品进行了交互。后续类似
--> 
0 16198 39895 49560 67677 68881 75476 81015 1453161600 1453161600 1453161600 1453161600 1453161600 1453161600 1453161600
1 30158 52823 76295 1453766400 1453766400 1453766400
2 1391 10625 11537  1452729600 1452729600 1452729600
3 89514 1452211200
4 5573 92552  1452988800 1452988800
5 10444 22618 1452729600 1452729600
6 67102 75422 13393 73402 1452902400 1453507200 1453680000 1453680000
7 17988 1452902400

Pre-training 预训练

要从头开始预训练模型,预训练的主文件是 pretrain.py 。您可以通过以下命令在数据集上运行图预训练:

# Taobao
python pretrain.py --data_path dataset/taobao --exp_name pretrain --phase pretrain --log 1 --device cuda:0 --model GraphPro --lr 1e-3 --edge_dropout 0.5 
# Koubei
python pretrain.py --data_path dataset/koubei --exp_name pretrain --phase pretrain --log 1 --device cuda:0 --model GraphPro --lr 1e-3 --edge_dropout 0.2 --hour_interval_pre 24 
# Amazon
python pretrain.py --data_path dataset/amazon --exp_name pretrain --phase pretrain --log 1 --device cuda:0 --model GraphPro --lr 1e-3 --edge_dropout 0.2 --hour_interval_pre 24

数据集的预处理 DataLoader

dataloader.py

from utils.parse_args import args
from os import path
from tqdm import tqdm
import numpy as np
import scipy.sparse as sp
import torch
import logging
import networkx as nx
from copy import deepcopy
from collections import defaultdict
import pandas as pd
# from torch_geometric.data import Data
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('train_logger')
logger.setLevel(logging.INFO)
class EdgeListData:
    def __init__(self, train_file, test_file, phase='pretrain', pre_dataset=None, user_hist_files=[], has_time=True):
        # 记录加载数据集的阶段
        logger.info(f"Loading dataset for {phase}...")
        self.phase = phase  # 训练或微调阶段
        self.has_time = has_time   # 数据集是否包含时间戳
        self.pre_dataset = pre_dataset  # 预训练数据集,如果有的话
        self.hour_interval = args.hour_interval_pre if phase == 'pretrain' else args.hour_interval_f
        self.edgelist = []
        self.edge_time = []
        self.num_users = 0
        self.num_items = 0
        self.num_edges = 0
        self.train_user_dict = {}
        self.test_user_dict = {}
        self._load_data(train_file, test_file, has_time)
        # 根据阶段不同,设置用户历史交互字典
        if phase == 'pretrain':
            self.user_hist_dict = self.train_user_dict  # 预训练阶段使用训练数据
        elif phase == 'finetune':
            self.user_hist_dict = deepcopy(self.train_user_dict)  # 微调阶段复制训练数据
            self._load_user_hist_from_files(user_hist_files)  # 加载额外的用户历史文件
        
        users_has_hist = set(list(self.user_hist_dict.keys()))  # 117338
        all_users = set(list(range(self.num_users)))
        users_no_hist = all_users - users_has_hist  # 112
        logger.info(f"Number of users from all users with no history: {len(users_no_hist)}")
        for u in users_no_hist:
            self.user_hist_dict[u] = []
    def _read_file(self, train_file, test_file, has_time=True):
        with open(train_file, 'r') as f:
            for line in f:
                line = line.strip().split('\t')  # 源节点    目标节点    交互的时间戳,以\t做切割
                if not has_time:
                    user, items = line[:2]
                    times = " ".join(["0"] * len(items.split(" ")))
                else:
                    user, items, times = line
                # 将用户-物品交互添加到边列表和时间列表
                for i in items.split(" "):
                    self.edgelist.append((int(user), int(i)))
                for i in times.split(" "):
                    self.edge_time.append(int(i))
                # 将用户和物品交互存储到字典中
                self.train_user_dict[int(user)] = [int(i) for i in items.split(" ")]  # 字典的形式存储{源节点1:[目标节点集合1],源节点2:[目标节点集合2],...}
        self.test_edge_num = 0
        with open(test_file, 'r') as f:
            for line in f:
                line = line.strip().split('\t')
                user, items = line[:2]
                self.test_user_dict[int(user)] = [int(i) for i in items.split(" ")]
                self.test_edge_num += len(self.test_user_dict[int(user)])
        logger.info('Number of test users: {}'.format(len(self.test_user_dict)))
    
    def _read_pd(self, train_pd, test_pd, has_time=True):
        for i in range(len(train_pd)):
            line = train_pd.iloc[i]
            if not has_time:
                user, items = line[0], line[1]
                times = " ".join(["0"] * len(items.split(" ")))
            else:
                user, items, times = line[0], line[1], line[2]
                
            for i in items.split(" "):
                self.edgelist.append((int(user), int(i)))
            for i in times.split(" "):
                self.edge_time.append(int(i))
            self.train_user_dict[int(user)] = [int(i) for i in items.split(" ")]
        self.test_edge_num = 0
        for i in range(len(test_pd)):
            line = test_pd.iloc[i]
            user, items = line[0], line[1]
            self.test_user_dict[int(user)] = [int(i) for i in items.split(" ")]
            self.test_edge_num += len(self.test_user_dict[int(user)])
        logger.info('Number of test users: {}'.format(len(self.test_user_dict)))
    # 定义加载数据的私有方法
    def _load_data(self, train_file, test_file, has_time=True):
        if isinstance(train_file, pd.DataFrame):
            self._read_pd(train_file, test_file, has_time)
        else:
            self._read_file(train_file, test_file, has_time)
        self.edgelist = np.array(self.edgelist, dtype=np.int32)
        # refine timestamp to predefined time steps at intervals
        # 0 as padding for self-loop
        self.edge_time = 1 + self.timestamp_to_time_step(np.array(self.edge_time, dtype=np.int32))  # 范围:1-120
        self.num_edges = len(self.edgelist)
        if self.pre_dataset is not None:
            self.num_users = self.pre_dataset.num_users
            self.num_items = self.pre_dataset.num_items
        else:
            self.num_users = max([np.max(self.edgelist[:, 0]) + 1, np.max(list(self.test_user_dict.keys())) + 1])
            self.num_items = max([np.max(self.edgelist[:, 1]) + 1, np.max([np.max(self.test_user_dict[u]) for u in self.test_user_dict.keys()]) + 1])
        logger.info('Number of users: {}'.format(self.num_users))
        logger.info('Number of items: {}'.format(self.num_items))
        logger.info('Number of edges: {}'.format(self.num_edges))
        # 创建了一个稀疏矩阵来表示用户和物品之间的交互图
        self.graph = sp.coo_matrix((np.ones(self.num_edges), (self.edgelist[:, 0], self.edgelist[:, 1])), shape=(self.num_users, self.num_items))
        if self.has_time:
            self.edge_time_dict = defaultdict(dict)
            for i in range(len(self.edgelist)):
                self.edge_time_dict[self.edgelist[i][0]][self.edgelist[i][1]+self.num_users] = self.edge_time[i]
                self.edge_time_dict[self.edgelist[i][1]+self.num_users][self.edgelist[i][0]] = self.edge_time[i]
                # self.edge_time_dict[self.edgelist[i][0]][self.edgelist[i][0]] = 0
                # self.edge_time_dict[self.edgelist[i][1]][self.edgelist[i][1]] = 0
        # homogenous edges generation
        # if args.ab in ["homo", "full"]:
        #     self.ii_adj = self.graph.T.dot(self.graph).tocoo()
        #     # sort the values in the sparse matrix and get top x% values and corresponding edges
        #     percentage_ii = 0.01
        #     tmp_data = sorted(self.ii_adj.data, reverse=True)
        #     tmp_data_xpercent, tmp_data_xpercent_len = tmp_data[int(len(tmp_data) * percentage_ii)], int(len(tmp_data) * percentage_ii)
        #     self.ii_adj.data = np.where(self.ii_adj.data > tmp_data_xpercent, self.ii_adj.data, 0)
        #     self.ii_adj.eliminate_zeros()
        #     logger.info(f"Sampled {len(self.ii_adj.data)} i-i edges from all {len(tmp_data)} edges.")
        #     self.uu_adj = self.graph.dot(self.graph.T).tocoo()
        #     # same filtering for uu_adj
        #     percentage_uu = 0.01
        #     tmp_data = sorted(self.uu_adj.data, reverse=True)
        #     tmp_data_xpercent, tmp_data_xpercent_len = tmp_data[int(len(tmp_data) * percentage_uu)], int(len(tmp_data) * percentage_uu)
        #     self.uu_adj.data = np.where(self.uu_adj.data > tmp_data_xpercent, self.uu_adj.data, 0)
        #     self.uu_adj.eliminate_zeros()
        #     logger.info(f"Sampled {len(self.uu_adj.data)} u-u edges from all {len(tmp_data)} edges.")
        # self.graph = nx.Graph()
        # for i in range(len(self.edgelist)):
        #     self.graph.add_edge(self.edgelist[i][0], self.edgelist[i][1], time=self.edge_time[i])
        # print(self.graph.number_of_nodes(), self.graph.number_of_edges())
        # self.ui_adj = nx.adjacency_matrix(self.graph, weight=None)[:self.num_users, self.num_users:].tocoo()
        # self.ii_adj = self.ui_adj.T.dot(self.ui_adj).tocoo()
        # self.uu_adj = self.ui_adj.dot(self.ui_adj.T).tocoo()
        # self.graph = Data(torch.zeros(self.num_users + self.num_items, 1), torch.tensor(self.edgelist).t(), torch.tensor(self.edge_time))
    
    def _load_user_hist_from_files(self, user_hist_files):
        for file in user_hist_files:
            with open(file, 'r') as f:
                for line in f:
                    line = line.strip().split('\t')
                    user, items = int(line[0]), [int(i) for i in line[1].split(" ")]
                    try:
                        self.user_hist_dict[user].extend(items)
                    except KeyError:
                        self.user_hist_dict[user] = items
    def sample_subgraph(self):
        pass
  '''
    get_train_batch 方法首先定义了一个内部函数 negative_sampling,用于为每个用户生成负样本物品。负采样是一种常见的技术,用于生成推荐系统中的负样本,即用户未交互的物品。在训练过程中,模型需要学习区分用户喜欢的物品(正样本)和不喜欢的物品(负样本)。然后,该方法从 edgelist 中提取指定范围的正样本用户-物品对,并使用 negative_sampling 函数为这些正样本生成相应的负样本。最后,它将用户、正样本物品和负样本物品转换为张量,并移动到指定的计算设备(如GPU),以便在训练过程中使用。 
    这种方法对于训练基于图的推荐系统模型非常重要,因为它提供了模型学习的基础数据结构。通过这种方式,模型可以更好地理解用户的偏好,并提高推荐的准确性。
  '''
    def get_train_batch(self, start, end):
    # 定义内部函数 negative_sampling,用于进行负采样(通用的模块)
        def negative_sampling(user_item, train_user_set, n=1):
            neg_items = []  # 初始化一个空列表,用于存储负样本物品
            for user, _ in user_item:  # 遍历用户-物品对
                user = int(user)   # 将用户转换为整数
                for i in range(n):    # 对于每个用户,生成n个负样本
                    while True: # 循环直到找到有效的负样本物品
                        neg_item = np.random.randint(low=0, high=self.num_items, size=1)[0]  # 随机生成一个物品索引,确保它在物品的数量范围内
                        if neg_item not in train_user_set[user]:  # 如果该物品不是用户的已知交互物品
                            break
                    neg_items.append(neg_item)  # 将负样本物品添加到列表中
            return neg_items  # 返回负样本物品列表
    # 从边列表中提取[start, end)范围内的用户-物品对作为正样本
        ui_pairs = self.edgelist[start:end]
        users = torch.LongTensor(ui_pairs[:, 0]).to(args.device)    # 创建一个包含所有用户的张量,并移动到指定的设备
        pos_items = torch.LongTensor(ui_pairs[:, 1]).to(args.device) # 创建一个包含所有正样本物品的张量,并移动到指定的设备
        # 根据模型类型选择负采样的数量
        if args.model == "MixGCF":
            neg_items = negative_sampling(ui_pairs, self.train_user_dict, args.n_negs)
        else:
            neg_items = negative_sampling(ui_pairs, self.train_user_dict, 1)
        # 将负样本物品转换为张量,并移动到指定的设备
        neg_items = torch.LongTensor(neg_items).to(args.device)
        # 返回用户张量、正样本物品张量和负样本物品张量
        return users, pos_items, neg_items
  def shuffle(self):
      # 使用 NumPy 库中的 random.permutation 函数生成一个从 0 到 num_edges-1 的随机索引排列
      random_idx = np.random.permutation(self.num_edges)
    
      # 使用生成的随机索引对边列表进行重排序
      # 这样每条边在列表中的位置都是随机的,实现了洗牌的效果
      self.edgelist = self.edgelist[random_idx]
    
      # 同样地,使用随机索引对边的时间戳列表进行重排序
      # 保证边的时间戳与其在边列表中的新位置相对应
      self.edge_time = self.edge_time[random_idx]
        
    def _generate_binorm_adj(self, edgelist):
        adj = sp.coo_matrix((np.ones(len(edgelist)), (edgelist[:, 0], edgelist[:, 1])),
                            shape=(self.num_users, self.num_items), dtype=np.float32)
        
        a = sp.csr_matrix((self.num_users, self.num_users))
        b = sp.csr_matrix((self.num_items, self.num_items))
        adj = sp.vstack([sp.hstack([a, adj]), sp.hstack([adj.transpose(), b])])
        adj = (adj != 0) * 1.0
        degree = np.array(adj.sum(axis=-1))
        d_inv_sqrt = np.reshape(np.power(degree, -0.5), [-1])
        d_inv_sqrt[np.isinf(d_inv_sqrt)] = 0.0
        d_inv_sqrt_mat = sp.diags(d_inv_sqrt)
        adj = adj.dot(d_inv_sqrt_mat).transpose().dot(d_inv_sqrt_mat).tocoo()
        ui_adj = adj.tocsr()[:self.num_users, self.num_users:].tocoo()
        return adj
    '''
        将原始的时间戳转换为时间步,这是处理时间序列数据时常见的一种方法。
        @param timestamp_arr: 原始的时间戳数组
        方法步骤:
            1、interval_hour: 从类的实例变量中获取时间间隔,这个时间间隔定义了每个时间步代表的小时数。
            2、如果 least_time 未提供,方法将计算 timestamp_arr 中的最小值作为 least_time。这个最小值将用作后续转换的基准时间。
            3、打印出时间戳数组中的最小值、排序后的第二小的值、第三小的值和最大值。这有助于了解数据集中时间戳的分布情况。
            4、从每个时间戳中减去 least_time,这样可以将所有时间戳转换为相对于最小时间戳的偏移量。
            5、将偏移量除以 interval_hour 和 3600(一小时的秒数),这样就将时间戳转换为了时间步。这个操作基于整数除法,结果将向下取整。
            6、返回转换后的时间步数组。        
        原始的时间戳被标准化为一系列整数,每个整数代表一个时间步。这样,模型就可以使用这些时间步作为输入特征,而不需要直接处理原始的时间戳。
        这种方法在处理具有时间动态性的数据时非常有用,例如在推荐系统中捕捉用户行为随时间的变化。
    '''
    def timestamp_to_time_step(self, timestamp_arr, least_time=None):
        interval_hour = self.hour_interval
        if least_time is None:
            least_time = np.min(timestamp_arr)
            print("Least time: ", least_time)
            print("2nd least time: ", np.sort(timestamp_arr)[1])  # 倒数第2小
            print("3rd least time: ", np.sort(timestamp_arr)[2])  # 倒数第3小
            print("Max time: ", np.max(timestamp_arr))
        timestamp_arr = timestamp_arr - least_time
        timestamp_arr = timestamp_arr // (interval_hour * 3600)
        return timestamp_arr
if __name__ == '__main__':
    edgelist_dataset = EdgeListData("dataset/yelp_small/train.txt", "dataset/yelp_small/test.txt")
    # edgelist_dataset = EdgeListData("D:\\技术学习资料\\综合技术学习笔记\\图神经网络_推荐\\图神经网络\\文献\\文献\\动态异质图推荐\\GraphPro(必看,很可能将来论文就是在这篇做改进)\\GraphPro-master\\dataset\\taobao\\pretrain.txt", "D:\\技术学习资料\\综合技术学习笔记\\图神经网络_推荐\\图神经网络\\文献\\文献\\动态异质图推荐\\GraphPro(必看,很可能将来论文就是在这篇做改进)\\GraphPro-master\\dataset\\taobao\\test_1.txt")

GraphPro模型构建代码

GraphPro.py

import torch
import torch.nn as nn
from modules.base_model import BaseModel
from utils.parse_args import args
import torch.nn.functional as F
from modules.utils import EdgelistDrop
import logging
from modules.utils import scatter_add, scatter_sum
import torch_scatter
# 初始化权重的函数
init = nn.init.xavier_uniform_
logger = logging.getLogger('train_logger')  # 获取日志记录器实例
class GraphPro(BaseModel):
    def __init__(self, dataset, pretrained_model=None, phase='pretrain'):
        super().__init__(dataset)  # 调用父类的构造函数
        # 创建双归一化的邻接矩阵
        self.adj = self._make_binorm_adj(dataset.graph)
        # 获取邻接矩阵的索引和值
        self.edges = self.adj._indices().t()
        self.edge_norm = self.adj._values()
        # 获取边的时间信息
        self.edge_times = [dataset.edge_time_dict[e[0]][e[1]] for e in self.edges.cpu().tolist()]
        self.edge_times = torch.LongTensor(self.edge_times).to(args.device)
        self.phase = phase
        # 随机生成门控权重
        self.emb_gate = self.random_gate
        # 预训练或为了微调的特定层的嵌入初始化
        if self.phase == 'pretrain' or self.phase == 'for_tune':
            self.user_embedding = nn.Parameter(init(torch.empty(self.num_users, self.emb_size)))
            self.item_embedding = nn.Parameter(init(torch.empty(self.num_items, self.emb_size)))
            if self.phase == 'for_tune':
                self.emb_gate = self.random_gate
        # load gating weights from pretrained model
        # 微调阶段加载预训练模型的嵌入和门控权重
        elif self.phase == 'finetune':
            pre_user_emb, pre_item_emb = pretrained_model.generate()
            self.user_embedding = nn.Parameter(pre_user_emb).requires_grad_(True)
            self.item_embedding = nn.Parameter(pre_item_emb).requires_grad_(True)
            self.gating_weight = nn.Parameter(init(torch.empty(args.emb_size, args.emb_size)))
            self.gating_bias = nn.Parameter(init(torch.empty(1, args.emb_size)))
            self.emb_dropout = nn.Dropout(args.emb_dropout)
            # 定义门控机制
            self.emb_gate = lambda x: self.emb_dropout(torch.mul(x, torch.sigmoid(torch.matmul(x, self.gating_weight) + self.gating_bias)))
        # 定义边的dropout机制
        self.edge_dropout = EdgelistDrop()
        # 记录最大时间步长
        logger.info(f"Max Time Step: {self.edge_times.max()}")
    
    def random_gate(self, x):
        # 随机生成并归一化门控权重和偏置
        # 使用 F.normalize 函数对生成的权重参数进行归一化,确保它们的范数为 1。这有助于防止梯度消失或爆炸,并保持数值稳定性
        gating_weight = F.normalize(torch.randn((args.emb_size, args.emb_size)).to(args.device))
        gating_bias = F.normalize(torch.randn((1, args.emb_size)).to(args.device))
        # 计算门控机制的输出。
        # 首先,通过矩阵乘法将输入嵌入与权重相乘。
        # 然后,加上偏置。
        # 使用 sigmoid 函数将结果压缩到 0 和 1 之间,这样权重和偏置就控制了输入嵌入的激活程度。
        gate = torch.sigmoid(torch.matmul(x, gating_weight) + gating_bias)
        # 应用门控机制
        # 将输入嵌入向量 x 与门控信号 gate 进行逐元素乘积。
        # 这样,输入嵌入向量中的每个维度都将根据门控信号的相应值进行放大或缩小。
        return torch.mul(x, gate)
    def _agg(self, all_emb, edges, edge_norm):
        # 聚合函数,用于GNN中的信息聚合
        # all_emb: 包含所有节点(用户和物品)嵌入的张量
        # edges: 一个二维数组,其中每一行代表一条边,列分别代表边的源节点和目标节点
        # edge_norm: 边的归一化值,用于在聚合过程中进行加权
        src_emb = all_emb[edges[:, 0]]  # 源节点的嵌入
        # 双归一化
        # 提取所有嵌入中的源节点嵌入
        # 对源节点嵌入进行双归一化处理
        # 通过将源节点嵌入与对应的边的归一化值相乘来实现
        # 这里使用unsqueeze(1)来给归一化值增加一个维度,使其可以与嵌入进行逐元素乘法
        src_emb = src_emb * edge_norm.unsqueeze(1)  # all_emb中索引为edges第一列值的行
        # 使用scatter_sum函数进行聚合操作
        # scatter_sum是一种在GNN中常用的操作,它将src_emb中的值根据edges中的索引分散到目标位置
        # edges[:, 1]包含目标节点的索引
        # dim=0表示沿着第一个维度(即行)进行scatter操作
        # dim_size=self.num_users+self.num_items表示整个数据的维度大小,包括用户和物品
        # dst_emb是聚合后的目标节点嵌入
        # conv 通过scatter_sum函数进行聚合
        dst_emb = scatter_sum(src_emb, edges[:, 1], dim=0, dim_size=self.num_users+self.num_items)
        return dst_emb  # 返回更新后的目标节点嵌入
    
    def _edge_binorm(self, edges):
        # 计算边的双归一化系数
        # 使用 scatter_add 函数计算每个用户的度(即与该用户相连的边的数量)。
        # torch.ones_like(edges[:, 0]) 创建一个与 edges[:, 0] 形状相同的全1张量。
        # 将这个全1张量在第一个维度(dim=0)上累加,即对所有边的源节点索引进行累加。
        # dim_size=self.num_users 指定了累加操作的维度大小,即用户的数量。
        user_degs = scatter_add(torch.ones_like(edges[:, 0]), edges[:, 0], dim=0, dim_size=self.num_users)
        # 从计算出的度张量中取出对应于边的源节点的度。
        user_degs = user_degs[edges[:, 0]]
        # 同样地,计算每个物品的度。
        item_degs = scatter_add(torch.ones_like(edges[:, 1]), edges[:, 1], dim=0, dim_size=self.num_items)
        # 从计算出的度张量中取出对应于边的目标节点的物品的度。
        item_degs = item_degs[edges[:, 1]]
        # 计算双归一化系数。
        # 对每个用户的度取负0.5次方,对每个物品的度也取负0.5次方。
        # 然后,将这两个结果相乘,得到每条边的双归一化系数。
        norm = torch.pow(user_degs, -0.5) * torch.pow(item_degs, -0.5)
        # 返回计算出的双归一化系数。
        return norm
    # 相对时间编码函数
    # 该方法接受三个参数:edges(边的索引),edge_times(边的时间戳),max_step(可选,用于归一化的最大时间步长)
    def _relative_edge_time_encoding(self, edges, edge_times, max_step=None):
        # 相对时间编码函数,用于处理边的时间信息
        # for each node, normalize edge_times according to its neighbors
        # edge_times: [E]
        # rescal to 0-1
        edge_times = edge_times.float()  # 转换为浮点数
        # 如果没有提供max_step,则找出所有时间戳中的最大值作为max_step
        if max_step is None:
            max_step = edge_times.max()  # 获取最大时间步长
        # 将时间戳归一化到0和1之间,这样所有的时间戳都会被映射到一个统一的尺度上
        # 这有助于模型更好地处理时间信息,避免时间戳的绝对值对模型造成影响
        edge_times = (edge_times - edge_times.min()) / (max_step - edge_times.min())   # 归一化到0-1之间
        # edge_times = torch.exp(edge_times)
        # edge_times = torch.sigmoid(edge_times)
        # 获取每条边的目标节点索引
        dst_nodes = edges[:, 1]  # 目标节点
        # 使用torch_scatter库中的scatter_softmax函数计算每个目标节点的时间归一化值
        # 这个函数会对每个目标节点对应的所有时间戳执行softmax操作,生成一个时间归一化向量
        # dim_size参数指定了整个数据的维度大小,包括用户和物品
        # 这行代码使用了 torch_scatter 库中的 scatter_softmax 函数来计算时间归一化(time_norm),这是在图神经网络(GNN)中处理时间信息的一种方法。这个操作的目的是为图中的每条边(连接用户和物品的交互)生成一个时间相关的权重,这个权重反映了交互发生的时间对推荐任务的重要性。
        time_norm = torch_scatter.scatter_softmax(edge_times, dst_nodes, dim_size=self.num_users+self.num_items)   # 计算时间的softmax归一化
        # time_norm = time_norm[dst_nodes]
        # 返回计算出的时间归一化向量
        return time_norm
    def forward(self, edges, edge_norm, edge_times, max_time_step=None):
        time_norm = self._relative_edge_time_encoding(edges, edge_times, max_step=max_time_step)  # 调用_relative_edge_time_encoding方法对边的时间戳进行相对时间编码。   # 如果提供了max_time_step,则使用该值作为最大时间步长,否则使用edge_times中的最大值。
        edge_norm = edge_norm * 1/2 + time_norm * 1/2   # 合并时间归一化和边的归一化
        all_emb = torch.cat([self.user_embedding, self.item_embedding], dim=0) # 拼接用户和物品的嵌入
        all_emb = self.emb_gate(all_emb)   # 应用门控机制
        res_emb = [all_emb]   # 存储每层的嵌入
        for l in range(args.num_layers):  # 遍历所有层
            all_emb = self._agg(res_emb[-1], edges, edge_norm)  # 聚合信息
            res_emb.append(all_emb)  # 存储当前层的嵌入
        res_emb = sum(res_emb)  # 合并所有层的嵌入
        user_res_emb, item_res_emb = res_emb.split([self.num_users, self.num_items], dim=0)  # 分离用户和物品的嵌入
        return user_res_emb, item_res_emb
    
    def cal_loss(self, batch_data):
        # 应用边的dropout机制,以防止过拟合,并获取对应的掩码
        # dropout_mask是一个布尔张量,表示哪些边应该被保留
        edges, dropout_mask = self.edge_dropout(self.edges, 1-args.edge_dropout, return_mask=True)
        # 从原始的边归一化值中选择被保留的边对应的值
        edge_norm = self.edge_norm[dropout_mask]
        # 从时间戳数组中选择被保留的边对应的时间戳
        edge_times = self.edge_times[dropout_mask]
        # forward # 获取批次数据中的用户、正样本和负样本
        users, pos_items, neg_items = batch_data
        # 执行模型的前向传播,获取用户和物品的嵌入表示
        # 这里的edges是经过dropout处理后的边的索引
        # edge_norm是经过dropout处理后的边的归一化系数
        # edge_times是经过dropout处理后的边的时间戳
        user_emb, item_emb = self.forward(edges, edge_norm, edge_times)
        # 从嵌入表示中提取批次数据中用户和正样本的嵌入
        batch_user_emb = user_emb[users]
        pos_item_emb = item_emb[pos_items]
        # 从嵌入表示中提取批次数据中负样本的嵌入
        neg_item_emb = item_emb[neg_items]
        # 计算推荐损失,这里使用_bpr_loss方法,它可能是一个基于BPR(Bayesian Personalized Ranking)的损失函数
        rec_loss = self._bpr_loss(batch_user_emb, pos_item_emb, neg_item_emb)
        # 计算正则化损失,以防止模型过拟合
        # weight_decay是一个超参数,用于控制正则化的强度
        reg_loss = args.weight_decay * self._reg_loss(users, pos_items, neg_items)
        # 总损失是推荐损失和正则化损失的和
        loss = rec_loss + reg_loss
        # 创建一个字典,存储不同类型损失的值
        loss_dict = {
            "rec_loss": rec_loss.item(),  # 推荐损失的值
            "reg_loss": reg_loss.item(),  # 正则化损失的值
        }
        # 返回总损失和损失字典
        return loss, loss_dict
    
    @torch.no_grad()
    def generate(self, max_time_step=None):
        # 生成用户和物品的嵌入表示,不计算梯度
        # max_time_step参数用于指定时间步长的最大值,如果提供了该参数
        return self.forward(self.edges, self.edge_norm, self.edge_times, max_time_step=max_time_step)
    
    @torch.no_grad()
    def rating(self, user_emb, item_emb):
        # 计算用户和物品之间的评分预测
        return torch.matmul(user_emb, item_emb.t())
    
    def _reg_loss(self, users, pos_items, neg_items):
        # 计算正则化损失
        # 用于防止模型过拟合,通过惩罚大的嵌入表示
        u_emb = self.user_embedding[users]  # 获取用户的嵌入表示
        pos_i_emb = self.item_embedding[pos_items]  # 获取正样本物品的嵌入表示
        neg_i_emb = self.item_embedding[neg_items]  # 获取负样本物品的嵌入表示
        # 计算正则化损失,这里是L2范数的平方
        reg_loss = (1/2)*(u_emb.norm(2).pow(2) +
                          pos_i_emb.norm(2).pow(2) +
                          neg_i_emb.norm(2).pow(2))/float(len(users))
        return reg_loss

训练器Trainer的代码

from utils.parse_args import args
import torch
import torch.optim as optim
from tqdm import tqdm
from utils.metrics import Metric
from os import path
import numpy as np
from utils.logger import log_exceptions
import time
class Trainer(object):
    def __init__(self, dataset, logger, pre_dataset=None):
        self.dataloader = dataset
        self.pre_dataloader = pre_dataset
        self.metric = Metric()
        self.logger = logger
        self.best_perform = {'recall': [0.], 'ndcg': [0.]}
    def create_optimizer(self, model):
        self.optimizer = optim.Adam(model.parameters(
        ), lr=args.lr)
    def train_epoch(self, model, epoch_idx):
        self.dataloader.shuffle()
        # for recording loss
        s = 0
        loss_log_dict = {}
        ep_loss = 0
        time_1 = time.time()
        # start this epoch
        model.train()
        pbar = tqdm(total=self.dataloader.num_edges // args.batch_size + 1)
        while s + args.batch_size <= self.dataloader.num_edges:
            batch_data = self.dataloader.get_train_batch(s, s+args.batch_size)  # users, pos_items, neg_items
            self.optimizer.zero_grad()
            loss, loss_dict = model.cal_loss(batch_data)
            loss.backward()
            self.optimizer.step()
            ep_loss += loss.item()
            # print(loss.item())
            # record loss
            for loss_name in loss_dict:
                _loss_val = float(
                    loss_dict[loss_name]) / (self.dataloader.num_edges // args.batch_size + 1)
                if loss_name not in loss_log_dict:
                    loss_log_dict[loss_name] = _loss_val
                else:
                    loss_log_dict[loss_name] += _loss_val
            # print(loss.item())
            s += args.batch_size
            pbar.update(1)
            if self.stop_flag:
                break
        time_2 = time.time()
        loss_log_dict['train_time'] = round(time_2 - time_1, 2)
        # log
        self.logger.log_loss(epoch_idx, loss_log_dict)
    @log_exceptions
    def train(self, model):
        # self.output_emb(model)
        # self.evaluate(model, 0, self.dataloader)
        self.create_optimizer(model)
        self.best_perform = {'recall': [0.], 'ndcg': [0.]}
        self.stop_counter = 0
        self.stop_flag = False
        # self.evaluate(model, 0)
        for epoch_idx in range(args.num_epochs):
            # train
            self.train_epoch(model, epoch_idx)
            # evaluate
            self.evaluate(model, epoch_idx, self.dataloader)
            if self.stop_flag:
                break
        return self.best_perform
    @log_exceptions
    def train_finetune(self, model, pre_model):
        # verify pretrain model performances
        pre_model.eval()
        # self.logger.log("Testing Pretrained Model on Pretrain Dataset...")
        # self.logger.log_eval(self.metric.eval(pre_model, self.pre_dataloader), self.metric.k)
        # self.logger.log("Testing Pretrained Model on Test Dataset...")
        # self.logger.log_eval(self.metric.eval(pre_model, self.dataloader), self.metric.k)
        self.create_optimizer(model)
        self.best_perform = {'recall': [0.], 'ndcg': [0.]}
        self.stop_counter = 0
        self.stop_flag = False
        # self.evaluate(model, 0)
        for epoch_idx in range(args.num_epochs):
            # train
            self.train_epoch(model, epoch_idx)
            # evaluate
            self.evaluate(model, epoch_idx, self.dataloader)
            if self.stop_flag:
                break
        return self.best_perform
    def evaluate(self, model, epoch_idx, dataloader):
        model.eval()
        eval_result = self.metric.eval(model, dataloader)
        self.logger.log_eval(eval_result, self.metric.k)
        perform = eval_result['recall'][0]
        if perform > self.best_perform['recall'][0]:
            self.best_perform = eval_result
            self.logger.log('Find better model at epoch: {}: recall={}'.format(
                epoch_idx, self.best_perform['recall'][0]))
            # self.output_emb(model)
            # self.logger.log('Embedding saved!')
            if args.log:
                self.save_model(model)
                self.logger.log('Model saved!')
            self.stop_counter = 0
        else:
            self.stop_counter += 1
            if self.stop_counter >= args.early_stop_patience:
                self.logger.log('Early stop!')
                self.logger.log(f"Best performance: recall={self.best_perform['recall'][0]}, ndcg={self.best_perform['ndcg'][0]}")
                self.stop_flag = True
        model.train()
    def save_model(self, model):
        self.save_path = path.join(args.save_dir, f'saved_model_{args.exp_time}.pt')
        torch.save(model.state_dict(), self.save_path)
        pass

Dropout机制

utils.py

class EdgelistDrop(nn.Module):
    def __init__(self):
        # 构造函数,初始化EdgelistDrop模块
        super(EdgelistDrop, self).__init__()
    def forward(self, edgeList, keep_rate, return_mask=False):
        # 前向传播函数,接收三个参数:
        # edgeList - 一个二维张量,表示图中的边,每行包含一条边的两个节点索引
        # keep_rate - 保留边的概率,即dropout的比例
        # return_mask - 一个布尔值,指示是否返回一个掩码张量,该掩码指示哪些边被保留
        # 如果保留概率为1.0,则不进行dropout操作,直接返回原始的边列表
        # 并且返回一个全为True的掩码张量,表示所有边都被保留
        if keep_rate == 1.0:
            return edgeList, torch.ones(edgeList.size(0)).type(torch.bool)
        # 计算边的数量
        edgeNum = edgeList.size(0)
        # 生成一个随机张量,每个元素的值在[0, 1)之间
        # 然后将这个随机张量与保留概率相加
        # 使用floor函数向下取整,这样每个元素的值会变成0或1
        # 值为1的表示边被保留,值为0的表示边被丢弃
        mask = (torch.rand(edgeNum) + keep_rate).floor().type(torch.bool)
        # 根据掩码选择保留的边,形成新的边列表
        newEdgeList = edgeList[mask, :]
        # 如果return_mask为True,则返回新的边列表和掩码张量(用于指示哪些边被保留)
        # 否则,只返回新的边列表
        if return_mask:
            return newEdgeList, mask
        else:
            return newEdgeList

目前有点疑惑的就是双归一化矩阵的生成步骤问题!

scatter_sum()函数的理解,它是图神经网络中常用的操作之一,用于将源张量(src)中的值根据索引(index)散布到目标张量(out)的指定维度(dim)上。这个操作通常用于实现图卷积网络中的聚合步骤,其中节点的特征需要根据其邻居节点的信息进行更新。

def scatter_sum(src: torch.Tensor, index: torch.Tensor, dim: int = -1,
                out: Optional[torch.Tensor] = None,
                dim_size: Optional[int] = None) -> torch.Tensor:
    # 广播索引张量,使其与源张量的形状相匹配
    index = broadcast(index, src, dim)
    
    # 如果没有提供输出张量out,则需要创建一个新的张量
    if out is None:
        # 获取源张量的原始大小
        size = list(src.size())
        # 如果提供了dim_size,则使用它设置目标维度的大小
        if dim_size is not None:
            size[dim] = dim_size
        # 如果索引张量为空,则目标维度的大小为0
        elif index.numel() == 0:
            size[dim] = 0
        # 否则,目标维度的大小为索引的最大值加1
        else:
            size[dim] = int(index.max()) + 1
        # 创建一个新的全零张量作为输出张量
        out = torch.zeros(size, dtype=src.dtype, device=src.device)
        # 使用scatter_add_函数填充输出张量
        return out.scatter_add_(dim, index, src)
    else:
        # 如果提供了输出张量out,则直接使用它进行scatter_add_操作
        return out.scatter_add_(dim, index, src)

broadcast.py

'''
  这段代码定义了一个名为 broadcast 的函数,它用于将一个张量(src)扩展到与另一个张量(other)相同的形状,以便可以进行广播操作。在深度学习中,广播机制允许不同形状的张量在特定条件下进行逐元素操作,这在处理图数据时尤其有用,因为图的节点和边可能具有不同的维度。
'''
def broadcast(src: torch.Tensor, other: torch.Tensor, dim: int):
    # 如果提供的维度索引dim是负数,则将其转换为对应的正数索引
    if dim < 0:
        dim = other.dim() + dim  # 计算dim对应的正数维度索引
    # 如果源张量src是一维的(即一个向量),则在最前面添加维度以匹配目标张量的维度
    if src.dim() == 1:
        for _ in range(0, dim):
            src = src.unsqueeze(0)  # 在最前面添加维度
    # 继续添加维度,直到源张量src的维度与目标张量other相同
    for _ in range(src.dim(), other.dim()):
        src = src.unsqueeze(-1)  # 在最后面添加维度
    # 扩展源张量src的形状,使其与目标张量other的形状完全相同
    src = src.expand(other.size())
    # 返回扩展后的源张量src
    return src
PYTHON 复制 全屏

broadcast 函数通过 unsqueeze 方法在张量的指定位置添加维度,然后使用 expand 方法将张量扩展到与目标张量相同的形状。这样,即使输入张量的形状不匹配,也能保证它们可以在后续的操作中进行逐元素的计算。

在图神经网络中,这种操作特别重要,因为它允许模型处理具有不同节点和边数量的图。例如,在聚合节点的特征时,可能需要将节点的特征向量广播到与边的索引相匹配的形状,以便可以正确地聚合来自邻居节点的信息。通过这种方式,模型可以更有效地学习和推理图中的结构信息。

相关文章
|
3月前
|
缓存 NoSQL 应用服务中间件
Redis实战篇
Redis实战篇
|
7月前
|
机器学习/深度学习 人工智能 自然语言处理
关于Sora面世,你有哪些畅想?
2024年春节假期技术圈最大的“瓜”非Sora面世莫属,因为OpenAI最近发布的文生视频模型——Sora,为视频内容创作领域带来了重大突破,这是首个能够根据文字描述生成高质量视频的文生视频模型。据宣传消息显示,Sora能够准确理解用户指令,并生成画面生动、细节丰富、时长可达一分钟的视频大片,这一突破性的技术正引起技术圈的非常广泛的关注。那么本文就来一起探讨一下Sora如何改变视频内容创作领域,并展望未来AI在内容创作领域可能带来的新突破。
134 2
关于Sora面世,你有哪些畅想?
|
5天前
小猫咪抽奖系统1.11(有卡密功能)
一个可以用于抽奖的源码,用于公司年会,节日活动抽奖,支持自定义奖品中奖概率和奖品数量,支持设置单个账号抽奖次数,支持限制抽奖邮箱类型(如限制只能使用qq邮箱等),支持邮箱验证抽奖账户,支持自定义公告
33 12
|
3月前
|
算法
12_前K个高频元素
12_前K个高频元素
|
3月前
|
机器学习/深度学习 自然语言处理 搜索推荐
基于图神经网络的电商购买预测
基于图神经网络的电商购买预测
|
3月前
|
机器学习/深度学习 自然语言处理
机器学习查漏补缺
机器学习查漏补缺
|
3月前
|
机器学习/深度学习 搜索推荐 图计算
图基础知识
图基础知识
|
3月前
|
存储 自然语言处理 算法
【算法精讲系列】MGTE系列模型,RAG实施中的重要模型
检索增强生成(RAG)结合检索与生成技术,利用外部知识库提升大模型的回答准确性与丰富性。RAG的关键组件包括文本表示模型和排序模型,前者计算文本向量表示,后者进行精细排序。阿里巴巴通义实验室推出的GTE-Multilingual系列模型,具备高性能、长文档支持、多语言处理及弹性向量表示等特性,显著提升了RAG系统的检索与排序效果。该系列模型已在多个数据集上展示出优越性能,并支持多语言和长文本处理,适用于各种复杂应用场景。
|
3月前
|
算法
01_二叉树的递归遍历
01_二叉树的递归遍历
|
4月前
|
机器学习/深度学习 监控 API
基于云计算的机器学习模型部署与优化
【8月更文第17天】随着云计算技术的发展,越来越多的数据科学家和工程师开始使用云平台来部署和优化机器学习模型。本文将介绍如何在主要的云计算平台上部署机器学习模型,并讨论模型优化策略,如模型压缩、超参数调优以及分布式训练。
814 2