GeaFlow图计算快速上手之K-hop算法

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 随着年轻人的社交需求不断增长,各种社交软件应运而生,这些社交软件通常都会有好友推荐功能,根据六度分离理论,理想情况下,每个人通过6个人就可以跟所有人产生关联,因此K-hop算法(K跳算法)被用于实现好友推荐,现在让我们来尝试使用GeaFlow在5分钟内实现K-hop算法吧!

引言

随着年轻人的社交需求不断增长,各种社交软件应运而生,这些社交软件通常都会有好友推荐功能,根据六度分离理论,理想情况下,每个人通过6个人就可以跟所有人产生关联,因此K-hop算法(K跳算法)被用于实现好友推荐,现在让我们来尝试使用GeaFlow在5分钟内实现K-hop算法吧!

K-hop(K跳)算法介绍

K-hop算法是一种基于图论的算法,用于寻找一个起点通过K次以内跳跃能够到达的节点,也就是从起点出发,找出K层内与之关联的节点。K-hop算法广泛应用于好友推荐、影响力预测和关系发现等场景。
K-hop算法本质上是一种广度优先搜索(BFS)算法,通过从起点开始不断向外扩散的方式来计算每一个节点到起点的跳跃数。算法流程如下:

1.png

GeaFlow实现K-hop算法

首先需要通过实现AlgorithmUserFunction接口来编写K-hop算法的UDGA,K-hop算法的参考实现如下:

package com.antfin.rayag.myUDF;
import com.antgroup.geaflow.common.type.primitive.IntegerType;
import com.antgroup.geaflow.common.type.primitive.StringType;
import com.antgroup.geaflow.dsl.common.algo.AlgorithmRuntimeContext;
import com.antgroup.geaflow.dsl.common.algo.AlgorithmUserFunction;
import com.antgroup.geaflow.dsl.common.data.RowEdge;
import com.antgroup.geaflow.dsl.common.data.RowVertex;
import com.antgroup.geaflow.dsl.common.data.impl.ObjectRow;
import com.antgroup.geaflow.dsl.common.data.impl.types.IntVertex;
import com.antgroup.geaflow.dsl.common.function.Description;
import com.antgroup.geaflow.dsl.common.types.StructType;
import com.antgroup.geaflow.dsl.common.types.TableField;
import com.antgroup.geaflow.model.graph.edge.EdgeDirection;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@Description(name = "khop", description = "built-in udga for KHop")
public class KHop implements AlgorithmUserFunction<Object, Integer> {
    private AlgorithmRuntimeContext<Object, Integer> context;
    private int srcId = 1;
    private int k = 1;
    @Override
    public void init(AlgorithmRuntimeContext<Object, Integer> context, Object[] parameters) {
        this.context = context;
        if (parameters.length > 2) {
            throw new IllegalArgumentException(
                    "Only support zero or more arguments, false arguments "
                            + "usage: func([alpha, [convergence, [max_iteration]]])");
        }
        if (parameters.length > 0) {
            srcId = Integer.parseInt(String.valueOf(parameters[0]));
        }
        if (parameters.length > 1) {
            k = Integer.parseInt(String.valueOf(parameters[1]));
        }
    }
    @Override
    public void process(RowVertex vertex, Iterator<Integer> messages) {
        List<RowEdge> outEdges = new ArrayList<>(context.loadEdges(EdgeDirection.OUT));
        //第一轮迭代将所有顶点初始化,目标点的K值初始化为0,并向邻点发送消息,其他点的K值初始化为Integer.MAX_VALUE
        if (context.getCurrentIterationId() == 1L) {
            if(srcId == (int) vertex.getId()) {
                sendMessageToNeighbors(outEdges, 1);
                context.updateVertexValue(ObjectRow.create(0));
                context.take(ObjectRow.create(vertex.getId(), 0));
            }else{
                context.updateVertexValue(ObjectRow.create(Integer.MAX_VALUE));
            }
        } else if (context.getCurrentIterationId() <= k+1) {
            int currentK = (int) vertex.getValue().getField(0, IntegerType.INSTANCE);
            //如果当前顶点收到消息,并且K值为Integer.MAX_VALUE(没有被遍历到),则本轮应该修改K值,并向邻边发消息
            if(messages.hasNext() && currentK == Integer.MAX_VALUE){
                Integer currK = messages.next();
                //将当前顶点写出
                context.take(ObjectRow.create(vertex.getId(), currK));
                //更新当前顶点的K值
                context.updateVertexValue(ObjectRow.create(currK));
                //向邻点发消息
                sendMessageToNeighbors(outEdges, currK+1);
            }
        }
    }
    //设置输出类型
    @Override
    public StructType getOutputType() {
        return new StructType(
                new TableField("id", IntegerType.INSTANCE, false),
                new TableField("k", IntegerType.INSTANCE, false)
        );
    }
    private void sendMessageToNeighbors(List<RowEdge> outEdges, Integer message) {
        for (RowEdge rowEdge : outEdges) {
            context.sendMessage(rowEdge.getTargetId(), message);
        }
    }
}

