对于有向边(u,v),定义所有满足 u<v 的顶点序列为拓扑序列,拓扑排序就是求一个有向图的拓扑序列的算法。
算法步骤如下:
从图中找到一个没有入边的顶点,并输出。
从图中删除该点,及其所有出边。
重复以上步骤,直到所有点都已输出。
代码示例
拓扑排序算法的代码,如下所示:
- import java.io.IOException;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import com.aliyun.odps.data.TableInfo;
- import com.aliyun.odps.graph.Aggregator;
- import com.aliyun.odps.graph.Combiner;
- import com.aliyun.odps.graph.ComputeContext;
- import com.aliyun.odps.graph.GraphJob;
- import com.aliyun.odps.graph.GraphLoader;
- import com.aliyun.odps.graph.MutationContext;
- import com.aliyun.odps.graph.Vertex;
- import com.aliyun.odps.graph.WorkerContext;
- import com.aliyun.odps.io.LongWritable;
- import com.aliyun.odps.io.NullWritable;
- import com.aliyun.odps.io.BooleanWritable;
- import com.aliyun.odps.io.WritableRecord;
- public class TopologySort {
- private final static Log LOG = LogFactory.getLog(TopologySort.class);
- public static class TopologySortVertex extends
- Vertex<LongWritable, LongWritable, NullWritable, LongWritable> {
- @Override
- public void compute(
- ComputeContext<LongWritable, LongWritable, NullWritable, LongWritable> context,
- Iterable<LongWritable> messages) throws IOException {
- // in superstep 0, each vertex sends message whose value is 1 to its
- // neighbors
- if (context.getSuperstep() == 0) {
- if (hasEdges()) {
- context.sendMessageToNeighbors(this, new LongWritable(1L));
- }
- } else if (context.getSuperstep() >= 1) {
- // compute each vertex's indegree
- long indegree = getValue().get();
- for (LongWritable msg : messages) {
- indegree += msg.get();
- }
- setValue(new LongWritable(indegree));
- if (indegree == 0) {
- voteToHalt();
- if (hasEdges()) {
- context.sendMessageToNeighbors(this, new LongWritable(-1L));
- }
- context.write(new LongWritable(context.getSuperstep()), getId());
- LOG.info("vertex: " + getId());
- }
- context.aggregate(new LongWritable(indegree));
- }
- }
- }
- public static class TopologySortVertexReader extends
- GraphLoader<LongWritable, LongWritable, NullWritable, LongWritable> {
- @Override
- public void load(
- LongWritable recordNum,
- WritableRecord record,
- MutationContext<LongWritable, LongWritable, NullWritable, LongWritable> context)
- throws IOException {
- TopologySortVertex vertex = new TopologySortVertex();
- vertex.setId((LongWritable) record.get(0));
- vertex.setValue(new LongWritable(0));
- String[] edges = record.get(1).toString().split(",");
- for (int i = 0; i < edges.length; i++) {
- long edge = Long.parseLong(edges);
- if (edge >= 0) {
- vertex.addEdge(new LongWritable(Long.parseLong(edges)),
- NullWritable.get());
- }
- }
- LOG.info(record.toString());
- context.addVertexRequest(vertex);
- }
- }
- public static class LongSumCombiner extends
- Combiner<LongWritable, LongWritable> {
- @Override
- public void combine(LongWritable vertexId, LongWritable combinedMessage,
- LongWritable messageToCombine) throws IOException {
- combinedMessage.set(combinedMessage.get() + messageToCombine.get());
- }
- }
- public static class TopologySortAggregator extends
- Aggregator<BooleanWritable> {
- @SuppressWarnings("rawtypes")
- @Override
- public BooleanWritable createInitialValue(WorkerContext context)
- throws IOException {
- return new BooleanWritable(true);
- }
- @Override
- public void aggregate(BooleanWritable value, Object item)
- throws IOException {
- boolean hasCycle = value.get();
- boolean inDegreeNotZero = ((LongWritable) item).get() == 0 ? false : true;
- value.set(hasCycle && inDegreeNotZero);
- }
- @Override
- public void merge(BooleanWritable value, BooleanWritable partial)
- throws IOException {
- value.set(value.get() && partial.get());
- }
- @SuppressWarnings("rawtypes")
- @Override
- public boolean terminate(WorkerContext context, BooleanWritable value)
- throws IOException {
- if (context.getSuperstep() == 0) {
- // since the initial aggregator value is true, and in superstep we don't
- // do aggregate
- return false;
- }
- return value.get();
- }
- }
- public static void main(String[] args) throws IOException {
- if (args.length != 2) {
- System.out.println("Usage : <inputTable> <outputTable>");
- System.exit(-1);
- }
- // 输入表形式为
- // 0 1,2
- // 1 3
- // 2 3
- // 3 -1
- // 第一列为vertexid,第二列为该点边的destination vertexid,若值为-1,表示该点无出边
- // 输出表形式为
- // 0 0
- // 1 1
- // 1 2
- // 2 3
- // 第一列为supstep值,隐含了拓扑顺序,第二列为vertexid
- // TopologySortAggregator用来判断图中是否有环
- // 若输入的图有环,则当图中active的点入度都不为0时,迭代结束
- // 用户可以通过输入表和输出表的记录数来判断一个有向图是否有环
- GraphJob job = new GraphJob();
- job.setGraphLoaderClass(TopologySortVertexReader.class);
- job.setVertexClass(TopologySortVertex.class);
- job.addInput(TableInfo.builder().tableName(args[0]).build());
- job.addOutput(TableInfo.builder().tableName(args[1]).build());
- job.setCombinerClass(LongSumCombiner.class);
- job.setAggregatorClass(TopologySortAggregator.class);
- long startTime = System.currentTimeMillis();
- job.run();
- System.out.println("Job Finished in "
- + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
- }
- }