GeaFlow任务能力增强:通过API定制流图计算逻辑

简介: GeaFlow API是对高阶用户提供的开发接口,用户可以直接通过编写java代码来编写计算作业,相比于DSL,API的方式开发更加灵活,也能实现更丰富的功能和更复杂的计算逻辑。

GeaFlow(品牌名TuGraph-Analytics) 已正式开源,欢迎大家关注!!! 欢迎给我们 Star 哦! GitHub👉https://github.com/TuGraph-family/tugraph-analytics
更多精彩内容,关注我们的博客 https://geaflow.github.io/


GeaFlow API介绍

GeaFlow API是对高阶用户提供的开发接口,用户可以直接通过编写java代码来编写计算作业,相比于DSL,API的方式开发更加灵活,也能实现更丰富的功能和更复杂的计算逻辑。
在GeaFlow中,API支持Graph API和Stream API两种类型:

  • Graph API:Graph是GeaFlow框架的一等公民,当前GeaFlow框架提供了一套基于GraphView的图计算编程接口,包含图构建、图计算及遍历。在GeaFlow中支持Static Graph和Dynamic Graph两种类型。
    • Static Graph API:静态图计算API,基于该类API可以进行全量的图计算或图遍历。
    • Dynamic Graph API:动态图计算API,GeaFlow中GraphView是动态图的数据抽象,基于GraphView
      之上,可以进行动态图计算或图遍历。同时支持对Graphview生成Snapshot快照,基于Snapshot可以提供和Static Graph API一样的接口能力。

001.jpeg

  • Stream API:GeaFlow提供了一套通用计算的编程接口,包括source构建、流批计算及sink输出。在GeaFlow中支持Batch和Stream两种类型。
    • Batch API:批计算API,基于该类API可以进行批量计算。
    • Stream API:流计算API,GeaFlow中StreamView是动态流的数据抽象,基于StreamView之上,可以进行流计算。

更多API的介绍可参考 https://github.com/TuGraph-family/tugraph-analytics/blob/master/docs/docs-cn/application-development/api/overview.md

PageRank算法示例

本例子是从文件中读取点边进行构图,执行pageRank算法后,将每个点的pageRank值进行打印。
其中,用户需要实现AbstractVcFunc,在compute方法中进行每一轮迭代的计算逻辑。
在本例子中,只计算了两轮迭代的结果。在第一轮中,每个点都会向邻居点发送当前点的value值,而在第二轮中,每个点收到邻居点发送的消息,将其value值进行累加,并更新为自己的value值,即为最后的PageRank值。

public class PageRank {
   
   

    private static final Logger LOGGER = LoggerFactory.getLogger(PageRank.class);

    public static final String RESULT_FILE_PATH = "./target/tmp/data/result/pagerank";

    private static final double alpha = 0.85;

    public static void main(String[] args) {
   
   
        Environment environment = EnvironmentUtil.loadEnvironment(args);
        IPipelineResult result = PageRank.submit(environment);
        PipelineResultCollect.get(result);
        environment.shutdown();
    }

    public static IPipelineResult submit(Environment environment) {
   
   
        Pipeline pipeline = PipelineFactory.buildPipeline(environment);
        Configuration envConfig = environment.getEnvironmentContext().getConfig();
        envConfig.put(FileSink.OUTPUT_DIR, RESULT_FILE_PATH);
        ResultValidator.cleanResult(RESULT_FILE_PATH);

        pipeline.submit((PipelineTask) pipelineTaskCxt -> {
   
   
            Configuration conf = pipelineTaskCxt.getConfig();
            PWindowSource<IVertex<Integer, Double>> prVertices =
                pipelineTaskCxt.buildSource(new FileSource<>("email_vertex",
                        line -> {
   
   
                            String[] fields = line.split(",");
                            IVertex<Integer, Double> vertex = new ValueVertex<>(
                                Integer.valueOf(fields[0]), Double.valueOf(fields[1]));
                            return Collections.singletonList(vertex);
                        }), AllWindow.getInstance())
                    .withParallelism(conf.getInteger(ExampleConfigKeys.SOURCE_PARALLELISM));

            PWindowSource<IEdge<Integer, Integer>> prEdges = pipelineTaskCxt.buildSource(new FileSource<>("email_edge",
                    line -> {
   
   
                        String[] fields = line.split(",");
                        IEdge<Integer, Integer> edge = new ValueEdge<>(Integer.valueOf(fields[0]), Integer.valueOf(fields[1]), 1);
                        return Collections.singletonList(edge);
                    }), AllWindow.getInstance())
                .withParallelism(conf.getInteger(ExampleConfigKeys.SOURCE_PARALLELISM));

            int iterationParallelism = conf.getInteger(ExampleConfigKeys.ITERATOR_PARALLELISM);
            GraphViewDesc graphViewDesc = GraphViewBuilder
                .createGraphView(GraphViewBuilder.DEFAULT_GRAPH)
                .withShardNum(2)
                .withBackend(BackendType.Memory)
                .build();
            PGraphWindow<Integer, Double, Integer> graphWindow =
                pipelineTaskCxt.buildWindowStreamGraph(prVertices, prEdges, graphViewDesc);

            SinkFunction<IVertex<Integer, Double>> sink = ExampleSinkFunctionFactory.getSinkFunction(conf);
            graphWindow.compute(new PRAlgorithms(10))
                .compute(iterationParallelism)
                .getVertices()
                .sink(v -> {
   
   
                    LOGGER.info("result {}", v);
                })
                .withParallelism(conf.getInteger(ExampleConfigKeys.SINK_PARALLELISM));
        });

        return pipeline.execute();
    }

