3秒学不会Palo Doris的数据导入你打我!(四)

简介: 3秒学不会Palo Doris的数据导入你打我!

Label 机制

Doris 的导入作业都可以设置一个 Label。这个 Label 通常是用户自定义的、具有一定业务逻辑属性的字符串。

Label 的主要作用是唯一标识一个导入任务,并且能够保证相同的 Label 仅会被成功导入一次。

Label 机制可以保证导入数据的不丢不重。如果上游数据源能够保证 At-Least-Once 语义,则配合 Doris 的 Label 机制,能够保证 Exactly-Once 语义。

Label 在一个数据库下具有唯一性。Label 的保留期限默认是 3 天。即 3 天后,已完成的 Label 会被自动清理,之后 Label 可以被重复使用。


最佳实践

Label 通常被设置为 业务逻辑+时间 的格式。如 my_business1_20201010_125000

这个 Label 通常用于表示:业务 my_business1 这个业务在 2020-10-10 12:50:00 产生的一批数据。通过这种 Label 设定,业务上可以通过 Label 查询导入任务状态,来明确的获知该时间点批次的数据是否已经导入成功。如果没有成功,则可以使用这个 Label 继续重试导入。



列的映射、转换与过滤

Doris 支持丰富的列映射、转换和过滤操作。可以非常灵活的处理需要导入的原始数据。

本章节主要介绍如何在导入中使用这些功能。

总体介绍

Doris 在导入过程中对数据处理步骤分为以下几步:

  1. 数据按原始文件中的列的顺序读入到 Doris
  2. 通过前置过滤条件(PRECEDING FILTER)对原始数据进行一次过滤。
  3. 通过列映射和转换,将原始数据映射到目标列顺序。
  4. 通过后置过滤条件(WHERE)对转换后的数据在进行一次过滤。
  5. 写入最终数据。

列的映射、转换和过滤参数在导入作业中皆为可选操作。在默认空缺的情况下,Doris 会将源文件中的行按默认的列分割符 \t 分割后,按顺序对应到表中的列。如果源文件中的列数量和表中的列数量不匹配,则会产生数据质量问题,导致数据无法导入。此时则需要显式的描述列的映射、转换和过滤信息。


支持的导入方式

BROKER LOAD

LOAD LABEL example_db.label1(
    DATA INFILE("bos://bucket/input/file")
    INTO TABLE `my_table`
    (k1, k2, tmpk3)
    PRECEDING FILTER k1 = 1
    SET (
        k3 = tmpk3 + 1
    )
    WHERE k1 > k2
)
WITH BROKER bos
(
    ...
);
STREAM LOAD
curl--location-trusted
-u user:passwd
-H "columns: k1, k2, tmpk3, k3 = tmpk3 + 1"
-H "where: k1 > k2"
-T file.txt
http://host:port/api/testDb/testTbl/_stream_load
ROUTINE LOAD
CREATE ROUTINE LOAD example_db.label1 ON my_tableCOLUMNS(k1, k2, tmpk3, k3 = tmpk3 +1),
PRECEDING FILTER k1 = 1,
WHERE k1 > k2
...

以上导入方式都支持对源数据进行列映射、转换和过滤操作:

  • 前置过滤:对读取到的原始数据进行一次过滤。
PRECEDING FILTER k1 = 1

映射:定义源数据中的列。如果定义的列名和表中的列相同,则直接映射为表中的列。如果不同,则这个被定义的列可以用于之后的转换操作。如上面示例中的:

(k1, k2, tmpk3)

转换:将第一步中经过映射的列进行转换,可以使用内置表达式、函数、自定义函数进行转化,并重新映射到表中对应的列上。如上面示例中的:

k3 = tmpk3 + 1

后置过滤:对经过映射和转换后的列,通过表达式进行过滤。被过滤的数据行不会导入到系统中。如上面示例中的:

WHERE k1 > k2


列映射

列映射的目的主要是描述导入文件中各个列的信息,相当于为源数据中的列定义名称。通过描述列映射关系,我们可以将于表中列顺序不同、列数量不同的源文件导入到 Doris 中。下面我们通过示例说明:

假设源文件有4列,内容如下(表头列名仅为方便表述,实际并无表头):

假设源文件有4列,内容如下(表头列名仅为方便表述,实际并无表头):

列1 列2 列3 列4
1 100 beijing 1.1
2 200 shanghai 1.2
3 300 guangzhou 1.3
4 \N chongqing 1.4

注:\N 在源文件中表示 null。


