开发者社区> 问答> 正文

MaxCompute用户指南:图模型:示例程序:输入点表



输入点表的代码,如下所示:

  1. import java.io.IOException;
  2. import com.aliyun.odps.conf.Configuration;
  3. import com.aliyun.odps.data.TableInfo;
  4. import com.aliyun.odps.graph.ComputeContext;
  5. import com.aliyun.odps.graph.GraphJob;
  6. import com.aliyun.odps.graph.GraphLoader;
  7. import com.aliyun.odps.graph.Vertex;
  8. import com.aliyun.odps.graph.VertexResolver;
  9. import com.aliyun.odps.graph.MutationContext;
  10. import com.aliyun.odps.graph.VertexChanges;
  11. import com.aliyun.odps.graph.Edge;
  12. import com.aliyun.odps.io.LongWritable;
  13. import com.aliyun.odps.io.WritableComparable;
  14. import com.aliyun.odps.io.WritableRecord;
  15. /**
  16. * 本示例是用于展示,对于不同类型的数据类型,如何编写图作业程序载入数据。主要展示GraphLoader和
  17. * VertexResolver的配合完成图的构建。
  18. *
  19. * ODPS Graph的作业输入都为ODPS的Table,假设作业输入有两张表,一张存储点的信息,一张存储边的信息。
  20. * 存储点信息的表的格式,如:
  21. * +------------------------+
  22. * | VertexID | VertexValue |
  23. * +------------------------+
  24. * |       id0|            9|
  25. * +------------------------+
  26. * |       id1|            7|
  27. * +------------------------+
  28. * |       id2|            8|
  29. * +------------------------+
  30. *
  31. * 存储边信息的表的格式,如
  32. * +-----------------------------------+
  33. * | VertexID | DestVertexID| EdgeValue|
  34. * +-----------------------------------+
  35. * |       id0|          id1|         1|
  36. * +-----------------------------------+
  37. * |       id0|          id2|         2|
  38. * +-----------------------------------+
  39. * |       id2|          id1|         3|
  40. * +-----------------------------------+
  41. *
  42. * 结合两张表的数据,表示id0有两条出边,分别指向id1和id2;id2有一条出边,指向id1;id1没有出边。
  43. *
  44. * 对于此种类型的数据,在GraphLoader::load(LongWritable, Record, MutationContext)
  45. * ,可以使用 MutationContext#addVertexRequest(Vertex)向图中请求添加点,使用
  46. * link MutationContext#addEdgeRequest(WritableComparable, Edge)向图中请求添加边,然后,在
  47. * link VertexResolver#resolve(WritableComparable, Vertex, VertexChanges, boolean)
  48. * 中,将load 方法中添加的点和边,合并到一个Vertex对象中,作为返回值,添加到最后参与计算的图中。
  49. *
  50. **/
  51. public class VertexInputFormat {
  52.   private final static String EDGE_TABLE = "edge.table";
  53.   /**
  54.    * 将Record解释为Vertex和Edge,每个Record根据其来源,表示一个Vertex或者一条Edge。
  55.    *
  56.    * 类似于com.aliyun.odps.mapreduce.Mapper#map
  57.    * ,输入Record,生成键值对,此处的键是Vertex的ID,
  58.    * 值是Vertex或Edge,通过上下文Context写出,这些键值对会在LoadingVertexResolver出根据Vertex的ID汇总。
  59.    *
  60.    * 注意:此处添加的点或边只是根据Record内容发出的请求,并不是最后参与计算的点或边,只有在随后的VertexResolver
  61.    * 中添加的点或边才参与计算。
  62.    **/
  63.   public static class VertexInputLoader extends
  64.       GraphLoader<LongWritable, LongWritable, LongWritable, LongWritable> {
  65.     private boolean isEdgeData;
  66.     /**
  67.      * 配置VertexInputLoader。
  68.      *
  69.      * @param conf
  70.      *          作业的配置参数,在main中使用GraphJob配置的,或者在console中set的
  71.      * @param workerId
  72.      *          当前工作的worker的序号,从0开始,可以用于构造唯一的vertex id
  73.      * @param inputTableInfo
  74.      *          当前worker载入的输入表信息,可以用于确定当前输入是哪种类型的数据,即Record的格式
  75.      **/
  76.     @Override
  77.     public void setup(Configuration conf, int workerId, TableInfo inputTableInfo) {
  78.       isEdgeData = conf.get(EDGE_TABLE).equals(inputTableInfo.getTableName());
  79.     }
  80.     /**
  81.      * 根据Record中的内容,解析为对应的边,并请求添加到图中。
  82.      *
  83.      * @param recordNum
  84.      *          记录序列号,从1开始,每个worker上单独计数
  85.      * @param record
  86.      *          输入表中的记录,三列,分别表示初点、终点、边的权重
  87.      * @param context
  88.      *          上下文,请求将解释后的边添加到图中
  89.      **/
  90.     @Override
  91.     public void load(
  92.         LongWritable recordNum,
  93.         WritableRecord record,
  94.         MutationContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
  95.         throws IOException {
  96.       if (isEdgeData) {
  97.         /**
  98.          * 数据来源于存储边信息的表。
  99.          *
  100.          * 1、第一列表示初始点的ID
  101.          **/
  102.         LongWritable sourceVertexID = (LongWritable) record.get(0);
  103.         /**
  104.          * 2、第二列表示终点的ID
  105.          **/
  106.         LongWritable destinationVertexID = (LongWritable) record.get(1);
  107.         /**
  108.          * 3、地三列表示边的权重
  109.          **/
  110.         LongWritable edgeValue = (LongWritable) record.get(2);
  111.         /**
  112.          * 4、创建边,由终点ID和边的权重组成
  113.          **/
  114.         Edge<LongWritable, LongWritable> edge = new Edge<LongWritable, LongWritable>(
  115.             destinationVertexID, edgeValue);
  116.         /**
  117.          * 5、请求给初始点添加边
  118.          **/
  119.         context.addEdgeRequest(sourceVertexID, edge);
  120.         /**
  121.          * 6、如果每条Record表示双向边,重复4与5 Edge<LongWritable, LongWritable> edge2 = new
  122.          * Edge<LongWritable, LongWritable>( sourceVertexID, edgeValue);
  123.          * context.addEdgeRequest(destinationVertexID, edge2);
  124.          **/
  125.       } else {
  126.         /**
  127.          * 数据来源于存储点信息的表。
  128.          *
  129.          * 1、第一列表示点的ID
  130.          **/
  131.         LongWritable vertexID = (LongWritable) record.get(0);
  132.         /**
  133.          * 2、第二列表示点的值
  134.          **/
  135.         LongWritable vertexValue = (LongWritable) record.get(1);
  136.         /**
  137.          * 3、创建点,由点的ID和点的值组成
  138.          **/
  139.         MyVertex vertex = new MyVertex();
  140.         /**
  141.          * 4、初始化点
  142.          **/
  143.         vertex.setId(vertexID);
  144.         vertex.setValue(vertexValue);
  145.         /**
  146.          * 5、请求添加点
  147.          **/
  148.         context.addVertexRequest(vertex);
  149.       }
  150.     }
  151.   }
  152.   /**
  153.    * 汇总GraphLoader::load(LongWritable, Record, MutationContext)生成的键值对,类似于
  154.    * com.aliyun.odps.mapreduce.Reducer中的reduce。对于唯一的Vertex ID,所有关于这个ID上
  155.    * 添加\删除、点\边的行为都会放在VertexChanges中。
  156.    *
  157.    * 注意:此处并不只针对load方法中添加的有冲突的点或边才调用(冲突是指添加多个相同Vertex对象,添加重复边等),
  158.    * 所有在load方法中请求生成的ID都会在此处被调用。
  159.    **/
  160.   public static class LoadingResolver extends
  161.       VertexResolver<LongWritable, LongWritable, LongWritable, LongWritable> {
  162.     /**
  163.      * 处理关于一个ID的添加或删除、点或边的请求。
  164.      *
  165.      * VertexChanges有四个接口,分别与MutationContext的四个接口对应:
  166.      * VertexChanges::getAddedVertexList()与
  167.      * MutationContext::addVertexRequest(Vertex)对应,
  168.      * 在load方法中,请求添加的ID相同的Vertex对象,会被汇总在返回的List中
  169.      * VertexChanges::getAddedEdgeList()与
  170.      * MutationContext::addEdgeRequest(WritableComparable, Edge)
  171.      * 对应,请求添加的初始点ID相同的Edge对象,会被汇总在返回的List中
  172.      * VertexChanges::getRemovedVertexCount()与
  173.      * MutationContext::removeVertexRequest(WritableComparable)
  174.      * 对应,请求删除的ID相同的Vertex,汇总的请求删除的次数作为返回值
  175.      * VertexChanges#getRemovedEdgeList()与
  176.      * MutationContext#removeEdgeRequest(WritableComparable, WritableComparable)
  177.      * 对应,请求删除的初始点ID相同的Edge对象,会被汇总在返回的List中
  178.      *
  179.      * 用户通过处理关于这个ID的变化,通过返回值声明此ID是否参与计算,如果返回的Vertex不为null,
  180.      * 则此ID会参与随后的计算,如果返回null,则不会参与计算。
  181.      *
  182.      * @param vertexId
  183.      *          请求添加的点的ID,或请求添加的边的初点ID
  184.      * @param vertex
  185.      *          已存在的Vertex对象,数据载入阶段,始终为null
  186.      * @param vertexChanges
  187.      *          此ID上的请求添加\删除、点\边的集合
  188.      * @param hasMessages
  189.      *          此ID是否有输入消息,数据载入阶段,始终为false
  190.      **/
  191.     @Override
  192.     public Vertex<LongWritable, LongWritable, LongWritable, LongWritable> resolve(
  193.         LongWritable vertexId,
  194.         Vertex<LongWritable, LongWritable, LongWritable, LongWritable> vertex,
  195.         VertexChanges<LongWritable, LongWritable, LongWritable, LongWritable> vertexChanges,
  196.         boolean hasMessages) throws IOException {
  197.       /**
  198.        * 1、获取Vertex对象,作为参与计算的点。
  199.        **/
  200.       MyVertex computeVertex = null;
  201.       if (vertexChanges.getAddedVertexList() == null
  202.           || vertexChanges.getAddedVertexList().isEmpty()) {
  203.         computeVertex = new MyVertex();
  204.         computeVertex.setId(vertexId);
  205.       } else {
  206.         /**
  207.          * 此处假设存储点信息的表中,每个Record表示唯一的点。
  208.          **/
  209.         computeVertex = (MyVertex) vertexChanges.getAddedVertexList().get(0);
  210.       }
  211.       /**
  212.        * 2、将请求给此点添加的边,添加到Vertex对象中,如果数据有重复的可能,根据算法需要决定是否去重。
  213.        **/
  214.       if (vertexChanges.getAddedEdgeList() != null) {
  215.         for (Edge<LongWritable, LongWritable> edge : vertexChanges
  216.             .getAddedEdgeList()) {
  217.           computeVertex.addEdge(edge.getDestVertexId(), edge.getValue());
  218.         }
  219.       }
  220.       /**
  221.        * 3、将Vertex对象返回,添加到最终的图中参与计算。
  222.        **/
  223.       return computeVertex;
  224.     }
  225.   }
  226.   /**
  227.    * 确定参与计算的Vertex的行为。
  228.    *
  229.    **/
  230.   public static class MyVertex extends
  231.       Vertex<LongWritable, LongWritable, LongWritable, LongWritable> {
  232.     /**
  233.      * 将vertex的边,按照输入表的格式再写到结果表。输入表与输出表的格式和数据都相同。
  234.      *
  235.      * @param context
  236.      *          运行时上下文
  237.      * @param messages
  238.      *          输入消息
  239.      **/
  240.     @Override
  241.     public void compute(
  242.         ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context,
  243.         Iterable<LongWritable> messages) throws IOException {
  244.       /**
  245.        * 将点的ID和值,写到存储点的结果表
  246.        **/
  247.       context.write("vertex", getId(), getValue());
  248.       /**
  249.        * 将点的边,写到存储边的结果表
  250.        **/
  251.       if (hasEdges()) {
  252.         for (Edge<LongWritable, LongWritable> edge : getEdges()) {
  253.           context.write("edge", getId(), edge.getDestVertexId(),
  254.               edge.getValue());
  255.         }
  256.       }
  257.       /**
  258.        * 只迭代一轮
  259.        **/
  260.       voteToHalt();
  261.     }
  262.   }
  263.   /**
  264.    * @param args
  265.    * @throws IOException
  266.    */
  267.   public static void main(String[] args) throws IOException {
  268.     if (args.length < 4) {
  269.       throw new IOException(
  270.           "Usage: VertexInputFormat <vertex input> <edge input> <vertex output> <edge output>");
  271.     }
  272.     /**
  273.      * GraphJob用于对Graph作业进行配置
  274.      */
  275.     GraphJob job = new GraphJob();
  276.     /**
  277.      * 1、指定输入的图数据,并指定存储边数据所在的表。
  278.      */
  279.     job.addInput(TableInfo.builder().tableName(args[0]).build());
  280.     job.addInput(TableInfo.builder().tableName(args[1]).build());
  281.     job.set(EDGE_TABLE, args[1]);
  282.     /**
  283.      * 2、指定载入数据的方式,将Record解释为Edge,此处类似于map,生成的 key为vertex的ID,value为edge。
  284.      */
  285.     job.setGraphLoaderClass(VertexInputLoader.class);
  286.     /**
  287.      * 3、指定载入数据阶段,生成参与计算的vertex。此处类似于reduce,将map 生成的edge合并成一个vertex。
  288.      */
  289.     job.setLoadingVertexResolverClass(LoadingResolver.class);
  290.     /**
  291.      * 4、指定参与计算的vertex的行为。每轮迭代执行vertex.compute方法。
  292.      */
  293.     job.setVertexClass(MyVertex.class);
  294.     /**
  295.      * 5、指定图作业的输出表,将计算生成的结果写到结果表中。
  296.      */
  297.     job.addOutput(TableInfo.builder().tableName(args[2]).label("vertex").build());
  298.     job.addOutput(TableInfo.builder().tableName(args[3]).label("edge").build());
  299.     /**
  300.      * 6、提交作业执行。
  301.      */
  302.     job.run();
  303.   }
  304. }

展开
收起
行者武松 2017-10-24 10:32:59 1691 0
0 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
大数据AI一体化的解读 立即下载
极氪大数据 Serverless 应用实践 立即下载
大数据&AI实战派 第2期 立即下载