使用 odps-jdbc 接入 ODPS,不再从零开始

简介: 还在为传统 DBMS 的性能问题而烦恼?想借助 ODPS 的分布计算能力?但是你又不想学习官方的 SDK ?如果你恰好在老项目中用了 JDBC 访问 Oracle 或 MySQL?那么你可能对这篇文章感兴趣。本文将结合几种常见的使用场景(数据查询、数据导入、第三方客户端工具)来介绍odps-jdbc

还在为传统 DBMS 的性能问题而烦恼?想借助 ODPS 的分布计算能力?但是你又不想学习官方的 SDK ?如果你恰好在老项目中用了 JDBC 访问 Oracle 或 MySQL?那么你可能对这篇文章感兴趣。

本文将结合几种常见的使用场景(数据查询、数据导入、第三方客户端工具)来介绍 odps-jdbc ,并附有代码示例级别的入门教程(比较长,所以放在了最后,并不建议看 :D)。

什么是 odps-jdbc?odps-jdbc 是 ODPS 官方提供的 JDBC 驱动,它向 Java 程序提供了一套执行 SQL 任务的接口。还记得吗? 当年 Java 红遍大江南北靠的就是一句“编写一次,处处运行”,JDBC 也是这种思潮下产物。

目前 hive-jdbc 支持的功能 odps-jdbc 都能够支持,hive-jdbc 不支持的一些功能,例如滚动游标的 ResultSet,也支持了。 我们的目标是使 ODPS 更加开放、灵活和易用。项目托管在 github。欢迎各位开源热心人士积极反馈,贡献代码。

场景1:数据查询

用 JDBC API 执行查询语句(Select)是最常见的场景。对于 ODPS 的 SQL 任务,我们只要通过几个简单的 API,就可以拿到带有类型信息的数据。

具体来说,就是调用 Statement 对象的 execute() 方法,Statement 对象可以通过 conn 对象的来创建(conn 是 JDBC 连接对象,后面的入门教程会具体描述它是如何创建的)。Statement 对象可以重复地使用来执行不同的 SQL。

Statement stmt = conn.createStatement();
ResultSet rs = stmt.execute("SELECT foo FROM bar");

Select 语句执行完成会返回一个 ResultSet 对象,它的用法类似一个迭代器,你可以在一个 while 循环中访问结果中每一行的数据。是不是很简单?

while (rs.next()) {
    ...
}

假设查询到的结果只有两列数据,第一列为整型,第二列为字符串,把所有行打印出来的代码应该长这样:

while (rs.next()) {
    System.out.printf("col1:%d, col2:%s", rs.getInt(1), rs.getString(2));
}

更多 API 的使用可以查看 JDBC 的文档

场景2:批量数据导入

JDBC 接口定义了批处理(batch)操作,结合 PreparedStatement 使用,odps-jdbc 就可以实现数据批量导入 ODPS 的功能。

具体地,只要把带有类型信息的数据先添加到 batch 中,当累计到一定数量(batchSize),就调用一次批量执行方法,这样就能批量地将你的数据上传到 odps。省去了额外序列化反序列化的麻烦,尤其适合从传统数据库导到 ODPS 的场景。

还是直接看代码吧:

String sql = "insert into employee (name, city, phone) values (?, ?, ?)";
PreparedStatement ps = conn.prepareStatement(sql);

final int batchSize = 1000;
int count = 0;

for (Employee employee: employees) {

    ps.setString(1, employee.getName());
    ps.setString(2, employee.getCity());
    ps.setString(3, employee.getPhone());
    ps.addBatch();
    
    if(++count % batchSize == 0) {
        ps.executeBatch();
    }
}
ps.executeBatch(); // insert remaining records

其中 PreparedStatement 通过一个 sql 语句的模板来创建,它可以让我们在一条 sql 语句中注入带有类型信息的数据。
我们将 batch 的大小设为 1000,用一个 count 来触发批量执行的操作(executeBatch())。

场景3:第三方客户端工具

