Apache Doris Routine Load快速体验之案例(2)
Failed to get all partitions of kafka topic
current error rows is more than max error num
环境信息
硬件信息
- 1.CPU :4C
- 2.CPU型号:ARM64
- 3.内存 :10GB
- 4.硬盘 :66GB SSD
软件信息
- 1.VM镜像版本 :CentOS-7
- 2.Apahce Doris版本 :1.2.4.1
- 3.Kafka版本:3.2.0
Routine Load介绍
Routine Load适合Kafka直接实时写数据到Doris的场景;它支持用户提交一个常驻的导入任务,通过不断地从指定的数据源中读取数据,将数据导入到 Doris 中。
如上图,Client 向 FE 提交一个Routine Load 作业。
1.FE 通过 JobScheduler 将一个导入作业拆分成若干个 Task。每个 Task 负责导入指定的一部分数据。Task 被 TaskScheduler 分配到指定的 BE 上执行。
2.在 BE 上,一个 Task 被视为一个普通的导入任务,通过 Stream Load 的导入机制进行导入。导入完成后,向 FE 汇报。
3.FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或者对失败的 Task 进行重试。
4.整个 Routine Load 作业通过不断的产生新的 Task,来完成数据不间断的导入。
Routine Load案例
创建Doris结果测试表
-- 创建测试库 create database routine_load; -- 切换为测试库 use routine_load; -- 创建测试结果表 CREATE TABLE rl_test01 ( `id` varchar(1000) NULL COMMENT "来源库表键", `test01` BIGINT SUM DEFAULT "0" COMMENT "测试" ) ENGINE=OLAP AGGREGATE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1 PROPERTIES ( "replication_allocation" = "tag.location.default: 1", "in_memory" = "false", "storage_format" = "V2" );
创建Routine Load任务
CREATE ROUTINE LOAD routine_load.rl_test01 ON rl_test01 COLUMNS TERMINATED BY ",", COLUMNS(id,test01) -- 字段名和表里对应 PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "200000", "max_batch_size" = "209715200", "strict_mode" = "false" ) FROM KAFKA ( "kafka_broker_list" = "192.168.1.61:9092", "kafka_topic" = "rl_test01", "property.group.id" = "rl_test01_group", "property.client.id" = "rl_test01_client", "property.kafka_default_offsets" = "OFFSET_BEGINNING" );