GeaFlow图计算快速上手之K-hop算法

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 随着年轻人的社交需求不断增长,各种社交软件应运而生,这些社交软件通常都会有好友推荐功能,根据六度分离理论,理想情况下,每个人通过6个人就可以跟所有人产生关联,因此K-hop算法(K跳算法)被用于实现好友推荐,现在让我们来尝试使用GeaFlow在5分钟内实现K-hop算法吧!

引言

随着年轻人的社交需求不断增长,各种社交软件应运而生,这些社交软件通常都会有好友推荐功能,根据六度分离理论,理想情况下,每个人通过6个人就可以跟所有人产生关联,因此K-hop算法(K跳算法)被用于实现好友推荐,现在让我们来尝试使用GeaFlow在5分钟内实现K-hop算法吧!

K-hop(K跳)算法介绍

K-hop算法是一种基于图论的算法,用于寻找一个起点通过K次以内跳跃能够到达的节点,也就是从起点出发,找出K层内与之关联的节点。K-hop算法广泛应用于好友推荐、影响力预测和关系发现等场景。
K-hop算法本质上是一种广度优先搜索(BFS)算法,通过从起点开始不断向外扩散的方式来计算每一个节点到起点的跳跃数。算法流程如下:

1.png

GeaFlow实现K-hop算法

首先需要通过实现AlgorithmUserFunction接口来编写K-hop算法的UDGA,K-hop算法的参考实现如下:

package com.antfin.rayag.myUDF;
import com.antgroup.geaflow.common.type.primitive.IntegerType;
import com.antgroup.geaflow.common.type.primitive.StringType;
import com.antgroup.geaflow.dsl.common.algo.AlgorithmRuntimeContext;
import com.antgroup.geaflow.dsl.common.algo.AlgorithmUserFunction;
import com.antgroup.geaflow.dsl.common.data.RowEdge;
import com.antgroup.geaflow.dsl.common.data.RowVertex;
import com.antgroup.geaflow.dsl.common.data.impl.ObjectRow;
import com.antgroup.geaflow.dsl.common.data.impl.types.IntVertex;
import com.antgroup.geaflow.dsl.common.function.Description;
import com.antgroup.geaflow.dsl.common.types.StructType;
import com.antgroup.geaflow.dsl.common.types.TableField;
import com.antgroup.geaflow.model.graph.edge.EdgeDirection;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@Description(name = "khop", description = "built-in udga for KHop")
public class KHop implements AlgorithmUserFunction<Object, Integer> {
    private AlgorithmRuntimeContext<Object, Integer> context;
    private int srcId = 1;
    private int k = 1;
    @Override
    public void init(AlgorithmRuntimeContext<Object, Integer> context, Object[] parameters) {
        this.context = context;
        if (parameters.length > 2) {
            throw new IllegalArgumentException(
                    "Only support zero or more arguments, false arguments "
                            + "usage: func([alpha, [convergence, [max_iteration]]])");
        }
        if (parameters.length > 0) {
            srcId = Integer.parseInt(String.valueOf(parameters[0]));
        }
        if (parameters.length > 1) {
            k = Integer.parseInt(String.valueOf(parameters[1]));
        }
    }
    @Override
    public void process(RowVertex vertex, Iterator<Integer> messages) {
        List<RowEdge> outEdges = new ArrayList<>(context.loadEdges(EdgeDirection.OUT));
        //第一轮迭代将所有顶点初始化,目标点的K值初始化为0,并向邻点发送消息,其他点的K值初始化为Integer.MAX_VALUE
        if (context.getCurrentIterationId() == 1L) {
            if(srcId == (int) vertex.getId()) {
                sendMessageToNeighbors(outEdges, 1);
                context.updateVertexValue(ObjectRow.create(0));
                context.take(ObjectRow.create(vertex.getId(), 0));
            }else{
                context.updateVertexValue(ObjectRow.create(Integer.MAX_VALUE));
            }
        } else if (context.getCurrentIterationId() <= k+1) {
            int currentK = (int) vertex.getValue().getField(0, IntegerType.INSTANCE);
            //如果当前顶点收到消息,并且K值为Integer.MAX_VALUE(没有被遍历到),则本轮应该修改K值,并向邻边发消息
            if(messages.hasNext() && currentK == Integer.MAX_VALUE){
                Integer currK = messages.next();
                //将当前顶点写出
                context.take(ObjectRow.create(vertex.getId(), currK));
                //更新当前顶点的K值
                context.updateVertexValue(ObjectRow.create(currK));
                //向邻点发消息
                sendMessageToNeighbors(outEdges, currK+1);
            }
        }
    }
    //设置输出类型
    @Override
    public StructType getOutputType() {
        return new StructType(
                new TableField("id", IntegerType.INSTANCE, false),
                new TableField("k", IntegerType.INSTANCE, false)
        );
    }
    private void sendMessageToNeighbors(List<RowEdge> outEdges, Integer message) {
        for (RowEdge rowEdge : outEdges) {
            context.sendMessage(rowEdge.getTargetId(), message);
        }
    }
}

