准备工作
开通百度消息服务
百度消息服务(BMS)基于 Kafka 在百度智能云提供托管服务,请先按照以下流程开通服务。
- 请根据 (BMS快速入门) https://cloud.baidu.com/doc/Kafka/s/9jwvygf3k 文档开通消息服务
- 下载证书压缩包 kafka-key.zip 并解压,解压后将得到以下文件
3.上传证书文件到 HTTP 服务器。
因为后续 Doris 需要从某个 HTTP 服务器上下载这些整数以供访问 Kafka。因此我们需要先将这些证书上传到 HTTP 服务器。这个 HTTP 服务器必须要能够被 Doris 的 Leader Node 节点所访问。
如果您没有合适的 HTTP 服务器,可以参照以下方式借助百度对象存储(BOS)来完成:
1. 根据 https://cloud.baidu.com/doc/BOS/s/Jk4xttg03(开始使用),https://cloud.baidu.com/doc/BOS/s/Fk4xtwbze(创建Bucket) 文档开通BOS服务并创建一个 Bucket。注意,Bucket所在地域必须和 Doris 集群所在地域相同
2. 将以下三个文件上传到 Bucket
- ca.pem
- client.key
- client.pem
- 3. 在 BOS Bucket 文件列表页面,点击文件右侧的
文件信息
,可以获取 HTTP 访问连接。请将连接有效时间
设为-1
,即永久。
注:请不要使用带有 cdn 加速的 http 下载地址。这个地址某些情况无法被 Doris 访问。
自建 Kafka 服务
如果使用自建 Kafka 服务,请确保 Kafka 服务和 Doris 集群在同一个 VPC 内,并且相互之间的网络能够互通。
订阅 Kafka 消息
订阅 Kafka 消息使用了 Doris 中的例行导入(Routine Load)功能。
用户首先需要创建一个例行导入作业。作业会通过例行调度,不断地发送一系列的任务,每个任务会消费一定数量 Kafka 中的消息。
请注意以下使用限制:
- 支持无认证的 Kafka 访问,以及通过 SSL 方式认证的 Kafka 集群。
- 支持的消息格式如下:
- csv 文本格式。每一个 message 为一行,且行尾不包含换行符。
- Json 格式,详见 http://palo.baidu.com/docs/%E6%93%8D%E4%BD%9C%E6%89%8B%E5%86%8C/%E6%95%B0%E6%8D%AE%E5%AF%BC%E5%85%A5/JSON%E6%A0%BC%E5%BC%8F%E6%95%B0%E6%8D%AE%E5%AF%BC%E5%85%A5%E8%AF%B4%E6%98%8E/ (导入JSON数据)
- 仅支持 Kafka 0.10.0.0(含) 以上版本。
访问 SSL 认证的 Kafka 集群
例行导入功能支持无认证的 Kafka 集群,以及通过 SSL 认证的 Kafka 集群。
访问 SSL 认证的 Kafka 集群需要用户提供用于认证 Kafka Broker 公钥的证书文件(ca.pem)。如果 Kafka 集群同时开启了客户端认证,则还需提供客户端的公钥(client.pem)、密钥文件(client.key),以及密钥密码。这里所需的文件需要先通过 CREAE FILE
命令上传到 Plao 中,并且 catalog 名称为 kafka
。
这里给出示例:
上传文件
CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka"); CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka"); CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");
上传完成后,可以通过 http://palo.baidu.com/docs/SQL%E6%89%8B%E5%86%8C/%E8%AF%AD%E6%B3%95%E5%B8%AE%E5%8A%A9/%E4%BF%A1%E6%81%AF%E6%9F%A5%E7%9C%8B%E8%AF%AD%E5%8F%A5/SHOW-FILE/ (SHOW FLIES) 命令查看已上传的文件。
创建例行导入作业
访问无认证的 Kafka 集群
CREATE ROUTINE LOAD example_db.my_first_job ON example_tbl COLUMNS TERMINATED BY "," PROPERTIES ( "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "property.group.id" = "xxx", "property.client.id" = "xxx", "property.kafka_default_offsets" = "OFFSET_BEGINNING" );
max_batch_interval/max_batch_rows/max_batch_size
用于控制一个子任务的运行周期。一个子任务的运行周期由最长运行时间、最多消费行数和最大消费数据量共同决定。
- 2. 访问 SSL 认证的 Kafka 集群
CREATE ROUTINE LOAD example_db.my_first_job ON example_tbl COLUMNS TERMINATED BY ",", PROPERTIES ( "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", ) FROM KAFKA ( "kafka_broker_list"= "broker1:9091,broker2:9091", "kafka_topic" = "my_topic", "property.security.protocol" = "ssl", "property.ssl.ca.location" = "FILE:ca.pem", "property.ssl.certificate.location" = "FILE:client.pem", "property.ssl.key.location" = "FILE:client.key", "property.ssl.key.password" = "abcdefg" );
- 对于百度消息服务,
property.ssl.key.password
属性可以在client.properties
文件中获取。
查看导入作业状态
查看作业状态的具体命令和示例请参阅 http://palo.baidu.com/docs/SQL%E6%89%8B%E5%86%8C/%E8%AF%AD%E6%B3%95%E5%B8%AE%E5%8A%A9/%E4%BF%A1%E6%81%AF%E6%9F%A5%E7%9C%8B%E8%AF%AD%E5%8F%A5/SHOW-ROUTINE-LOAD/ (SHOW ROUTINE LOAD) 命令文档。
查看某个作业的任务运行状态的具体命令和示例请参阅 http://palo.baidu.com/docs/SQL%E6%89%8B%E5%86%8C/%E8%AF%AD%E6%B3%95%E5%B8%AE%E5%8A%A9/%E4%BF%A1%E6%81%AF%E6%9F%A5%E7%9C%8B%E8%AF%AD%E5%8F%A5/SHOW-ROUTINE-LOAD-TASK/ (SHOW ROUTINE LOAD TASK) 命令文档。
只能查看当前正在运行中的任务,已结束和未开始的任务无法查看。
修改作业属性
用户可以修改已经创建的作业的部分属性。具体说明请参阅 http://palo.baidu.com/docs/SQL%E6%89%8B%E5%86%8C/%E8%AF%AD%E6%B3%95%E5%B8%AE%E5%8A%A9/%E8%BE%85%E5%8A%A9%E5%91%BD%E4%BB%A4/ALTER-ROUTINE-LOAD/ (ALTER ROUTINE LOAD) 命令手册。
作业控制
用户可以通过 STOP/PAUSE/RESUME
三个命令来控制作业的停止,暂停和重启。
使用JDBC同步数据
用户可以通过 JDBC 协议,使用 INSERT 语句进行数据导入。
INSERT 语句的使用方式和 MySQL 等数据库中 INSERT 语句的使用方式类似。INSERT 语句支持以下两种语法:
* INSERT INTO table SELECT ... * INSERT INTO table VALUES(...)
单次写入
单次写入是指用户直接执行一个 INSERT 命令。示例如下:
INSERT INTO example_tbl (col1, col2, col3) VALUES (1000, "baidu", 3.25);
对于 Doris 来说,一个 INSERT 命令就是一个完整的导入事务。
因此不论是导入一条数据,还是多条数据,我们都不建议在生产环境使用这种方式进行数据导入。高频词的 INSERT 操作会导致在存储层产生大量的小文件,会严重影响系统性能。
该方式仅用于线下简单测试或低频少量的操作。
或者可以使用以下方式进行批量的插入操作:
INSERT INTO example_tbl VALUES (1000, "baidu1", 3.25) (2000, "baidu2", 4.25) (3000, "baidu3", 5.25);
我们建议一批次插入条数在尽量大,比如几千甚至一万条一次。或者可以通过下面的程序的方式,使用 PreparedStatement 来进行批量插入。
JDBC 示例
这里我们给出一个简单的 JDBC 批量 INSERT 代码示例:
package demo.doris; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; public class DorisJDBCDemo { private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver"; private static final String DB_URL_PATTERN = "jdbc:mysql://%s:%d/%s?rewriteBatchedStatements=true"; private static final String HOST = "127.0.0.1"; // Leader Node host private static final int PORT = 8030; // http port of Leader Node private static final String DB = "example_db"; private static final String TBL = "example_tbl"; private static final String USER = "admin"; private static final String PASSWD = "my_pass"; private static final int INSERT_BATCH_SIZE = 10000; public static void main(String[] args) { insert(); } private static void insert() { // 注意末尾不要加 分号 ";" String query = "insert into " + TBL + " values(?, ?)"; // 设置 Label 以做到幂等。 // String query = "insert into " + TBL + " WITH LABEL my_label values(?, ?)"; Connection conn = null; PreparedStatement stmt = null; String dbUrl = String.format(DB_URL_PATTERN, HOST, PORT, DB); try { Class.forName(JDBC_DRIVER); conn = DriverManager.getConnection(dbUrl, USER, PASSWD); stmt = conn.prepareStatement(query); for (int i =0; i < INSERT_BATCH_SIZE; i++) { stmt.setInt(1, i); stmt.setInt(2, i * 100); stmt.addBatch(); } int[] res = stmt.executeBatch(); System.out.println(res); } catch (Exception e) { e.printStackTrace(); } finally { try { if (stmt != null) { stmt.close(); } } catch (SQLException se2) { se2.printStackTrace(); } try { if (conn != null) conn.close(); } catch (SQLException se) { se.printStackTrace(); } } } }
请注意以下几点:
1. JDBC 连接串需添加 rewriteBatchedStatements=true
参数,并使用 PreparedSta tement
方式。
目前 Doris 暂不支持服务器端的 PrepareStatemnt,所以 JDBC Driver 会在客户端进行批量 Prepare。
rewriteBatchedStatements=true
会确保 Driver 执行批处理。并最终形成如下形式的 INSERT 语句发往 Doris:
INSERT INTO example_tbl VALUES (1000, "baidu1", 3.25) (2000, "baidu2", 4.25) (3000, "baidu3", 5.25);
2. 批次大小
因为是在客户端进行批量处理,因此一批次如果过大的话,话占用客户端的内存资源,需关注。
Doris 后续会支持服务端的 PrepareStatemnt,敬请期待。
3. 导入原子性
和其他到导入方式一样,INSERT 操作本身也支持原子性。每一个 INSERT 操作都是一个导入事务,能够保证一个 INSERT 中的所有数据原子性的写入。
前面提到,我们建议在使用 INSERT 导入数据时,采用 ”批“ 的方式进行导入,而不是单条插入。
同时,我们可以为每次 INSERT 操作设置一个 Label。通过 Label 机制 可以保证操作的幂等性和原子性,最终做到数据的不丢不重。
通过外部表同步数据
Doris 可以创建通过 ODBC 协议访问的外部表。创建完成后,可以通过 SELECT 语句直接查询外部表的数据,也可以通过 INSERT INTO SELECT
的方式导入外部表的数据。
本文档主要介绍如何创建通过 ODBC 协议访问的外部表,以及如何导入这些外部表的数据。目前支持的数据源包括:
- MySQL
- Oracle
- PostgreSQL
创建外部表
这里仅通过示例说明使用方式。
- 创建 ODBC Resource
ODBC Resource 的目的是用于统一管理外部表的连接信息。
CREATE EXTERNAL RESOURCE `oracle_odbc` PROPERTIES ( "type" = "odbc_catalog", "host" = "192.168.0.1", "port" = "8086", "user" = "test", "password" = "test", "database" = "test", "odbc_type" = "oracle", "driver" = "Oracle" );
这里我们创建了一个名为 oracle_odbc
的 Resource,其类型为 odbc_catalog
,表示这是一个用于存储 ODBC 信息的 Resource。odbc_type
为 oracle
,表示这个 OBDC Resource 是用于连接 Oracle 数据库的。
2. 创建外部表
CREATE EXTERNAL TABLE `ext_oracle_tbl` ( `k1` decimal(9, 3) NOT NULL COMMENT "", `k2` char(10) NOT NULL COMMENT "", `k3` datetime NOT NULL COMMENT "", `k5` varchar(20) NOT NULL COMMENT "", `k6` double NOT NULL COMMENT "" ) ENGINE=ODBC COMMENT "ODBC" PROPERTIES ( "odbc_catalog_resource" = "oracle_odbc", "database" = "test", "table" = "baseall" );
这里我们创建一个 ext_oracle_tbl
外部表,并引用了之前创建的 oracle_odbc
Resource。
连接百度云数据库 RDS
1. 创建 RDS
注意:创建 RDS 实例时,网络类型 -> 选择网络
处,需要选择和 Doris 集群相同的网络(VPC)。可用区可以不同。
2. 创建资源
CREATE EXTERNAL RESOURCE `rds_odbc` PROPERTIES ( "type" = "odbc_catalog", "host" = "mysql56.rdsxxxxx.rds.gz.baidubce.com", "port" = "3306", "user" = "rdsroot", "password" = "12345", "odbc_type" = "mysql", "driver" = "MySQL" );
需修改其中 host
,port
,user
,password
对应的参数。host port 可以在 RDS 实例信息也查看。user 和 password 需要在 RDS 控制台创建账户后获取。
3. 创建外部表
CREATE EXTERNAL TABLE `mysql_table` ( k1 int, k2 int ) ENGINE=ODBC PROPERTIES ( "odbc_catalog_resource" = "rds_odbc", "database" = "mysql_db", "table" = "mysql_tbl" );
创建之后,就可以进行查询等操作了。
导入数据
1. 创建 Doris 表
这里我们创建一张 Doris 的表,列信息和上一步创建的外部表 ext_oracle_tbl
一样:
CREATE EXTERNAL TABLE `doris_tbl` ( `k1` decimal(9, 3) NOT NULL COMMENT "", `k2` char(10) NOT NULL COMMENT "", `k3` datetime NOT NULL COMMENT "", `k5` varchar(20) NOT NULL COMMENT "", `k6` double NOT NULL COMMENT "" ) COMMENT "Doris Table" DISTRIBUTED BY HASH(k1) BUCKETS 2; PROPERTIES ( "replication_num" = "1" );
2. 导入数据 (从 ext_oracle_tbl
表 导入到 doris_tbl
表)
INSERT INTO doris_tbl SELECT k1,k2,k3 FROM ext_oracle_tbl limit 100;
INSERT 命令是同步命令,返回成功,即表示导入成功。
注意事项
- 必须保证外部数据源与 Doris 集群在同一个VPC内,并且 Compute Node 可以和外部数据源的网络是互通的。
- ODBC 外部表本质上是通过单一 ODBC 客户端访问数据源,因此并不合适一次性导入大量的数据,建议分批多次导入。