基于MaxCompute的图计算实践分享-Resolver简介

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: Resolver简介 在学习使用MaxCompute-Graph计算模型时,resolver是一个不容易理解的概念。在MaxCompute帮助文档 https://help.aliyun.com/document_detail/27903.

更多精彩内容参见云栖社区大数据频道https://yq.aliyun.com/big-data,此外,通过Maxcompute及其配套产品,大数据分析仅需几步,详情访问https://www.aliyun.com/product/odps

Resolver简介

在学习使用MaxCompute-Graph计算模型时,resolver是一个不容易理解的概念。在MaxCompute帮助文档 https://help.aliyun.com/document_detail/27903.html?spm=5176.doc27901.6.662.O0sGBI 中对于Resolver有相关介绍:VertexResolver 用于自定义图拓扑修改时的冲突处理逻辑.

​在Graph的迭代计算模型中,图的拓扑结构会在下面两个场景中发生变化:

​1,在图加载阶段(LOAD),worker会读取输入数据,然后发送修改图拓扑结构的请求;在RESOLVE_LOADING_MUTATION阶段,worker会根据LOAD阶段收到的图拓扑结构变更请求,对图的拓扑结构进行修改;

​2,在每个superstep的COMPUTE阶段,worker也会发送修改图拓扑结构的请求;在下一个superstep的RESOLVE_COMPUTING_MUTATION阶段,会处理收集到上个superstep中发送到该Vertex的请求,然后对图的拓扑结构进行修改。

 

图拓扑结构变化的流程

​下图是COMPUTE阶段图拓扑结构发生变化的示例:

alt

1,在superstep:k的compute阶段,vertex会试图改变图的拓扑结构,修改图拓扑结构的请求包括以下四种:

  1. ​addVertexRequest:添加顶点
  2. ​removeVertexRequest:删除顶点
  3. ​addEdgeRequest:添加边
  4. ​removeEdgeRequest:删除边

这些改变图拓扑结构的请求并不会马上执行并返回,而只是记录在了目标顶点所在的worker上。

2,这些改变图拓扑结构的请求会根据操作顶点的id,partition分发到对应worker上,每个worker会将在superstep:k中收集到的图变更请求保存在serverData.partitionMutations中。这样在superstep:k+1时,每个顶点可以从serverData中获取到对应自己操作的所有请求。

3,在superstep:K+1的RESOLVE_COMPUTING_MUTATION阶段,worker会从serverData获取到对应每个vertexID的vertexChanges(即上一个superstep中对改顶点的图修改请求集合),分别对应然后将其作为参数调用用户定义的resole方法,在resolve方法中完成对当前Vertex结构的修改。

4,完成RESOLVE_COMPUTING_MUTATION阶段之后,开始superstep:K+1的compute阶段。

 

如何定义resolver

用户可以自定义VertexResolver的实现,即自定义类继承VertexResolver并重写VertexResolver.resolve方法,然后在提交作业时通过:

  1. job.setLoadingVertexResolverClass(VertexResolver.class)
  2. job.setComputingVertexResolverClass(VertexResolver.class)
 
分别指定在图加载阶段和COMPUTE阶段处理图拓扑结构修改请求的resolver实现类。对于LoadingVertexResolver,如果用户没有指定,框架会提供一个默认的Default LoadingVertexResolver实现;但是对于 ComputingVertexResolver,如果没有指定,是没有默认实现的,就是说如果用户需要在compute阶段对图的拓扑结构进行修改,则必须指定Resolver实现,否则会抛异常。

关于“冲突”

为什么graph的框架需要用户去自定义Resolver的实现,一个很重要的原因是在处理修改图拓扑结构时可能会出现各种“冲突”。

在graph模型中,修改图拓扑结构是一个异“步(superstep)”的请求,在某次superstep中发起的请求,需要到下一个superstep中才能生效。同时对于同一个顶点的修改请求,可能来自各个不同的worker,彼此之间不能相互感知,来自不同worker的请求先后顺序也是不确定的,所以会出现很多冲突的场景,例如对通过一个顶点进行多次添加,删除不存在的边等等。

