开发者社区> 问答> 正文

MaxCompute用户指南:图模型:示例程序:连通分量



两个顶点之间存在路径,称两个顶点为连通的。如果无向图 G 中任意两个顶点都是连通的,则称 G 为连通图,否则称为非连通图。其顶点个数极大的连通子图称为连通分量。
本算法计算每个点的连通分量成员,最后输出顶点值中包含最小顶点 ID 的连通分量。将最小顶点 ID 沿着边传播到连通分量的所有顶点。

代码示例


连通分量的代码,如下所示:

  1. import java.io.IOException;
  2. import com.aliyun.odps.data.TableInfo;
  3. import com.aliyun.odps.graph.ComputeContext;
  4. import com.aliyun.odps.graph.GraphJob;
  5. import com.aliyun.odps.graph.GraphLoader;
  6. import com.aliyun.odps.graph.MutationContext;
  7. import com.aliyun.odps.graph.Vertex;
  8. import com.aliyun.odps.graph.WorkerContext;
  9. import com.aliyun.odps.graph.examples.SSSP.MinLongCombiner;
  10. import com.aliyun.odps.io.LongWritable;
  11. import com.aliyun.odps.io.NullWritable;
  12. import com.aliyun.odps.io.WritableRecord;
  13. /**
  14. * Compute the connected component membership of each vertex and output
  15. * each vertex which's value containing the smallest id in the connected
  16. * component containing that vertex.
  17. *
  18. * Algorithm: propagate the smallest vertex id along the edges to all
  19. * vertices of a connected component.
  20. *
  21. */
  22. public class ConnectedComponents {
  23.   public static class CCVertex extends
  24.     Vertex<LongWritable, LongWritable, NullWritable, LongWritable> {
  25.     @Override
  26.     public void compute(
  27.         ComputeContext<LongWritable, LongWritable, NullWritable, LongWritable> context,
  28.         Iterable<LongWritable> msgs) throws IOException {
  29.       if (context.getSuperstep() == 0L) {
  30.         this.setValue(getId());
  31.         context.sendMessageToNeighbors(this, getValue());
  32.         return;
  33.       }
  34.       long minID = Long.MAX_VALUE;
  35.       for (LongWritable id : msgs) {
  36.         if (id.get() < minID) {
  37.           minID = id.get();
  38.         }
  39.       }
  40.       if (minID < this.getValue().get()) {
  41.         this.setValue(new LongWritable(minID));
  42.         context.sendMessageToNeighbors(this, getValue());
  43.       } else {
  44.         this.voteToHalt();
  45.       }
  46.     }
  47.     /**
  48.      * Output Table Description:
  49.      * +-----------------+----------------------------------------+
  50.      * | Field | Type    | Comment                                |
  51.      * +-----------------+----------------------------------------+
  52.      * | v     | bigint  | vertex id                              |
  53.      * | minID | bigint  | smallest id in the connected component |
  54.      * +-----------------+----------------------------------------+
  55.      */
  56.     @Override
  57.     public void cleanup(
  58.         WorkerContext<LongWritable, LongWritable, NullWritable, LongWritable> context)
  59.         throws IOException {
  60.       context.write(getId(), getValue());
  61.     }
  62.   }
  63.   /**
  64.    * Input Table Description:
  65.    * +-----------------+----------------------------------------------------+
  66.    * | Field | Type    | Comment                                            |
  67.    * +-----------------+----------------------------------------------------+
  68.    * | v     | bigint  | vertex id                                          |
  69.    * | es    | string  | comma separated target vertex id of outgoing edges |
  70.    * +-----------------+----------------------------------------------------+
  71.    *
  72.    * Example:
  73.    * For graph:
  74.    *       1 ----- 2
  75.    *       |       |
  76.    *       3 ----- 4
  77.    * Input table:
  78.    * +-----------+
  79.    * | v  | es   |
  80.    * +-----------+
  81.    * | 1  | 2,3  |
  82.    * | 2  | 1,4  |
  83.    * | 3  | 1,4  |
  84.    * | 4  | 2,3  |
  85.    * +-----------+
  86.    */
  87.   public static class CCVertexReader extends
  88.     GraphLoader<LongWritable, LongWritable, NullWritable, LongWritable> {
  89.     @Override
  90.     public void load(
  91.         LongWritable recordNum,
  92.         WritableRecord record,
  93.         MutationContext<LongWritable, LongWritable, NullWritable, LongWritable> context)
  94.     throws IOException {
  95.       CCVertex vertex = new CCVertex();
  96.       vertex.setId((LongWritable) record.get(0));
  97.       String[] edges = record.get(1).toString().split(",");
  98.       for (int i = 0; i < edges.length; i++) {
  99.         long destID = Long.parseLong(edges);
  100.         vertex.addEdge(new LongWritable(destID), NullWritable.get());
  101.       }
  102.       context.addVertexRequest(vertex);
  103.     }
  104.   }
  105.   public static void main(String[] args) throws IOException {
  106.     if (args.length < 2) {
  107.       System.out.println("Usage: <input> <output>");
  108.       System.exit(-1);
  109.     }
  110.     GraphJob job = new GraphJob();
  111.     job.setGraphLoaderClass(CCVertexReader.class);
  112.     job.setVertexClass(CCVertex.class);
  113.     job.setCombinerClass(MinLongCombiner.class);
  114.     job.addInput(TableInfo.builder().tableName(args[0]).build());
  115.     job.addOutput(TableInfo.builder().tableName(args[1]).build());
  116.     long startTime = System.currentTimeMillis();
  117.     job.run();
  118.     System.out.println("Job Finished in "
  119.         + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
  120.   }
  121. }

展开
收起
行者武松 2017-10-24 10:29:45 1690 0
0 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
Data+AI时代大数据平台应该如何建设 立即下载
大数据AI一体化的解读 立即下载
极氪大数据 Serverless 应用实践 立即下载