你甚至不一定需要通过代码的方式来接入 JDBC。兼容 JDBC,也就意味着 ODPS 可以兼容所有 JDBC 兼容的第三方工具,例如:

  • Pentahao 报表工具
  • Squirrel SQL 客户端工具
  • SQL Workbench/J 客户端工具
  • TalentD ETL 工具
  • ...

也就是说如果你是以上这些软件的用户,都可以毫无成本地迁移到 ODPS 平台上工作。下面就简单介绍一下 SQL Workbench/J 的使用。笔者曾经试用过很多利用 JDBC 连接数据库的客户端软件,大多都因为操作实在不方便,不得不放弃,直到遇见了 SQL Workbench/J,好像突然遇到了老朋友,非常推荐!

SQL Workbench/J 下载地址

首先你需要加载我们的 jdbc 驱动,点开 Manager Driver 加载我们的独立 jar 包(下载地址会在后面的入门教程中介绍),然后正确输入 Classname。

_2016_02_17_2_32_42

其次新建一个 Connection Profile,在 Driver 下拉框中选择我们刚才添加的驱动,然后填入 url(详细定义见入门教程),username 和 password 分别为你的阿里云账号和钥匙。

_2016_02_17_2_32_26

点击确认,这个时候你会发现数据库已经连接好了。于是你就可以在输入框中敲入查询语句、DDL 语句,并在下面的结果栏中查看结果。

_2016_02_17_2_38_24

现在你已经大概了解了 odps-jdbc 能用来处理哪些场景,如果你真的需要开始使用它,可以继续阅读以下内容。

入门教程(TL;DR)

使用前的准备工作

1.你可以通过以下两种方式来安装 JDBC 驱动

<dependency>
  <groupId>com.aliyun.odps</groupId>
  <artifactId>odps-jdbc</artifactId>
  <version>x.y</version>
</dependency>

2.手动在代码中将 JDBC 驱动加载到 JVM

Class.forName("com.aliyun.odps.jdbc.OdpsDriver");

3.通过下面这条语句可以定义一个 JDBC 连接

Connection conn = DriverManager.getConnection(url, accessId, accessKey);

其中 url 是 JDBC 的资源定位符,用过 JDBC 的人一定知道,JDBC 就是利用它来找到数据库的入口,并且进行一些初始化的配置。一个完整的 url 就长下面这个样子,它将使用名为 jdbc_test 的 ODPS 项目,并使用 UTF-8 编码:

"jdbc:odps:https://service.odps.aliyun.com/api?project=jdbc_test&charset=UTF-8"

accessId 和 accessKey 分别对应了阿里云账号和钥匙。

首先你要有一个阿里云的账号 :)

4.如果配置参数太多,也可以先添加到 Properties 中,然后通过它来创建

Properties config = new Properties();
config.put("access_id", "...");
config.put("access_key", "...");
config.put("project_name", "...");
config.put("charset", "...");
Connection conn = DriverManager.getConnection("jdbc:odps:<endpoint>", config);

一个完整的例子

下面的代码展示 JDBC 中的常见操作,包含了删表、建表、获取表 meta、执行 insert、执行 select 以及遍历结果集:

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;

public class Main {

  private static String driverName = "com.aliyun.odps.jdbc.OdpsDriver";