Geaflow运行K-hop算法实战

1. 将KHop类打包成UDGA

新建一个maven工程,将KHop类放/src/main/java目录下,pom文件中需要添加如下依赖:

<dependency>
            <groupId>com.antgroup.tugraph</groupId>
            <artifactId>geaflow-dsl-common</artifactId>
            <version>0.1</version>
</dependency>

参考https://github.com/TuGraph-family/tugraph-analytics/blob/master/docs/docs-cn/application-development/dsl/overview.md

2. 将UDGA上传至geaflow-console平台

2.png

3. 注册khop函数,并在DSL中使用

set geaflow.dsl.window.size = -1;
set geaflow.dsl.ignore.exception = true;
CREATE GRAPH IF NOT EXISTS g (
  Vertex v (
    vid int ID,
    vvalue int
  ),
  Edge e (
    srcId int SOURCE ID,
    targetId int DESTINATION ID
  )
) WITH (
  storeType='rocksdb',
  shardCount = 1
);
CREATE TABLE IF NOT EXISTS v_source (
    v_id int,
    v_value int
) WITH (
  type='file',
  //vertex文件中保存了点的信息,文件放在与KHop类目录下的resources目录下,此处可以换成其他数据源
  geaflow.dsl.file.path = 'resource:///input/vertex'
);
CREATE TABLE IF NOT EXISTS e_source (
    src_id int,
    dst_id int
) WITH (
  type='file',
    //edge文件中保存了边的信息,文件放在与KHop类目录下的resources目录下,此处可以换成其他数据源
  geaflow.dsl.file.path = 'resource:///input/edge'
);
//定义结果表
CREATE TABLE IF NOT EXISTS tbl_result (
  v_id int,
  k_value int
) WITH (
  type='file',
   geaflow.dsl.file.path = '/tmp/result'
);
USE GRAPH g;
INSERT INTO g.v(vid, vvalue)
SELECT
v_id, v_value
FROM v_source;
INSERT INTO g.e(srcId, targetId)
SELECT
 src_id, dst_id
FROM e_source;
//注册khop函数
CREATE Function khop AS 'com.antfin.rayag.myUDF.KHop';
INSERT INTO tbl_result(v_id, k_value)
//调用khop函数,并返回结果
CALL khop(1,2) YIELD (vid, kValue)
RETURN vid, kValue
;

4. 运行结果

4.1. 输入数据如下

//vertex文件内容:
1,1
2,1
3,1
4,1
5,1
6,1
//edge文件内容:
1,3
1,5
1,6
2,3
3,4
4,1
4,6
5,4
5,6

4.2. 在container的/tmp/result文件中可以得到结果如下

1,0
3,1
5,1
6,1
4,2

至此,我们就成功使用Geaflow实现并运行了K-hop算法了!是不是超简单!快来试一试吧!

GeaFlow(品牌名TuGraph-Analytics) 已正式开源,欢迎大家关注!!!

欢迎给我们 Star 哦! GitHub👉 https://github.com/TuGraph-family/tugraph-analytics

