开发者社区> 问答> 正文

MaxCompute用户指南:图模型:示例程序:拓扑排序



对于有向边(u,v),定义所有满足 u<v 的顶点序列为拓扑序列,拓扑排序就是求一个有向图的拓扑序列的算法。
算法步骤如下:


  1. 从图中找到一个没有入边的顶点,并输出。

  2. 从图中删除该点,及其所有出边。

  3. 重复以上步骤,直到所有点都已输出。


代码示例


拓扑排序算法的代码,如下所示:
  1. import java.io.IOException;
  2. import org.apache.commons.logging.Log;
  3. import org.apache.commons.logging.LogFactory;
  4. import com.aliyun.odps.data.TableInfo;
  5. import com.aliyun.odps.graph.Aggregator;
  6. import com.aliyun.odps.graph.Combiner;
  7. import com.aliyun.odps.graph.ComputeContext;
  8. import com.aliyun.odps.graph.GraphJob;
  9. import com.aliyun.odps.graph.GraphLoader;
  10. import com.aliyun.odps.graph.MutationContext;
  11. import com.aliyun.odps.graph.Vertex;
  12. import com.aliyun.odps.graph.WorkerContext;
  13. import com.aliyun.odps.io.LongWritable;
  14. import com.aliyun.odps.io.NullWritable;
  15. import com.aliyun.odps.io.BooleanWritable;
  16. import com.aliyun.odps.io.WritableRecord;
  17. public class TopologySort {
  18.   private final static Log LOG = LogFactory.getLog(TopologySort.class);
  19.   public static class TopologySortVertex extends
  20.       Vertex<LongWritable, LongWritable, NullWritable, LongWritable> {
  21.     @Override
  22.     public void compute(
  23.         ComputeContext<LongWritable, LongWritable, NullWritable, LongWritable> context,
  24.         Iterable<LongWritable> messages) throws IOException {
  25.       // in superstep 0, each vertex sends message whose value is 1 to its
  26.       // neighbors
  27.       if (context.getSuperstep() == 0) {
  28.         if (hasEdges()) {
  29.           context.sendMessageToNeighbors(this, new LongWritable(1L));
  30.         }
  31.       } else if (context.getSuperstep() >= 1) {
  32.         // compute each vertex's indegree
  33.         long indegree = getValue().get();
  34.         for (LongWritable msg : messages) {
  35.           indegree += msg.get();
  36.         }
  37.         setValue(new LongWritable(indegree));
  38.         if (indegree == 0) {
  39.           voteToHalt();
  40.           if (hasEdges()) {
  41.             context.sendMessageToNeighbors(this, new LongWritable(-1L));
  42.           }
  43.           context.write(new LongWritable(context.getSuperstep()), getId());
  44.           LOG.info("vertex: " + getId());
  45.         }
  46.         context.aggregate(new LongWritable(indegree));
  47.       }
  48.     }
  49.   }
  50.   public static class TopologySortVertexReader extends
  51.       GraphLoader<LongWritable, LongWritable, NullWritable, LongWritable> {
  52.     @Override
  53.     public void load(
  54.         LongWritable recordNum,
  55.         WritableRecord record,
  56.         MutationContext<LongWritable, LongWritable, NullWritable, LongWritable> context)
  57.         throws IOException {
  58.       TopologySortVertex vertex = new TopologySortVertex();
  59.       vertex.setId((LongWritable) record.get(0));
  60.       vertex.setValue(new LongWritable(0));
  61.       String[] edges = record.get(1).toString().split(",");
  62.       for (int i = 0; i < edges.length; i++) {
  63.         long edge = Long.parseLong(edges);
  64.         if (edge >= 0) {
  65.           vertex.addEdge(new LongWritable(Long.parseLong(edges)),
  66.               NullWritable.get());
  67.         }
  68.       }
  69.       LOG.info(record.toString());
  70.       context.addVertexRequest(vertex);
  71.     }
  72.   }
  73.   public static class LongSumCombiner extends
  74.       Combiner<LongWritable, LongWritable> {
  75.     @Override
  76.     public void combine(LongWritable vertexId, LongWritable combinedMessage,
  77.         LongWritable messageToCombine) throws IOException {
  78.       combinedMessage.set(combinedMessage.get() + messageToCombine.get());
  79.     }
  80.   }
  81.   public static class TopologySortAggregator extends
  82.       Aggregator<BooleanWritable> {
  83.     @SuppressWarnings("rawtypes")
  84.     @Override
  85.     public BooleanWritable createInitialValue(WorkerContext context)
  86.         throws IOException {
  87.       return new BooleanWritable(true);
  88.     }
  89.     @Override
  90.     public void aggregate(BooleanWritable value, Object item)
  91.         throws IOException {
  92.       boolean hasCycle = value.get();
  93.       boolean inDegreeNotZero = ((LongWritable) item).get() == 0 ? false : true;
  94.       value.set(hasCycle && inDegreeNotZero);
  95.     }
  96.     @Override
  97.     public void merge(BooleanWritable value, BooleanWritable partial)
  98.         throws IOException {
  99.       value.set(value.get() && partial.get());
  100.     }
  101.     @SuppressWarnings("rawtypes")
  102.     @Override
  103.     public boolean terminate(WorkerContext context, BooleanWritable value)
  104.         throws IOException {
  105.       if (context.getSuperstep() == 0) {
  106.         // since the initial aggregator value is true, and in superstep we don't
  107.         // do aggregate
  108.         return false;
  109.       }
  110.       return value.get();
  111.     }
  112.   }
  113.   public static void main(String[] args) throws IOException {
  114.     if (args.length != 2) {
  115.       System.out.println("Usage : <inputTable> <outputTable>");
  116.       System.exit(-1);
  117.     }
  118.     // 输入表形式为
  119.     // 0 1,2
  120.     // 1 3
  121.     // 2 3
  122.     // 3 -1
  123.     // 第一列为vertexid,第二列为该点边的destination vertexid,若值为-1,表示该点无出边
  124.     // 输出表形式为
  125.     // 0 0
  126.     // 1 1
  127.     // 1 2
  128.     // 2 3
  129.     // 第一列为supstep值,隐含了拓扑顺序,第二列为vertexid
  130.     // TopologySortAggregator用来判断图中是否有环
  131.     // 若输入的图有环,则当图中active的点入度都不为0时,迭代结束
  132.     // 用户可以通过输入表和输出表的记录数来判断一个有向图是否有环
  133.     GraphJob job = new GraphJob();
  134.     job.setGraphLoaderClass(TopologySortVertexReader.class);
  135.     job.setVertexClass(TopologySortVertex.class);
  136.     job.addInput(TableInfo.builder().tableName(args[0]).build());
  137.     job.addOutput(TableInfo.builder().tableName(args[1]).build());
  138.     job.setCombinerClass(LongSumCombiner.class);
  139.     job.setAggregatorClass(TopologySortAggregator.class);
  140.     long startTime = System.currentTimeMillis();
  141.     job.run();
  142.     System.out.println("Job Finished in "
  143.         + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
  144.   }
  145. }

展开
收起
行者武松 2017-10-24 10:30:12 1748 0
0 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
大数据AI一体化的解读 立即下载
极氪大数据 Serverless 应用实践 立即下载
大数据&AI实战派 第2期 立即下载