1. 调整映射顺序

假设表中有 k1,k2,k3,k4 4列。我们希望的导入映射关系如下:

列1 -> k1
列2 -> k3
列3 -> k2
列4 -> k4

则列映射的书写顺序应如下:

(k1, k3, k2, k4)

2. 源文件中的列数量多于表中的列

假设表中有 k1,k2,k3 3列。我们希望的导入映射关系如下:

列1 -> k1
列2 -> k3
列3 -> k2

则列映射的书写顺序应如下:

(k1, k3, k2, tmpk4)

其中 tmpk4 为一个自定义的、表中不存在的列名。Doris 会忽略这个不存在的列名。

3. 源文件中的列数量少于表中的列,使用默认值填充

假设表中有 k1,k2,k3,k4,k5 5列。我们希望的导入映射关系如下:

列1 -> k1
列2 -> k3
列3 -> k2

这里我们仅使用源文件中的前3列。k4,k5 两列希望使用默认值填充。

则列映射的书写顺序应如下:

(k1, k3, k2)

如果 k4,k5 列有默认值,则会填充默认值。否则如果是 nullable 的列,则会填充 null 值。否则,导入作业会报错。



列前置过滤

前置过滤是对读取到的原始数据进行一次过滤。目前仅支持 BROKER LOAD 和 ROUTINE LOAD。

前置过滤有以下应用场景:

1. 转换前做过滤

希望在列映射和转换前做过滤的场景。能够先行过滤掉部分不需要的数据。

2.过滤列不存在于表中,仅作为过滤标识

比如源数据中存储了多张表的数据(或者多张表的数据写入了同一个 Kafka 消息队列)。数据中每行有一列表名来标识该行数据属于哪个表。用户可以通过前置过滤条件来筛选对应的表数据进行导入。


列转换

列转换功能允许用户对源文件中列值进行变换。目前 Doris 支持使用绝大部分内置函数、用户自定义函数进行转换。

注:自定义函数隶属于某一数据库下,在使用自定义函数进行转换时,需要用户对这个数据库有读权限。

转换操作通常是和列映射一起定义的。即先对列进行映射,再进行转换。下面我们通过示例说明:

假设源文件有4列,内容如下(表头列名仅为方便表述,实际并无表头):

列1 列2 列3 列4
1 100 beijing 1.1
2 200 shanghai 1.2
3 300 guangzhou 1.3
4 400 chongqing 1.4

1. 将源文件中的列值经转换后导入表中

假设表中有 k1,k2,k3,k4 4列。我们希望的导入映射和转换关系如下:

列1       -> k1
列2 * 100 -> k3
列3       -> k2
列4       -> k4

则列映射的书写顺序应如下:

(k1, tmpk3, k2, k4, k3 = tmpk3 * 100)

这里相当于我们将源文件中的第2列命名为 tmpk3,同时指定表中 k3 列的值为 tmpk3 * 100。最终表中的数据如下:

k1 k2 k3 k4
1 beijing 10000 1.1
2 shanghai 20000 1.2
3 guangzhou 30000 1.3
null chongqing 40000 1.4

2. 通过 case when 函数,有条件的进行列转换。

假设表中有 k1,k2,k3,k4 4列。我们希望对于源数据中的 beijing, shanghai, guangzhou, chongqing 分别转换为对应的地区id后导入:

列1                  -> k1
列2                  -> k2
列3 进行地区id转换后    -> k3
列4                  -> k4

则列映射的书写顺序应如下:

(k1, k2, tmpk3, k4, k3 = case tmpk3 when "beijing" then 1 when "shanghai" then 2 when "guangzhou" then 3 when "chongqing" then 4 else null end)

最终表中的数据如下:

k1 k2 k3 k4
1 100 1 1.1
2 200 2 1.2
3 300 3 1.3
null 400 4 1.4

3. 将源文件中的 null 值转换成 0 导入。同时也进行示例2中的地区id转换。

假设表中有 k1,k2,k3,k4 4列。在对地区id转换的同时,我们也希望对于源数据中 k1 列的 null 值转换成 0 导入:

列1 如果为null 则转换成0   -> k1
列2                      -> k2
列3                      -> k3
列4                      -> k4

则列映射的书写顺序应如下:

(tmpk1, k2, tmpk3, k4, k1 = ifnull(tmpk1, 0), k3 = case tmpk3 when "beijing" then 1 when "shanghai" then 2 when "guangzhou" then 3 when "chongqing" then 4 else null end)

最终表中的数据如下:

k1 k2 k3 k4
1 100 1 1.1
2 200 2 1.2
3 300 3 1.3
0 400 4 1.4

列过滤

经过列映射和转换后,我们可以通过过滤条件将不希望导入到Doris中的数据进行过滤。下面我们通过示例说明:

假设源文件有4列,内容如下(表头列名仅为方便表述,实际并无表头):

列1 列2 列3 列4
1 100 beijing 1.1
2 200 shanghai 1.2
3 300 guangzhou 1.3
4 400 chongqing 1.4

1. 在列映射和转换缺省的情况下,直接过滤

假设表中有 k1,k2,k3,k4 4列。我们可以在缺省列映射和转换的情况下,直接定义过滤条件。如我们希望只导入源文件中第4列为大于 1.2 的数据行,则过滤条件如下:

where k4 > 1.2

最终表中的数据如下:

k1 k2 k3 k4
3 300 guangzhou 1.3
null 400 chongqing 1.4

缺省情况下,Doris 会按照顺序进行列映射,因此源文件中的第4列自动被映射到表中的 k4 列。

2. 对经过列转换的数据进行过滤

假设表中有 k1,k2,k3,k4 4列。在 列转换 示例中,我们将省份名称转换成了id。这里我们想过滤掉 id 为 3 的数据。则转换、过滤条件如下:

(k1, k2, tmpk3, k4, k3 = case tmpk3 when "beijing" then 1 when "shanghai" then 2 when "guangzhou" then 3 when "chongqing" then 4 else null end)
where k3 != 3

最终表中的数据如下:

k1 k2 k3 k4
1 100 1 1.1
2 200 2 1.2
null 400 4 1.4

这里我们看到,执行过滤时的列值,为经过映射和转换后的最终列值,而不是原始数据。

3. 多条件过滤

假设表中有 k1,k2,k3,k4 4列。我们想过滤掉 k1 列为 null 的数据,同时过滤掉 k4 列小于 1.2 的数据,则过滤条件如下:

where k1 is null and k4 < 1.2

最终表中的数据如下:

k1 k2 k3 k4
2 200 2 1.2
3 300 3 1.3

数据质量问题和过滤阈值

导入作业中被处理的数据行可以分为如下三种:

1. Filtered Rows

因数据质量不合格而被过滤掉的数据。数据质量不合格包括类型错误、精度错误、字符串长度超长、文件列数不匹配等数据格式问题,以及因没有对应的分区而被过滤掉的数据行。

2. Unselected Rows

这部分为因 preceding filterwhere 列过滤条件而被过滤掉的数据行。

3. Loaded Rows

被正确导入的数据行。

Doris 的导入任务允许用户设置最大错误率(max_filter_ratio)。如果导入的数据的错误率低于阈值,则这些错误行将被忽略,其他正确的数据将被导入。

错误率的计算方式为:

#Filtered Rows / (#Filtered Rows + #Loaded Rows)

也就是说 Unselected Rows 不会参与错误率的计算。



严格模式

严格模式(strict_mode)为导入操作中的一个参数配置。该参数会影响某些数值的导入行为和最终导入的数据。

本文档主要说明如何设置严格模式,以及严格模式产生的影响。


如何设置

严格模式默认情况下都为 False,即关闭状态。

不同的导入方式设置严格模式的方式不尽相同。

1. BROKER LOAD

LOAD LABEL example_db.label1(
    DATA INFILE("bos://my_bucket/input/file.txt")
    INTO TABLE `my_table`
    COLUMNS TERMINATED BY ","
)
WITH BROKER bos
(
    "bos_endpoint" = "http://bj.bcebos.com",
    "bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx",
    "bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyyyyyyyy"
)
PROPERTIES
(
    "strict_mode" = "true"
)

2.STREAM LOAD

curl --location-trusted -u user:passwd \-H "strict_mode: true" \
-T 1.txt \
http://host:port/api/example_db/my_table/_stream_load

3. ROUTINE LOAD

CREATE ROUTINE LOAD example_db.test_job ON my_tablePROPERTIES
(
    "strict_mode" = "true"
) 
FROM KAFKA
(
    "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
    "kafka_topic" = "my_topic"
);

4. INSERT

通过会话变量设置:

SET enable_insert_strict = true;
INSERT INTO my_table ...;

严格模式的作用

严格模式的意思是,对于导入过程中的列类型转换进行严格过滤。

严格过滤的策略如下:

对于列类型转换来说,如果开启严格模式,则错误的数据将被过滤。这里的错误数据是指:原始数据并不为 null,而在进行列类型转换后结果为 null 的这一类数据。

