MaxCompute Graph开发-eclipse.sql

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 参考了官方文档,把过程做了一遍,验证成功。https://help.aliyun.com/document_detail/27813.html

操作步骤
1.运行MaxCompute客户端。
image.png

2.执行如下命令创建输入表sssp_in和输出表sssp_out。

CREATE TABLE sssp_in (v bigint, es string);
CREATE TABLE sssp_out (v bigint, l bigint);

3.上传数据至表sssp_in中。
示例数据如下,建议您创建sssp.txt文件将数据保存至本地。假设保存在本地路径为D:\IntelliJ\data\sssp.txt

1 2:2,3:1,4:4
2 1:2,3:2,4:1
3 1:1,2:2,5:1
4 1:4,2:1,5:1
5 3:1,4:1

4.执行Tunnel命令上传数据至表sssp_in中, 以空格键做两列的分隔符。

tunnel u -fd " " D:\IntelliJ\data\sssp.txt sssp_in;
odps@ yunbee>select * from sssp_in;

image.png

5.在eclipse上创建odps object(graph小写)
File -> New -> Other -> ODPS -> ODPS Project -> next -> Packge ->Project name (graph) ->
Config ODPS console installation path->Brower->C:\odpscmd\yunbee

image.png

image.png

6.创建Java class (SSSP大写)
Graph(右键)->new -> Class ->Name(例如命名成 SSSP)->Finish
image.png

7.编写SSSP示例。
本地编译、调试SSSP算法示例,假设代码被打包为名为odps-graph-example-sssp.jar的文件。
说明 仅需要将SSSP代码打包即可,不需要同时将SDK打包入odps-graph-example-sssp.jar中。

import java.io.IOException;
import com.aliyun.odps.io.WritableRecord;
import com.aliyun.odps.graph.Combiner;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.Edge;
import com.aliyun.odps.graph.GraphJob;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.graph.MutationContext;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.WorkerContext;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.data.TableInfo;

