基于MaxCompute的图计算实践分享-Aggregator机制介绍

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: Aggregator是MaxCompute-GRAPH作业中常用的feature之一,特别是解决机器学习问题时。MaxCompute-GRAPH中Aggregator用于汇总并处理全局信息。本文将详细介绍的Aggregator的执行机制、相关API,并以Kmeans Clustering为例子说明Aggregator的具体用法。

更多精彩内容参见云栖社区大数据频道https://yq.aliyun.com/big-data,此外,通过Maxcompute及其配套产品,大数据分析仅需几步,详情访问https://www.aliyun.com/product/odps


Aggregator是MaxCompute-GRAPH作业中常用的feature之一,特别是解决机器学习问题时。MaxCompute-GRAPH中Aggregator用于汇总并处理全局信息。本文将详细介绍的Aggregator的执行机制、相关API,并以Kmeans Clustering为例子说明Aggregator的具体用法。

Aggregator机制

如图1所示,Aggregator的逻辑分两部分,一部分在所有Worker上执行,即分布式执行,另一部分只在AggregatorOwner所在Worker上执行,即单点。其中在所有Worker上执行的操作包括创建初始值及局部聚合,然后将局部聚合结果发送给AggregatorOwner所在Worker上。AggregatorOwner所在Worker上聚合普通Worker发送过来的局部聚合对象,得到全局聚合结果,然后判断迭代是否结束。全局聚合的结果会在下一轮超步分发给所有Worker,供下一轮迭代使用。

aggregator
图1 Aggregator机制

Aggregator的API

Aggregator共提供了五个API供用户实现。下面逐个介绍5个API的调用时机及常规用途。

1. createStartupValue(context)

该API在所有Worker上执行一次,调用时机是所有超步开始之前,通常用以初始化AggregatorValue。在第0轮超步中,调用WorkerContext.getLastAggregatedValue() 或ComputeContext.getLastAggregatedValue()可以获取该API初始化的AggregatorValue对象。

2. createInitialValue(context)

该API在所有Worker上每轮超步开始时调用一次,用以初始化本轮迭代所用的AggregatorValue。通常操作是通过WorkerContext.getLastAggregatedValue() 得到上一轮迭代的结果,然后执行部分初始化操作。

3. aggregate(value, item)

该API同样在所有Worker上执行,与上述API不同的是,该API由用户显示调用ComputeContext#aggregate(item)来触发,而上述两个API,则由框架自动调用。该API用以执行局部聚合操作,其中第一个参数value是本Worker在该轮超步已经聚合的结果(初始值是createInitialValue返回的对象),第二个参数是用户代码调用ComputeContext#aggregate(item)传入的参数。该API中通常用item来更新value实现聚合。所有aggregate执行完后,得到的value就是该Worker的局部聚合结果,然后由框架发送给AggregatorOwner所在的Worker。

4. merge(value, partial)

该API执行于AggregatorOwner所在Worker,用以合并各Worker局部聚合的结果,达到全局聚合对象。与aggregate类似,value是已经聚合的结果,而partial待聚合的对象,同样用partial更新value。
假定有3个worker,分别是w0、w1、w2,其局部聚合结果是p0、p1、p2。假定发送到AggregatorOwner所在Worker的顺序为p1、p0、p2。那么merge执行次序为,首先执行merge(p1, p0),这样p1和p0就聚合为p1',然后执行merge(p1', p2),p1'和p2聚合为p1'',而p1''即为本轮超步全局聚合的结果。
从上述示例可以看出,当只有一个worker时,不需要执行merge方法,也就是说merge()不会被调用。

5. terminate(context, value)

当AggregatorOwner所在Worker执行完merge()后,框架会调用terminate(context, value)执行最后的处理。其中第二个参数value,即为merge()最后得到全局聚合,在该方法中可以对全局聚合继续修改。执行完terminate()后,框架会将全局聚合对象分发给所有Worker,供下一轮超步使用。
terminate()方法的一个特殊之处在于,如果返回true,则整个作业就结束迭代,否则继续执行。在机器学习场景中,通常判断收敛后返回true以结束作业。

Kmeans Clustering示例

下面以典型的KmeansClustering作为示例,来看下Aggregator具体用法。附件有完整代码,这里我们逐个部分解析代码。

1. GraphLoader部分

