3秒学不会Palo Doris的数据导入你打我!(一)

简介: 3秒学不会Palo Doris的数据导入你打我!

支持的数据源

Doris 提供多种数据导入方案,可以针对不同的数据源进行选择。


数据源 导入方式
百度对象存储(BOS)、HDFS、AFS 使用 Broker Load 导入数据
本地文件 导入本地数据
百度消息服务(Kafka) 订阅 Kafka 日志
MySQL、Oracle、PostgreSQL 通过外部表同步数据
通过 JDBC 导入数据 使用JDBC同步数据
导入 JSON 格式数据 JSON 格式数据导入说明
MySQL binlog 敬请期待


数据导入总体说明

Doris 的数据导入实现有以下共性特征,这里分别介绍,以帮助大家更好的使用数据导入功能


原子性保证

Doris 的每一个导入作业,不论是使用 Broker Load 进行批量导入,还是使用 INSERT 语句进行单条导入,都是一个完整的事务操作。导入事务可以保证一批次内的数据原子生效,不会出现部分数据写入的情况。

同时,一个导入作业都会有一个 Label。这个 Label 是在一个数据库(Database)下唯一的,用于唯一标识一个导入作业。Label 可以由用户指定,部分导入功能也会由系统自动生成。

Label 是用于保证对应的导入作业,仅能成功导入一次。一个被成功导入的 Label,再次使用时,会被拒绝并报错 Label already used。通过这个机制,可以在 Doris 测做到 At-Most-Once 语义。如果结合上游系统的 At-Least-Once 语义,则可以实现导入数据的 Exactly-Once 语义。


同步和异步

导入方式分为同步和异步。对于同步导入方式,返回结果即表示导入成功还是失败。而对于异步导入方式,返回成功仅代表作业提交成功,不代表数据导入成功,需要使用对应的命令查看导入作业的运行状态。


支持的数据格式

不同的导入方式支持的数据格式略有不同。

导入方式 支持的格式
Broker Load Parquet,ORC,csv,gzip
Stream Load csv, gzip, json
Routine Load csv, json


正文来了 伙计们!!!!!!!!!!!!!!!!


导入本地数据


Stream Load 用于将本地文件导入到 Doris 中。

不同于其他命令的提交方式,Stream Load 是通过 HTTP 协议与 Doris 进行连接交互的。

该方式中涉及 HOST:PORT 应为 HTTP 协议端口。

  • 公有云用户必须使用 Compute Node(BE)的 HTTP 协议端口,默认为 8040。
  • 私有化部署用户可以使用 Leader Node(FE)的 HTTP 协议端口,默认为 8030。但须保证客户端所在机器网络能够联通 Compute Node 所在机器。

本章节我们以 curl 命令为例演示如何进行数据导入。

文章最后,我给出一个使用 Java 导入数据的代码示例。


导入数据

Stream Load 的请求体如下:

PUT /api/{db}/{table}/_stream_load
  1. 创建一张表
    通过 CREATE TABLE 命令创建一张表用于存储待导入的数据。具体的导入方式请查阅 CREATE TABLE 命令手册。示例如下:
CREATE TABLE IF NOT EXISTS load_test
(
    id INT,
    name VARCHAR(128)
)
DISTRIBUTED BY HASH(id) BUCKETS 8;


2. 导入数据

执行以下 curl 命令导入本地文件:

curl -u user:passwd -H "label:example_label_1" -T /path/to/local/your_file.txt http://host:port/api/example_db/load_test/_stream_load
  • user:passwd 为在 Doris 中创建的用户。初始用户为 admin,密码为创建 Doris 集群时设置的密码。
  • host:port 为 Compute Node 的 HTTP 协议端口,默认是 8040,可以在智能云 Doris 集群详情页面查看。
  • label: 可以在 Header 中指定 Label 唯一标识这个导入任务。


3. 等待导入结果

Stream Load 命令是同步命令,返回成功即表示导入成功。如果导入数据较大,可能需要较长的等待时间。示例如下:

{
    "TxnId": 1003,
    "Label": "example_label_1",
    "Status": "Success",
    "Message": "OK",
    "NumberTotalRows": 1000000,
    "NumberLoadedRows": 1000000,
    "NumberFilteredRows": 1,
    "NumberUnselectedRows": 0,
    "LoadBytes": 40888898,
    "LoadTimeMs": 2144,
    "BeginTxnTimeMs": 1,
    "StreamLoadPutTimeMs": 2,
    "ReadDataTimeMs": 325,
    "WriteDataTimeMs": 1933,
    "CommitAndPublishTimeMs": 106,
    "ErrorURL": "http://192.168.1.1:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005"
}
  • Status 字段状态为 Success 即表示导入成功。