  public static void main(String[] args) throws SQLException {
    try {
      Class.forName(driverName);
    } catch (ClassNotFoundException e) {
      e.printStackTrace();
      System.exit(1);
    }

    Properties config = new Properties();
    config.put("access_id", "your_access_id");
    config.put("access_key", "your_access_key");
    config.put("project_name", "your_project");
    config.put("charset", "utf-8");
    Connection
        conn = DriverManager.getConnection("jdbc:odps:https://service.odps.aliyun.com/api", config);
    ResultSet rs;

    // create a table
    Statement stmt = conn.createStatement();
    String tableName = "jdbc_test";
    stmt.execute("drop table if exists " + tableName);
    stmt.execute("create table " + tableName + " (key int, value string)");

    // get meta data
    DatabaseMetaData metaData = conn.getMetaData();
    System.out.println("product = " + metaData.getDatabaseProductName());
    System.out.println("jdbc version = " + metaData.getDriverMajorVersion() + ", "
                       + metaData.getDriverMinorVersion());
    rs = metaData.getTables(null, null, tableName, null);
    while (rs.next()) {
      String name = rs.getString(3);
      System.out.println("inspecting table: " + name);
      ResultSet rs2 = metaData.getColumns(null, null, name, null);
      while (rs2.next()) {
        System.out.println(rs2.getString("COLUMN_NAME") + "\t"
                           + rs2.getString("TYPE_NAME") + "(" + rs2.getInt("DATA_TYPE") + ")");
      }
    }

    // run sql
    String sql;

    // insert a record
    sql = String.format("insert into table %s select 24 key, 'hours' value from (select count(1) from %s) a", tableName, tableName);
    System.out.println("Running: " + sql);
    int count = stmt.executeUpdate(sql);
    System.out.println("updated records: " + count);

    // select * query
    sql = "select * from " + tableName;
    System.out.println("Running: " + sql);
    rs = stmt.executeQuery(sql);
    while (rs.next()) {
      System.out.println(String.valueOf(rs.getInt(1)) + "\t" + rs.getString(2));
    }

  }

运行可以获得类似如下结果:

product = ODPS JDBC
jdbc version = 1, 0
inspecting table: jdbc_test
key BIGINT(-5)
value   STRING(12)
Running: insert into table jdbc_test select 24 key, 'hours' value from (select count(1) from jdbc_test) a
updated records: 1
Running: select * from jdbc_test
24  hours

FAQ

1.如何获取 ODPS SQL 的 logview?

JDBC 驱动默认会以日志的形式将 LOGVIEW 打印在程序的 stderr 中。

2.如何调试 JDBC 驱动?

在连接串中将 loglevel 设置为 DEBUG,例如:

"jdbc:odps:<endpoint>?project=jdbc_test&loglevel=DEBUG"

如果你遇到了什么问题,可以在这个地方提问~

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
13天前
|
SQL 分布式计算 DataWorks
DataWorks报错问题之集成hive数据源报错如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
7月前
|
SQL 分布式计算 大数据
MAXCOMPUTE和ODPS的区别是什么?
MAXCOMPUTE和ODPS的区别是什么?
297 1
|
13天前
|
DataWorks 关系型数据库 调度
DataWorks产品使用合集之在DataWorks中,将ODPS数据导入到MySQL的步骤如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
33 0
|
7月前
|
分布式计算 安全 大数据
MAXCOMPUTE和ODPS的区别2
MAXCOMPUTE和ODPS的区别2
95 1
|
7月前
|
SQL 分布式计算 Java
阿里云MaxCompute-Hive UDF(Java)迁移上云实践
阿里云MaxCompute-Hive UDF(Java)迁移上云实践
|
SQL 分布式计算 DataWorks
阿里云-DataWorks- ODPS SQL开发2
阿里云-DataWorks- ODPS SQL开发2 本文主要讲解日常大量会接触到的一些常用函数与关键词
|
SQL 存储 分布式计算
阿里云-DataWorks- ODPS SQL开发
阿里云-DataWorks- ODPS SQL开发
|
分布式计算 DataX MaxCompute
DataX使用指南——ODPS to ODPS
1. DataX是什么 DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、OTS、ODPS 等各种异构数据源之间高效的数据同步功能。
47071 2
|
数据采集 分布式计算 项目管理
通过 Connector 同步到 Odps|学习笔记
快速学习通过 Connector 同步到 Odps
136 0
通过 Connector 同步到 Odps|学习笔记
|
SQL 机器学习/深度学习 XML
ODPS的学习
流计算用的是ODPS,ODPS的所有对象都隶属于项目空间。
377 0

热门文章

最新文章