Dijkstra 算法是求解有向图中单源最短距离(Single Source Shortest Path,简称为 SSSP)的经典算法。
最短距离:对一个有权重的有向图 G=(V,E),从一个源点 s 到汇点 v 有很多路径,其中边权和最小的路径,称从 s 到 v 的最短距离。
算法基本原理,如下所示:
初始化:源点 s 到 s 自身的距离(d[s]=0),其他点 u 到 s 的距离为无穷(d=∞)。
迭代:若存在一条从 u 到 v 的边,那么从 s 到 v 的最短距离更新为:d[v]=min(d[v], d+weight(u, v)),直到所有的点到 s 的距离不再发生变化时,迭代结束。
由算法基本原理可以看出,此算法非常适合使用 MaxCompute Graph程序进行求解:每个点维护到源点的当前最短距离值,当这个值变化时,将新值加上边的权值发送消息通知其邻接点,下一轮迭代时,邻接点根据收到的消息更新其当前最短距离,当所有点当前最短距离不再变化时,迭代结束。
代码示例
单源最短距离的代码,如下所示:
  - import java.io.IOException;
 - import com.aliyun.odps.io.WritableRecord;
 - import com.aliyun.odps.graph.Combiner;
 - import com.aliyun.odps.graph.ComputeContext;
 - import com.aliyun.odps.graph.Edge;
 - import com.aliyun.odps.graph.GraphJob;
 - import com.aliyun.odps.graph.GraphLoader;
 - import com.aliyun.odps.graph.MutationContext;
 - import com.aliyun.odps.graph.Vertex;
 - import com.aliyun.odps.graph.WorkerContext;
 - import com.aliyun.odps.io.LongWritable;
 - import com.aliyun.odps.data.TableInfo;
 - public class SSSP {
 -   public static final String START_VERTEX = "sssp.start.vertex.id";
 -   public static class SSSPVertex extends
 -       Vertex<LongWritable, LongWritable, LongWritable, LongWritable> {
 -     private static long startVertexId = -1;
 -     public SSSPVertex() {
 -       this.setValue(new LongWritable(Long.MAX_VALUE));
 -     }
 -     public boolean isStartVertex(
 -         ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context) {
 -       if (startVertexId == -1) {
 -         String s = context.getConfiguration().get(START_VERTEX);
 -         startVertexId = Long.parseLong(s);
 -       }
 -       return getId().get() == startVertexId;
 -     }
 -     @Override
 -     public void compute(
 -         ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context,
 -         Iterable<LongWritable> messages) throws IOException {
 -       long minDist = isStartVertex(context) ? 0 : Integer.MAX_VALUE;
 -       for (LongWritable msg : messages) {
 -         if (msg.get() < minDist) {
 -           minDist = msg.get();
 -         }
 -       }
 -       if (minDist < this.getValue().get()) {
 -         this.setValue(new LongWritable(minDist));
 -         if (hasEdges()) {
 -           for (Edge<LongWritable, LongWritable> e : this.getEdges()) {
 -             context.sendMessage(e.getDestVertexId(), new LongWritable(minDist
 -                 + e.getValue().get()));
 -           }
 -         }
 -       } else {
 -         voteToHalt();
 -       }
 -     }
 -     @Override
 -     public void cleanup(
 -         WorkerContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
 -         throws IOException {
 -       context.write(getId(), getValue());
 -     }
 -   }
 -   public static class MinLongCombiner extends
 -       Combiner<LongWritable, LongWritable> {
 -     @Override
 -     public void combine(LongWritable vertexId, LongWritable combinedMessage,
 -         LongWritable messageToCombine) throws IOException {
 -       if (combinedMessage.get() > messageToCombine.get()) {
 -         combinedMessage.set(messageToCombine.get());
 -       }
 -     }
 -   }
 -   public static class SSSPVertexReader extends
 -       GraphLoader<LongWritable, LongWritable, LongWritable, LongWritable> {
 -     @Override
 -     public void load(
 -         LongWritable recordNum,
 -         WritableRecord record,
 -         MutationContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
 -         throws IOException {
 -       SSSPVertex vertex = new SSSPVertex();
 -       vertex.setId((LongWritable) record.get(0));
 -       String[] edges = record.get(1).toString().split(",");
 -       for (int i = 0; i < edges.length; i++) {
 -         String[] ss = edges.split(":");
 -         vertex.addEdge(new LongWritable(Long.parseLong(ss[0])),
 -             new LongWritable(Long.parseLong(ss[1])));
 -       }
 -       context.addVertexRequest(vertex);
 -     }
 -   }
 -   public static void main(String[] args) throws IOException {
 -     if (args.length < 2) {
 -       System.out.println("Usage: <startnode> <input> <output>");
 -       System.exit(-1);
 -     }
 -     GraphJob job = new GraphJob();
 -     job.setGraphLoaderClass(SSSPVertexReader.class);
 -     job.setVertexClass(SSSPVertex.class);
 -     job.setCombinerClass(MinLongCombiner.class);
 -     job.set(START_VERTEX, args[0]);
 -     job.addInput(TableInfo.builder().tableName(args[1]).build());
 -     job.addOutput(TableInfo.builder().tableName(args[2]).build());
 -     long startTime = System.currentTimeMillis();
 -     job.run();
 -     System.out.println("Job Finished in "
 -         + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
 -   }
 - }
 
上述代码,说明如下:
   
第 19 行:定义 SSSPVertex ,其中:
点值表示该点到源点 startVertexId 的当前最短距离。
compute() 方法使用迭代公式:d[v]=min(d[v], d+weight(u, v)) 更新点值。
cleanup() 方法把点及其到源点的最短距离写到结果表中。
第 58 行:当点值没发生变化时,调用 voteToHalt() 告诉框架该点进入 halt 状态,当所有点都进入 halt 状态时,计算结束。
第 70 行:定义 MinLongCombiner,对发送给同一个点的消息进行合并,优化性能,减少内存占用。
第 83 行:定义 SSSPVertexReader 类,加载图,将表中每一条记录解析为一个点,记录的第一列是点标识,第二列存储该点起始的所有的边集,内容如:2:2,3:1,4:4。
第 106 行:主程序(main 函数),定义 GraphJob,指定 Vertex/GraphLoader/Combiner 等的实现,指定输入输出表。