在默认的DefaultLoadingVertexResolver实现中,对于下面这些场景会认为是“冲突”,然后会抛出异常:

  • 添加重复点:通常使用 addVertexRequest 添加点,而相同 ID 的点已经存在。
  • 添加重复边:通常使用 addEdgeRequest 添加边时,而相同起点和终点的边已经存在;或者使用 addVertexRequest 添加点时,待添加点中存在重复边。
  • 删除不存在的边:通常使用 removeEdgeRequest 删除边时,待删除的边不存在。
  • 删除不存在的点:通常使用 removeVertexRequest 删除点时,待删除的点不存在。
  • 发送消息到不存在的点:通常使用 sendMessage 发送消息时,目标点不存在。

用户自定义的Resolver中可以自行定义冲突场景,或者对上述冲突场景做其他处理。

​SSSP_Split的例子

​在graph提供的example中提供的三个例子都没有自定义resolver,也没有在迭代过程中改变图拓扑结构。下面这个例子是在原先最简单的例子SSSP(单源最短路径)的基础上增加了分裂顶点的功能,即在某轮迭代中发现某个顶点的出边数大于某个阈值时,将其分裂成两个顶点,这两个顶点之间的距离为0。


package com.test;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import com.aliyun.odps.Record;
import com.aliyun.odps.graph.Combiner;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.DefaultLoadingVertexResolver;
import com.aliyun.odps.graph.Edge;
import com.aliyun.odps.graph.GraphJob;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.graph.MutationContext;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.VertexChanges;
import com.aliyun.odps.graph.VertexResolver;
import com.aliyun.odps.graph.WorkerContext;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.io.TableInfo;
import com.aliyun.odps.io.Text;
import com.aliyun.odps.io.WritableComparable;

public class SSSP_Split {

  public static final String START_VERTEX = "sssp.start.vertex.id";
  public static final String MAX_DEGREE = "sssp.vertex.max.degree";

