Apache Doris Routine Load快速体验之案例(2)1

简介: Apache Doris Routine Load快速体验之案例(2)1

Apache Doris Routine Load快速体验之案例(2)

环境信息

硬件信息

软件信息

Routine Load介绍

Routine Load案例

创建Doris结果测试表

创建Routine Load任务

查看Routine Load

发送测试Kafka测试数据

查看Doris结果数据

常见问题

Failed to get all partitions of kafka topic

current error rows is more than max error num

环境信息

硬件信息

  1. 1.CPU :4C
  2. 2.CPU型号:ARM64
  3. 3.内存 :10GB
  4. 4.硬盘 :66GB SSD

软件信息

  1. 1.VM镜像版本 :CentOS-7
  2. 2.Apahce Doris版本 :1.2.4.1
  3. 3.Kafka版本:3.2.0

Routine Load介绍

Routine Load适合Kafka直接实时写数据到Doris的场景;它支持用户提交一个常驻的导入任务,通过不断地从指定的数据源中读取数据,将数据导入到 Doris 中。

如上图,Client 向 FE 提交一个Routine Load 作业。

1.FE 通过 JobScheduler 将一个导入作业拆分成若干个 Task。每个 Task 负责导入指定的一部分数据。Task 被 TaskScheduler 分配到指定的 BE 上执行。

2.在 BE 上,一个 Task 被视为一个普通的导入任务,通过 Stream Load 的导入机制进行导入。导入完成后,向 FE 汇报。

3.FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或者对失败的 Task 进行重试。

4.整个 Routine Load 作业通过不断的产生新的 Task,来完成数据不间断的导入。

Routine Load案例

创建Doris结果测试表

-- 创建测试库
create database routine_load;
-- 切换为测试库
use routine_load;
-- 创建测试结果表
CREATE TABLE rl_test01 (
  `id` varchar(1000) NULL COMMENT "来源库表键",
  `test01` BIGINT SUM DEFAULT "0" COMMENT "测试"
) ENGINE=OLAP
AGGREGATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2"
);

创建Routine Load任务

CREATE ROUTINE LOAD routine_load.rl_test01 ON rl_test01
        COLUMNS TERMINATED BY ",",
        COLUMNS(id,test01) -- 字段名和表里对应
        PROPERTIES
        (
            "desired_concurrent_number"="3",
            "max_batch_interval" = "20",
            "max_batch_rows" = "200000",
            "max_batch_size" = "209715200",
            "strict_mode" = "false"
        )
        FROM KAFKA
        (
            "kafka_broker_list" = "192.168.1.61:9092",
            "kafka_topic" = "rl_test01",
            "property.group.id" = "rl_test01_group",
            "property.client.id" = "rl_test01_client",
            "property.kafka_default_offsets" = "OFFSET_BEGINNING"
        );
相关文章
|
10天前
|
存储 SQL BI
毫秒级查询性能优化实践!基于阿里云数据库 SelectDB 版内核:Apache Doris 在极越汽车数字化运营和营销方向的解决方案
毫秒级查询性能优化实践!基于阿里云数据库 SelectDB 版内核:Apache Doris 在极越汽车数字化运营和营销方向的解决方案
毫秒级查询性能优化实践!基于阿里云数据库 SelectDB 版内核:Apache Doris 在极越汽车数字化运营和营销方向的解决方案
|
10天前
|
弹性计算 JSON Cloud Native
Apache Doris 2.0.11 版本正式发布
Apache Doris 2.0.11 版本已于 2024 年 6 月 5 日正式与大家见面,该版本提交了 123 个改进项以及问题修复,进一步提升了系统的性能及稳定性,欢迎大家下载体验。
|
16天前
|
存储 运维 5G
基于阿里云数据库 SelectDB 内核 Apache Doris 的实时/离线一体化架构,赋能中国联通 5G 全连接工厂解决方案
数据是 5G 全连接工厂的核心要素,为支持全方位的数据收集、存储、分析等工作的高效进行,联通 5G 全连接工厂从典型的 Lambda 架构演进为 All in [Apache Doris](https://c.d4t.cn/vwDf8R) 的实时/离线一体化架构,并凭借 Doris 联邦查询能力打造统一查询网关,数据处理及查询链路大幅简化,为联通 5G 全连接工厂带来数据时效性、查询响应、存储成本、开发效率全方位的提升。
基于阿里云数据库 SelectDB 内核 Apache Doris 的实时/离线一体化架构,赋能中国联通 5G 全连接工厂解决方案
|
19天前
|
OLAP 数据处理 Apache
众安保险 CDP 平台:借助阿里云数据库 SelectDB 版内核 Apache Doris 打破数据孤岛,人群圈选提速4倍
众安保险在CDP(Customer Data Platform,客户数据平台)建设中,通过引入阿里云数据库SelectDB版内核Apache Doris,成功打破了数据孤岛,并显著提升了人群圈选的速度
184 1
|
20天前
|
运维 Cloud Native Apache
云计算新宠:探索Apache Doris的云原生策略
云计算新宠:探索Apache Doris的云原生策略
|
3天前
|
监控 大数据 Java
使用Apache Flink进行大数据实时流处理
Apache Flink是开源流处理框架,擅长低延迟、高吞吐量实时数据流处理。本文深入解析Flink的核心概念、架构(包括客户端、作业管理器、任务管理器和数据源/接收器)和事件时间、窗口、状态管理等特性。通过实战代码展示Flink在词频统计中的应用,讨论其实战挑战与优化。Flink作为大数据处理的关键组件,将持续影响实时处理领域。
38 5
|
24天前
|
消息中间件 Java Kafka
实时计算 Flink版操作报错之Apache Flink中的SplitFetcher线程在读取数据时遇到了未预期的情况,该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4天前
|
数据采集 关系型数据库 MySQL
使用Apache Flink实现MySQL数据读取和写入的完整指南
使用Apache Flink实现MySQL数据读取和写入的完整指南
使用Apache Flink实现MySQL数据读取和写入的完整指南
|
8天前
|
消息中间件 Kafka 数据处理
Apache Flink:流式数据处理的强大引擎
【6月更文挑战第8天】Apache Flink是开源的流处理框架,专注于高效、低延迟的无界和有界数据流处理。它提供统一编程模型,支持实时与批量数据。核心概念包括DataStreams、DataSets、时间语义和窗口操作。使用Flink涉及环境设置、数据源配置(如Kafka)、数据转换(如map、filter)、窗口聚合及数据输出。通过丰富API和灵活时间语义,Flink适于构建复杂流处理应用,在实时数据处理领域具有广阔前景。
|
17天前
|
数据处理 Apache 流计算

推荐镜像

更多