Geaflow运行K-hop算法实战

1. 将KHop类打包成UDGA

新建一个maven工程,将KHop类放/src/main/java目录下,pom文件中需要添加如下依赖:

<dependency>
            <groupId>com.antgroup.tugraph</groupId>
            <artifactId>geaflow-dsl-common</artifactId>
            <version>0.1</version>
</dependency>

参考https://github.com/TuGraph-family/tugraph-analytics/blob/master/docs/docs-cn/application-development/dsl/overview.md

2. 将UDGA上传至geaflow-console平台

2.png

3. 注册khop函数,并在DSL中使用

set geaflow.dsl.window.size = -1;
set geaflow.dsl.ignore.exception = true;
CREATE GRAPH IF NOT EXISTS g (
  Vertex v (
    vid int ID,
    vvalue int
  ),
  Edge e (
    srcId int SOURCE ID,
    targetId int DESTINATION ID
  )
) WITH (
  storeType='rocksdb',
  shardCount = 1
);
CREATE TABLE IF NOT EXISTS v_source (
    v_id int,
    v_value int
) WITH (
  type='file',
  //vertex文件中保存了点的信息,文件放在与KHop类目录下的resources目录下,此处可以换成其他数据源
  geaflow.dsl.file.path = 'resource:///input/vertex'
);
CREATE TABLE IF NOT EXISTS e_source (
    src_id int,
    dst_id int
) WITH (
  type='file',
    //edge文件中保存了边的信息,文件放在与KHop类目录下的resources目录下,此处可以换成其他数据源
  geaflow.dsl.file.path = 'resource:///input/edge'
);
//定义结果表
CREATE TABLE IF NOT EXISTS tbl_result (
  v_id int,
  k_value int
) WITH (
  type='file',
   geaflow.dsl.file.path = '/tmp/result'
);
USE GRAPH g;
INSERT INTO g.v(vid, vvalue)
SELECT
v_id, v_value
FROM v_source;
INSERT INTO g.e(srcId, targetId)
SELECT
 src_id, dst_id
FROM e_source;
//注册khop函数
CREATE Function khop AS 'com.antfin.rayag.myUDF.KHop';
INSERT INTO tbl_result(v_id, k_value)
//调用khop函数,并返回结果
CALL khop(1,2) YIELD (vid, kValue)
RETURN vid, kValue
;

4. 运行结果

4.1. 输入数据如下

//vertex文件内容:
1,1
2,1
3,1
4,1
5,1
6,1
//edge文件内容:
1,3
1,5
1,6
2,3
3,4
4,1
4,6
5,4
5,6

4.2. 在container的/tmp/result文件中可以得到结果如下

1,0
3,1
5,1
6,1
4,2

至此,我们就成功使用Geaflow实现并运行了K-hop算法了!是不是超简单!快来试一试吧!

GeaFlow(品牌名TuGraph-Analytics) 已正式开源,欢迎大家关注!!!