使用建议

  • Stream Load 只能导入本地文件。
  • 建议一个导入请求的数据量控制在 1 GB 以内。如果有大量本地文件,可以分批并发提交。

Java 代码示例

这里通过一个简单的 JAVA 示例来执行 Stream Load:

package demo.doris;
import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.FileEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/*
这是一个 Doris Stream Load 示例,需要依赖
<dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpclient</artifactId>
</dependency>
 */
public class DorisStreamLoader {
    // 1. 对于公有云公户,这里填写 Compute Node 地址以及 HTTP 协议访问端口(8040)。
    // 2. 对于开源用户,可以选择填写 FE 地址以及 FE 的 http_port,但须保证客户端和 BE 节点的连通性。
    private final static String HOST = "your_host";
    private final static int PORT = 8040;
    private final static String DATABASE = "db1";   // 要导入的数据库
    private final static String TABLE = "tbl1";     // 要导入的表
    private final static String USER = "root";      // Doris 用户名
    private final static String PASSWD = "";        // Doris 密码
    private final static String LOAD_FILE_NAME = "/path/to/1.txt"; // 要导入的本地文件路径
    private final static String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load",
            HOST, PORT, DATABASE, TABLE);
    private final static HttpClientBuilder httpClientBuilder = HttpClients
            .custom()
            .setRedirectStrategy(new DefaultRedirectStrategy() {
                @Override
                protected boolean isRedirectable(String method) {
                    // 如果连接目标是 FE,则需要处理 307 redirect。
                    return true;
                }
            });
    public void load(File file) throws Exception {
        try (CloseableHttpClient client = httpClientBuilder.build()) {
            HttpPut put = new HttpPut(loadUrl);
            put.setHeader(HttpHeaders.EXPECT, "100-continue");
            put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(USER, PASSWD));
            // 可以在 Header 中设置 stream load 相关属性,这里我们设置 label 和 column_separator。
            put.setHeader("label","label1");
            put.setHeader("column_separator",",");
            // 设置导入文件。
            // 这里也可以使用 StringEntity 来传输任意数据。
            FileEntity entity = new FileEntity(file);
            put.setEntity(entity);
            try (CloseableHttpResponse response = client.execute(put)) {
                String loadResult = "";
                if (response.getEntity() != null) {
                    loadResult = EntityUtils.toString(response.getEntity());
                }
                final int statusCode = response.getStatusLine().getStatusCode();
                if (statusCode != 200) {
                    throw new IOException(
                            String.format("Stream load failed. status: %s load result: %s", statusCode, loadResult));
                }
                System.out.println("Get load result: " + loadResult);
            }
        }
    }
    private String basicAuthHeader(String username, String password) {
        final String tobeEncode = username + ":" + password;
        byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
        return "Basic " + new String(encoded);
    }
    public static void main(String[] args) throws Exception{
        DorisStreamLoader loader = new DorisStreamLoader();
        File file = new File(LOAD_FILE_NAME);
        loader.load(file);
    }
}


导入BOS中的数据


这一章主要介绍如何导入 BOS 中存储的数据。


准备工作

请先通过以下步骤,在百度对象存储(Baidu Object Storage,BOS)上存放需导入到Doris中的数据。

  1. 开通 BOS 服务
    请参阅 https://cloud.baidu.com/doc/BOS/s/Jk4xttg03使用BOS
  2. 创建 Bucket
    请参阅 https://cloud.baidu.com/doc/BOS/s/Fk4xtwbze创建Bucket
    注意:Bucket 所属地域必须和 Doris 集群所属地域相同。Doris 地域通常可以在 Doris 控制台页面左上角查看
  3. 上传文件到 Bucket
    有两种方式可以上传文件到 Bucket。
    通过控制台直接上传,请参阅文档 https://cloud.baidu.com/doc/BOS/s/Gk4xty0f2上传Object)。
    通过命令行工具上传:
  1. 请先https://cloud.baidu.com/doc/BOS/s/Ejwvyqobd#bos-cli%E4%B8%8B%E8%BD%BD%E5%9C%B0%E5%9D%80下载 BOS CLI 命令行工具 )。这里以Linux操作系统的 bce-cli-0.10.10.zip 为例。
  2. 解压后,执行以下命令配置 BOS CLI:
./bce -c
BOS Access Key ID []: 353b8dexxxxxxxxxxb156d3
BOS Secret Access Key []: ea15a18xxxxxx29f78e8d77
BCE Security Token [None]:
Default region name [bj]:
Default domain [bj.bcebos.com]:
Default use auto switch domain [yes]:
Default breakpoint_file_expiration [7] days:
Default use https protocol [no]:
Default multi upload thread num [10]:

3. 使用以下命令上传文件:

./bce bos cp /path/to/local/your_file.txt bos:/your_bucket_name


开始导入

Doris 支持通过以下两种方式导入 BOS 中的数据。


通过 Broker Load 命令提交导入作业

Broker 是一个无状态的进程服务,已经内置在 Doris 集群中,主要用于对外部数据源的文件进行读写操作。Broker Load 则是利用 Broker 服务访问源数据,进行数据导入的一种方式。


1. 创建一张表

通过 CREATE TABLE 命令创建一张表用于存储待导入的数据。

示例如下:

CREATE TABLE IF NOT EXISTS load_test
(
    id INT,
    name VARCHAR(128)
)
DISTRIBUTED BY HASH(id) BUCKETS 8;


2. 提交 Broker Load 导入作业

   示例如下:

LOAD LABEL example_db.exmpale_label_1
(
    DATA INFILE("bos://your_bucket_name/your_file.txt")
    INTO TABLE load_test
    COLUMNS TERMINATED BY ","
)
WITH BROKER "bos"
(
    "bos_endpoint" = "http://bj.bcebos.com",
    "bos_accesskey" = "353b8dexxxxxxxxxxb156d3",
    "bos_secret_accesskey" = "ea15a18xxxxxx29f78e8d77"
)
PROPERTIES
(
    "timeout" = "3600"
);
  • LABEL:每个导入作业都需要指定一个唯一的 Label,后续可以通过这个 Label 查看导入作业的运行状态。
  • WITH BROKER "bos":"bos" 仅仅是 Broker 服务进程的名称,并不代表需要访问的数据源。Broker的名称可以使用 admin 用户连接 Doris 后,通过 SHOW BROKER 命令查看。
  • "bos_accesskey" 和 "bos_secret_accesskey" 可在公有云页面右上角点击 账户头像 -> 安全认证 获取。
  • "bos_endpoint" 和 BOS Bucket 所在地域有关。


3. 查看导入作业状态

Broker Load 是一个异步命令,第二步中的命令执行成功,仅代表作业提交成功。具体执行情况,须通过以下命令查看。

mysql> SHOW LOAD FROM example_db WHERE LABEL="exmpale_label_1"
*************************** 1. row ***************************
         JobId: 10041
         Label: exmpale_label_1
         State: FINISHED
      Progress: ETL:100%; LOAD:100%
          Type: BROKER
       EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=100000000
      TaskInfo: cluster:N/A; timeout(s):14400; max_filter_ratio:0.0
      ErrorMsg: NULL
    CreateTime: 2020-11-17 09:38:04
  EtlStartTime: 2020-11-17 09:38:09
 EtlFinishTime: 2020-11-17 09:38:09
 LoadStartTime: 2020-11-17 09:38:09
LoadFinishTime: 2020-11-17 09:42:07
           URL: N/A
    JobDetails: {"Unfinished backends":{},"ScannedRows":0,"TaskNumber":0,"All backends":{},"FileNumber":0,"FileSize":0}
1 row in set (0.01 sec)

其中 State 字段状态为 FINISHED 则代表导入成功,数据可查询。关于 SHOW LOAD 返回结果的具体说明,可参阅 SHOW LOAD 命令文档。


4. 取消导入作业

正在运行中的 Broker Load 导入作业可以使用以下命令取消:

CANCEL LOAD WHERE LABEL="exmpale_label_1";

取消成功后,所有已导入的数据也会被回滚。Doris 会自动保证一个导入作业中的数据原子生效。


通过外部表进行导入

Doris 也支持通过创建一张 Broker 外部表的方式引用BOS上存储的数据,然后通过 INSERT INTO SELECT 的方式导入数据。


  1. 创建一张表
    创建一张用于存储数据的表。同上,不再赘述。

2. 创建 Broker 外部表