    public static class PRAlgorithms extends VertexCentricCompute<Integer, Double, Integer, Double> {
   
   

        public PRAlgorithms(long iterations) {
   
   
            super(iterations);
        }

        @Override
        public VertexCentricComputeFunction<Integer, Double, Integer, Double> getComputeFunction() {
   
   
            return new PRVertexCentricComputeFunction();
        }

        @Override
        public VertexCentricCombineFunction<Double> getCombineFunction() {
   
   
            return null;
        }

    }

    public static class PRVertexCentricComputeFunction extends AbstractVcFunc<Integer, Double, Integer, Double> {
   
   

        @Override
        public void compute(Integer vertexId,
                            Iterator<Double> messageIterator) {
   
   
            IVertex<Integer, Double> vertex = this.context.vertex().get();
            List<IEdge<Integer, Integer>> outEdges = context.edges().getOutEdges();
            if (this.context.getIterationId() == 1) {
   
   
                if (!outEdges.isEmpty()) {
   
   
                    this.context.sendMessageToNeighbors(vertex.getValue() / outEdges.size());
                }

            } else {
   
   
                double sum = 0;
                while (messageIterator.hasNext()) {
   
   
                    double value = messageIterator.next();
                    sum += value;
                }
                double pr = sum * alpha + (1 - alpha);
                this.context.setNewVertexValue(pr);

                if (!outEdges.isEmpty()) {
   
   
                    this.context.sendMessageToNeighbors(pr / outEdges.size());
                }
            }
        }

    }
}

提交API作业

(以容器模式,PageRank算法示例)

算法打包

在新的项目中新建一个PageRank的demo,pom中引入geaflow依赖

<dependency>
    <groupId>com.antgroup.tugraph</groupId>
    <artifactId>geaflow-assembly</artifactId>
    <version>0.2-SNAPSHOT</version>
</dependency>

新建PageRank类,编写上述相关代码。
在项目resources路径下,创建测试数据文件email_vertex和email_edge,代码中会从resources://资源路径读取数据进行构图。

 PWindowSource<IVertex<Integer, Double>> prVertices =
                pipelineTaskCxt.buildSource(new FileSource<>("email_vertex",
                        line -> {
   
   
                            String[] fields = line.split(",");
                            IVertex<Integer, Double> vertex = new ValueVertex<>(
                                Integer.valueOf(fields[0]), Double.valueOf(fields[1]));
                            return Collections.singletonList(vertex);
                        }), AllWindow.getInstance())
                    .withParallelism(conf.getInteger(ExampleConfigKeys.SOURCE_PARALLELISM));

   PWindowSource<IEdge<Integer, Integer>> prEdges = pipelineTaskCxt.buildSource(new FileSource<>("email_edge",
                    line -> {
   
   
                        String[] fields = line.split(",");
                        IEdge<Integer, Integer> edge = new ValueEdge<>(Integer.valueOf(fields[0]), Integer.valueOf(fields[1]), 1);
                        return Collections.singletonList(edge);
                    }), AllWindow.getInstance())
                .withParallelism(conf.getInteger(ExampleConfigKeys.SOURCE_PARALLELISM));

email_vertex

0,1
1,1
2,1
3,1
4,1
5,1
6,1
7,1
8,1
9,1

email_edge

4,3
0,1
2,3
4,6
2,4
6,8
0,2
4,8
0,5
0,7
0,8
9,0
7,0
7,1
7,2
9,5
3,0
7,4
5,3
7,5
1,0
5,4
9,8
3,4
7,9
3,7
3,8
1,6
8,0
6,0
6,2
8,5
4,2

maven打包,在target目录获取算法的jar包

mvn clean install

新增HLA图任务

