基于MaxCompute的图计算实践分享-图加载过程

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 一、前言 MaxCompute Graph 是基于飞天平台实现的面向迭代的图处理框架,为用户提供了类似于 Pregel 的编程接口。MaxCompute Graph(以下简称 Graph )作业包含图加载和计算两个阶段: 加载,将存储在表中的数据载入到内存中,以点和边的形式存在;

免费开通大数据服务:https://www.aliyun.com/product/odps

一、前言

MaxCompute Graph 是基于飞天平台实现的面向迭代的图处理框架,为用户提供了类似于 Pregel 的编程接口。MaxCompute Graph(以下简称 Graph )作业包含图加载和计算两个阶段:

  • 加载,将存储在表中的数据载入到内存中,以点和边的形式存在;
  • 计算,遍历内存中的点,经过不断的迭代,直至达到迭代终止; Graph 模型有点(vertex)和边(edge)组成,以邻接表的形式进行组织,如下图: vertex

图的原始数据存在于MaxCompute的表(table)中,每个 table 包含多个记录(record),每个 record 又包含多个列(field),图加载就是将这种形式的数据,转换成 vertex 和 edge的过程。

二、图加载阶段

我们将图的加载过程实现为一个网络版的MapReduce过程,如下图所示:
shuffle

下面我们分三个阶段,分别解析图载入的具体实现。

1、输入分解

图作业将MaxCompute中的table作为输入,需要做的第一步就是将输入表中的record分块,以便并行载入。输入的分解与MaxCompute MR相同,都是经过一下步骤:

  • 获取图作业的所有输入表或者分区;
  • 拿到输入表或分区下的所有数据文件;
  • 按照指定字节数(块大小)将大文件进行切割,或者小文件合并;
  • 切割后的块或者多个小文件组成的块,就是最终每个worker的输入(split);

额外解释一下第三步的切割(或合并),我们知道表或者分区的数据都是存储在一个或多个文件中的,而文件有大有小。在根据指定的字节数 split size划分块时,我们将大于split size的文件称为大文件,小于split size的小文件。
因此,为了保证每个块字节数基本相等,需要将大文件按照split size切分成小块,具体形式就是有三元组(文件名,起始偏移字节,块长度)表示,文件尾剩余的字节数如果过小,会合并到最后一块中。而对于小文件们,则需要将多个小文件作为一个块,具体形式有多个三元组组成【(文件名,0,文件长度),(文件名,0,文件长度)】,最终块的大小也会受块中小文件的个数限制(保证不会因为过多的小文件导致载入时间过长)。

至此,我们已经将输入划分成了块,假设块的个数为m,根据不同的作业配置,数据并行载入的方式也不相同。

  • 默认情况下,我们会创建m个worker(m最多为1000),每个worker按照编号读取对应的块,split和worker对应情况如下: one_to_one

对于此种情况,worker和split是一对一的关系,worker数由split的个数决定,如果split个数超过1000,则作业会因为worker数超过1000报错。

  • Graph作业同时支持用户手工设置worker数,这时候又会出现一个worker对应零个或多个split的情况。

如果split size比较小,导致split的个数多于设定的worker数,则worker与split的对应关系如下图:
one_to_more

如果split size比较大,导致split的个数少于设定的worker数,则worker与split的对应关系如下图:
one_to_more_2

因此,用户手工设定 worker 数的情况下,调整split size的大小,有助于提高数据的载入速度。

总结一下,本小节将输入分解成了块,并确定了worker和块之间的关系。不管哪种情况,最终的结果是,一个 worker 会分到零个或多个 split,下面就需要解析一个 worker 是怎么处理 split 的。

2、加载点和边

如上节所述,本节解释单个 worker 处理零个或多个 split 的执行过程。此阶段的实现需要用户实现 GraphLoader 接口,我们会为每个 split 创建单独的 GraphLoader 实例,这个实例负责将 split 中的 record 解释为 vertex 或者 edge。根据此 worker 分到的 split 的个数,分为三种情况:

  • worker 分到零个 split。此种情况下该 worker 无需进入此阶段,直接进入下一阶段。
  • worker 分到一个 split。我们将处理过程用下图表示: split_worker