CREATE EXTERNAL TABLE IF NOT EXISTS example_db.example_ext_table
(
    id INT,
    name VARCHAR(128)
)
ENGINE=BROKER
PROPERTIES
(
    "broker_name" = "bos",
    "path" = "bos://your_bucket_name/your_file.txt",
)
BROKER PROPERTIES
(
    "bos_endpoint" = "http://bj.bcebos.com",
    "bos_accesskey" = "353b8dexxxxxxxxxxb156d3",
    "bos_secret_accesskey" = "ea15a18xxxxxx29f78e8d77"
);


  • ENGINE:ENGINE 的类型为 BROKER,表示这是一张借助 Broker 服务访问数据的外部表。
  • "broker_name" 为 "bos","bos" 仅仅是 Broker 服务进程的名称,并不代表需要访问的数据源。Broker的名称可以使用 admin 用户连接 Doris 后,通过 SHOW BROKER 命令查看。
  • "bos_accesskey" 和 "bos_secret_accesskey" 可在公有云页面右上角点击 账户头像 -> 安全认证 获取。
  • "bos_endpoint" 和 BOS Bucket 所在地域有关。


注:外部表中的数据也可以通过 SELECT 直接查询,但效率较低,推荐导入到 Doris 中后在执行查询。


3. 导入数据

使用以下命令从外部表导入数据到内部表。

INSERT INTO load_test SELECT * FROM example_ext_table;

该命令为同步命令(异步提交 INSERT 作业的操作正在开发中),命令返回成功即表示数据导入完成。当导入数据量较大时,可能会因查询超时而任务取消。


订阅Kafka日志


用户可以通过提交例行导入作业,直接订阅Kafka中的消息数据,以近实时的方式进行数据同步。

Doris 自身能够保证不丢不重的订阅 Kafka 中的消息,即 Exactly-Once 消费语义。

相关文章
|
数据采集 消息中间件 存储
3秒学不会Palo Doris的数据导入你打我!(四)
3秒学不会Palo Doris的数据导入你打我!
496 0
|
3月前
|
消息中间件 分布式计算 关系型数据库
大数据-140 - ClickHouse 集群 表引擎详解5 - MergeTree CollapsingMergeTree 与其他数据源 HDFS MySQL
大数据-140 - ClickHouse 集群 表引擎详解5 - MergeTree CollapsingMergeTree 与其他数据源 HDFS MySQL
66 0
|
5月前
|
SQL 关系型数据库 MySQL
实时数仓 Hologres操作报错合集之Flink CTAS Source(Mysql) 表字段从可空改为非空的原因是什么
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
5月前
|
SQL 分布式计算 数据处理
实时计算 Flink版产品使用问题之怎么将数据从Hive表中读取并写入到另一个Hive表中
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
SQL 缓存 关系型数据库
ClickHouse(19)ClickHouse集成Hive表引擎详细解析
Hive引擎允许对HDFS Hive表执行 `SELECT` 查询。目前它支持如下输入格式: -文本:只支持简单的标量列类型,除了 `Binary` - ORC:支持简单的标量列类型,除了`char`; 只支持 `array` 这样的复杂类型 - Parquet:支持所有简单标量列类型;只支持 `array` 这样的复杂类型
235 1
|
7月前
|
SQL 分布式计算 安全
ClickHouse(22)ClickHouse集成HDFS表引擎详细解析
ClickHouse的HDFS引擎允许直接在Hadoop生态系统内管理数据。使用`ENGINE=HDFS(URI, format)`,其中URI指定HDFS路径,format定义文件格式(如TSV、CSV或ORC)。表可读写,但不支持`ALTER`、`SELECT...SAMPLE`、索引和复制操作。通配符可用于文件路径,如`*`、`?`和范围`{N..M}`。Kerberos认证可配置。虚拟列包括文件路径 `_path` 和文件名 `_file`。有关更多信息,参见相关文章系列。
181 0
|
8月前
|
缓存 NoSQL 数据库
Flink cdc到doris,starrocks,table store
Flink cdc到doris,starrocks,table store
|
8月前
|
SQL 分布式计算 关系型数据库
Sqoop数据导入到Hive表的最佳实践
Sqoop数据导入到Hive表的最佳实践
|
Java 关系型数据库 MySQL
[1.2.0新功能系列:二] Apache Doris 1.2.0 JDBC外表 及 Mutil Catalog
[1.2.0新功能系列:二] Apache Doris 1.2.0 JDBC外表 及 Mutil Catalog
177 0
|
SQL 消息中间件 监控
​实战:Flink 1.12 维表 Join Hive 最新分区功能体验
我们生产常有将实时数据流与 Hive 维表 join 来丰富数据的需求,其中 Hive 表是分区表,业务上需要关联上 Hive 最新分区的数据。上周 Flink 1.12 发布了,刚好支撑了这种业务场景,我也将 1.12 版本部署后做了一个线上需求并上线。对比之前生产环境中实现方案,最新分区直接作为时态表提升了很多开发效率,在这里做一些小的分享。
​实战:Flink 1.12 维表 Join Hive 最新分区功能体验