欢迎给我们 Star 哦! GitHub👉 https://github.com/TuGraph-family/tugraph-analytics

更多精彩内容,关注我们的博客 https://geaflow.github.io/

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1天前
|
机器学习/深度学习 算法
递归算法题练习(数的计算、带备忘录的递归、计算函数值)
递归算法题练习(数的计算、带备忘录的递归、计算函数值)
|
1天前
|
算法 Java
算法:Java计算二叉树从根节点到叶子结点的最大路径和
算法:Java计算二叉树从根节点到叶子结点的最大路径和
|
1天前
|
资源调度 算法 块存储
m基于遗传优化的LDPC码OMS译码算法最优偏移参数计算和误码率matlab仿真
MATLAB2022a仿真实现了遗传优化的LDPC码OSD译码算法,通过自动搜索最佳偏移参数ΔΔ以提升纠错性能。该算法结合了低密度奇偶校验码和有序统计译码理论,利用遗传算法进行全局优化,避免手动调整,提高译码效率。核心程序包括编码、调制、AWGN信道模拟及软输入软输出译码等步骤,通过仿真曲线展示了不同SNR下的误码率性能。
9 1
|
1天前
|
算法 Serverless
m基于遗传优化的LDPC码NMS译码算法最优归一化参数计算和误码率matlab仿真
MATLAB 2022a仿真实现了遗传优化的归一化最小和(NMS)译码算法,应用于低密度奇偶校验(LDPC)码。结果显示了遗传优化的迭代过程和误码率对比。遗传算法通过选择、交叉和变异操作寻找最佳归一化因子,以提升NMS译码性能。核心程序包括迭代优化、目标函数计算及性能绘图。最终,展示了SNR与误码率的关系,并保存了关键数据。
15 1
|
1天前
|
算法 TensorFlow 算法框架/工具
基于直方图的图像阈值计算和分割算法FPGA实现,包含tb测试文件和MATLAB辅助验证
这是一个关于图像处理的算法实现摘要,主要包括四部分:展示了四张算法运行的效果图;提到了使用的软件版本为VIVADO 2019.2和matlab 2022a;介绍了算法理论,即基于直方图的图像阈值分割,通过灰度直方图分布选取阈值来区分图像区域;并提供了部分Verilog代码,该代码读取图像数据,进行处理,并输出结果到&quot;result.txt&quot;以供MATLAB显示图像分割效果。
|
1天前
|
存储 算法
图的深度优先算法
图的深度优先算法
19 0
|
1天前
|
算法 数据可视化 大数据
圆堆图circle packing算法可视化分析电商平台网红零食销量采集数据
圆堆图circle packing算法可视化分析电商平台网红零食销量采集数据
|
1天前
|
算法
bellman_ford算法与dijkstra为什么dijkstra算法不能计算带有负权边图
bellman_ford算法与dijkstra为什么dijkstra算法不能计算带有负权边图
23 0
|
1天前
|
算法 数据安全/隐私保护 计算机视觉
基于二维CS-SCHT变换和LABS方法的水印嵌入和提取算法matlab仿真
该内容包括一个算法的运行展示和详细步骤,使用了MATLAB2022a。算法涉及水印嵌入和提取,利用LAB色彩空间可能用于隐藏水印。水印通过二维CS-SCHT变换、低频系数处理和特定解码策略来提取。代码段展示了水印置乱、图像处理(如噪声、旋转、剪切等攻击)以及水印的逆置乱和提取过程。最后,计算并保存了比特率,用于评估水印的稳健性。
|
1天前
|
算法 计算机视觉
基于高斯混合模型的视频背景提取和人员跟踪算法matlab仿真
该内容是关于使用MATLAB2013B实现基于高斯混合模型(GMM)的视频背景提取和人员跟踪算法。算法通过GMM建立背景模型,新帧与模型比较,提取前景并进行人员跟踪。文章附有程序代码示例,展示从读取视频到结果显示的流程。最后,结果保存在Result.mat文件中。