Flink 使用 SQL 读取 Kafka 写入到Doris表中

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 这次我们演示的是整合Flink Doris Connector 到 Flink 里使用,通过Flink Kafka Connector,通过SQL的方式流式实时消费Kafka数据,利用Doris Flink Connector将数据插入到Doris表中。

这次我们演示的是整合Flink Doris Connector 到 Flink 里使用,通过Flink Kafka Connector,通过SQL的方式流式实时消费Kafka数据,利用Doris Flink Connector将数据插入到Doris表中。


这里的演示我们是用过Flink SQL Client来进行的,


1. 准备条件


这里我们使用的环境是

Doris-0.14.7
doris-flink-1.0-SNAPSHOT.jar,这个可以自己去编译
Flink-1.12.5
flink-connector-kafka_2.11-1.12.1.jar
kafka-clients-2.2.2.jar
kafka-2.2.2

2. Kafka数据准备


首先我们在kafka下创建一个topic:

bin/kafka-topics.sh --create --topic user_behavior --replication-factor 1 --partitions 1 --zookeeper 10.220.147.155:2181,10.220.147.156:2181,10.220.147.157:2181

向user_behavior topic队列中添加数据

bin/kafka-console-producer.sh --broker-list 10.220.147.155:9092,10.220.147.156:9092,10.220.147.157:9092 --topic user_behavior

示例数据如下:


{"user_id": "54346222", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662863337", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}

这里是演示,你可以将这个示例数据中的数据进行复制修改


3. doris 数据库建表


这里我们采用的是唯一主键模型


CREATE TABLE user_log (
    user_id VARCHAR(20),
    item_id VARCHAR(30),
    category_id VARCHAR(30),
    behavior VARCHAR(30),
    ts varchar(20)
) ENGINE=OLAP
UNIQUE KEY(`user_id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES (
   "replication_num" = "1",
   "in_memory" = "false",
    "storage_format" = "V2"
);

4.实战演示


4.1 Flink 安装部署


这里我们使用的是单机模式


因为Doris Flink Connector 要求Scala 2.12.x版本,这里我们下载是Flink 适配 scala 2.12的版本



解压Flink到指定目录,然后将下面依赖的 JAR 包拷贝到 Flink lib 目录下:

doris-flink-1.0-SNAPSHOT.jar
flink-connector-kafka_2.12-1.12.1.jar
kafka-clients-2.2.2.jar

image.png


4.2 启动Flink


bin/start-cluster.sh

启动以后我们在浏览器里访问:可以看到Flink的界面

image.png


4.3 启动 Flink SQL Client


./bin/sql-client.sh embedded

image.png


首先我们通过Flink SQL 创建Kafka表


CREATE TABLE user_log (
  user_id VARCHAR,
  item_id VARCHAR,
  category_id VARCHAR,
  behavior VARCHAR,
  ts varchar
) WITH (
  'connector.type' = 'kafka', 
  'connector.version' = 'universal',  
  'connector.topic' = 'user_behavior', 
  'connector.startup-mode' = 'earliest-offset', 
  'connector.properties.0.key' = 'zookeeper.connect', 
  'connector.properties.0.value' = '10.220.147.155:2181,10.220.147.156:2181,10.220.147.157:2181', 
  'connector.properties.1.key' = 'bootstrap.servers',
  'connector.properties.1.value' = '10.220.147.155:9092,10.220.147.156:9092,10.220.147.157:9092', 
  'update-mode' = 'append',
  'format.type' = 'json', 
  'format.derive-schema' = 'true' 
);

image.png


执行查询验证是否正常

select * from user_log;

执行以后我们可以看到下面的界面,显示正常

image.png


利用 Doris Flink Connector 创建 Doris 映射表

CREATE TABLE doris_test_sink_1 (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts varchar
) 
WITH (
  'connector' = 'doris',
  'fenodes' = '10.220.146.10:8030',
  'table.identifier' = 'test_2.user_log',
  'sink.batch.size' = '2',
  'username' = 'root',
  'password' = ''
)

执行下面SQL 查询Doris数据库表里的数据,验证是否正常

select * from doris_test_sink_1;

执行以后可以看到下面的界面,显示有四条数据,是正常的

image.png


提交 插入语句,从Kafka表中读取数据插入到Doris中

insert into doris_test_sink_1 select * from user_log;

显示任务已经提交到FLink 集群上运行


image.png


在Flink 集群的web界面我们也可以看到这个Job

image.png


然后我们朝Kafka topic中推送两条数据,看看是否在Doris表中能查询到

{"user_id": "123456", "item_id":"123", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "1765543", "item_id":"456", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}

image.png


验证结果

image.png




我们在Flink SQL Client界面查询可以看到红色标识出来额的两条数据已经插入进去,说明是正常的。


5.总结


我们到这里整个的演示就结束了,使用Doris flink connector可以很容易的通过Flink SQL方式整合各种异构数据源,导入到Doris数仓中,非常的方便




目录
相关文章
|
1月前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
311 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
消息中间件 存储 传感器
193 0
|
5月前
|
消息中间件 SQL 关系型数据库
Flink CDC + Kafka 加速业务实时化
Flink CDC 是一种支持流批一体的分布式数据集成工具,通过 YAML 配置实现数据传输过程中的路由与转换操作。它已从单一数据源的 CDC 数据流发展为完整的数据同步解决方案,支持 MySQL、Kafka 等多种数据源和目标端(如 Delta Lake、Iceberg)。其核心功能包括多样化数据输入链路、Schema Evolution、Transform 和 Routing 模块,以及丰富的监控指标。相比传统 SQL 和 DataStream 作业,Flink CDC 提供更灵活的 Schema 变更控制和原始 binlog 同步能力。
|
6月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
222 12
|
SQL 消息中间件 分布式数据库
flink sql问题之连接HBase报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
761 0
|
SQL JSON Java
Flink SQL 问题之重启报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
401 3
|
SQL Java API
Flink SQL 问题之窗口函数报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
202 1
|
SQL 消息中间件 关系型数据库
Flink SQL 问题之提交执行报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
744 2
|
SQL 资源调度 分布式数据库
Flink SQL 问题之服务器报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
340 3
|
SQL 消息中间件 Oracle
Flink SQL 问题之写入ES报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
213 4