运行作业
MaxCompute 客户端提供一个 Jar 命令用于运行 MaxCompute Graph 作业,其使用方式与
MapReduce 中的
Jar 命令 相同。
简要介绍如下:
- Usage: jar [<GENERIC_OPTIONS>] <MAIN_CLASS> [ARGS]
- -conf <configuration_file> Specify an application configuration file
- -classpath <local_file_list> classpaths used to run mainClass
- -D <name>=<value> Property value pair, which will be used to run mainClass
- -local Run job in local mode
- -resources <resource_name_list> file/table resources used in graph, seperate by comma
其中 <GENERIC_OPTIONS> 包括以下参数(均为可选):
-conf <configuration file >:指定 JobConf 配置文件。
-classpath <local_file_list >:本地执行时的 classpath,主要用于指定 main 函数所在的 Jar 包。
大多数情况下,用户更习惯于将 main 函数与 Graph作业编写在一个包中,例如:单源最短距离算法。因此,在执行示例程序时,-resources 及 -classpath 的参数中都出现了用户的Jar 包,但二者意义不同:-resources 引用的是 Graph 作业,运行于分布式环境中,而 -classpath 引用的是 main函数,运行于本地,指定的 Jar 包路径也是本地文件路径。包名之间使用系统默认的文件分割符作分割(通常情况下,windows系统是分号,linux 系统是冒号)。
-D <prop_name > = < prop_value >:本地执行时,<mainClass > 的 Java 属性,可以定义多个。
-local:以本地模式执行 Graph 作业,主要用于程序调试。
-resources <resource_name_list >:Graph作业运行时使用的资源声明。一般情况下,resource_name_list 中需要指定 Graph 作业所在的资源名称。如果您在 Graph作业中读取了其他 MaxCompute 资源,那么,这些资源名称也需要被添加到 resource_name_list中。资源之间使用逗号分隔,使用跨项目空间使用资源时,需要在前面加上:PROJECT_NAME/resources/。示例:-resources otherproject/resources/resfile;。
同时,您也可以直接运行 Graph 作业的 main 函数,直接将作业提交到 MaxCompute,而不是通过 MaxCompute 客户端提交作业。以PageRank算法 为例,如下所示:
-
public static void main(String[] args) throws Exception {
- if (args.length < 2)
- printUsage();
- Account account = new AliyunAccount(accessId, accessKey);
- Odps odps = new Odps(account);
- odps.setEndpoint(endPoint);
- odps.setDefaultProject(project);
- SessionState ss = SessionState.get();
- ss.setOdps(odps);
- ss.setLocalRun(false);
- String resource = "mapreduce-examples.jar";
- GraphJob job = new GraphJob();
- // 将使用的jar及其他文件添加到class cache resource,对应于jar命令中 -libjars 中指定的资源
- job.addCacheResourcesToClassPath(resource);
- job.setGraphLoaderClass(PageRankVertexReader.class);
- job.setVertexClass(PageRankVertex.class);
- job.addInput(TableInfo.builder().tableName(args[0]).build());
- job.addOutput(TableInfo.builder().tableName(args[1]).build());
- // default max iteration is 30
- job.setMaxIteration(30);
- if (args.length >= 3)
- job.setMaxIteration(Integer.parseInt(args[2]));
- long startTime = System.currentTimeMillis();
- job.run();
- System.out.println("Job Finished in "
- + (System.currentTimeMillis() - startTime) / 1000.0
- + " seconds");
- }
输入及输出
MaxCompute Graph 作业不支持您自定义输入与输出格式。
定义作业输入,支持多路输入,如下所示:
- GraphJob job = new GraphJob();
- job.addInput(TableInfo.builder().tableName(“tblname”).build()); //表作为输入
- job.addInput(TableInfo.builder().tableName(“tblname”).partSpec("pt1=a/pt2=b").build()); //分区作为输入
- //只读取输入表的 col2 和 col0 列,在 GraphLoader 的 load 方法中,record.get(0) 得到的是col2列,顺序一致
- job.addInput(TableInfo.builder().tableName(“tblname”).partSpec("pt1=a/pt2=b").build(), new String[]{"col2", "col0"});
注意:
关于作业输入定义,更多详情请参见 GraphJob 的 addInput 相关方法说明,框架读取输入表的记录传给用户自定义的 GraphLoader 载入图数据。
限制:暂时不支持分区过滤条件。更多应用限制请参见 应用限制。
定义作业输出,支持多路输出,通过 label 标识每路输出。如下所示:
- GraphJob job = new GraphJob();
- //输出表为分区表时需要给到最末一级分区
- job.addOutput(TableInfo.builder().tableName("table_name").partSpec("pt1=a/pt2=b").build());
- // 下面的参数 true 表示覆盖tableinfo指定的分区,即INSERT OVERWRITE语义,false表示INSERT INTO语义
- job.addOutput(TableInfo.builder().tableName("table_name").partSpec("pt1=a/pt2=b").lable("output1").build(), true);
注意:
关于作业输出定义,更多详情请参见 GraphJob 的 addOutput 相关方法说明。
Graph 作业在运行时可以通过 WorkerContext 的 write 方法写出记录到输出表,多路输出需要指定标识,如上面的 output1。
更多应用限制请参见 应用限制。
读取资源
Graph 程序中添加资源
除了通过 Jar 命令指定 Graph 读取的资源外,还可以通过 GraphJob 的以下两种方法指定:
- void addCacheResources(String resourceNames)
- void addCacheResourcesToClassPath(String resourceNames)
Graph 程序中使用资源
在 Graph 程序中可以通过相应的上下文对象 WorkerContext 的下述方法读取资源:
- public byte[] readCacheFile(String resourceName) throws IOException;
- public Iterable<byte[]> readCacheArchive(String resourceName) throws IOException;
- public Iterable<byte[]> readCacheArchive(String resourceName, String relativePath)throws IOException;
- public Iterable<WritableRecord> readResourceTable(String resourceName);
- public BufferedInputStream readCacheFileAsStream(String resourceName) throws IOException;
- public Iterable<BufferedInputStream> readCacheArchiveAsStream(String resourceName) throws IOException;
- public Iterable<BufferedInputStream> readCacheArchiveAsStream(String resourceName, String relativePath) throws IOException;
注意:
通常在 WorkerComputer 的 setup 方法中读取资源,然后保存在 Worker Value 中,之后通过 getWorkerValue 方法获取。
建议用上面的流接口,边读边处理,内存耗费少。
更多应用限制请参见 应用限制。