GraphLoader部分用以加载输入表,并转换为图的点或边。这里我们输入表的每行数据为一个样本,一个样本构造一个点,并用Vertex的value来存放样本。
我们首先定义一个Writable类KmeansValue作为Vertex的value类型。

  public static class KmeansValue implements Writable {

    DenseVector sample;

    public KmeansValue() { 
    }

    public KmeansValue(DenseVector v) {
      this.sample = v;
    }

    @Override
    public void write(DataOutput out) throws IOException {
      wirteForDenseVector(out, sample);

    }

    @Override
    public void readFields(DataInput in) throws IOException {
      sample = readFieldsForDenseVector(in);
    }
  }

KmeansValue中封装一个DenseVector对象来存放一个样本,这里DenseVector类型来自matrix-toolkits-java,而wirteForDenseVector()及readFieldsForDenseVector()用以实现序列化及反序列化,可参见附件中的完整代码。
我们自定义的KmeansReader代码如下:

  public static class KmeansReader extends 
    GraphLoader<LongWritable, KmeansValue, NullWritable, NullWritable> {

    @Override
    public void load(
        LongWritable recordNum,
        WritableRecord record,
        MutationContext<LongWritable, KmeansValue, NullWritable, NullWritable> context)
        throws IOException {
      KmeansVertex v = new KmeansVertex();
      v.setId(recordNum);

      int n = record.size();
      DenseVector dv = new DenseVector(n);
      for (int i = 0; i < n; i++) {
        dv.set(i, ((DoubleWritable)record.get(i)).get());
      }
      v.setValue(new KmeansValue(dv));

      context.addVertexRequest(v);
    }
  }

KmeansReader中,每读入一行数据(一个Record)创建一个点,这里用recordNum作为点的ID,将record内容转换成DenseVector对象并封装进VertexValue中。

2. Vertex部分

自定义的KmeansVertex代码如下。逻辑非常简单,每轮迭代要做的事情就是将自己维护的样本执行局部聚合。具体逻辑参见下面Aggregator的实现。

  public static class KmeansVertex extends
    Vertex<LongWritable, KmeansValue, NullWritable, NullWritable> {

    @Override
    public void compute(
        ComputeContext<LongWritable, KmeansValue, NullWritable, NullWritable> context,
        Iterable<NullWritable> messages) throws IOException {
      context.aggregate(getValue());
    }
  }
3. Aggregator部分

整个Kmeans的主要逻辑集中在Aggregator中。首先是自定义的KmeansAggrValue,用以维护要聚合及分发的内容。

  public static class KmeansAggrValue implements Writable {

    DenseMatrix centroids;
    DenseMatrix sums; // used to recalculate new centroids
    DenseVector counts; // used to recalculate new centroids

    @Override
    public void write(DataOutput out) throws IOException {
      wirteForDenseDenseMatrix(out, centroids);
      wirteForDenseDenseMatrix(out, sums);
      wirteForDenseVector(out, counts);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
      centroids = readFieldsForDenseMatrix(in);
      sums = readFieldsForDenseMatrix(in);
      counts = readFieldsForDenseVector(in);
    }
  }

