MaxCompute Graph开发-eclipse.sql

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 参考了官方文档,把过程做了一遍,验证成功。https://help.aliyun.com/document_detail/27813.html

操作步骤
1.运行MaxCompute客户端。
image.png

2.执行如下命令创建输入表sssp_in和输出表sssp_out。

CREATE TABLE sssp_in (v bigint, es string);
CREATE TABLE sssp_out (v bigint, l bigint);

3.上传数据至表sssp_in中。
示例数据如下,建议您创建sssp.txt文件将数据保存至本地。假设保存在本地路径为D:\IntelliJ\data\sssp.txt

1 2:2,3:1,4:4
2 1:2,3:2,4:1
3 1:1,2:2,5:1
4 1:4,2:1,5:1
5 3:1,4:1

4.执行Tunnel命令上传数据至表sssp_in中, 以空格键做两列的分隔符。

tunnel u -fd " " D:\IntelliJ\data\sssp.txt sssp_in;
odps@ yunbee>select * from sssp_in;

image.png

5.在eclipse上创建odps object(graph小写)
File -> New -> Other -> ODPS -> ODPS Project -> next -> Packge ->Project name (graph) ->
Config ODPS console installation path->Brower->C:\odpscmd\yunbee

image.png

image.png

6.创建Java class (SSSP大写)
Graph(右键)->new -> Class ->Name(例如命名成 SSSP)->Finish
image.png

7.编写SSSP示例。
本地编译、调试SSSP算法示例,假设代码被打包为名为odps-graph-example-sssp.jar的文件。
说明 仅需要将SSSP代码打包即可,不需要同时将SDK打包入odps-graph-example-sssp.jar中。

import java.io.IOException;
import com.aliyun.odps.io.WritableRecord;
import com.aliyun.odps.graph.Combiner;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.Edge;
import com.aliyun.odps.graph.GraphJob;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.graph.MutationContext;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.WorkerContext;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.data.TableInfo;