在GeaFlow Console中新增图任务,任务类型选择“HLA”, 并上传jar包(或者选择已存在的jar包),其中entryClass为算法主函数所在的类。 点击“提交”,创建任务。

002.png

提交作业

003.png

点击"发布",可进入作业详情界面,点击“提交”即可提交作业。

004.png

查看运行结果

进入容器 /tmp/logs/task/ 目录下,查看对应作业的日志,可看到日志中打印了最终计算得到的每个点的pageRank值。

2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:0, value:1.5718675107490019)
2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:1, value:0.5176947080197076)
2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:2, value:1.0201253300467092)
2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:3, value:1.3753756869824914)
2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:4, value:1.4583114077692536)
2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:5, value:1.1341668910561529)
2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:6, value:0.6798184364673463)
2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:7, value:0.70935427506243)
2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:8, value:1.2827529511906106)
2023-08-01 16:51:38 INFO  PageRank:107 - result ValueVertex(vertexId:9, value:0.2505328026562969)

可在作业详情中查看运行详情,

005.png

至此,我们就成功使用Geaflow实现并运行API任务了!是不是超简单!快来试一试吧!


GeaFlow(品牌名TuGraph-Analytics) 已正式开源,欢迎大家关注!!!

欢迎给我们 Star 哦!

Welcome to give us a Star!

GitHub👉https://github.com/TuGraph-family/tugraph-analytics

更多精彩内容,关注我们的博客 https://geaflow.github.io/

相关文章
|
存储 算法 关系型数据库
实时计算 Flink版产品使用合集之在Flink Stream API中,可以在任务启动时初始化一些静态的参数并将其存储在内存中吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
227 4
|
4月前
|
JSON 数据可视化 API
产品经理的技术必修课:四步掌握API设计核心逻辑
产品经理的技术必修课:四步掌握API设计核心逻辑
209 83
|
29天前
|
数据采集 API
京东:调用用户行为API分析购买路径,优化页面跳转逻辑
京东通过整合用户行为API,构建购买路径分析体系,运用马尔可夫链模型识别高损耗、断裂与冗余路径,优化页面跳转逻辑。实施流程合并、预加载及实时干预策略,转化率提升30.2%,路径缩短34.9%,跳转失败率下降78.7%,实现数据驱动的精细化运营。
241 0
|
2月前
|
安全 API 区块链
数据资产化新范式:API如何重构企业增长逻辑?
在数字经济时代,数据已成为企业核心资产,而API作为连接数据与业务的桥梁,正驱动企业释放数据价值、实现业务增长。本文通过电商、金融、医疗与政务领域的典型案例,解析API如何助力企业提升效率、优化服务、拓展生态,并探讨其商业价值实现路径与未来趋势。
数据资产化新范式:API如何重构企业增长逻辑?
|
3月前
|
缓存 安全 前端开发
3个月GMV破5000万:揭秘某家居品牌靠API接口逆袭的底层逻辑
本文详解如何利用电商API从零搭建定制化商城,涵盖需求分析、技术选型、开发流程与安全优化,并结合实战案例,助力中小商家突破竞争,实现高效电商转型。
|
6月前
|
存储 安全 API
秘密任务 1.0:为什么 DTO 是 API 设计效率和安全性的秘密武器?
在软件开发中,确保API安全与高效至关重要。本文通过“间谍机构”场景,介绍数据传输对象(DTO)的作用。DTO是一种设计模式,用于格式化数据并隐藏敏感信息,仅传送必要内容。例如,在特工数据中,DTO可过滤掉密码和任务详情,仅返回代号和权限等级。使用DTO能简化前后端通信、提升性能和安全性。 文中示例展示如何用DTO处理GET与POST请求:GET响应只含安全字段,POST创建新特工时隐藏密码。借助工具如APIPost,可更高效管理API设计,实现安全、结构化的数据交互。总结来说,DTO让API更简洁、安全且高效。
|
11月前
|
前端开发 JavaScript API
探究 React Hooks:如何利用全新 API 优化组件逻辑复用与状态管理
本文深入探讨React Hooks的使用方法,通过全新API优化组件逻辑复用和状态管理,提升开发效率和代码可维护性。
|
API Python
4. salt-api请求salt-minion执行任务 tornado超时报错
4. salt-api请求salt-minion执行任务 tornado超时报错
|
前端开发 API
vue3【详解】选项式 API 实现逻辑复用
vue3【详解】选项式 API 实现逻辑复用
132 1
|
DataWorks 安全 API
DataWorks产品使用合集之有api或者是sdk可以获取到 dataworks 的任务运行的结果吗
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
112 1

热门文章

最新文章

下一篇
oss教程