3秒学不会Palo Doris的数据导入你打我!(二)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用版 2核4GB 50GB
简介: 3秒学不会Palo Doris的数据导入你打我!

准备工作

开通百度消息服务

百度消息服务(BMS)基于 Kafka 在百度智能云提供托管服务,请先按照以下流程开通服务。

  1. 请根据 (BMS快速入门https://cloud.baidu.com/doc/Kafka/s/9jwvygf3k 文档开通消息服务
  2. 下载证书压缩包 kafka-key.zip 并解压,解压后将得到以下文件

3.上传证书文件到 HTTP 服务器。

因为后续 Doris 需要从某个 HTTP 服务器上下载这些整数以供访问 Kafka。因此我们需要先将这些证书上传到 HTTP 服务器。这个 HTTP 服务器必须要能够被 Doris 的 Leader Node 节点所访问。

如果您没有合适的 HTTP 服务器,可以参照以下方式借助百度对象存储(BOS)来完成:

   1. 根据 https://cloud.baidu.com/doc/BOS/s/Jk4xttg03开始使用),https://cloud.baidu.com/doc/BOS/s/Fk4xtwbze(创建Bucket) 文档开通BOS服务并创建一个 Bucket。注意,Bucket所在地域必须和 Doris 集群所在地域相同

           2. 将以下三个文件上传到 Bucket

  • ca.pem
  • client.key
  • client.pem
  1.  3. 在 BOS Bucket 文件列表页面,点击文件右侧的 文件信息,可以获取 HTTP 访问连接。请将 连接有效时间 设为 -1,即永久。

注:请不要使用带有 cdn 加速的 http 下载地址。这个地址某些情况无法被 Doris 访问。


自建 Kafka 服务

如果使用自建 Kafka 服务,请确保 Kafka 服务和 Doris 集群在同一个 VPC 内,并且相互之间的网络能够互通。


订阅 Kafka 消息

订阅 Kafka 消息使用了 Doris 中的例行导入(Routine Load)功能。

用户首先需要创建一个例行导入作业。作业会通过例行调度,不断地发送一系列的任务,每个任务会消费一定数量 Kafka 中的消息。

请注意以下使用限制:

  1. 支持无认证的 Kafka 访问,以及通过 SSL 方式认证的 Kafka 集群。
  2. 支持的消息格式如下:
  1. 仅支持 Kafka 0.10.0.0(含) 以上版本。

访问 SSL 认证的 Kafka 集群

例行导入功能支持无认证的 Kafka 集群,以及通过 SSL 认证的 Kafka 集群。

访问 SSL 认证的 Kafka 集群需要用户提供用于认证 Kafka Broker 公钥的证书文件(ca.pem)。如果 Kafka 集群同时开启了客户端认证,则还需提供客户端的公钥(client.pem)、密钥文件(client.key),以及密钥密码。这里所需的文件需要先通过 CREAE FILE 命令上传到 Plao 中,并且 catalog 名称为 kafka

这里给出示例:

上传文件

CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka");
CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka");
CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");

上传完成后,可以通过 http://palo.baidu.com/docs/SQL%E6%89%8B%E5%86%8C/%E8%AF%AD%E6%B3%95%E5%B8%AE%E5%8A%A9/%E4%BF%A1%E6%81%AF%E6%9F%A5%E7%9C%8B%E8%AF%AD%E5%8F%A5/SHOW-FILE/     (SHOW FLIES) 命令查看已上传的文件。


创建例行导入作业

访问无认证的 Kafka 集群

CREATE ROUTINE LOAD example_db.my_first_job ON example_tbl
COLUMNS TERMINATED BY ","
PROPERTIES
(
    "max_batch_interval" = "20",
    "max_batch_rows" = "300000",
    "max_batch_size" = "209715200",
)
FROM KAFKA
(
    "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
    "kafka_topic" = "my_topic",
    "property.group.id" = "xxx",
    "property.client.id" = "xxx",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
  • max_batch_interval/max_batch_rows/max_batch_size 用于控制一个子任务的运行周期。一个子任务的运行周期由最长运行时间、最多消费行数和最大消费数据量共同决定。

  1. 2. 访问 SSL 认证的 Kafka 集群
CREATE ROUTINE LOAD example_db.my_first_job ON example_tbl
COLUMNS TERMINATED BY ",",
PROPERTIES
(
    "max_batch_interval" = "20",
    "max_batch_rows" = "300000",
    "max_batch_size" = "209715200",
)
FROM KAFKA
(
   "kafka_broker_list"= "broker1:9091,broker2:9091",
   "kafka_topic" = "my_topic",
   "property.security.protocol" = "ssl",
   "property.ssl.ca.location" = "FILE:ca.pem",
   "property.ssl.certificate.location" = "FILE:client.pem",
   "property.ssl.key.location" = "FILE:client.key",
   "property.ssl.key.password" = "abcdefg"
);
  • 对于百度消息服务,property.ssl.key.password 属性可以在 client.properties 文件中获取。

查看导入作业状态

查看作业状态的具体命令和示例请参阅 http://palo.baidu.com/docs/SQL%E6%89%8B%E5%86%8C/%E8%AF%AD%E6%B3%95%E5%B8%AE%E5%8A%A9/%E4%BF%A1%E6%81%AF%E6%9F%A5%E7%9C%8B%E8%AF%AD%E5%8F%A5/SHOW-ROUTINE-LOAD/   (SHOW ROUTINE LOAD) 命令文档。

查看某个作业的任务运行状态的具体命令和示例请参阅 http://palo.baidu.com/docs/SQL%E6%89%8B%E5%86%8C/%E8%AF%AD%E6%B3%95%E5%B8%AE%E5%8A%A9/%E4%BF%A1%E6%81%AF%E6%9F%A5%E7%9C%8B%E8%AF%AD%E5%8F%A5/SHOW-ROUTINE-LOAD-TASK/    (SHOW ROUTINE LOAD TASK) 命令文档。

只能查看当前正在运行中的任务,已结束和未开始的任务无法查看。

修改作业属性

用户可以修改已经创建的作业的部分属性。具体说明请参阅 http://palo.baidu.com/docs/SQL%E6%89%8B%E5%86%8C/%E8%AF%AD%E6%B3%95%E5%B8%AE%E5%8A%A9/%E8%BE%85%E5%8A%A9%E5%91%BD%E4%BB%A4/ALTER-ROUTINE-LOAD/   (ALTER ROUTINE LOAD) 命令手册。


作业控制

用户可以通过 STOP/PAUSE/RESUME 三个命令来控制作业的停止,暂停和重启。



使用JDBC同步数据


用户可以通过 JDBC 协议,使用 INSERT 语句进行数据导入。

INSERT 语句的使用方式和 MySQL 等数据库中 INSERT 语句的使用方式类似。INSERT 语句支持以下两种语法:

* INSERT INTO table SELECT ...
* INSERT INTO table VALUES(...)

单次写入

单次写入是指用户直接执行一个 INSERT 命令。示例如下:

INSERT INTO example_tbl (col1, col2, col3) VALUES (1000, "baidu", 3.25);

对于 Doris 来说,一个 INSERT 命令就是一个完整的导入事务。

因此不论是导入一条数据,还是多条数据,我们都不建议在生产环境使用这种方式进行数据导入。高频词的 INSERT 操作会导致在存储层产生大量的小文件,会严重影响系统性能。

该方式仅用于线下简单测试或低频少量的操作。

或者可以使用以下方式进行批量的插入操作:

INSERT INTO example_tbl VALUES
(1000, "baidu1", 3.25)
(2000, "baidu2", 4.25)
(3000, "baidu3", 5.25);

我们建议一批次插入条数在尽量大,比如几千甚至一万条一次。或者可以通过下面的程序的方式,使用 PreparedStatement 来进行批量插入。

JDBC 示例

这里我们给出一个简单的 JDBC 批量 INSERT 代码示例:

package demo.doris;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
public class DorisJDBCDemo {
    private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";
    private static final String DB_URL_PATTERN = "jdbc:mysql://%s:%d/%s?rewriteBatchedStatements=true";
    private static final String HOST = "127.0.0.1"; // Leader Node host
    private static final int PORT = 8030;   // http port of Leader Node
    private static final String DB = "example_db";
    private static final String TBL = "example_tbl";
    private static final String USER = "admin";
    private static final String PASSWD = "my_pass";
    private static final int INSERT_BATCH_SIZE = 10000;
    public static void main(String[] args) {
        insert();
    }
    private static void insert() {
        // 注意末尾不要加 分号 ";"
        String query = "insert into " + TBL + " values(?, ?)";
        // 设置 Label 以做到幂等。
        // String query = "insert into " + TBL + " WITH LABEL my_label values(?, ?)";
        Connection conn = null;
        PreparedStatement stmt = null;
        String dbUrl = String.format(DB_URL_PATTERN, HOST, PORT, DB);
        try {
            Class.forName(JDBC_DRIVER);
            conn = DriverManager.getConnection(dbUrl, USER, PASSWD);
            stmt = conn.prepareStatement(query);
            for (int i =0; i < INSERT_BATCH_SIZE; i++) {
                stmt.setInt(1, i);
                stmt.setInt(2, i * 100);
                stmt.addBatch();
            }
            int[] res = stmt.executeBatch();
            System.out.println(res);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (stmt != null) {
                    stmt.close();
                }
            } catch (SQLException se2) {
                se2.printStackTrace();
            }
            try {
                if (conn != null) conn.close();
            } catch (SQLException se) {
                se.printStackTrace();
            }
        }
    }
}

请注意以下几点:

1. JDBC 连接串需添加 rewriteBatchedStatements=true 参数,并使用 PreparedSta tement 方式。

目前 Doris 暂不支持服务器端的 PrepareStatemnt,所以 JDBC Driver 会在客户端进行批量 Prepare。

rewriteBatchedStatements=true 会确保 Driver 执行批处理。并最终形成如下形式的 INSERT 语句发往 Doris:

INSERT INTO example_tbl VALUES
(1000, "baidu1", 3.25)
(2000, "baidu2", 4.25)
(3000, "baidu3", 5.25);

2. 批次大小

   因为是在客户端进行批量处理,因此一批次如果过大的话,话占用客户端的内存资源,需关注。

Doris 后续会支持服务端的 PrepareStatemnt,敬请期待。

3. 导入原子性

和其他到导入方式一样,INSERT 操作本身也支持原子性。每一个 INSERT 操作都是一个导入事务,能够保证一个 INSERT 中的所有数据原子性的写入。

前面提到,我们建议在使用 INSERT 导入数据时,采用 ”批“ 的方式进行导入,而不是单条插入。

同时,我们可以为每次 INSERT 操作设置一个 Label。通过 Label 机制 可以保证操作的幂等性和原子性,最终做到数据的不丢不重。



通过外部表同步数据


Doris 可以创建通过 ODBC 协议访问的外部表。创建完成后,可以通过 SELECT 语句直接查询外部表的数据,也可以通过 INSERT INTO SELECT 的方式导入外部表的数据。

本文档主要介绍如何创建通过 ODBC 协议访问的外部表,以及如何导入这些外部表的数据。目前支持的数据源包括:

  • MySQL
  • Oracle
  • PostgreSQL

创建外部表

这里仅通过示例说明使用方式。

  1. 创建 ODBC Resource
    ODBC Resource 的目的是用于统一管理外部表的连接信息。
CREATE EXTERNAL RESOURCE `oracle_odbc`
PROPERTIES (
    "type" = "odbc_catalog",
    "host" = "192.168.0.1",
    "port" = "8086",
    "user" = "test",
    "password" = "test",
    "database" = "test",
    "odbc_type" = "oracle",
    "driver" = "Oracle"
);

这里我们创建了一个名为 oracle_odbc 的 Resource,其类型为 odbc_catalog,表示这是一个用于存储 ODBC 信息的 Resource。odbc_typeoracle,表示这个 OBDC Resource 是用于连接 Oracle 数据库的。


2. 创建外部表

CREATE EXTERNAL TABLE `ext_oracle_tbl` (
  `k1` decimal(9, 3) NOT NULL COMMENT "",
  `k2` char(10) NOT NULL COMMENT "",
  `k3` datetime NOT NULL COMMENT "",
  `k5` varchar(20) NOT NULL COMMENT "",
  `k6` double NOT NULL COMMENT ""
) ENGINE=ODBC
COMMENT "ODBC"
PROPERTIES (
    "odbc_catalog_resource" = "oracle_odbc",
    "database" = "test",
    "table" = "baseall"
);

这里我们创建一个 ext_oracle_tbl 外部表,并引用了之前创建的 oracle_odbc Resource。


连接百度云数据库 RDS

1. 创建 RDS

注意:创建 RDS 实例时,网络类型 -> 选择网络 处,需要选择和 Doris 集群相同的网络(VPC)。可用区可以不同。


2. 创建资源

CREATE EXTERNAL RESOURCE `rds_odbc`
PROPERTIES (
    "type" = "odbc_catalog",
    "host" = "mysql56.rdsxxxxx.rds.gz.baidubce.com",
    "port" = "3306",
    "user" = "rdsroot",
    "password" = "12345",
    "odbc_type" = "mysql",
    "driver" = "MySQL"
);

需修改其中 hostportuserpassword 对应的参数。host port 可以在 RDS 实例信息也查看。user 和 password 需要在 RDS 控制台创建账户后获取。


3. 创建外部表

CREATE EXTERNAL TABLE `mysql_table` (
   k1 int,
   k2 int
) ENGINE=ODBC
PROPERTIES (
    "odbc_catalog_resource" = "rds_odbc",
    "database" = "mysql_db",
    "table" = "mysql_tbl"
);

创建之后,就可以进行查询等操作了。


导入数据


1. 创建 Doris 表

这里我们创建一张 Doris 的表,列信息和上一步创建的外部表 ext_oracle_tbl 一样:

CREATE EXTERNAL TABLE `doris_tbl` (
  `k1` decimal(9, 3) NOT NULL COMMENT "",
  `k2` char(10) NOT NULL COMMENT "",
  `k3` datetime NOT NULL COMMENT "",
  `k5` varchar(20) NOT NULL COMMENT "",
  `k6` double NOT NULL COMMENT ""
)
COMMENT "Doris Table"
DISTRIBUTED BY HASH(k1) BUCKETS 2;
PROPERTIES (
    "replication_num" = "1"
);

2. 导入数据 (从 ext_oracle_tbl表 导入到 doris_tbl 表)

INSERT INTO doris_tbl SELECT k1,k2,k3 FROM ext_oracle_tbl limit 100;

INSERT 命令是同步命令,返回成功,即表示导入成功。


注意事项

  • 必须保证外部数据源与 Doris 集群在同一个VPC内,并且 Compute Node 可以和外部数据源的网络是互通的。
  • ODBC 外部表本质上是通过单一 ODBC 客户端访问数据源,因此并不合适一次性导入大量的数据,建议分批多次导入。
相关文章
|
8月前
|
SQL 消息中间件 分布式计算
Apache Doris 系列: 入门篇-数据导入及查询
Apache Doris 系列: 入门篇-数据导入及查询
210 0
|
数据采集 消息中间件 存储
3秒学不会Palo Doris的数据导入你打我!(四)
3秒学不会Palo Doris的数据导入你打我!
381 0
|
1天前
|
SQL 缓存 关系型数据库
ClickHouse(19)ClickHouse集成Hive表引擎详细解析
Hive引擎允许对HDFS Hive表执行 `SELECT` 查询。目前它支持如下输入格式: -文本:只支持简单的标量列类型,除了 `Binary` - ORC:支持简单的标量列类型,除了`char`; 只支持 `array` 这样的复杂类型 - Parquet:支持所有简单标量列类型;只支持 `array` 这样的复杂类型
11 1
|
1月前
|
缓存 NoSQL 数据库
Flink cdc到doris,starrocks,table store
Flink cdc到doris,starrocks,table store
|
1月前
|
NoSQL MongoDB 数据安全/隐私保护
Flink CDC支持MongoDB的CDC(Change Data Capture)连接器
Flink CDC支持MongoDB的CDC(Change Data Capture)连接器
130 4
|
10月前
|
消息中间件 JSON 关系型数据库
Flink导入mysql数据到doris
经过各种实践,发现比较适合中小公司的方式。分为全量和增量。
|
SQL 存储 关系型数据库
Flink CDC 系列 - 构建 MySQL 和 Postgres 上的 Streaming ETL
本篇教程将展示如何基于 Flink CDC 快速构建 MySQL 和 Postgres 的流式 ETL。
Flink CDC 系列 - 构建 MySQL 和 Postgres 上的 Streaming ETL
|
SQL 分布式计算 运维
【大数据开发运维解决方案】Sqoop增量同步mysql/oracle数据到hive(merge-key/append)测试文档
上一篇文章介绍了sqoop全量同步数据到hive, 本片文章将通过实验详细介绍如何增量同步数据到hive,以及sqoop job与crontab定时结合无密码登录的增量同步实现方法。
【大数据开发运维解决方案】Sqoop增量同步mysql/oracle数据到hive(merge-key/append)测试文档
|
SQL 数据安全/隐私保护 UED
Palo Doris版五分钟快速入门!
Palo Doris版五分钟快速入门!
209 0
Palo Doris版五分钟快速入门!
|
SQL 存储 搜索推荐
浅谈 Apache Doris FE 处理查询 SQL 源码解析
浅谈 Apache Doris FE 处理查询 SQL 源码解析
791 0
浅谈 Apache Doris FE 处理查询 SQL 源码解析