开发者社区> 问答> 正文

MaxCompute用户指南:图模型:示例程序:PageRank



PageRank 算法是计算网页排名的经典算法:输入是一个有向图 G,其中顶点表示网页,如果存在网页 A 到网页 B 的链接,那么存在连接 A 到 B 的边。
算法基本原理,如下所示:


  • 初始化:点值表示 PageRank 的 rank 值(double 类型),初始时,所有点取值为 1/TotalNumVertices。

  • 迭代公式:PageRank(i)=0.15/TotalNumVertices+0.85*sum,其中 sum 为所有指向 i 点的点(设为 j) PageRank(j)/out_degree(j) 的累加值。

由算法基本原理可以看出,此算法非常适合使用 MaxCompute Graph 程序进行求解:每个点 j 维护其 PageRank值,每一轮迭代都将 PageRank(j)/out_degree(j) 发给其邻接点(向其投票),下一轮迭代时,每个点根据迭代公式重新计算PageRank 取值。

代码示例

  1. import java.io.IOException;
  2. import org.apache.log4j.Logger;
  3. import com.aliyun.odps.io.WritableRecord;
  4. import com.aliyun.odps.graph.ComputeContext;
  5. import com.aliyun.odps.graph.GraphJob;
  6. import com.aliyun.odps.graph.GraphLoader;
  7. import com.aliyun.odps.graph.MutationContext;
  8. import com.aliyun.odps.graph.Vertex;
  9. import com.aliyun.odps.graph.WorkerContext;
  10. import com.aliyun.odps.io.DoubleWritable;
  11. import com.aliyun.odps.io.LongWritable;
  12. import com.aliyun.odps.io.NullWritable;
  13. import com.aliyun.odps.data.TableInfo;
  14. import com.aliyun.odps.io.Text;
  15. import com.aliyun.odps.io.Writable;
  16. public class PageRank {
  17.   private final static Logger LOG = Logger.getLogger(PageRank.class);
  18.   public static class PageRankVertex extends
  19.       Vertex<Text, DoubleWritable, NullWritable, DoubleWritable> {
  20.     @Override
  21.     public void compute(
  22.         ComputeContext<Text, DoubleWritable, NullWritable, DoubleWritable> context,
  23.         Iterable<DoubleWritable> messages) throws IOException {
  24.       if (context.getSuperstep() == 0) {
  25.         setValue(new DoubleWritable(1.0 / context.getTotalNumVertices()));
  26.       } else if (context.getSuperstep() >= 1) {
  27.         double sum = 0;
  28.         for (DoubleWritable msg : messages) {
  29.           sum += msg.get();
  30.         }
  31.         DoubleWritable vertexValue = new DoubleWritable(
  32.             (0.15f / context.getTotalNumVertices()) + 0.85f * sum);
  33.         setValue(vertexValue);
  34.       }
  35.       if (hasEdges()) {
  36.         context.sendMessageToNeighbors(this, new DoubleWritable(getValue()
  37.             .get() / getEdges().size()));
  38.       }
  39.     }
  40.     @Override
  41.     public void cleanup(
  42.         WorkerContext<Text, DoubleWritable, NullWritable, DoubleWritable> context)
  43.         throws IOException {
  44.       context.write(getId(), getValue());
  45.     }
  46.   }
  47.   public static class PageRankVertexReader extends
  48.       GraphLoader<Text, DoubleWritable, NullWritable, DoubleWritable> {
  49.     @Override
  50.     public void load(
  51.         LongWritable recordNum,
  52.         WritableRecord record,
  53.         MutationContext<Text, DoubleWritable, NullWritable, DoubleWritable> context)
  54.         throws IOException {
  55.       PageRankVertex vertex = new PageRankVertex();
  56.       vertex.setValue(new DoubleWritable(0));
  57.       vertex.setId((Text) record.get(0));
  58.       System.out.println(record.get(0));
  59.       for (int i = 1; i < record.size(); i++) {
  60.         Writable edge = record.get(i);
  61.         System.out.println(edge.toString());
  62.         if (!(edge.equals(NullWritable.get()))) {
  63.           vertex.addEdge(new Text(edge.toString()), NullWritable.get());
  64.         }
  65.       }
  66.       LOG.info("vertex edgs size: "
  67.           + (vertex.hasEdges() ? vertex.getEdges().size() : 0));
  68.       context.addVertexRequest(vertex);
  69.     }
  70.   }
  71.   private static void printUsage() {
  72.     System.out.println("Usage: <in> <out> [Max iterations (default 30)]");
  73.     System.exit(-1);
  74.   }
  75.   public static void main(String[] args) throws IOException {
  76.     if (args.length < 2)
  77.       printUsage();
  78.     GraphJob job = new GraphJob();
  79.     job.setGraphLoaderClass(PageRankVertexReader.class);
  80.     job.setVertexClass(PageRankVertex.class);
  81.     job.addInput(TableInfo.builder().tableName(args[0]).build());
  82.     job.addOutput(TableInfo.builder().tableName(args[1]).build());
  83.     // default max iteration is 30
  84.     job.setMaxIteration(30);
  85.     if (args.length >= 3)
  86.       job.setMaxIteration(Integer.parseInt(args[2]));
  87.     long startTime = System.currentTimeMillis();
  88.     job.run();
  89.     System.out.println("Job Finished in "
  90.         + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
  91.   }
  92. }

上述代码,说明如下:

  • 第 23 行:定义 PageRankVertex ,其中:
    点值表示该点(网页)的当前 PageRank 取值。

  • compute() 方法使用迭代公式:PageRank(i)=0.15/TotalNumVertices+0.85*sum更新点值。

  • cleanup() 方法把点及其 PageRank 取值写到结果表中。

第 55 行:定义 PageRankVertexReader 类,加载图,将表中每一条记录解析为一个点,记录的第一列是起点,其他列为终点。
第 88 行:主程序(main 函数),定义 GraphJob,指定 Vertex/GraphLoader 等的实现,以及最大迭代次数(默认 30),并指定输入输出表。

展开
收起
行者武松 2017-10-24 10:27:17 1918 0
0 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
Data+AI时代大数据平台应该如何建设 立即下载
大数据AI一体化的解读 立即下载
极氪大数据 Serverless 应用实践 立即下载