Label 机制
Doris 的导入作业都可以设置一个 Label。这个 Label 通常是用户自定义的、具有一定业务逻辑属性的字符串。
Label 的主要作用是唯一标识一个导入任务,并且能够保证相同的 Label 仅会被成功导入一次。
Label 机制可以保证导入数据的不丢不重。如果上游数据源能够保证 At-Least-Once 语义,则配合 Doris 的 Label 机制,能够保证 Exactly-Once 语义。
Label 在一个数据库下具有唯一性。Label 的保留期限默认是 3 天。即 3 天后,已完成的 Label 会被自动清理,之后 Label 可以被重复使用。
最佳实践
Label 通常被设置为 业务逻辑+时间
的格式。如 my_business1_20201010_125000
。
这个 Label 通常用于表示:业务 my_business1
这个业务在 2020-10-10 12:50:00
产生的一批数据。通过这种 Label 设定,业务上可以通过 Label 查询导入任务状态,来明确的获知该时间点批次的数据是否已经导入成功。如果没有成功,则可以使用这个 Label 继续重试导入。
列的映射、转换与过滤
Doris 支持丰富的列映射、转换和过滤操作。可以非常灵活的处理需要导入的原始数据。
本章节主要介绍如何在导入中使用这些功能。
总体介绍
Doris 在导入过程中对数据处理步骤分为以下几步:
- 数据按原始文件中的列的顺序读入到 Doris
- 通过前置过滤条件(PRECEDING FILTER)对原始数据进行一次过滤。
- 通过列映射和转换,将原始数据映射到目标列顺序。
- 通过后置过滤条件(WHERE)对转换后的数据在进行一次过滤。
- 写入最终数据。
列的映射、转换和过滤参数在导入作业中皆为可选操作。在默认空缺的情况下,Doris 会将源文件中的行按默认的列分割符 \t
分割后,按顺序对应到表中的列。如果源文件中的列数量和表中的列数量不匹配,则会产生数据质量问题,导致数据无法导入。此时则需要显式的描述列的映射、转换和过滤信息。
支持的导入方式
BROKER LOAD
LOAD LABEL example_db.label1( DATA INFILE("bos://bucket/input/file") INTO TABLE `my_table` (k1, k2, tmpk3) PRECEDING FILTER k1 = 1 SET ( k3 = tmpk3 + 1 ) WHERE k1 > k2 ) WITH BROKER bos ( ... ); STREAM LOAD curl--location-trusted -u user:passwd -H "columns: k1, k2, tmpk3, k3 = tmpk3 + 1" -H "where: k1 > k2" -T file.txt http://host:port/api/testDb/testTbl/_stream_load ROUTINE LOAD CREATE ROUTINE LOAD example_db.label1 ON my_tableCOLUMNS(k1, k2, tmpk3, k3 = tmpk3 +1), PRECEDING FILTER k1 = 1, WHERE k1 > k2 ...
以上导入方式都支持对源数据进行列映射、转换和过滤操作:
- 前置过滤:对读取到的原始数据进行一次过滤。
PRECEDING FILTER k1 = 1
映射:定义源数据中的列。如果定义的列名和表中的列相同,则直接映射为表中的列。如果不同,则这个被定义的列可以用于之后的转换操作。如上面示例中的:
(k1, k2, tmpk3)
转换:将第一步中经过映射的列进行转换,可以使用内置表达式、函数、自定义函数进行转化,并重新映射到表中对应的列上。如上面示例中的:
k3 = tmpk3 + 1
后置过滤:对经过映射和转换后的列,通过表达式进行过滤。被过滤的数据行不会导入到系统中。如上面示例中的:
WHERE k1 > k2
列映射
列映射的目的主要是描述导入文件中各个列的信息,相当于为源数据中的列定义名称。通过描述列映射关系,我们可以将于表中列顺序不同、列数量不同的源文件导入到 Doris 中。下面我们通过示例说明:
假设源文件有4列,内容如下(表头列名仅为方便表述,实际并无表头):
假设源文件有4列,内容如下(表头列名仅为方便表述,实际并无表头):
列1 | 列2 | 列3 | 列4 |
1 | 100 | beijing | 1.1 |
2 | 200 | shanghai | 1.2 |
3 | 300 | guangzhou | 1.3 |
4 | \N | chongqing | 1.4 |
注:\N
在源文件中表示 null。
1. 调整映射顺序
假设表中有 k1,k2,k3,k4
4列。我们希望的导入映射关系如下:
列1 -> k1 列2 -> k3 列3 -> k2 列4 -> k4
则列映射的书写顺序应如下:
(k1, k3, k2, k4)
2. 源文件中的列数量多于表中的列
假设表中有 k1,k2,k3
3列。我们希望的导入映射关系如下:
列1 -> k1 列2 -> k3 列3 -> k2
则列映射的书写顺序应如下:
(k1, k3, k2, tmpk4)
其中 tmpk4
为一个自定义的、表中不存在的列名。Doris 会忽略这个不存在的列名。
3. 源文件中的列数量少于表中的列,使用默认值填充
假设表中有 k1,k2,k3,k4,k5
5列。我们希望的导入映射关系如下:
列1 -> k1 列2 -> k3 列3 -> k2
这里我们仅使用源文件中的前3列。k4,k5
两列希望使用默认值填充。
则列映射的书写顺序应如下:
(k1, k3, k2)
如果 k4,k5
列有默认值,则会填充默认值。否则如果是 nullable
的列,则会填充 null
值。否则,导入作业会报错。
列前置过滤
前置过滤是对读取到的原始数据进行一次过滤。目前仅支持 BROKER LOAD 和 ROUTINE LOAD。
前置过滤有以下应用场景:
1. 转换前做过滤
希望在列映射和转换前做过滤的场景。能够先行过滤掉部分不需要的数据。
2.过滤列不存在于表中,仅作为过滤标识
比如源数据中存储了多张表的数据(或者多张表的数据写入了同一个 Kafka 消息队列)。数据中每行有一列表名来标识该行数据属于哪个表。用户可以通过前置过滤条件来筛选对应的表数据进行导入。
列转换
列转换功能允许用户对源文件中列值进行变换。目前 Doris 支持使用绝大部分内置函数、用户自定义函数进行转换。
注:自定义函数隶属于某一数据库下,在使用自定义函数进行转换时,需要用户对这个数据库有读权限。
转换操作通常是和列映射一起定义的。即先对列进行映射,再进行转换。下面我们通过示例说明:
假设源文件有4列,内容如下(表头列名仅为方便表述,实际并无表头):
列1 | 列2 | 列3 | 列4 |
1 | 100 | beijing | 1.1 |
2 | 200 | shanghai | 1.2 |
3 | 300 | guangzhou | 1.3 |
4 | 400 | chongqing | 1.4 |
1. 将源文件中的列值经转换后导入表中
假设表中有 k1,k2,k3,k4
4列。我们希望的导入映射和转换关系如下:
列1 -> k1 列2 * 100 -> k3 列3 -> k2 列4 -> k4
则列映射的书写顺序应如下:
(k1, tmpk3, k2, k4, k3 = tmpk3 * 100)
这里相当于我们将源文件中的第2列命名为 tmpk3
,同时指定表中 k3
列的值为 tmpk3 * 100
。最终表中的数据如下:
k1 | k2 | k3 | k4 |
1 | beijing | 10000 | 1.1 |
2 | shanghai | 20000 | 1.2 |
3 | guangzhou | 30000 | 1.3 |
null | chongqing | 40000 | 1.4 |
2. 通过 case when 函数,有条件的进行列转换。
假设表中有 k1,k2,k3,k4
4列。我们希望对于源数据中的 beijing, shanghai, guangzhou, chongqing
分别转换为对应的地区id后导入:
列1 -> k1 列2 -> k2 列3 进行地区id转换后 -> k3 列4 -> k4
则列映射的书写顺序应如下:
(k1, k2, tmpk3, k4, k3 = case tmpk3 when "beijing" then 1 when "shanghai" then 2 when "guangzhou" then 3 when "chongqing" then 4 else null end)
最终表中的数据如下:
k1 | k2 | k3 | k4 |
1 | 100 | 1 | 1.1 |
2 | 200 | 2 | 1.2 |
3 | 300 | 3 | 1.3 |
null | 400 | 4 | 1.4 |
3. 将源文件中的 null 值转换成 0 导入。同时也进行示例2中的地区id转换。
假设表中有 k1,k2,k3,k4
4列。在对地区id转换的同时,我们也希望对于源数据中 k1 列的 null 值转换成 0 导入:
列1 如果为null 则转换成0 -> k1 列2 -> k2 列3 -> k3 列4 -> k4
则列映射的书写顺序应如下:
(tmpk1, k2, tmpk3, k4, k1 = ifnull(tmpk1, 0), k3 = case tmpk3 when "beijing" then 1 when "shanghai" then 2 when "guangzhou" then 3 when "chongqing" then 4 else null end)
最终表中的数据如下:
k1 | k2 | k3 | k4 |
1 | 100 | 1 | 1.1 |
2 | 200 | 2 | 1.2 |
3 | 300 | 3 | 1.3 |
0 | 400 | 4 | 1.4 |
列过滤
经过列映射和转换后,我们可以通过过滤条件将不希望导入到Doris中的数据进行过滤。下面我们通过示例说明:
假设源文件有4列,内容如下(表头列名仅为方便表述,实际并无表头):
列1 | 列2 | 列3 | 列4 |
1 | 100 | beijing | 1.1 |
2 | 200 | shanghai | 1.2 |
3 | 300 | guangzhou | 1.3 |
4 | 400 | chongqing | 1.4 |
1. 在列映射和转换缺省的情况下,直接过滤
假设表中有 k1,k2,k3,k4
4列。我们可以在缺省列映射和转换的情况下,直接定义过滤条件。如我们希望只导入源文件中第4列为大于 1.2 的数据行,则过滤条件如下:
where k4 > 1.2
最终表中的数据如下:
k1 | k2 | k3 | k4 |
3 | 300 | guangzhou | 1.3 |
null | 400 | chongqing | 1.4 |
缺省情况下,Doris 会按照顺序进行列映射,因此源文件中的第4列自动被映射到表中的 k4
列。
2. 对经过列转换的数据进行过滤
假设表中有 k1,k2,k3,k4
4列。在 列转换 示例中,我们将省份名称转换成了id。这里我们想过滤掉 id 为 3 的数据。则转换、过滤条件如下:
(k1, k2, tmpk3, k4, k3 = case tmpk3 when "beijing" then 1 when "shanghai" then 2 when "guangzhou" then 3 when "chongqing" then 4 else null end) where k3 != 3
最终表中的数据如下:
k1 | k2 | k3 | k4 |
1 | 100 | 1 | 1.1 |
2 | 200 | 2 | 1.2 |
null | 400 | 4 | 1.4 |
这里我们看到,执行过滤时的列值,为经过映射和转换后的最终列值,而不是原始数据。
3. 多条件过滤
假设表中有 k1,k2,k3,k4
4列。我们想过滤掉 k1
列为 null
的数据,同时过滤掉 k4
列小于 1.2 的数据,则过滤条件如下:
where k1 is null and k4 < 1.2
最终表中的数据如下:
k1 | k2 | k3 | k4 |
2 | 200 | 2 | 1.2 |
3 | 300 | 3 | 1.3 |
数据质量问题和过滤阈值
导入作业中被处理的数据行可以分为如下三种:
1. Filtered Rows
因数据质量不合格而被过滤掉的数据。数据质量不合格包括类型错误、精度错误、字符串长度超长、文件列数不匹配等数据格式问题,以及因没有对应的分区而被过滤掉的数据行。
2. Unselected Rows
这部分为因 preceding filter
或 where
列过滤条件而被过滤掉的数据行。
3. Loaded Rows
被正确导入的数据行。
Doris 的导入任务允许用户设置最大错误率(max_filter_ratio
)。如果导入的数据的错误率低于阈值,则这些错误行将被忽略,其他正确的数据将被导入。
错误率的计算方式为:
#Filtered Rows / (#Filtered Rows + #Loaded Rows)
也就是说 Unselected Rows
不会参与错误率的计算。
严格模式
严格模式(strict_mode)为导入操作中的一个参数配置。该参数会影响某些数值的导入行为和最终导入的数据。
本文档主要说明如何设置严格模式,以及严格模式产生的影响。
如何设置
严格模式默认情况下都为 False,即关闭状态。
不同的导入方式设置严格模式的方式不尽相同。
1. BROKER LOAD
LOAD LABEL example_db.label1( DATA INFILE("bos://my_bucket/input/file.txt") INTO TABLE `my_table` COLUMNS TERMINATED BY "," ) WITH BROKER bos ( "bos_endpoint" = "http://bj.bcebos.com", "bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx", "bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyyyyyyyy" ) PROPERTIES ( "strict_mode" = "true" )
2.STREAM LOAD
curl --location-trusted -u user:passwd \-H "strict_mode: true" \ -T 1.txt \ http://host:port/api/example_db/my_table/_stream_load
3. ROUTINE LOAD
CREATE ROUTINE LOAD example_db.test_job ON my_tablePROPERTIES ( "strict_mode" = "true" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic" );
4. INSERT
通过会话变量设置:
SET enable_insert_strict = true; INSERT INTO my_table ...;
严格模式的作用
严格模式的意思是,对于导入过程中的列类型转换进行严格过滤。
严格过滤的策略如下:
对于列类型转换来说,如果开启严格模式,则错误的数据将被过滤。这里的错误数据是指:原始数据并不为 null
,而在进行列类型转换后结果为 null
的这一类数据。
这里说指的 列类型转换
,并不包括用函数计算得出的 null
值。
对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,严格模式对其也不产生影响。例如:如果类型是 decimal(1,0)
, 原始数据为 10,则属于可以通过类型转换但不在列声明的范围内。这种数据 strict 对其不产生影响。
1. 以列类型为 TinyInt 来举例:
原始数据类型 | 原始数据举例 | 转换为 TinyInt 后的值 | 严格模式 | 结果 |
空值 | \N | NULL | 开启或关闭 | NULL |
非空值 | "abc" or 2000 | NULL | 开启 | 非法值(被过滤) |
非空值 | "abc" | NULL | 关闭 | NULL |
非空值 | 1 | 1 | 开启或关闭 | 正确导入 |
说明:
1. 表中的列允许导入空值
2. abc
及2000
在转换为 TinyInt 后,会因类型或精度问题变为 NULL。在严格模式开启的情况下,这类数据将会被过滤。而如果是关闭状态,则会导入null
。
- 以列类型为 Decimal(1,0) 举例
原始数据类型 | 原始数据举例 | 转换为 Decimal 后的值 | 严格模式 | 结果 |
空值 | \N | null | 开启或关闭 | NULL |
非空值 | aaa | NULL | 开启 | 非法值(被过滤) |
非空值 | aaa | NULL | 关闭 | NULL |
非空值 | 1 or 10 | 1 or 10 | 开启或关闭 | 正确导入 |
说明:
1. 表中的列允许导入空值2.abc
在转换为 Decimal 后,会因类型问题变为 NULL。在严格模式开启的情况下,这类数据将会被过滤。而如果是关闭状态,则会导入null
。3.10
虽然是一个超过范围的值,但是因为其类型符合 decimal 的要求,所以严格模式对其不产生影响。10
最后会在其他导入处理流程中被过滤。但不会被严格模式过滤。