public class SSSP {
  public static final String START_VERTEX = "sssp.start.vertex.id";
  public static class SSSPVertex extends
      Vertex<LongWritable, LongWritable, LongWritable, LongWritable> {
    private static long startVertexId = -1;
    public SSSPVertex() {
      this.setValue(new LongWritable(Long.MAX_VALUE));
    }

    public boolean isStartVertex(
        ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context) {
      if (startVertexId == -1) {
        String s = context.getConfiguration().get(START_VERTEX);
        startVertexId = Long.parseLong(s);
      }
      return getId().get() == startVertexId;
    }

    @Override
    public void compute(
        ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context,
        Iterable<LongWritable> messages) throws IOException {
      long minDist = isStartVertex(context) ? 0 : Integer.MAX_VALUE;
      for (LongWritable msg : messages) {
        if (msg.get() < minDist) {
          minDist = msg.get();
        }
      }

      if (minDist < this.getValue().get()) {
        this.setValue(new LongWritable(minDist));
        if (hasEdges()) {
          for (Edge<LongWritable, LongWritable> e : this.getEdges()) {
            context.sendMessage(e.getDestVertexId(), new LongWritable(minDist
                + e.getValue().get()));
          }
        }
      } else {
        voteToHalt();
      }
    }

    @Override
    public void cleanup(
        WorkerContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
        throws IOException {
      context.write(getId(), getValue());
    }
  }

  public static class MinLongCombiner extends
      Combiner<LongWritable, LongWritable> {

    @Override
    public void combine(LongWritable vertexId, LongWritable combinedMessage,
        LongWritable messageToCombine) throws IOException {
      if (combinedMessage.get() > messageToCombine.get()) {
        combinedMessage.set(messageToCombine.get());
      }
    }

  }

  public static class SSSPVertexReader extends
      GraphLoader<LongWritable, LongWritable, LongWritable, LongWritable> {

    @Override
    public void load(
        LongWritable recordNum,
        WritableRecord record,
        MutationContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
        throws IOException {
      SSSPVertex vertex = new SSSPVertex();
      vertex.setId((LongWritable) record.get(0));
      String[] edges = record.get(1).toString().split(",");
      for (int i = 0; i < edges.length; i++) {
        String[] ss = edges[i].split(":");
        vertex.addEdge(new LongWritable(Long.parseLong(ss[0])),
            new LongWritable(Long.parseLong(ss[1])));
      }

      context.addVertexRequest(vertex);
    }

  }

  public static void main(String[] args) throws IOException {
    if (args.length < 2) {
      System.out.println("Usage: <startnode> <input> <output>");
      System.exit(-1);
    }

    GraphJob job = new GraphJob();
    job.setGraphLoaderClass(SSSPVertexReader.class);
    job.setVertexClass(SSSPVertex.class);
    job.setCombinerClass(MinLongCombiner.class);

    job.set(START_VERTEX, args[0]);
    job.addInput(TableInfo.builder().tableName(args[1]).build());
    job.addOutput(TableInfo.builder().tableName(args[2]).build());

    long startTime = System.currentTimeMillis();
    job.run();
    System.out.println("Job Finished in "
        + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
  }
}

image.png

8.在eclipse上配置参数并运行

run -> run configuations -> ODPS Mapreduce(双击) -> main class (自动生成:graph.SSSP)
->ODPS config (yunbee)->Arguments -> apply -> run
传参数:1 sssp_in sssp_out

    3 sssp_in sssp_out
    5 sssp_in sssp_out

image.png

9.查看数据
D:\eclipse\eclipse-workspace\graph\temp
D:\eclipse\eclipse-workspace\graph\warehouse\yunbee\_tables_\sssp_out
image.png

10.maven打jar包
点maxcompute->右键 -> Run AS -> 3.Maven build
#Goal:clean package -DskipTests
image.png

11.检查jar包是否正常
D:\eclipse\eclipse-workspace\maxcompute\target
maxcompute-0.0.1-SNAPSHOT.jar

image.png

12.在maxcompute上添加Jar资源
odps@ yunbee>add jar D:\eclipse\eclipse-workspace\maxcompute\target\maxcompute-0.0.1-SNAPSHOT.jar -f;
image.png

13.运行SSSP
jar -libjars maxcompute-0.0.1-SNAPSHOT.jar -classpath D:\eclipse\eclipse-workspace\maxcompute\target\maxcompute-0.0.1-SNAPSHOT.jar graph.SSSP 1 sssp_in sssp_out;
image.png
image.png

14.查看数据

odps@ yunbee>select * from sssp_out;

image.png

在maxcomupte跑的结果和本地eclipse跑的结果完成吻合。

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
2月前
|
SQL 关系型数据库 MySQL
【MySQL】根据binlog日志获取回滚sql的一个开发思路
【MySQL】根据binlog日志获取回滚sql的一个开发思路
|
2月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之如何开发ODPS Spark任务
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
12天前
|
SQL 安全 Go
SQL注入不可怕,XSS也不难防!Python Web安全进阶教程,让你安心做开发!
在Web开发中,安全至关重要,尤其要警惕SQL注入和XSS攻击。SQL注入通过在数据库查询中插入恶意代码来窃取或篡改数据,而XSS攻击则通过注入恶意脚本来窃取用户敏感信息。本文将带你深入了解这两种威胁,并提供Python实战技巧,包括使用参数化查询和ORM框架防御SQL注入,以及利用模板引擎自动转义和内容安全策略(CSP)防范XSS攻击。通过掌握这些方法,你将能够更加自信地应对Web安全挑战,确保应用程序的安全性。
43 3
|
1月前
|
SQL JSON 分布式计算
ODPS SQL ——列转行、行转列这回让我玩明白了!
本文详细介绍了在MaxCompute中如何使用TRANS_ARRAY和LATERAL VIEW EXPLODE函数来实现列转行的功能。
|
2月前
|
SQL 分布式计算 MaxCompute
一种基于ODPS SQL的全局字典索引分布式计算思路
本文提供一种能充分利用分布式计算资源来计算全局字典索引的方法,以解决在大数据量下使用上诉方式导致所有数据被分发到单个reducer进行单机排序带来的性能瓶颈。
|
2月前
|
SQL NoSQL 数据库
开发效率与灵活性:SQL vs NoSQL
【8月更文第24天】随着大数据和实时应用的兴起,数据库技术也在不断发展以适应新的需求。传统的SQL(结构化查询语言)数据库因其成熟的数据管理机制而被广泛使用,而NoSQL(Not Only SQL)数据库则以其灵活性和扩展性赢得了众多开发者的青睐。本文将从开发者的视角出发,探讨这两种数据库类型的优缺点,并通过具体的代码示例来说明它们在实际开发中的应用。
50 1
|
2月前
|
SQL 存储 分布式计算
我在淘宝写SQL|ODPS SQL 优化总结
本文结合作者多年的数仓开发经验,结合ODPS平台分享数据仓库中的SQL优化经验。
|
2月前
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之未保存的ODPS SQL语句该如何找回
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
27天前
|
SQL 分布式计算 大数据
代码编码原则和规范大数据开发
此文档详细规定了SQL代码的编写规范,包括代码的清晰度,执行效率,以及注释的必要性。它强调所有SQL关键字需统一使用大写或小写,并禁止使用select *操作。此外,还规定了代码头部的信息模板,字段排列方式,INSERT, SELECT子句的格式,运算符的使用,CASE语句编写规则,查询嵌套规范,表别名定义,以及SQL注释的添加方法。这些规则有助于提升代码的可读性和可维护性。
15 0
|
27天前
|
SQL 分布式计算 大数据
大数据开发SQL代码编码原则和规范
这段SQL编码原则强调代码的功能完整性、清晰度、执行效率及可读性,通过统一关键词大小写、缩进量以及禁止使用模糊操作如select *等手段提升代码质量。此外,SQL编码规范还详细规定了代码头部信息、字段与子句排列、运算符前后间隔、CASE语句编写、查询嵌套、表别名定义以及SQL注释的具体要求,确保代码的一致性和维护性。
26 0

推荐镜像

更多
下一篇
无影云桌面