  public static class SSSPVertex extends
      Vertex<LongWritable, LongWritable, LongWritable, LongWritable> {

    private static long startVertexId = -1;
    private static long STEP_BASE = 100;
    
    public SSSPVertex() {
      this.setValue(new LongWritable(Long.MAX_VALUE));
    }

    public boolean isStartVertex(
        ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context) {
      if (startVertexId == -1) {
        String s = context.getConfiguration().get(START_VERTEX);
        startVertexId = Long.parseLong(s);
      }
      return getId().get() == startVertexId;
    }

    @Override
    public void compute(
        ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context,
        Iterable<LongWritable> messages) throws IOException {
      long minDist = isStartVertex(context) ? 0 : Long.MAX_VALUE;

      for (LongWritable msg : messages) {
        if (msg.get() < minDist) {
          minDist = msg.get();
        }
      }
      
      if (minDist < this.getValue().get()) {
        this.setValue(new LongWritable(minDist));
        if (hasEdges()) {
          for (Edge<LongWritable, LongWritable> e : this.getEdges()) {
            context.sendMessage(e.getDestVertexId(), new LongWritable(minDist
                + e.getValue().get()));
          }
        }
      } else {
        voteToHalt();
      }
      // 这里执行分裂顶点的逻辑,当顶点的出边数大于阈值sssp.vertex.max.degree时执行分裂,
      // 并且分裂行为只在前3轮发生.
      if (this.getEdges().size() > context.getConfiguration().getInt(
          MAX_DEGREE, Integer.MAX_VALUE)
          && context.getSuperstep() < 3) {
        SSSPVertex splitVertex = new SSSPVertex();
        // 分裂的顶点id为原id+100*2^迭代轮次,这样避免id出现冲突(初始顶点数小于100)
        // 即顶点3在第0步时分裂出来顶点103,顶点103在第2轮时分裂出顶点503
        splitVertex.setId(new LongWritable(getId().get()
            + (STEP_BASE << context.getSuperstep())));
        splitVertex.setValue(getValue());
        context.addVertexRequest(splitVertex);
        // 添加原始顶点与分裂顶点之间的边,距离为0
        context.addEdgeRequest(splitVertex.getId(),
            new Edge<LongWritable, LongWritable>(this.getId(),
                new LongWritable(0)));
        context.addEdgeRequest(getId(), new Edge<LongWritable, LongWritable>(
            splitVertex.getId(), new LongWritable(0)));
        // 将原始顶点上一半的出边转移到分裂顶点上。
        for (int i = 0; i < this.getEdges().size(); i++) {
          if (i % 2 == 0) {
            context.removeEdgeRequest(getId(), getEdges().get(i).getDestVertexId());
            context.addEdgeRequest(splitVertex.getId(), getEdges().get(i));
          } 
        }
      }
    }

    @Override
    public void cleanup(
        WorkerContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
        throws IOException {
      String edges = "";
      for (int i =0;i<getEdges().size();i++){
        edges+=getEdges().get(i).getDestVertexId()+":";
      }
      // 输出每个顶点的id,出边,和与目标顶点的距离
      context.write(getId(), new Text(edges), getValue());
    }
  }

  public static class MinLongCombiner extends
      Combiner<LongWritable, LongWritable> {

    @Override
    public void combine(LongWritable vertexId, LongWritable combinedMessage,
        LongWritable messageToCombine) throws IOException {
      if (combinedMessage.get() > messageToCombine.get()) {
        combinedMessage.set(messageToCombine.get());
      }
    }
  }
  
  public static class SSSPVertexReader extends
      GraphLoader<LongWritable, LongWritable, LongWritable, LongWritable> {
    @Override
    public void load(
        LongWritable recordNum,
        Record record,
        MutationContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
        throws IOException {
      SSSPVertex vertex = new SSSPVertex();
      vertex.setId((LongWritable) record.get(0));
      String[] edges = record.get(1).toString().split(",");
      for (int i = 0; i < edges.length; i++) {
        String[] ss = edges[i].split(":");
        vertex.addEdge(new LongWritable(Long.parseLong(ss[0])),
            new LongWritable(Long.parseLong(ss[1])));
      }
      context.addVertexRequest(vertex);
    }
  }
  
  // 这里定义computer阶段的resolver
  public static class SSSPVertexResolver extends VertexResolver{
    @Override
    public Vertex resolve(WritableComparable vertexId, Vertex vertex,
        VertexChanges vertexChanges, boolean hasMessages) throws IOException {
      // 处理添加顶点的请求
      if (vertexChanges.getAddedVertexList() != null
          && vertexChanges.getAddedVertexList().size() > 0) {
        vertex = (Vertex) vertexChanges.getAddedVertexList().get(0);
      }
      // 处理添加边的请求
      if (vertexChanges.getAddedEdgeList() != null
          && vertexChanges.getAddedEdgeList().size() > 0) {
        for (Edge<LongWritable, LongWritable> edge : (List<Edge<LongWritable, LongWritable>>) (vertexChanges
            .getAddedEdgeList())) {
          vertex.addEdge(edge.getDestVertexId(), edge.getValue());
        }
      }
      // 处理删除边的请求
      if (vertexChanges.getRemovedEdgeList() != null
          && vertexChanges.getRemovedEdgeList().size() > 0) {
        for (LongWritable removedDestVertex : (List<LongWritable>)vertexChanges.getRemovedEdgeList()) {
          List<Edge<LongWritable, LongWritable>> edgeList = vertex.getEdges();
          for (Iterator<Edge<LongWritable, LongWritable>> edges = edgeList.iterator(); edges.hasNext();) {
            Edge<LongWritable, LongWritable> edge = edges.next();
            if (edge.getDestVertexId().equals(removedDestVertex)) {
              edges.remove();
            }
          }
        }
      }
      // 处理删除顶点的请求
      if (vertexChanges.getRemovedVertexCount() > 0) {
        // do nothing
      }
      return vertex;
    }
    
  }
  public static void main(String[] args) throws IOException {
    if (args.length < 3) {
      System.out.println("Usage: <startnode> <max degree> <input> <output>");
      System.exit(-1);
    }

    GraphJob job = new GraphJob();
    job.setGraphLoaderClass(SSSPVertexReader.class);
    job.setVertexClass(SSSPVertex.class);
    job.setCombinerClass(MinLongCombiner.class);
    // 设置compute阶段的resolver
    job.setComputingVertexResolverClass(SSSPVertexResolver.class);
    job.set(START_VERTEX, args[0]);
    job.set(MAX_DEGREE, args[1]);
    job.addInput(new TableInfo(args[2]));
    job.addOutput(new TableInfo(args[3]));

    long startTime = System.currentTimeMillis();
    job.run();
    System.out.println("Job Finished in "
        + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
  }
}

输入表数据ssp_in:

1,"2:2,3:1,4:4,5:5"
2,"1:2,3:2,4:1,5:4"
3,"1:1,2:2,4:2,5:3"
4,"1:4,2:1,3:2,5:1"
5,"1:5,2:4,3:1,4:1"

输出表数据sssp_out:

102,2:1:4:,2
103,3:1:4:,1
1,3:5:101:,0
2,3:5:102:,2
101,1:2:4:,0
3,2:5:103:,1
4,2:5:104:,3
5,2:4:105:,4
104,4:1:3:,3
105,5:1:3:,4

输出数据中,分裂出来的点与源点(顶点103和顶点3)与目秒顶点的距离相同,两者实际仍然是同一个点。

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
2月前
|
机器学习/深度学习 算法 搜索推荐
从理论到实践,Python算法复杂度分析一站式教程,助你轻松驾驭大数据挑战!
【10月更文挑战第4天】在大数据时代,算法效率至关重要。本文从理论入手,介绍时间复杂度和空间复杂度两个核心概念,并通过冒泡排序和快速排序的Python实现详细分析其复杂度。冒泡排序的时间复杂度为O(n^2),空间复杂度为O(1);快速排序平均时间复杂度为O(n log n),空间复杂度为O(log n)。文章还介绍了算法选择、分而治之及空间换时间等优化策略,帮助你在大数据挑战中游刃有余。
87 4
|
27天前
|
存储 负载均衡 算法
大数据散列分区计算哈希值
大数据散列分区计算哈希值
42 4
|
2月前
|
分布式计算 大数据 Java
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
67 5
|
2月前
|
存储 缓存 分布式计算
大数据-83 Spark 集群 RDD编程简介 RDD特点 Spark编程模型介绍
大数据-83 Spark 集群 RDD编程简介 RDD特点 Spark编程模型介绍
44 4
|
2月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
54 3
|
2月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
72 0
|
23天前
|
存储 消息中间件 分布式计算
Cisco WebEx 数据平台:统一 Trino、Pinot、Iceberg 及 Kyuubi,探索 Apache Doris 在 Cisco 的改造实践
Cisco WebEx 早期数据平台采用了多系统架构(包括 Trino、Pinot、Iceberg 、 Kyuubi 等),面临架构复杂、数据冗余存储、运维困难、资源利用率低、数据时效性差等问题。因此,引入 Apache Doris 替换了 Trino、Pinot 、 Iceberg 及 Kyuubi 技术栈,依赖于 Doris 的实时数据湖能力及高性能 OLAP 分析能力,统一数据湖仓及查询分析引擎,显著提升了查询性能及系统稳定性,同时实现资源成本降低 30%。
Cisco WebEx 数据平台:统一 Trino、Pinot、Iceberg 及 Kyuubi,探索 Apache Doris 在 Cisco 的改造实践
|
1月前
|
边缘计算 人工智能 搜索推荐
大数据与零售业:精准营销的实践
【10月更文挑战第31天】在信息化社会,大数据技术正成为推动零售业革新的重要驱动力。本文探讨了大数据在零售业中的应用,包括客户细分、个性化推荐、动态定价、营销自动化、预测性分析、忠诚度管理和社交网络洞察等方面,通过实际案例展示了大数据如何帮助商家洞悉消费者行为,优化决策,实现精准营销。同时,文章也讨论了大数据面临的挑战和未来展望。
|
1月前
|
分布式计算 Java MaxCompute
ODPS MR节点跑graph连通分量计算代码报错java heap space如何解决
任务启动命令:jar -resources odps-graph-connect-family-2.0-SNAPSHOT.jar -classpath ./odps-graph-connect-family-2.0-SNAPSHOT.jar ConnectFamily 若是设置参数该如何设置
|
2月前
|
存储 缓存 NoSQL
大数据-38 Redis 高并发下的分布式缓存 Redis简介 缓存场景 读写模式 旁路模式 穿透模式 缓存模式 基本概念等
大数据-38 Redis 高并发下的分布式缓存 Redis简介 缓存场景 读写模式 旁路模式 穿透模式 缓存模式 基本概念等
73 4

相关产品

  • 云原生大数据计算服务 MaxCompute
  • 下一篇
    DataWorks