操作步骤
1.运行MaxCompute客户端。
2.执行如下命令创建输入表sssp_in和输出表sssp_out。
CREATE TABLE sssp_in (v bigint, es string);
CREATE TABLE sssp_out (v bigint, l bigint);
3.上传数据至表sssp_in中。
示例数据如下,建议您创建sssp.txt文件将数据保存至本地。假设保存在本地路径为D:\IntelliJ\data\sssp.txt
1 2:2,3:1,4:4
2 1:2,3:2,4:1
3 1:1,2:2,5:1
4 1:4,2:1,5:1
5 3:1,4:1
4.执行Tunnel命令上传数据至表sssp_in中, 以空格键做两列的分隔符。
tunnel u -fd " " D:\IntelliJ\data\sssp.txt sssp_in;
odps@ yunbee>select * from sssp_in;
5.在eclipse上创建odps object(graph小写)
File -> New -> Other -> ODPS -> ODPS Project -> next -> Packge ->Project name (graph) ->
Config ODPS console installation path->Brower->C:\odpscmd\yunbee
6.创建Java class (SSSP大写)
Graph(右键)->new -> Class ->Name(例如命名成 SSSP)->Finish
7.编写SSSP示例。
本地编译、调试SSSP算法示例,假设代码被打包为名为odps-graph-example-sssp.jar的文件。
说明 仅需要将SSSP代码打包即可,不需要同时将SDK打包入odps-graph-example-sssp.jar中。
import java.io.IOException;
import com.aliyun.odps.io.WritableRecord;
import com.aliyun.odps.graph.Combiner;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.Edge;
import com.aliyun.odps.graph.GraphJob;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.graph.MutationContext;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.WorkerContext;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.data.TableInfo;
public class SSSP {
public static final String START_VERTEX = "sssp.start.vertex.id";
public static class SSSPVertex extends
Vertex<LongWritable, LongWritable, LongWritable, LongWritable> {
private static long startVertexId = -1;
public SSSPVertex() {
this.setValue(new LongWritable(Long.MAX_VALUE));
}
public boolean isStartVertex(
ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context) {
if (startVertexId == -1) {
String s = context.getConfiguration().get(START_VERTEX);
startVertexId = Long.parseLong(s);
}
return getId().get() == startVertexId;
}
@Override
public void compute(
ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context,
Iterable<LongWritable> messages) throws IOException {
long minDist = isStartVertex(context) ? 0 : Integer.MAX_VALUE;
for (LongWritable msg : messages) {
if (msg.get() < minDist) {
minDist = msg.get();
}
}
if (minDist < this.getValue().get()) {
this.setValue(new LongWritable(minDist));
if (hasEdges()) {
for (Edge<LongWritable, LongWritable> e : this.getEdges()) {
context.sendMessage(e.getDestVertexId(), new LongWritable(minDist
+ e.getValue().get()));
}
}
} else {
voteToHalt();
}
}
@Override
public void cleanup(
WorkerContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
throws IOException {
context.write(getId(), getValue());
}
}
public static class MinLongCombiner extends
Combiner<LongWritable, LongWritable> {
@Override
public void combine(LongWritable vertexId, LongWritable combinedMessage,
LongWritable messageToCombine) throws IOException {
if (combinedMessage.get() > messageToCombine.get()) {
combinedMessage.set(messageToCombine.get());
}
}
}
public static class SSSPVertexReader extends
GraphLoader<LongWritable, LongWritable, LongWritable, LongWritable> {
@Override
public void load(
LongWritable recordNum,
WritableRecord record,
MutationContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
throws IOException {
SSSPVertex vertex = new SSSPVertex();
vertex.setId((LongWritable) record.get(0));
String[] edges = record.get(1).toString().split(",");
for (int i = 0; i < edges.length; i++) {
String[] ss = edges[i].split(":");
vertex.addEdge(new LongWritable(Long.parseLong(ss[0])),
new LongWritable(Long.parseLong(ss[1])));
}
context.addVertexRequest(vertex);
}
}
public static void main(String[] args) throws IOException {
if (args.length < 2) {
System.out.println("Usage: <startnode> <input> <output>");
System.exit(-1);
}
GraphJob job = new GraphJob();
job.setGraphLoaderClass(SSSPVertexReader.class);
job.setVertexClass(SSSPVertex.class);
job.setCombinerClass(MinLongCombiner.class);
job.set(START_VERTEX, args[0]);
job.addInput(TableInfo.builder().tableName(args[1]).build());
job.addOutput(TableInfo.builder().tableName(args[2]).build());
long startTime = System.currentTimeMillis();
job.run();
System.out.println("Job Finished in "
+ (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
}
}
8.在eclipse上配置参数并运行
run -> run configuations -> ODPS Mapreduce(双击) -> main class (自动生成:graph.SSSP)
->ODPS config (yunbee)->Arguments -> apply -> run
传参数:1 sssp_in sssp_out
3 sssp_in sssp_out
5 sssp_in sssp_out
9.查看数据
D:\eclipse\eclipse-workspace\graph\temp
D:\eclipse\eclipse-workspace\graph\warehouse\yunbee\_tables_\sssp_out
10.maven打jar包
点maxcompute->右键 -> Run AS -> 3.Maven build
#Goal:clean package -DskipTests
11.检查jar包是否正常
D:\eclipse\eclipse-workspace\maxcompute\target
maxcompute-0.0.1-SNAPSHOT.jar
12.在maxcompute上添加Jar资源
odps@ yunbee>add jar D:\eclipse\eclipse-workspace\maxcompute\target\maxcompute-0.0.1-SNAPSHOT.jar -f;
13.运行SSSP
jar -libjars maxcompute-0.0.1-SNAPSHOT.jar -classpath D:\eclipse\eclipse-workspace\maxcompute\target\maxcompute-0.0.1-SNAPSHOT.jar graph.SSSP 1 sssp_in sssp_out;
14.查看数据
odps@ yunbee>select * from sssp_out;
在maxcomupte跑的结果和本地eclipse跑的结果完成吻合。