更多精彩内容,关注我们的博客 https://geaflow.github.io/

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
2月前
|
算法 机器人
基于SOA海鸥优化算法的PID控制器最优控制参数计算matlab仿真
本课题研究基于海鸥优化算法(SOA)优化PID控制器参数的方法,通过MATLAB仿真对比传统PID控制效果。利用SOA算法优化PID的kp、ki、kd参数,以积分绝对误差(IAE)为适应度函数,提升系统响应速度与稳定性。仿真结果表明,SOA优化的PID控制器在阶跃响应和误差控制方面均优于传统方法,具有更快的收敛速度和更强的全局寻优能力,适用于复杂系统的参数整定。
|
6月前
|
算法 JavaScript 数据安全/隐私保护
基于GA遗传优化的最优阈值计算认知异构网络(CHN)能量检测算法matlab仿真
本内容介绍了一种基于GA遗传优化的阈值计算方法在认知异构网络(CHN)中的应用。通过Matlab2022a实现算法,完整代码含中文注释与操作视频。能量检测算法用于感知主用户信号,其性能依赖检测阈值。传统固定阈值方法易受噪声影响,而GA算法通过模拟生物进化,在复杂环境中自动优化阈值,提高频谱感知准确性,增强CHN的通信效率与资源利用率。预览效果无水印,核心程序部分展示,适合研究频谱感知与优化算法的学者参考。
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
241 0
|
8月前
|
算法 数据安全/隐私保护
基于Big-Bang-Big-Crunch(BBBC)算法的目标函数最小值计算matlab仿真
该程序基于Big-Bang-Big-Crunch (BBBC)算法,在MATLAB2022A中实现目标函数最小值的计算与仿真。通过模拟宇宙大爆炸和大收缩过程,算法在解空间中搜索最优解。程序初始化随机解集,经过扩张和收缩阶段逐步逼近全局最优解,并记录每次迭代的最佳适应度。最终输出最佳解及其对应的目标函数最小值,并绘制收敛曲线展示优化过程。 核心代码实现了主循环、粒子位置更新、适应度评估及最优解更新等功能。程序运行后无水印,提供清晰的结果展示。
210 14
|
JSON 算法 数据可视化
测试专项笔记(一): 通过算法能力接口返回的检测结果完成相关指标的计算(目标检测)
这篇文章是关于如何通过算法接口返回的目标检测结果来计算性能指标的笔记。它涵盖了任务描述、指标分析(包括TP、FP、FN、TN、精准率和召回率),接口处理,数据集处理,以及如何使用实用工具进行文件操作和数据可视化。文章还提供了一些Python代码示例,用于处理图像文件、转换数据格式以及计算目标检测的性能指标。
310 0
测试专项笔记(一): 通过算法能力接口返回的检测结果完成相关指标的计算(目标检测)
|
算法 数据可视化 数据安全/隐私保护
基于LK光流提取算法的图像序列晃动程度计算matlab仿真
该算法基于Lucas-Kanade光流方法,用于计算图像序列的晃动程度。通过计算相邻帧间的光流场并定义晃动程度指标(如RMS),可量化图像晃动。此版本适用于Matlab 2022a,提供详细中文注释与操作视频。完整代码无水印。
|
算法 C++
如何精确计算出一个算法的CPU运行时间?
如何精确计算出一个算法的CPU运行时间?
|
1月前
|
机器学习/深度学习 算法 机器人
【水下图像增强融合算法】基于融合的水下图像与视频增强研究(Matlab代码实现)
【水下图像增强融合算法】基于融合的水下图像与视频增强研究(Matlab代码实现)
202 0
|
1月前
|
数据采集 分布式计算 并行计算
mRMR算法实现特征选择-MATLAB
mRMR算法实现特征选择-MATLAB
150 2
|
2月前
|
传感器 机器学习/深度学习 编解码
MATLAB|主动噪声和振动控制算法——对较大的次级路径变化具有鲁棒性
MATLAB|主动噪声和振动控制算法——对较大的次级路径变化具有鲁棒性
203 3

热门文章

最新文章