KmeansAggrValue中维护了三个对象,其中centroids是当前的K个中心点,假定样本是m维的话,centroids就是一个K*m的矩阵。sums是和centroids大小一样的矩阵,每个元素记录了到特定中心点最近的样本特定维之和,例如sums(i,j)是到第i个中心点最近的样本的第j维度之和。
counts是个K维的向量,记录到每个中心点距离最短的样本个数。sums和counts一起用以计算新的中心点,也是要聚合的主要内容。
接下来是自定义的Aggregator实现类KmeansAggregator,我们按照上述API的顺序逐个看其实现。
首先是createStartupValue().

  public static class KmeansAggregator extends Aggregator<KmeansAggrValue> {

    public KmeansAggrValue createStartupValue(WorkerContext context) throws IOException {
      KmeansAggrValue av = new KmeansAggrValue();

      byte[] centers = context.readCacheFile("centers");
      String lines[] = new String(centers).split("\n");

      int rows = lines.length;
      int cols = lines[0].split(",").length; // assumption rows >= 1 

      av.centroids = new DenseMatrix(rows, cols);
      av.sums = new DenseMatrix(rows, cols);
      av.sums.zero();
      av.counts = new DenseVector(rows);
      av.counts.zero();

      for (int i = 0; i < lines.length; i++) {
        String[] ss = lines[i].split(",");
        for (int j = 0; j < ss.length; j++) {
          av.centroids.set(i, j, Double.valueOf(ss[j]));
        }
      }
      return av;
    }

我们在该方法中初始化一个KmeansAggrValue对象,然后从资源文件centers中读取初始中心点,并赋值给centroids。而sums和counts初始化为0。
接来下是createInitialValue()的实现:

    @Override
    public KmeansAggrValue createInitialValue(WorkerContext context)
        throws IOException {
      KmeansAggrValue av = (KmeansAggrValue)context.getLastAggregatedValue(0);

      // reset for next iteration
      av.sums.zero();
      av.counts.zero();

      return av;
    }

该方法中,我们首先获取上一轮迭代的KmeansAggrValue,然后将sums和counts清零,其实是只保留了上一轮迭代出的centroids。
用以执行局部聚合的aggregate()实现如下:

    @Override
    public void aggregate(KmeansAggrValue value, Object item)
        throws IOException {
      DenseVector sample = ((KmeansValue)item).sample;

      // find the nearest centroid
      int min = findNearestCentroid(value.centroids, sample);

      // update sum and count
      for (int i = 0; i < sample.size(); i ++) {
        value.sums.add(min, i, sample.get(i));
      }
      value.counts.add(min, 1.0d);
    }

该方法中调用findNearestCentroid()(实现见附件)找到样本item欧拉距离最近的中心点索引,然后将其各个维度加到sums上,最后counts计数加1。
以上三个方法执行于所有worker上,实现局部聚合。接下来看下在AggregatorOwner所在Worker执行的全局聚合相关操作。
首先是merge的实现:

    @Override
    public void merge(KmeansAggrValue value, KmeansAggrValue partial)
        throws IOException {
      value.sums.add(partial.sums);
      value.counts.add(partial.counts);
    }

merge的实现逻辑很简单,就是把各个worker聚合出的sums和counts相加即可。
最后是terminate()的实现:

   @Override
    public boolean terminate(WorkerContext context, KmeansAggrValue value)
        throws IOException {
      // Calculate the new means to be the centroids (original sums)
      DenseMatrix newCentriods = calculateNewCentroids(value.sums, value.counts, value.centroids);

      // print old centroids and new centroids for debugging
      System.out.println("\nsuperstep: " + context.getSuperstep() + 
          "\nold centriod:\n" + value.centroids + " new centriod:\n" + newCentriods);

      boolean converged = isConverged(newCentriods, value.centroids, 0.05d);
      System.out.println("superstep: " + context.getSuperstep() + "/" 
          + (context.getMaxIteration() - 1) + " converged: " + converged);
      if (converged || context.getSuperstep() == context.getMaxIteration() - 1) {
        // converged or reach max iteration, output centriods
        for (int i = 0; i < newCentriods.numRows(); i++) {
          Writable[] centriod = new Writable[newCentriods.numColumns()];
          for (int j = 0; j < newCentriods.numColumns(); j++) {
            centriod[j] = new DoubleWritable(newCentriods.get(i, j));
          }
          context.write(centriod);
        }

        // true means to terminate iteration
        return true;
      }

      // update centriods
      value.centroids.set(newCentriods);
      // false means to continue iteration
      return false;
    }

teminate()中首先根据sums和counts调用calculateNewCentroids()求平均计算出新的中心点。然后调用isConverged()根据新老中心点欧拉距离判断是否已经收敛。如果收敛或迭代次数达到最大数,则将新的中心点输出并返回true,以结束迭代。否则更新中心点并返回false以继续迭代。其中calculateNewCentroids()和isConverged()的实现见附件。

4. main方法

main方法用以构造GraphJob,然后设置相应配置,并提交作业。代码如下:

  public static void main(String[] args) throws IOException {
    if (args.length < 2)
      printUsage();

    GraphJob job = new GraphJob();

    job.setGraphLoaderClass(KmeansReader.class);
    job.setRuntimePartitioning(false);
    job.setVertexClass(KmeansVertex.class);
    job.setAggregatorClass(KmeansAggregator.class);
    job.addInput(TableInfo.builder().tableName(args[0]).build());
    job.addOutput(TableInfo.builder().tableName(args[1]).build());

    // default max iteration is 30
    job.setMaxIteration(30);
    if (args.length >= 3)
      job.setMaxIteration(Integer.parseInt(args[2]));

    long start = System.currentTimeMillis();
    job.run();
    System.out.println("Job Finished in "
        + (System.currentTimeMillis() - start) / 1000.0 + " seconds");
  }

这里需要注意的是job.setRuntimePartitioning(false),设置为false后,各个worker加载的数据不再根据Partitioner重新分区,即谁加载的数据谁维护。

总结

本文介绍了MaxCompute-GRAPH中的Aggregator机制,API含义以及示例Kmeans Clustering。总的来说,Aggregator基本步骤是,
1)每个worker启动时执行createStartupValue用以创建AggregatorValue;
2)每轮迭代开始前,每个worker执行createInitialValue来初始化本轮的AggregatorValue;
3)一轮迭代中每个点通过context.aggregate()来执行aggregate()实现worker内的局部迭代;
4)每个Worker将局部迭代结果发送给AggregatorOwner所在的Worker;
5)AggregatorOwner所在worker执行多次merge,实现全局聚合;
6)AggregatorOwner所在Worker执行terminate用以对全局聚合结果做处理并决定是否结束迭代。

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
3月前
|
机器学习/深度学习 算法 搜索推荐
从理论到实践,Python算法复杂度分析一站式教程,助你轻松驾驭大数据挑战!
【10月更文挑战第4天】在大数据时代,算法效率至关重要。本文从理论入手,介绍时间复杂度和空间复杂度两个核心概念,并通过冒泡排序和快速排序的Python实现详细分析其复杂度。冒泡排序的时间复杂度为O(n^2),空间复杂度为O(1);快速排序平均时间复杂度为O(n log n),空间复杂度为O(log n)。文章还介绍了算法选择、分而治之及空间换时间等优化策略,帮助你在大数据挑战中游刃有余。
97 3
|
2月前
|
存储 负载均衡 算法
大数据散列分区计算哈希值
大数据散列分区计算哈希值
52 4
|
3月前
|
分布式计算 大数据 Java
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
80 5
|
3月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
61 3
|
7天前
|
数据采集 人工智能 分布式计算
MaxFrame:链接大数据与AI的高效分布式计算框架深度评测与实践!
阿里云推出的MaxFrame是链接大数据与AI的分布式Python计算框架,提供类似Pandas的操作接口和分布式处理能力。本文从部署、功能验证到实际场景全面评测MaxFrame,涵盖分布式Pandas操作、大语言模型数据预处理及企业级应用。结果显示,MaxFrame在处理大规模数据时性能显著提升,代码兼容性强,适合从数据清洗到训练数据生成的全链路场景...
24 5
MaxFrame:链接大数据与AI的高效分布式计算框架深度评测与实践!
|
3月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
81 0
|
2月前
|
存储 消息中间件 分布式计算
Cisco WebEx 数据平台:统一 Trino、Pinot、Iceberg 及 Kyuubi,探索 Apache Doris 在 Cisco 的改造实践
Cisco WebEx 早期数据平台采用了多系统架构(包括 Trino、Pinot、Iceberg 、 Kyuubi 等),面临架构复杂、数据冗余存储、运维困难、资源利用率低、数据时效性差等问题。因此,引入 Apache Doris 替换了 Trino、Pinot 、 Iceberg 及 Kyuubi 技术栈,依赖于 Doris 的实时数据湖能力及高性能 OLAP 分析能力,统一数据湖仓及查询分析引擎,显著提升了查询性能及系统稳定性,同时实现资源成本降低 30%。
Cisco WebEx 数据平台:统一 Trino、Pinot、Iceberg 及 Kyuubi,探索 Apache Doris 在 Cisco 的改造实践
|
2月前
|
边缘计算 人工智能 搜索推荐
大数据与零售业:精准营销的实践
【10月更文挑战第31天】在信息化社会,大数据技术正成为推动零售业革新的重要驱动力。本文探讨了大数据在零售业中的应用,包括客户细分、个性化推荐、动态定价、营销自动化、预测性分析、忠诚度管理和社交网络洞察等方面,通过实际案例展示了大数据如何帮助商家洞悉消费者行为,优化决策,实现精准营销。同时,文章也讨论了大数据面临的挑战和未来展望。
|
2月前
|
分布式计算 Java MaxCompute
ODPS MR节点跑graph连通分量计算代码报错java heap space如何解决
任务启动命令:jar -resources odps-graph-connect-family-2.0-SNAPSHOT.jar -classpath ./odps-graph-connect-family-2.0-SNAPSHOT.jar ConnectFamily 若是设置参数该如何设置
|
3月前
|
存储 缓存 NoSQL
大数据-45 Redis 持久化概念 RDB AOF机制 持久化原因和对比
大数据-45 Redis 持久化概念 RDB AOF机制 持久化原因和对比
60 2
大数据-45 Redis 持久化概念 RDB AOF机制 持久化原因和对比

相关产品

  • 云原生大数据计算服务 MaxCompute
  • 下一篇
    开通oss服务