从整体看,GraphLoader 实例会读取 split 中的每个 record,用户可根据此 record 中的语义(如来自哪张表,字段的含义等)解析成 vertex 和(或)edge,解析后的结果以键值对的形式存在,(id, vertex)表示为值为 id 的点增加一个 vertex 对象,(id, edge)表示为值为id的点增加一个 edge 对象。请注意,vertex 本身也是可以带有 edge 的。

此阶段,可以为单个 id 多次增加 vertex 或 edge 对象,无须关系是否重复,下个阶段会处理这种情况。此过程将 split 中的多个 record,转换成了多个 vertex 或 edge对象。用户可以将其理解成一个 map 过程,将 record 转换成一些(key, value),key本身会重复。

  • worker分到多个 split。第二种情况是此第三种情况的特例,表现为一个 worker 多次处理 split 的情况,每次与第二种情况相同。注意,每个 split 有单独的 GraphLoader 实例。

至此,我们已经将 split 中的 record,都转换成了(id, vertex)和(id, edge)这种形式的键值对,这些键值对是通过用户的 GraphLoader 接口生成的。我们在拿到这些键值对之后,会根据 id 将这些键值对分发到其它的 worker,默认的分发依据是 id 的hash值。同时,每个 worker 在接收到这些键值对后,会根据 id 分组,将相同 id的 vertex 和 edge 组织起来,具体组织形式见下图:
key_values

也就是组织成 key -> values 的形式,类似于 reduce 的输入形式,但不同的是,values 会根据 value 的类型进行划分,这里会将一个 id 对应的 values 划分为 vertex 和 edge 集合。

总结一下,本小节将所有 split 解析成了(id, vertex)和(id, edge),并将这些键值对 shuffle 到了指定的 worker,下面就需要解析 worker 是怎么处理这些键值对的。

3、规约点和边

Graph 中图是以邻接表的形式存储的,因此,内存中的对象组织形式就是,每个点对应一个 vertex 对象,而边作为 vertex 对象的成员变量存在,表示此 vertex 的出边。
因此,我们此阶段要做的工作,就是将从上一节得到的键值对,封装到一个 vertex 对象中,并把此 vertex 对象添加到最终的图中,此 vertex 对象也是计算阶段迭代遍历的vertex对象。
我们用下图表示此阶段的执行过程:
resolver

此过程,针对每个 id 汇总后的结果,将上一节中针对此 id 的添加的所有 vertex 和 edge 都封装到一个 vertex 对象中。封装的过程是通过 VertexResolver 接口完成的,此接口我们会提供一个默认实现,用户也可以自定义,根据自己作业的情况进行特殊处理。
针对一个 id 添加的 vertex 和 edge,我们提供的默认实现有如下步骤:

  • 检查是否添加了 vertex,没有添加 vertex 或者添加了多个 vertex时报错;
  • 检查 vertex 中的 edge是否有重复,如果存在重复则报错;
  • 如果添加了 edge,检查 vertex 中的 edge 和添加的 edge 是否有重复,如果存在重复则报错;
  • 以上检查通过后,将所有的 edge 都放入 vertex 中,并将此 vertex 添加到最终的图中。 用流程图的形式表述如下: default_resolver

总结一下,本小节我们将 shuffle 之后的(id, vertex)和(id, edge)键值对,成功的封装成了最终的 vertex 对象,这些 vertex 对象最终也将参与后续的迭代计算。请注意,对于每个 id,都会经过这个阶段,并不是只有冲突的时候才会这样。

三、编程接口

经过上一章节的介绍,我们已经对整个图数据加载的原理有了初步的认识,下面我们开始介绍主要的接口,从上一章节得知,用户只需要实现 GraphLoader 和 VertexResolver 接口,而 VertexResolver 接口我们提供了默认实现,因此一般情况下,用户只需要自定义 GraphLoader 接口就可以了。

1、GraphLoader

先看一下 GraphLoader 接口的代码:

public abstract class GraphLoader<VERTEX_ID extends WritableComparable, VERTEX_VALUE extends Writable, EDGE_VALUE extends Writable, MESSAGE extends Writable> {

  public void setup(Configuration conf, int workerId, TableInfo tableInfo)
      throws IOException {
  }

  public abstract void load(LongWritable recordNum, Record record,
      MutationContext<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE, MESSAGE> context)
      throws IOException;
}

首先解释一下四个泛型参数的含义:

  • VERTEX_ID 代表点的 id 的数据类型;
  • VERTEX_VALUE 代表点的值的数据类型;
  • EDGE_VALUE 代表边的值的数据类型;
  • MESSAGE 代表迭代时消息的数据类型; 这些类型都是用户可以自定义的,但是都必须都实现 Writable 接口。

我们现在来看 GraphLoader 的两个方法的含义:

  • setup是 GraphLoader 的配置方法,该方法在每个 GraphLoader 实例只被调用一次,我们会将作业的配置信息,当前 worker 的id,当前输入的 table 或 partiton 信息作为参数传递给 setup,用户可以使用这些参数初始化上下文信息。其中,在 main 方法中配置的信息可以在此处通过 Configuration 拿到,当前输入的表名或分区名可以通过 TableInfo 拿到。
  • load 针对 split 中的每个 record 调用一次,用户可以读取 record 中各列的数据,转换成 vertex 和(或)edge,并通过 MutationContext 接口请求添加到图中。MutationContext接口的方法:
public interface MutationContext<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> {

  public abstract Configuration getConfiguration();
  public abstract int getNumWorkers();

  public void addVertexRequest(Vertex<I, V, E, M> vertex) 
      throws IOException;
  public void removeVertexRequest(I vertexId) throws IOException;
  public abstract void addEdgeRequest(I sourceVertexId, Edge<I, E> edge)
      throws IOException;
  public void removeEdgeRequest(I sourceVertexId, I targetVertexId)
      throws IOException;

  public abstract Counter getCounter(Enum<?> name);
  public abstract Counter getCounter(String group, String name);
}

其中 getConfiguration 方法的返回值与 setup 的 Configuration 参数相同,geNumWorkers获取作业总的 worker 的个数,getCounter 是允许用户自定义 counter 来做一些统计,在作业结束时,这些 counter 可以通过 sdk 被获取到。最重要的是剩余的四个方法,其中 removeVertexRequest 和

RemoveEdgeRequest 方法可以暂时忽略,它们的作用是可以根据输入数据,可以选择去删除一些点和边,在数据载入阶段,这种需求并不常见。

我们重点关注addVertexRequest和addEdgeRequest,这两个方法的作用,就是生成我们之前提到的(id, vertex)和(id, edge)。addVertexRequest(Vertex vertex)会生成(id, vertex)的键值对,id 是 vertex 的 id。addEdgeRequest(I sourceVertexId, Edge edge) 会生成(id, edge)的键值对,id是边的起始点,edge包含边的终点和值。

总结一下,实现 GraphLoader 的目的,就是根据上下文,在 load 方法中,解释 record,并调用 MutationContext 接口,请求将 vertex 和 edge 添加到图中。

请注意,这里的方法名字都带有Request字样,意思就是说调用这些方法,只代表一种请求,最终这些请求是否生效,取决于 VertexResolver 的实现。

2、VertexResolver

在所有的 split 被 GraphLoader 处理结束后,我们会做一次同步,使得所有 worker 统一开始执行 VertexResolver,我们先看一下 VertexResolver的方法:

public abstract class VertexResolver<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> {

  public void configure(Configuration conf) throws IOException {
  }

  public abstract Vertex<I, V, E, M> resolve(I vertexId,
      Vertex<I, V, E, M> vertex, VertexChanges<I, V, E, M> vertexChanges,
      boolean hasMessages) throws IOException;

}

