支持的数据源
Doris 提供多种数据导入方案,可以针对不同的数据源进行选择。
数据源 | 导入方式 |
百度对象存储(BOS)、HDFS、AFS | 使用 Broker Load 导入数据 |
本地文件 | 导入本地数据 |
百度消息服务(Kafka) | 订阅 Kafka 日志 |
MySQL、Oracle、PostgreSQL | 通过外部表同步数据 |
通过 JDBC 导入数据 | 使用JDBC同步数据 |
导入 JSON 格式数据 | JSON 格式数据导入说明 |
MySQL binlog | 敬请期待 |
数据导入总体说明
Doris 的数据导入实现有以下共性特征,这里分别介绍,以帮助大家更好的使用数据导入功能
原子性保证
Doris 的每一个导入作业,不论是使用 Broker Load 进行批量导入,还是使用 INSERT 语句进行单条导入,都是一个完整的事务操作。导入事务可以保证一批次内的数据原子生效,不会出现部分数据写入的情况。
同时,一个导入作业都会有一个 Label。这个 Label 是在一个数据库(Database)下唯一的,用于唯一标识一个导入作业。Label 可以由用户指定,部分导入功能也会由系统自动生成。
Label 是用于保证对应的导入作业,仅能成功导入一次。一个被成功导入的 Label,再次使用时,会被拒绝并报错 Label already used
。通过这个机制,可以在 Doris 测做到 At-Most-Once
语义。如果结合上游系统的 At-Least-Once
语义,则可以实现导入数据的 Exactly-Once
语义。
同步和异步
导入方式分为同步和异步。对于同步导入方式,返回结果即表示导入成功还是失败。而对于异步导入方式,返回成功仅代表作业提交成功,不代表数据导入成功,需要使用对应的命令查看导入作业的运行状态。
支持的数据格式
不同的导入方式支持的数据格式略有不同。
导入方式 | 支持的格式 |
Broker Load | Parquet,ORC,csv,gzip |
Stream Load | csv, gzip, json |
Routine Load | csv, json |
正文来了 伙计们!!!!!!!!!!!!!!!!
导入本地数据
Stream Load 用于将本地文件导入到 Doris 中。
不同于其他命令的提交方式,Stream Load 是通过 HTTP 协议与 Doris 进行连接交互的。
该方式中涉及 HOST:PORT 应为 HTTP 协议端口。
- 公有云用户必须使用 Compute Node(BE)的 HTTP 协议端口,默认为 8040。
- 私有化部署用户可以使用 Leader Node(FE)的 HTTP 协议端口,默认为 8030。但须保证客户端所在机器网络能够联通 Compute Node 所在机器。
本章节我们以 curl 命令为例演示如何进行数据导入。
文章最后,我给出一个使用 Java 导入数据的代码示例。
导入数据
Stream Load 的请求体如下:
PUT /api/{db}/{table}/_stream_load
- 创建一张表
通过CREATE TABLE
命令创建一张表用于存储待导入的数据。具体的导入方式请查阅 CREATE TABLE 命令手册。示例如下:
CREATE TABLE IF NOT EXISTS load_test ( id INT, name VARCHAR(128) ) DISTRIBUTED BY HASH(id) BUCKETS 8;
2. 导入数据
执行以下 curl 命令导入本地文件:
curl -u user:passwd -H "label:example_label_1" -T /path/to/local/your_file.txt http://host:port/api/example_db/load_test/_stream_load
- user:passwd 为在 Doris 中创建的用户。初始用户为 admin,密码为创建 Doris 集群时设置的密码。
- host:port 为 Compute Node 的 HTTP 协议端口,默认是 8040,可以在智能云 Doris 集群详情页面查看。
- label: 可以在 Header 中指定 Label 唯一标识这个导入任务。
3. 等待导入结果
Stream Load 命令是同步命令,返回成功即表示导入成功。如果导入数据较大,可能需要较长的等待时间。示例如下:
{ "TxnId": 1003, "Label": "example_label_1", "Status": "Success", "Message": "OK", "NumberTotalRows": 1000000, "NumberLoadedRows": 1000000, "NumberFilteredRows": 1, "NumberUnselectedRows": 0, "LoadBytes": 40888898, "LoadTimeMs": 2144, "BeginTxnTimeMs": 1, "StreamLoadPutTimeMs": 2, "ReadDataTimeMs": 325, "WriteDataTimeMs": 1933, "CommitAndPublishTimeMs": 106, "ErrorURL": "http://192.168.1.1:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005" }
Status
字段状态为Success
即表示导入成功。
使用建议
- Stream Load 只能导入本地文件。
- 建议一个导入请求的数据量控制在 1 GB 以内。如果有大量本地文件,可以分批并发提交。
Java 代码示例
这里通过一个简单的 JAVA 示例来执行 Stream Load:
package demo.doris; import org.apache.commons.codec.binary.Base64; import org.apache.http.HttpHeaders; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPut; import org.apache.http.entity.FileEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.DefaultRedirectStrategy; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; /* 这是一个 Doris Stream Load 示例,需要依赖 <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> </dependency> */ public class DorisStreamLoader { // 1. 对于公有云公户,这里填写 Compute Node 地址以及 HTTP 协议访问端口(8040)。 // 2. 对于开源用户,可以选择填写 FE 地址以及 FE 的 http_port,但须保证客户端和 BE 节点的连通性。 private final static String HOST = "your_host"; private final static int PORT = 8040; private final static String DATABASE = "db1"; // 要导入的数据库 private final static String TABLE = "tbl1"; // 要导入的表 private final static String USER = "root"; // Doris 用户名 private final static String PASSWD = ""; // Doris 密码 private final static String LOAD_FILE_NAME = "/path/to/1.txt"; // 要导入的本地文件路径 private final static String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load", HOST, PORT, DATABASE, TABLE); private final static HttpClientBuilder httpClientBuilder = HttpClients .custom() .setRedirectStrategy(new DefaultRedirectStrategy() { @Override protected boolean isRedirectable(String method) { // 如果连接目标是 FE,则需要处理 307 redirect。 return true; } }); public void load(File file) throws Exception { try (CloseableHttpClient client = httpClientBuilder.build()) { HttpPut put = new HttpPut(loadUrl); put.setHeader(HttpHeaders.EXPECT, "100-continue"); put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(USER, PASSWD)); // 可以在 Header 中设置 stream load 相关属性,这里我们设置 label 和 column_separator。 put.setHeader("label","label1"); put.setHeader("column_separator",","); // 设置导入文件。 // 这里也可以使用 StringEntity 来传输任意数据。 FileEntity entity = new FileEntity(file); put.setEntity(entity); try (CloseableHttpResponse response = client.execute(put)) { String loadResult = ""; if (response.getEntity() != null) { loadResult = EntityUtils.toString(response.getEntity()); } final int statusCode = response.getStatusLine().getStatusCode(); if (statusCode != 200) { throw new IOException( String.format("Stream load failed. status: %s load result: %s", statusCode, loadResult)); } System.out.println("Get load result: " + loadResult); } } } private String basicAuthHeader(String username, String password) { final String tobeEncode = username + ":" + password; byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8)); return "Basic " + new String(encoded); } public static void main(String[] args) throws Exception{ DorisStreamLoader loader = new DorisStreamLoader(); File file = new File(LOAD_FILE_NAME); loader.load(file); } }
导入BOS中的数据
这一章主要介绍如何导入 BOS 中存储的数据。
准备工作
请先通过以下步骤,在百度对象存储(Baidu Object Storage,BOS)上存放需导入到Doris中的数据。
- 开通 BOS 服务
请参阅 https://cloud.baidu.com/doc/BOS/s/Jk4xttg03(使用BOS) - 创建 Bucket
请参阅 https://cloud.baidu.com/doc/BOS/s/Fk4xtwbze(创建Bucket)
注意:Bucket 所属地域必须和 Doris 集群所属地域相同。Doris 地域通常可以在 Doris 控制台页面左上角查看 - 上传文件到 Bucket
有两种方式可以上传文件到 Bucket。
通过控制台直接上传,请参阅文档 https://cloud.baidu.com/doc/BOS/s/Gk4xty0f2(上传Object)。
通过命令行工具上传:
- 请先https://cloud.baidu.com/doc/BOS/s/Ejwvyqobd#bos-cli%E4%B8%8B%E8%BD%BD%E5%9C%B0%E5%9D%80(下载 BOS CLI 命令行工具 )。这里以Linux操作系统的 bce-cli-0.10.10.zip 为例。
- 解压后,执行以下命令配置 BOS CLI:
./bce -c BOS Access Key ID []: 353b8dexxxxxxxxxxb156d3 BOS Secret Access Key []: ea15a18xxxxxx29f78e8d77 BCE Security Token [None]: Default region name [bj]: Default domain [bj.bcebos.com]: Default use auto switch domain [yes]: Default breakpoint_file_expiration [7] days: Default use https protocol [no]: Default multi upload thread num [10]:
- BOS Access Key ID 和 BOS Secret Access Key 可在公有云页面右上角点击
账户头像 -> 安全认证
获取。 - Default region name 和 Default domain 请填写Bucket所在地域的缩写,可查阅 https://cloud.baidu.com/doc/BOS/s/Ck1rk80hn#%E8%AE%BF%E9%97%AE%E5%9F%9F%E5%90%8D%EF%BC%88endpoint%EF%BC%89(访问域名)获取。
- 其他配置使用默认即可。
3. 使用以下命令上传文件:
./bce bos cp /path/to/local/your_file.txt bos:/your_bucket_name
开始导入
Doris 支持通过以下两种方式导入 BOS 中的数据。
通过 Broker Load 命令提交导入作业
Broker 是一个无状态的进程服务,已经内置在 Doris 集群中,主要用于对外部数据源的文件进行读写操作。Broker Load 则是利用 Broker 服务访问源数据,进行数据导入的一种方式。
1. 创建一张表
通过 CREATE TABLE
命令创建一张表用于存储待导入的数据。
示例如下:
CREATE TABLE IF NOT EXISTS load_test ( id INT, name VARCHAR(128) ) DISTRIBUTED BY HASH(id) BUCKETS 8;
2. 提交 Broker Load 导入作业
示例如下:
LOAD LABEL example_db.exmpale_label_1 ( DATA INFILE("bos://your_bucket_name/your_file.txt") INTO TABLE load_test COLUMNS TERMINATED BY "," ) WITH BROKER "bos" ( "bos_endpoint" = "http://bj.bcebos.com", "bos_accesskey" = "353b8dexxxxxxxxxxb156d3", "bos_secret_accesskey" = "ea15a18xxxxxx29f78e8d77" ) PROPERTIES ( "timeout" = "3600" );
- LABEL:每个导入作业都需要指定一个唯一的 Label,后续可以通过这个 Label 查看导入作业的运行状态。
- WITH BROKER "bos":"bos" 仅仅是 Broker 服务进程的名称,并不代表需要访问的数据源。Broker的名称可以使用 admin 用户连接 Doris 后,通过
SHOW BROKER
命令查看。 - "bos_accesskey" 和 "bos_secret_accesskey" 可在公有云页面右上角点击
账户头像 -> 安全认证
获取。 - "bos_endpoint" 和 BOS Bucket 所在地域有关。
3. 查看导入作业状态
Broker Load 是一个异步命令,第二步中的命令执行成功,仅代表作业提交成功。具体执行情况,须通过以下命令查看。
mysql> SHOW LOAD FROM example_db WHERE LABEL="exmpale_label_1" *************************** 1. row *************************** JobId: 10041 Label: exmpale_label_1 State: FINISHED Progress: ETL:100%; LOAD:100% Type: BROKER EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=100000000 TaskInfo: cluster:N/A; timeout(s):14400; max_filter_ratio:0.0 ErrorMsg: NULL CreateTime: 2020-11-17 09:38:04 EtlStartTime: 2020-11-17 09:38:09 EtlFinishTime: 2020-11-17 09:38:09 LoadStartTime: 2020-11-17 09:38:09 LoadFinishTime: 2020-11-17 09:42:07 URL: N/A JobDetails: {"Unfinished backends":{},"ScannedRows":0,"TaskNumber":0,"All backends":{},"FileNumber":0,"FileSize":0} 1 row in set (0.01 sec)
其中 State
字段状态为 FINISHED
则代表导入成功,数据可查询。关于 SHOW LOAD
返回结果的具体说明,可参阅 SHOW LOAD
命令文档。
4. 取消导入作业
正在运行中的 Broker Load 导入作业可以使用以下命令取消:
CANCEL LOAD WHERE LABEL="exmpale_label_1";
取消成功后,所有已导入的数据也会被回滚。Doris 会自动保证一个导入作业中的数据原子生效。
通过外部表进行导入
Doris 也支持通过创建一张 Broker 外部表的方式引用BOS上存储的数据,然后通过 INSERT INTO SELECT
的方式导入数据。
- 创建一张表
创建一张用于存储数据的表。同上,不再赘述。
2. 创建 Broker 外部表
CREATE EXTERNAL TABLE IF NOT EXISTS example_db.example_ext_table ( id INT, name VARCHAR(128) ) ENGINE=BROKER PROPERTIES ( "broker_name" = "bos", "path" = "bos://your_bucket_name/your_file.txt", ) BROKER PROPERTIES ( "bos_endpoint" = "http://bj.bcebos.com", "bos_accesskey" = "353b8dexxxxxxxxxxb156d3", "bos_secret_accesskey" = "ea15a18xxxxxx29f78e8d77" );
- ENGINE:ENGINE 的类型为 BROKER,表示这是一张借助 Broker 服务访问数据的外部表。
- "broker_name" 为 "bos","bos" 仅仅是 Broker 服务进程的名称,并不代表需要访问的数据源。Broker的名称可以使用 admin 用户连接 Doris 后,通过
SHOW BROKER
命令查看。 - "bos_accesskey" 和 "bos_secret_accesskey" 可在公有云页面右上角点击
账户头像 -> 安全认证
获取。 - "bos_endpoint" 和 BOS Bucket 所在地域有关。
注:外部表中的数据也可以通过 SELECT
直接查询,但效率较低,推荐导入到 Doris 中后在执行查询。
3. 导入数据
使用以下命令从外部表导入数据到内部表。
INSERT INTO load_test SELECT * FROM example_ext_table;
该命令为同步命令(异步提交 INSERT 作业的操作正在开发中),命令返回成功即表示数据导入完成。当导入数据量较大时,可能会因查询超时而任务取消。
订阅Kafka日志
用户可以通过提交例行导入作业,直接订阅Kafka中的消息数据,以近实时的方式进行数据同步。
Doris 自身能够保证不丢不重的订阅 Kafka 中的消息,即 Exactly-Once
消费语义。