开发者社区 问答 正文

MaxCompute用户指南:图模型:示例程序:三角形计数



三角形计数算法用于计算通过每个顶点的三角形个数。
算法实现的流程如下:


  1. 每个顶点将其 ID 发送给所有出边邻居。

  2. 存储入边和出边邻居并发送给出边邻居。

  3. 对每条边计算其终点的交集数量,并求和,结果输出到表。

  4. 将表中的输出结果求和并除以三,即得到三角形个数。


代码示例


三角形计数算法的代码,如下所示:
  1. import java.io.IOException;
  2. import com.aliyun.odps.data.TableInfo;
  3. import com.aliyun.odps.graph.ComputeContext;
  4. import com.aliyun.odps.graph.Edge;
  5. import com.aliyun.odps.graph.GraphJob;
  6. import com.aliyun.odps.graph.GraphLoader;
  7. import com.aliyun.odps.graph.MutationContext;
  8. import com.aliyun.odps.graph.Vertex;
  9. import com.aliyun.odps.graph.WorkerContext;
  10. import com.aliyun.odps.io.LongWritable;
  11. import com.aliyun.odps.io.NullWritable;
  12. import com.aliyun.odps.io.Tuple;
  13. import com.aliyun.odps.io.Writable;
  14. import com.aliyun.odps.io.WritableRecord;
  15. /**
  16. * Compute the number of triangles passing through each vertex.
  17. *
  18. * The algorithm can be computed in three supersteps:
  19. * I. Each vertex sends a message with its ID to all its outgoing
  20. * neighbors.
  21. * II. The incoming neighbors and outgoing neighbors are stored and
  22. * send to outgoing neighbors.
  23. * III. For each edge compute the intersection of the sets at destination
  24. * vertex and sum them, then output to table.
  25. *
  26. * The triangle count is the sum of output table and divide by three since
  27. * each triangle is counted three times.
  28. *
  29. **/
  30. public class TriangleCount {
  31.   public static class TCVertex extends
  32.     Vertex<LongWritable, Tuple, NullWritable, Tuple> {
  33.     @Override
  34.     public void setup(
  35.         WorkerContext<LongWritable, Tuple, NullWritable, Tuple> context)
  36.       throws IOException {
  37.       // collect the outgoing neighbors
  38.       Tuple t = new Tuple();
  39.       if (this.hasEdges()) {
  40.         for (Edge<LongWritable, NullWritable> edge : this.getEdges()) {
  41.           t.append(edge.getDestVertexId());
  42.         }
  43.       }
  44.       this.setValue(t);
  45.     }
  46.     @Override
  47.     public void compute(
  48.         ComputeContext<LongWritable, Tuple, NullWritable, Tuple> context,
  49.         Iterable<Tuple> msgs) throws IOException {
  50.       if (context.getSuperstep() == 0L) {
  51.         // sends a message with its ID to all its outgoing neighbors
  52.         Tuple t = new Tuple();
  53.         t.append(getId());
  54.         context.sendMessageToNeighbors(this, t);
  55.       } else if (context.getSuperstep() == 1L) {
  56.         // store the incoming neighbors
  57.         for (Tuple msg : msgs) {
  58.           for (Writable item : msg.getAll()) {
  59.             if (!this.getValue().getAll().contains((LongWritable)item)) {
  60.               this.getValue().append((LongWritable)item);
  61.             }
  62.           }
  63.         }
  64.         // send both incoming and outgoing neighbors to all outgoing neighbors
  65.         context.sendMessageToNeighbors(this, getValue());
  66.       } else if (context.getSuperstep() == 2L) {
  67.         // count the sum of intersection at each edge
  68.         long count = 0;
  69.         for (Tuple msg : msgs) {
  70.           for (Writable id : msg.getAll()) {
  71.             if (getValue().getAll().contains(id)) {
  72.               count ++;
  73.             }
  74.           }
  75.         }
  76.         // output to table
  77.         context.write(getId(), new LongWritable(count));
  78.         this.voteToHalt();
  79.       }
  80.     }
  81.   }
  82.   public static class TCVertexReader extends
  83.   GraphLoader<LongWritable, Tuple, NullWritable, Tuple> {
  84.     @Override
  85.     public void load(
  86.         LongWritable recordNum,
  87.         WritableRecord record,
  88.         MutationContext<LongWritable, Tuple, NullWritable, Tuple> context)
  89.     throws IOException {
  90.       TCVertex vertex = new TCVertex();
  91.       vertex.setId((LongWritable) record.get(0));
  92.       String[] edges = record.get(1).toString().split(",");
  93.       for (int i = 0; i < edges.length; i++) {
  94.         try {
  95.           long destID = Long.parseLong(edges);
  96.           vertex.addEdge(new LongWritable(destID), NullWritable.get());
  97.         } catch(NumberFormatException nfe) {
  98.           System.err.println("Ignore " + nfe);
  99.         }
  100.       }
  101.       context.addVertexRequest(vertex);
  102.     }
  103.   }
  104.   public static void main(String[] args) throws IOException {
  105.     if (args.length < 2) {
  106.       System.out.println("Usage: <input> <output>");
  107.       System.exit(-1);
  108.     }
  109.     GraphJob job = new GraphJob();
  110.     job.setGraphLoaderClass(TCVertexReader.class);
  111.     job.setVertexClass(TCVertex.class);
  112.     job.addInput(TableInfo.builder().tableName(args[0]).build());
  113.     job.addOutput(TableInfo.builder().tableName(args[1]).build());
  114.     long startTime = System.currentTimeMillis();
  115.     job.run();
  116.     System.out.println("Job Finished in "
  117.         + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
  118.   }
  119. }

展开
收起
行者武松 2017-10-24 10:32:11 1876 分享 版权
0 条回答
写回答
取消 提交回答