public class SSSP {
  public static final String START_VERTEX = "sssp.start.vertex.id";
  public static class SSSPVertex extends
      Vertex<LongWritable, LongWritable, LongWritable, LongWritable> {
    private static long startVertexId = -1;
    public SSSPVertex() {
      this.setValue(new LongWritable(Long.MAX_VALUE));
    }

    public boolean isStartVertex(
        ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context) {
      if (startVertexId == -1) {
        String s = context.getConfiguration().get(START_VERTEX);
        startVertexId = Long.parseLong(s);
      }
      return getId().get() == startVertexId;
    }

    @Override
    public void compute(
        ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context,
        Iterable<LongWritable> messages) throws IOException {
      long minDist = isStartVertex(context) ? 0 : Integer.MAX_VALUE;
      for (LongWritable msg : messages) {
        if (msg.get() < minDist) {
          minDist = msg.get();
        }
      }

      if (minDist < this.getValue().get()) {
        this.setValue(new LongWritable(minDist));
        if (hasEdges()) {
          for (Edge<LongWritable, LongWritable> e : this.getEdges()) {
            context.sendMessage(e.getDestVertexId(), new LongWritable(minDist
                + e.getValue().get()));
          }
        }
      } else {
        voteToHalt();
      }
    }

    @Override
    public void cleanup(
        WorkerContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
        throws IOException {
      context.write(getId(), getValue());
    }
  }

  public static class MinLongCombiner extends
      Combiner<LongWritable, LongWritable> {

    @Override
    public void combine(LongWritable vertexId, LongWritable combinedMessage,
        LongWritable messageToCombine) throws IOException {
      if (combinedMessage.get() > messageToCombine.get()) {
        combinedMessage.set(messageToCombine.get());
      }
    }

  }

  public static class SSSPVertexReader extends
      GraphLoader<LongWritable, LongWritable, LongWritable, LongWritable> {

    @Override
    public void load(
        LongWritable recordNum,
        WritableRecord record,
        MutationContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
        throws IOException {
      SSSPVertex vertex = new SSSPVertex();
      vertex.setId((LongWritable) record.get(0));
      String[] edges = record.get(1).toString().split(",");
      for (int i = 0; i < edges.length; i++) {
        String[] ss = edges[i].split(":");
        vertex.addEdge(new LongWritable(Long.parseLong(ss[0])),
            new LongWritable(Long.parseLong(ss[1])));
      }

      context.addVertexRequest(vertex);
    }

  }

  public static void main(String[] args) throws IOException {
    if (args.length < 2) {
      System.out.println("Usage: <startnode> <input> <output>");
      System.exit(-1);
    }

    GraphJob job = new GraphJob();
    job.setGraphLoaderClass(SSSPVertexReader.class);
    job.setVertexClass(SSSPVertex.class);
    job.setCombinerClass(MinLongCombiner.class);

    job.set(START_VERTEX, args[0]);
    job.addInput(TableInfo.builder().tableName(args[1]).build());
    job.addOutput(TableInfo.builder().tableName(args[2]).build());

    long startTime = System.currentTimeMillis();
    job.run();
    System.out.println("Job Finished in "
        + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
  }
}

image.png

8.在eclipse上配置参数并运行

run -> run configuations -> ODPS Mapreduce(双击) -> main class (自动生成:graph.SSSP)
->ODPS config (yunbee)->Arguments -> apply -> run
传参数:1 sssp_in sssp_out

    3 sssp_in sssp_out
    5 sssp_in sssp_out

image.png

9.查看数据
D:\eclipse\eclipse-workspace\graph\temp
D:\eclipse\eclipse-workspace\graph\warehouse\yunbee\_tables_\sssp_out
image.png

10.maven打jar包
点maxcompute->右键 -> Run AS -> 3.Maven build
#Goal:clean package -DskipTests
image.png

11.检查jar包是否正常
D:\eclipse\eclipse-workspace\maxcompute\target
maxcompute-0.0.1-SNAPSHOT.jar

image.png

12.在maxcompute上添加Jar资源
odps@ yunbee>add jar D:\eclipse\eclipse-workspace\maxcompute\target\maxcompute-0.0.1-SNAPSHOT.jar -f;
image.png

13.运行SSSP
jar -libjars maxcompute-0.0.1-SNAPSHOT.jar -classpath D:\eclipse\eclipse-workspace\maxcompute\target\maxcompute-0.0.1-SNAPSHOT.jar graph.SSSP 1 sssp_in sssp_out;
image.png
image.png

14.查看数据

odps@ yunbee>select * from sssp_out;

image.png

在maxcomupte跑的结果和本地eclipse跑的结果完成吻合。

相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
相关文章
|
26天前
|
SQL 存储 分布式计算
【万字长文,建议收藏】《高性能ODPS SQL章法》——用古人智慧驾驭大数据战场
本文旨在帮助非专业数据研发但是有高频ODPS使用需求的同学们(如数分、算法、产品等)能够快速上手ODPS查询优化,实现高性能查数看数,避免日常工作中因SQL任务卡壳、失败等情况造成的工作产出delay甚至集群资源稳定性问题。
640 32
【万字长文,建议收藏】《高性能ODPS SQL章法》——用古人智慧驾驭大数据战场
|
2月前
|
SQL 分布式计算 大数据
SparkSQL 入门指南:小白也能懂的大数据 SQL 处理神器
在大数据处理的领域,SparkSQL 是一种非常强大的工具,它可以让开发人员以 SQL 的方式处理和查询大规模数据集。SparkSQL 集成了 SQL 查询引擎和 Spark 的分布式计算引擎,使得我们可以在分布式环境下执行 SQL 查询,并能利用 Spark 的强大计算能力进行数据分析。
|
4月前
|
SQL 人工智能 分布式计算
别再只会写SQL了!这五个大数据趋势正在悄悄改变行业格局
别再只会写SQL了!这五个大数据趋势正在悄悄改变行业格局
59 0
|
6月前
|
SQL 关系型数据库 MySQL
大数据新视界--大数据大厂之MySQL数据库课程设计:MySQL 数据库 SQL 语句调优方法详解(2-1)
本文深入介绍 MySQL 数据库 SQL 语句调优方法。涵盖分析查询执行计划,如使用 EXPLAIN 命令及理解关键指标;优化查询语句结构,包括避免子查询、减少函数使用、合理用索引列及避免 “OR”。还介绍了索引类型知识,如 B 树索引、哈希索引等。结合与 MySQL 数据库课程设计相关文章,强调 SQL 语句调优重要性。为提升数据库性能提供实用方法,适合数据库管理员和开发人员。
|
6月前
|
关系型数据库 MySQL 大数据
大数据新视界--大数据大厂之MySQL 数据库课程设计:MySQL 数据库 SQL 语句调优的进阶策略与实际案例(2-2)
本文延续前篇,深入探讨 MySQL 数据库 SQL 语句调优进阶策略。包括优化索引使用,介绍多种索引类型及避免索引失效等;调整数据库参数,如缓冲池、连接数和日志参数;还有分区表、垂直拆分等其他优化方法。通过实际案例分析展示调优效果。回顾与数据库课程设计相关文章,强调全面认识 MySQL 数据库重要性。为读者提供综合调优指导,确保数据库高效运行。
|
7月前
|
SQL 大数据 数据挖掘
玩转大数据:从零开始掌握SQL查询基础
玩转大数据:从零开始掌握SQL查询基础
268 35
|
9月前
|
数据采集 机器学习/深度学习 DataWorks
DataWorks产品评测:大数据开发治理的深度体验
DataWorks产品评测:大数据开发治理的深度体验
373 1
|
11月前
|
SQL 算法 大数据
为什么大数据平台会回归SQL
在大数据领域,尽管非结构化数据占据了大数据平台80%以上的存储空间,结构化数据分析依然是核心任务。SQL因其广泛的应用基础和易于上手的特点成为大数据处理的主要语言,各大厂商纷纷支持SQL以提高市场竞争力。然而,SQL在处理复杂计算时表现出的性能和开发效率低下问题日益凸显,如难以充分利用现代硬件能力、复杂SQL优化困难等。为了解决这些问题,出现了像SPL这样的开源计算引擎,它通过提供更高效的开发体验和计算性能,以及对多种数据源的支持,为大数据处理带来了新的解决方案。
|
11月前
|
NoSQL 编译器 C语言
C语言调试是开发中的重要技能,涵盖基本技巧如打印输出、断点调试和单步执行,以及使用GCC、GDB、Visual Studio和Eclipse CDT等工具。
C语言调试是开发中的重要技能,涵盖基本技巧如打印输出、断点调试和单步执行,以及使用GCC、GDB、Visual Studio和Eclipse CDT等工具。高级技巧包括内存检查、性能分析和符号调试。通过实践案例学习如何有效定位和解决问题,同时注意保持耐心、合理利用工具、记录过程并避免过度调试,以提高编程能力和开发效率。
279 1

热门文章

最新文章

推荐镜像

更多