MaxCompute用户指南:图模型:示例程序:输入点表
输入点表的代码,如下所示:
- import java.io.IOException;
- import com.aliyun.odps.conf.Configuration;
- import com.aliyun.odps.data.TableInfo;
- import com.aliyun.odps.graph.ComputeContext;
- import com.aliyun.odps.graph.GraphJob;
- import com.aliyun.odps.graph.GraphLoader;
- import com.aliyun.odps.graph.Vertex;
- import com.aliyun.odps.graph.VertexResolver;
- import com.aliyun.odps.graph.MutationContext;
- import com.aliyun.odps.graph.VertexChanges;
- import com.aliyun.odps.graph.Edge;
- import com.aliyun.odps.io.LongWritable;
- import com.aliyun.odps.io.WritableComparable;
- import com.aliyun.odps.io.WritableRecord;
- /**
- * 本示例是用于展示,对于不同类型的数据类型,如何编写图作业程序载入数据。主要展示GraphLoader和
- * VertexResolver的配合完成图的构建。
- *
- * ODPS Graph的作业输入都为ODPS的Table,假设作业输入有两张表,一张存储点的信息,一张存储边的信息。
- * 存储点信息的表的格式,如:
- * +------------------------+
- * | VertexID | VertexValue |
- * +------------------------+
- * | id0| 9|
- * +------------------------+
- * | id1| 7|
- * +------------------------+
- * | id2| 8|
- * +------------------------+
- *
- * 存储边信息的表的格式,如
- * +-----------------------------------+
- * | VertexID | DestVertexID| EdgeValue|
- * +-----------------------------------+
- * | id0| id1| 1|
- * +-----------------------------------+
- * | id0| id2| 2|
- * +-----------------------------------+
- * | id2| id1| 3|
- * +-----------------------------------+
- *
- * 结合两张表的数据,表示id0有两条出边,分别指向id1和id2;id2有一条出边,指向id1;id1没有出边。
- *
- * 对于此种类型的数据,在GraphLoader::load(LongWritable, Record, MutationContext)
- * ,可以使用 MutationContext#addVertexRequest(Vertex)向图中请求添加点,使用
- * link MutationContext#addEdgeRequest(WritableComparable, Edge)向图中请求添加边,然后,在
- * link VertexResolver#resolve(WritableComparable, Vertex, VertexChanges, boolean)
- * 中,将load 方法中添加的点和边,合并到一个Vertex对象中,作为返回值,添加到最后参与计算的图中。
- *
- **/
- public class VertexInputFormat {
- private final static String EDGE_TABLE = "edge.table";
- /**
- * 将Record解释为Vertex和Edge,每个Record根据其来源,表示一个Vertex或者一条Edge。
- *
- * 类似于com.aliyun.odps.mapreduce.Mapper#map
- * ,输入Record,生成键值对,此处的键是Vertex的ID,
- * 值是Vertex或Edge,通过上下文Context写出,这些键值对会在LoadingVertexResolver出根据Vertex的ID汇总。
- *
- * 注意:此处添加的点或边只是根据Record内容发出的请求,并不是最后参与计算的点或边,只有在随后的VertexResolver
- * 中添加的点或边才参与计算。
- **/
- public static class VertexInputLoader extends
- GraphLoader<LongWritable, LongWritable, LongWritable, LongWritable> {
- private boolean isEdgeData;
- /**
- * 配置VertexInputLoader。
- *
- * @param conf
- * 作业的配置参数,在main中使用GraphJob配置的,或者在console中set的
- * @param workerId
- * 当前工作的worker的序号,从0开始,可以用于构造唯一的vertex id
- * @param inputTableInfo
- * 当前worker载入的输入表信息,可以用于确定当前输入是哪种类型的数据,即Record的格式
- **/
- @Override
- public void setup(Configuration conf, int workerId, TableInfo inputTableInfo) {
- isEdgeData = conf.get(EDGE_TABLE).equals(inputTableInfo.getTableName());
- }
- /**
- * 根据Record中的内容,解析为对应的边,并请求添加到图中。
- *
- * @param recordNum
- * 记录序列号,从1开始,每个worker上单独计数
- * @param record
- * 输入表中的记录,三列,分别表示初点、终点、边的权重
- * @param context
- * 上下文,请求将解释后的边添加到图中
- **/
- @Override
- public void load(
- LongWritable recordNum,
- WritableRecord record,
- MutationContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
- throws IOException {
- if (isEdgeData) {
- /**
- * 数据来源于存储边信息的表。
- *
- * 1、第一列表示初始点的ID
- **/
- LongWritable sourceVertexID = (LongWritable) record.get(0);
- /**
- * 2、第二列表示终点的ID
- **/
- LongWritable destinationVertexID = (LongWritable) record.get(1);
- /**
- * 3、地三列表示边的权重
- **/
- LongWritable edgeValue = (LongWritable) record.get(2);
- /**
- * 4、创建边,由终点ID和边的权重组成
- **/
- Edge<LongWritable, LongWritable> edge = new Edge<LongWritable, LongWritable>(
- destinationVertexID, edgeValue);
- /**
- * 5、请求给初始点添加边
- **/
- context.addEdgeRequest(sourceVertexID, edge);
- /**
- * 6、如果每条Record表示双向边,重复4与5 Edge<LongWritable, LongWritable> edge2 = new
- * Edge<LongWritable, LongWritable>( sourceVertexID, edgeValue);
- * context.addEdgeRequest(destinationVertexID, edge2);
- **/
- } else {
- /**
- * 数据来源于存储点信息的表。
- *
- * 1、第一列表示点的ID
- **/
- LongWritable vertexID = (LongWritable) record.get(0);
- /**
- * 2、第二列表示点的值
- **/
- LongWritable vertexValue = (LongWritable) record.get(1);
- /**
- * 3、创建点,由点的ID和点的值组成
- **/
- MyVertex vertex = new MyVertex();
- /**
- * 4、初始化点
- **/
- vertex.setId(vertexID);
- vertex.setValue(vertexValue);
- /**
- * 5、请求添加点
- **/
- context.addVertexRequest(vertex);
- }
- }
- }
- /**
- * 汇总GraphLoader::load(LongWritable, Record, MutationContext)生成的键值对,类似于
- * com.aliyun.odps.mapreduce.Reducer中的reduce。对于唯一的Vertex ID,所有关于这个ID上
- * 添加\删除、点\边的行为都会放在VertexChanges中。
- *
- * 注意:此处并不只针对load方法中添加的有冲突的点或边才调用(冲突是指添加多个相同Vertex对象,添加重复边等),
- * 所有在load方法中请求生成的ID都会在此处被调用。
- **/
- public static class LoadingResolver extends
- VertexResolver<LongWritable, LongWritable, LongWritable, LongWritable> {
- /**
- * 处理关于一个ID的添加或删除、点或边的请求。
- *
- * VertexChanges有四个接口,分别与MutationContext的四个接口对应:
- * VertexChanges::getAddedVertexList()与
- * MutationContext::addVertexRequest(Vertex)对应,
- * 在load方法中,请求添加的ID相同的Vertex对象,会被汇总在返回的List中
- * VertexChanges::getAddedEdgeList()与
- * MutationContext::addEdgeRequest(WritableComparable, Edge)
- * 对应,请求添加的初始点ID相同的Edge对象,会被汇总在返回的List中
- * VertexChanges::getRemovedVertexCount()与
- * MutationContext::removeVertexRequest(WritableComparable)
- * 对应,请求删除的ID相同的Vertex,汇总的请求删除的次数作为返回值
- * VertexChanges#getRemovedEdgeList()与
- * MutationContext#removeEdgeRequest(WritableComparable, WritableComparable)
- * 对应,请求删除的初始点ID相同的Edge对象,会被汇总在返回的List中
- *
- * 用户通过处理关于这个ID的变化,通过返回值声明此ID是否参与计算,如果返回的Vertex不为null,
- * 则此ID会参与随后的计算,如果返回null,则不会参与计算。
- *
- * @param vertexId
- * 请求添加的点的ID,或请求添加的边的初点ID
- * @param vertex
- * 已存在的Vertex对象,数据载入阶段,始终为null
- * @param vertexChanges
- * 此ID上的请求添加\删除、点\边的集合
- * @param hasMessages
- * 此ID是否有输入消息,数据载入阶段,始终为false
- **/
- @Override
- public Vertex<LongWritable, LongWritable, LongWritable, LongWritable> resolve(
- LongWritable vertexId,
- Vertex<LongWritable, LongWritable, LongWritable, LongWritable> vertex,
- VertexChanges<LongWritable, LongWritable, LongWritable, LongWritable> vertexChanges,
- boolean hasMessages) throws IOException {
- /**
- * 1、获取Vertex对象,作为参与计算的点。
- **/
- MyVertex computeVertex = null;
- if (vertexChanges.getAddedVertexList() == null
- || vertexChanges.getAddedVertexList().isEmpty()) {
- computeVertex = new MyVertex();
- computeVertex.setId(vertexId);
- } else {
- /**
- * 此处假设存储点信息的表中,每个Record表示唯一的点。
- **/
- computeVertex = (MyVertex) vertexChanges.getAddedVertexList().get(0);
- }
- /**
- * 2、将请求给此点添加的边,添加到Vertex对象中,如果数据有重复的可能,根据算法需要决定是否去重。
- **/
- if (vertexChanges.getAddedEdgeList() != null) {
- for (Edge<LongWritable, LongWritable> edge : vertexChanges
- .getAddedEdgeList()) {
- computeVertex.addEdge(edge.getDestVertexId(), edge.getValue());
- }
- }
- /**
- * 3、将Vertex对象返回,添加到最终的图中参与计算。
- **/
- return computeVertex;
- }
- }
- /**
- * 确定参与计算的Vertex的行为。
- *
- **/
- public static class MyVertex extends
- Vertex<LongWritable, LongWritable, LongWritable, LongWritable> {
- /**
- * 将vertex的边,按照输入表的格式再写到结果表。输入表与输出表的格式和数据都相同。
- *
- * @param context
- * 运行时上下文
- * @param messages
- * 输入消息
- **/
- @Override
- public void compute(
- ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context,
- Iterable<LongWritable> messages) throws IOException {
- /**
- * 将点的ID和值,写到存储点的结果表
- **/
- context.write("vertex", getId(), getValue());
- /**
- * 将点的边,写到存储边的结果表
- **/
- if (hasEdges()) {
- for (Edge<LongWritable, LongWritable> edge : getEdges()) {
- context.write("edge", getId(), edge.getDestVertexId(),
- edge.getValue());
- }
- }
- /**
- * 只迭代一轮
- **/
- voteToHalt();
- }
- }
- /**
- * @param args
- * @throws IOException
- */
- public static void main(String[] args) throws IOException {
- if (args.length < 4) {
- throw new IOException(
- "Usage: VertexInputFormat <vertex input> <edge input> <vertex output> <edge output>");
- }
- /**
- * GraphJob用于对Graph作业进行配置
- */
- GraphJob job = new GraphJob();
- /**
- * 1、指定输入的图数据,并指定存储边数据所在的表。
- */
- job.addInput(TableInfo.builder().tableName(args[0]).build());
- job.addInput(TableInfo.builder().tableName(args[1]).build());
- job.set(EDGE_TABLE, args[1]);
- /**
- * 2、指定载入数据的方式,将Record解释为Edge,此处类似于map,生成的 key为vertex的ID,value为edge。
- */
- job.setGraphLoaderClass(VertexInputLoader.class);
- /**
- * 3、指定载入数据阶段,生成参与计算的vertex。此处类似于reduce,将map 生成的edge合并成一个vertex。
- */
- job.setLoadingVertexResolverClass(LoadingResolver.class);
- /**
- * 4、指定参与计算的vertex的行为。每轮迭代执行vertex.compute方法。
- */
- job.setVertexClass(MyVertex.class);
- /**
- * 5、指定图作业的输出表,将计算生成的结果写到结果表中。
- */
- job.addOutput(TableInfo.builder().tableName(args[2]).label("vertex").build());
- job.addOutput(TableInfo.builder().tableName(args[3]).label("edge").build());
- /**
- * 6、提交作业执行。
- */
- job.run();
- }
- }
收起
行者武松
2017-10-24 10:32:59
1691
0
0
条回答
写回答
取消
提交回答