每个 worker 上只会创建一个 VertexResolver的实例,这个实例负责处理属于此 worker 的所有点,即将关于一个 id 的 vertex 和 edge 封装成一个 vertex 对象。

  • configure 方法 VertexResolver 的配置方法,每个实例只会被调用一次,用于获取作业配置信息;
  • resolve 方法针对每个 id 调用一次,用于处理所有关于此 id 的 vertex 和 edge,它的返回值是一个vertex 对象,表示将此 vertex 添加到最终的图中。参数比较复杂,我们展开介绍:
    • vertexId 是当前要处理的 vertex 的 id,也就是之前所说的 key
    • vertex 是当前已存在的 vertex 对象,在数据载入阶段,这个参数一定为 null;
    • vertexChanges 是关于此 id 所有的变动集合,主要是 GraphLoader 中添加的 vertex 和 edge,也就是之前所说的 values
    • hasMessages 表示此 id 是否有收到message,在数据载入阶段,这个参数一定为false; 因此,这四个参数中,只需要关心 vertexId 和 VertexChanges 两个参数,而 vertexChanges就是个集合,它来自于 GraphLoader.load方法中,调用 MutationContext.add*Request接口添加的 vertex 和 edge,vertexChanges 内部也按照 vertex 和 edge 划分了不同的子集合,我们先看一下 VertexChanges 接口的方法:
public interface VertexChanges<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> {

  public List<Vertex<I, V, E, M>> getAddedVertexList();
  public int getRemovedVertexCount();
  public List<Edge<I, E>> getAddedEdgeList();
  public List<I> getRemovedEdgeList();
}

看这个四个方法,是不是和 MutationContext 中的方法相似?是的, MutationContext中的add/remove*Reuqest方法分别与这里的get*List对应,也解释了上一小节所说的。MutationContext 的调用都是些请求,并不一定是最终的 vertex 和 edge,因为它们要在这里进行经过处理才能决定是否真正要添加到最终的图中。

总结一下,VertexResolver 是数据加载的最后一步,它的作用就是构造最终的图结构,构造过程就是将 GraphLoader 里生成的键值对进行封装,使得最终的图结构以一种邻接表的形式存在。

四、举例

我们举两种类型的示例说明图的加载过程。输入数据一般可以划分成边类型和点类型的数据,我们分开举例说明。

1、边类型数据

边类型的数据可以用下面的表格表示:

SourceVertexID DestinationVertexID EdgeValue
id0 id1 9
id0 id2 5
id2 id1 4

每条Record表示图中的一条边的格式,上图表示id0有两条出边,分别指向id1和id2;id2有一条出边,指向id1;id1没有出边。

对应的 GraphLoader 的实现为:

/**
   * 将Record解释为Edge,此处对于上述第一种情况,每个Record只表示一条Edge。
   * <p>
   * 类似于{@link com.aliyun.odps.mapreduce.Mapper#map}
   * ,输入Record,生成键值对,此处的键是Vertex的ID,
   * 值是Edge,通过上下文Context写出,这些键值对会在LoadingVertexResolver出根据Vertex的ID汇总。
   * 
   * 注意:此处添加的点或边只是根据Record内容发出的请求,并不是最后参与计算的点或边,只有在随后的{@link VertexResolver}
   * 中添加的点或边才参与计算。
   */
  public static class EdgeInputLoader extends
      GraphLoader<LongWritable, LongWritable, LongWritable, LongWritable> {

    /**
     * 配置EdgeInputLoader。
     * 
     * @param conf
     *          作业的配置参数,在main中使用GraphJob配置的,或者在console中set的
     * @param workerId
     *          当前工作的worker的序号,从0开始,可以用于构造唯一的vertex id
     * @param inputTableInfo
     *          当前worker载入的输入表信息,可以用于确定当前输入是哪种类型的数据,即Record的格式
     */
    @Override
    public void setup(Configuration conf, int workerId, TableInfo inputTableInfo) {

    }

    /**
     * 根据Record中的内容,解析为对应的边,并请求添加到图中。
     * 
     * @param recordNum
     *          记录序列号,从1开始,每个worker上单独计数
     * @param record
     *          输入表中的记录,三列,分别表示初点、终点、边的权重
     * @param context
     *          上下文,请求将解释后的边添加到图中
     */
    @Override
    public void load(
        LongWritable recordNum,
        Record record,
        MutationContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
        throws IOException {
      /**
       * 1、第一列表示初始点的ID
       */
      LongWritable sourceVertexID = (LongWritable) record.get(0);

      /**
       * 2、第二列表示终点的ID
       */
      LongWritable destinationVertexID = (LongWritable) record.get(1);

      /**
       * 3、地三列表示边的权重
       */
      LongWritable edgeValue = (LongWritable) record.get(2);

      /**
       * 4、创建边,由终点ID和边的权重组成
       */
      Edge<LongWritable, LongWritable> edge = new Edge<LongWritable, LongWritable>(
          destinationVertexID, edgeValue);

      /**
       * 5、请求给初始点添加边
       */
      context.addEdgeRequest(sourceVertexID, edge);

      /**
       * 6、如果每条Record表示双向边,重复4与5 Edge<LongWritable, LongWritable> edge2 = new
       * Edge<LongWritable, LongWritable>( sourceVertexID, edgeValue);
       * context.addEdgeRequest(destinationVertexID, edge2);
       */
    }

  }

对应的 VertexResolver的实现为:

  /**
   * 汇总{@link GraphLoader#load(LongWritable, Record, MutationContext)}生成的键值对,类似于
   * {@link com.aliyun.odps.mapreduce.Reducer#reduce}。对于唯一的Vertex ID,所有关于这个ID上
   * 添加\删除、点\边的行为都会放在{@link VertexChanges}中。
   * 
   * 注意:此处并不只针对load方法中添加的有冲突的点或边才调用(冲突是指添加多个相同Vertex对象,添加重复边等),
   * 所有在load方法中请求生成的ID都会在此处被调用。
   */
  public static class EdgeResolver extends
      VertexResolver<LongWritable, LongWritable, LongWritable, LongWritable> {

    /**
     * 处理关于一个ID的添加或删除、点或边的请求。
     * 
     * <p>
     * {@link VertexChanges}有四个接口,分别与{@link MutationContext}的四个接口对应:
     * <ul>
     * <li>{@link VertexChanges#getAddedVertexList()}与
     * {@link MutationContext#addVertexRequest(Vertex)}对应,
     * 在load方法中,请求添加的ID相同的Vertex对象,会被汇总在返回的List中
     * <li>{@link VertexChanges#getAddedEdgeList()}与
     * {@link MutationContext#addEdgeRequest(WritableComparable, Edge)}
     * 对应,请求添加的初始点ID相同的Edge对象,会被汇总在返回的List中
     * <li>{@link VertexChanges#getRemovedVertexCount()}与
     * {@link MutationContext#removeVertexRequest(WritableComparable)}
     * 对应,请求删除的ID相同的Vertex,汇总的请求删除的次数作为返回值
     * <li>{@link VertexChanges#getRemovedEdgeList()}与
     * {@link MutationContext#removeEdgeRequest(WritableComparable, WritableComparable)}
     * 对应,请求删除的初始点ID相同的Edge对象,会被汇总在返回的List中
     * <ul>
     * 
     * <p>
     * 用户通过处理关于这个ID的变化,通过返回值声明此ID是否参与计算,如果返回的{@link Vertex}不为null,
     * 则此ID会参与随后的计算,如果返回null,则不会参与计算。
     * 
     * @param vertexId
     *          请求添加的点的ID,或请求添加的边的初点ID
     * @param vertex
     *          已存在的Vertex对象,数据载入阶段,始终为null
     * @param vertexChanges
     *          此ID上的请求添加\删除、点\边的集合
     * @param hasMessages
     *          此ID是否有输入消息,数据载入阶段,始终为false
     */
    @Override
    public Vertex<LongWritable, LongWritable, LongWritable, LongWritable> resolve(
        LongWritable vertexId,
        Vertex<LongWritable, LongWritable, LongWritable, LongWritable> vertex,
        VertexChanges<LongWritable, LongWritable, LongWritable, LongWritable> vertexChanges,
        boolean hasMessages) throws IOException {
      /**
       * 1、生成Vertex对象,作为参与计算的点。
       */
      EdgeVertex computeVertex = new EdgeVertex();
      computeVertex.setId(vertexId);

      /**
       * 2、将请求给此点添加的边,添加到Vertex对象中,如果数据有重复的可能,根据算法需要决定是否去重。
       */
      if (vertexChanges.getAddedEdgeList() != null) {
        for (Edge<LongWritable, LongWritable> edge : vertexChanges
            .getAddedEdgeList()) {
          computeVertex.addEdge(edge.getDestVertexId(), edge.getValue());
        }
      }

      /**
       * 3、将Vertex对象返回,添加到最终的图中参与计算。
       */
      return computeVertex;
    }

  }

2、点类型数据

我们分析一个多路输入的例子。Graph作业指定两张表作为输入,一张是边类型的数据,格式如1中所示,另一张是点类型的数据,格式可以用下面的表格表示:

VertexID VertexValue
id0 9
id1 7
id2 8

表示有三个点,id分别是id0,id1,id2,对应的点的值分别是9,7,8。

对应的 GraphLoader 的实现为:

  /**
   * 将Record解释为Vertex和Edge,每个Record根据其来源,表示一个Vertex或者一条Edge。
   * <p>
   * 类似于{@link com.aliyun.odps.mapreduce.Mapper#map}
   * ,输入Record,生成键值对,此处的键是Vertex的ID,
   * 值是Vertex或Edge,通过上下文Context写出,这些键值对会在LoadingVertexResolver出根据Vertex的ID汇总。
   * 
   * 注意:此处添加的点或边只是根据Record内容发出的请求,并不是最后参与计算的点或边,只有在随后的{@link VertexResolver}
   * 中添加的点或边才参与计算。
   */
  public static class VertexInputLoader extends
      GraphLoader<LongWritable, LongWritable, LongWritable, LongWritable> {

    private boolean isEdgeData;

    /**
     * 配置VertexInputLoader。
     * 
     * @param conf
     *          作业的配置参数,在main中使用GraphJob配置的,或者在console中set的
     * @param workerId
     *          当前工作的worker的序号,从0开始,可以用于构造唯一的vertex id
     * @param inputTableInfo
     *          当前worker载入的输入表信息,可以用于确定当前输入是哪种类型的数据,即Record的格式
     */
    @Override
    public void setup(Configuration conf, int workerId, TableInfo inputTableInfo) {
      isEdgeData = conf.get(EDGE_TABLE).equals(inputTableInfo.getTableName());
    }

    /**
     * 根据Record中的内容,解析为对应的边,并请求添加到图中。
     * 
     * @param recordNum
     *          记录序列号,从1开始,每个worker上单独计数
     * @param record
     *          输入表中的记录,三列,分别表示初点、终点、边的权重
     * @param context
     *          上下文,请求将解释后的边添加到图中
     */
    @Override
    public void load(
        LongWritable recordNum,
        Record record,
        MutationContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
        throws IOException {
      if (isEdgeData) {
        /**
         * 数据来源于存储边信息的表。
         * 
         * 1、第一列表示初始点的ID
         */
        LongWritable sourceVertexID = (LongWritable) record.get(0);

        /**
         * 2、第二列表示终点的ID
         */
        LongWritable destinationVertexID = (LongWritable) record.get(1);

        /**
         * 3、地三列表示边的权重
         */
        LongWritable edgeValue = (LongWritable) record.get(2);

        /**
         * 4、创建边,由终点ID和边的权重组成
         */
        Edge<LongWritable, LongWritable> edge = new Edge<LongWritable, LongWritable>(
            destinationVertexID, edgeValue);

        /**
         * 5、请求给初始点添加边
         */
        context.addEdgeRequest(sourceVertexID, edge);

        /**
         * 6、如果每条Record表示双向边,重复4与5 Edge<LongWritable, LongWritable> edge2 = new
         * Edge<LongWritable, LongWritable>( sourceVertexID, edgeValue);
         * context.addEdgeRequest(destinationVertexID, edge2);
         */
      } else {
        /**
         * 数据来源于存储点信息的表。
         * 
         * 1、第一列表示点的ID
         */
        LongWritable vertexID = (LongWritable) record.get(0);

        /**
         * 2、第二列表示点的值
         */
        LongWritable vertexValue = (LongWritable) record.get(1);

        /**
         * 3、创建点,由点的ID和点的值组成
         */
        MyVertex vertex = new MyVertex();

        /**
         * 4、初始化点
         */
        vertex.setId(vertexID);
        vertex.setValue(vertexValue);

        /**
         * 5、请求添加点
         */
        context.addVertexRequest(vertex);
      }

    }

  }

对应的 VertexResolver 的实现为:

  /**
   * 汇总{@link GraphLoader#load(LongWritable, Record, MutationContext)}生成的键值对,类似于
   * {@link com.aliyun.odps.mapreduce.Reducer#reduce}。对于唯一的Vertex ID,所有关于这个ID上
   * 添加\删除、点\边的行为都会放在{@link VertexChanges}中。
   * 
   * 注意:此处并不只针对load方法中添加的有冲突的点或边才调用(冲突是指添加多个相同Vertex对象,添加重复边等),
   * 所有在load方法中请求生成的ID都会在此处被调用。
   */
  public static class LoadingResolver extends
      VertexResolver<LongWritable, LongWritable, LongWritable, LongWritable> {

    /**
     * 处理关于一个ID的添加或删除、点或边的请求。
     * 
     * <p>
     * {@link VertexChanges}有四个接口,分别与{@link MutationContext}的四个接口对应:
     * <ul>
     * <li>{@link VertexChanges#getAddedVertexList()}与
     * {@link MutationContext#addVertexRequest(Vertex)}对应,
     * 在load方法中,请求添加的ID相同的Vertex对象,会被汇总在返回的List中
     * <li>{@link VertexChanges#getAddedEdgeList()}与
     * {@link MutationContext#addEdgeRequest(WritableComparable, Edge)}
     * 对应,请求添加的初始点ID相同的Edge对象,会被汇总在返回的List中
     * <li>{@link VertexChanges#getRemovedVertexCount()}与
     * {@link MutationContext#removeVertexRequest(WritableComparable)}
     * 对应,请求删除的ID相同的Vertex,汇总的请求删除的次数作为返回值
     * <li>{@link VertexChanges#getRemovedEdgeList()}与
     * {@link MutationContext#removeEdgeRequest(WritableComparable, WritableComparable)}
     * 对应,请求删除的初始点ID相同的Edge对象,会被汇总在返回的List中
     * <ul>
     * 
     * <p>
     * 用户通过处理关于这个ID的变化,通过返回值声明此ID是否参与计算,如果返回的{@link Vertex}不为null,
     * 则此ID会参与随后的计算,如果返回null,则不会参与计算。
     * 
     * @param vertexId
     *          请求添加的点的ID,或请求添加的边的初点ID
     * @param vertex
     *          已存在的Vertex对象,数据载入阶段,始终为null
     * @param vertexChanges
     *          此ID上的请求添加\删除、点\边的集合
     * @param hasMessages
     *          此ID是否有输入消息,数据载入阶段,始终为false
     */
    @Override
    public Vertex<LongWritable, LongWritable, LongWritable, LongWritable> resolve(
        LongWritable vertexId,
        Vertex<LongWritable, LongWritable, LongWritable, LongWritable> vertex,
        VertexChanges<LongWritable, LongWritable, LongWritable, LongWritable> vertexChanges,
        boolean hasMessages) throws IOException {
      /**
       * 1、获取Vertex对象,作为参与计算的点。
       */
      MyVertex computeVertex = null;
      if (vertexChanges.getAddedVertexList() == null
          || vertexChanges.getAddedVertexList().isEmpty()) {
        computeVertex = new MyVertex();
        computeVertex.setId(vertexId);
      } else {
        /**
         * 此处假设存储点信息的表中,每个Record表示唯一的点。
         */
        computeVertex = (MyVertex) vertexChanges.getAddedVertexList().get(0);
      }

      /**
       * 2、将请求给此点添加的边,添加到Vertex对象中,如果数据有重复的可能,根据算法需要决定是否去重。
       */
      if (vertexChanges.getAddedEdgeList() != null) {
        for (Edge<LongWritable, LongWritable> edge : vertexChanges
            .getAddedEdgeList()) {
          computeVertex.addEdge(edge.getDestVertexId(), edge.getValue());
        }
      }

      /**
       * 3、将Vertex对象返回,添加到最终的图中参与计算。
       */
      return computeVertex;
    }

  }
相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
3月前
|
SQL 分布式计算 运维
如何对付一个耗时6h+的ODPS任务:慢节点优化实践
本文描述了大数据处理任务(特别是涉及大量JOIN操作的任务)中遇到的性能瓶颈问题及其优化过程。
|
2月前
|
机器学习/深度学习 算法 搜索推荐
从理论到实践,Python算法复杂度分析一站式教程,助你轻松驾驭大数据挑战!
【10月更文挑战第4天】在大数据时代,算法效率至关重要。本文从理论入手,介绍时间复杂度和空间复杂度两个核心概念,并通过冒泡排序和快速排序的Python实现详细分析其复杂度。冒泡排序的时间复杂度为O(n^2),空间复杂度为O(1);快速排序平均时间复杂度为O(n log n),空间复杂度为O(log n)。文章还介绍了算法选择、分而治之及空间换时间等优化策略,帮助你在大数据挑战中游刃有余。
89 4
|
29天前
|
存储 负载均衡 算法
大数据散列分区计算哈希值
大数据散列分区计算哈希值
46 4
|
2月前
|
分布式计算 大数据 Java
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
69 5
|
2月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
56 3
|
2月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
72 0
|
25天前
|
存储 消息中间件 分布式计算
Cisco WebEx 数据平台:统一 Trino、Pinot、Iceberg 及 Kyuubi,探索 Apache Doris 在 Cisco 的改造实践
Cisco WebEx 早期数据平台采用了多系统架构(包括 Trino、Pinot、Iceberg 、 Kyuubi 等),面临架构复杂、数据冗余存储、运维困难、资源利用率低、数据时效性差等问题。因此,引入 Apache Doris 替换了 Trino、Pinot 、 Iceberg 及 Kyuubi 技术栈,依赖于 Doris 的实时数据湖能力及高性能 OLAP 分析能力,统一数据湖仓及查询分析引擎,显著提升了查询性能及系统稳定性,同时实现资源成本降低 30%。
Cisco WebEx 数据平台:统一 Trino、Pinot、Iceberg 及 Kyuubi,探索 Apache Doris 在 Cisco 的改造实践
|
1月前
|
边缘计算 人工智能 搜索推荐
大数据与零售业:精准营销的实践
【10月更文挑战第31天】在信息化社会,大数据技术正成为推动零售业革新的重要驱动力。本文探讨了大数据在零售业中的应用,包括客户细分、个性化推荐、动态定价、营销自动化、预测性分析、忠诚度管理和社交网络洞察等方面,通过实际案例展示了大数据如何帮助商家洞悉消费者行为,优化决策,实现精准营销。同时,文章也讨论了大数据面临的挑战和未来展望。
|
1月前
|
分布式计算 Java MaxCompute
ODPS MR节点跑graph连通分量计算代码报错java heap space如何解决
任务启动命令:jar -resources odps-graph-connect-family-2.0-SNAPSHOT.jar -classpath ./odps-graph-connect-family-2.0-SNAPSHOT.jar ConnectFamily 若是设置参数该如何设置
|
3月前
|
人工智能 分布式计算 大数据
超级计算与大数据:推动科学研究的发展
【9月更文挑战第30天】在信息时代,超级计算和大数据技术正成为推动科学研究的关键力量。超级计算凭借强大的计算能力,在尖端科研、国防军工等领域发挥重要作用;大数据技术则提供高效的数据处理工具,促进跨学科合作与创新。两者融合不仅提升了数据处理效率,还推动了人工智能、生物科学等领域的快速发展。未来,随着技术进步和跨学科合作的加深,超级计算与大数据将在科学研究中扮演更加重要的角色。

相关产品

  • 云原生大数据计算服务 MaxCompute