这里说指的 列类型转换,并不包括用函数计算得出的 null 值。

对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,严格模式对其也不产生影响。例如:如果类型是 decimal(1,0), 原始数据为 10,则属于可以通过类型转换但不在列声明的范围内。这种数据 strict 对其不产生影响。

1. 以列类型为 TinyInt 来举例:

原始数据类型 原始数据举例 转换为 TinyInt 后的值 严格模式 结果
空值 \N NULL 开启或关闭 NULL
非空值 "abc" or 2000 NULL 开启 非法值(被过滤)
非空值 "abc" NULL 关闭 NULL
非空值 1 1 开启或关闭 正确导入

说明:

1. 表中的列允许导入空值

2. abc2000 在转换为 TinyInt 后,会因类型或精度问题变为 NULL。在严格模式开启的情况下,这类数据将会被过滤。而如果是关闭状态,则会导入 null

  1. 以列类型为 Decimal(1,0) 举例
原始数据类型 原始数据举例 转换为 Decimal 后的值 严格模式 结果
空值 \N null 开启或关闭 NULL
非空值 aaa NULL 开启 非法值(被过滤)
非空值 aaa NULL 关闭 NULL
非空值 1 or 10 1 or 10 开启或关闭 正确导入

说明:
1. 表中的列允许导入空值
2.abc 在转换为 Decimal 后,会因类型问题变为 NULL。在严格模式开启的情况下,这类数据将会被过滤。而如果是关闭状态,则会导入 null
3.10 虽然是一个超过范围的值,但是因为其类型符合 decimal 的要求,所以严格模式对其不产生影响。10 最后会在其他导入处理流程中被过滤。但不会被严格模式过滤。

相关文章
|
8月前
|
SQL 消息中间件 分布式计算
Apache Doris 系列: 入门篇-数据导入及查询
Apache Doris 系列: 入门篇-数据导入及查询
210 0
|
1天前
|
SQL 缓存 关系型数据库
ClickHouse(19)ClickHouse集成Hive表引擎详细解析
Hive引擎允许对HDFS Hive表执行 `SELECT` 查询。目前它支持如下输入格式: -文本:只支持简单的标量列类型,除了 `Binary` - ORC:支持简单的标量列类型,除了`char`; 只支持 `array` 这样的复杂类型 - Parquet:支持所有简单标量列类型;只支持 `array` 这样的复杂类型
11 1
|
1月前
|
缓存 NoSQL 数据库
Flink cdc到doris,starrocks,table store
Flink cdc到doris,starrocks,table store
|
1月前
|
NoSQL MongoDB 数据安全/隐私保护
Flink CDC支持MongoDB的CDC(Change Data Capture)连接器
Flink CDC支持MongoDB的CDC(Change Data Capture)连接器
130 4
|
10月前
|
消息中间件 JSON 关系型数据库
Flink导入mysql数据到doris
经过各种实践,发现比较适合中小公司的方式。分为全量和增量。
|
SQL 存储 关系型数据库
Flink CDC 系列 - 构建 MySQL 和 Postgres 上的 Streaming ETL
本篇教程将展示如何基于 Flink CDC 快速构建 MySQL 和 Postgres 的流式 ETL。
Flink CDC 系列 - 构建 MySQL 和 Postgres 上的 Streaming ETL
|
SQL 分布式计算 运维
【大数据开发运维解决方案】Sqoop增量同步mysql/oracle数据到hive(merge-key/append)测试文档
上一篇文章介绍了sqoop全量同步数据到hive, 本片文章将通过实验详细介绍如何增量同步数据到hive,以及sqoop job与crontab定时结合无密码登录的增量同步实现方法。
【大数据开发运维解决方案】Sqoop增量同步mysql/oracle数据到hive(merge-key/append)测试文档
|
SQL 数据安全/隐私保护 UED
Palo Doris版五分钟快速入门!
Palo Doris版五分钟快速入门!
209 0
Palo Doris版五分钟快速入门!
|
SQL 存储 搜索推荐
浅谈 Apache Doris FE 处理查询 SQL 源码解析
浅谈 Apache Doris FE 处理查询 SQL 源码解析
791 0
浅谈 Apache Doris FE 处理查询 SQL 源码解析
|
存储 消息中间件 关系型数据库
flink cdc通过配置表动态感知mysql并分流
flink cdc通过配置表动态感知mysql并分流
flink cdc通过配置表动态感知mysql并分流