怎么使用 Flink 向 Apache Doris 表中写 Bitmap 类型的数据

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: 怎么使用 Flink 向 Apache Doris 表中写 Bitmap 类型的数据

Bitmap是一种经典的数据结构,用于高效地对大量的二进制数据进行压缩存储和快速查询。Doris支持bitmap数据类型,在Flink计算场景中,可以结合Flink doris Connector对bitmap数据做计算。


社区里很多小伙伴在是Doris Flink Connector的时候,不知道怎么写Bitmap类型的数据,本文将介绍如何使用 Flink Doris Connector 如何将 bitmap 数据写入 Doris 中。


前置准备


Doris2.0.1的环境


Flink1.16,同时将 Doris Flink Connector的Jar包放在<FLINK_HOME>/lib 下面。


创建Doris表

CREATE TABLE `page_view_bitmap` (
`dt` int,
`page` varchar(256),
`user_id` bitmap bitmap_union
)
AGGREGATE KEY(`dt`, page)
DISTRIBUTED BY HASH(`dt`) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
)

写入Bitmap数据


这里模拟Flink读取MySQL数据写入Doris,同时将user_id存储到bitmap中。


模拟数据


创建MySQL表

CREATE TABLE `page_view` (
 `id` int NOT NULL,
 `dt` int,
 `page` varchar(256),
 `user_id` int,
 PRIMARY KEY (`id`)
);
#模拟数据
INSERT INTO `test`.`page_view` (`id`, `dt`, `page`, `user_id`) VALUES (1, 20230921, 'home', 1001);
INSERT INTO `test`.`page_view` (`id`, `dt`, `page`, `user_id`) VALUES (2, 20230921, 'home', 1002);
INSERT INTO `test`.`page_view` (`id`, `dt`, `page`, `user_id`) VALUES (3, 20230921, 'search', 1003);
INSERT INTO `test`.`page_view` (`id`, `dt`, `page`, `user_id`) VALUES (4, 20230922, 'mine', 1001);
INSERT INTO `test`.`page_view` (`id`, `dt`, `page`, `user_id`) VALUES (5, 20230922, 'home', 1002);
FlinkSQL写入Bitmap
#使用JDBC读取mysql数据
CREATE TABLE page_view (
   `dt` int,
   `page` string,
   `user_id` int
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://127.0.0.1:3306/test',
   'table-name' = 'page_view',
   'username' = 'root',
   'password' = '123456'
);

doris connector写入数据

CREATE TABLE page_view_bitmap (
dt int,
page string,
user_id int
)
WITH (
 'connector' = 'doris',
 'fenodes' = '127.0.0.1:8030',
 'table.identifier' = 'test.page_view_bitmap',
 'username' = 'root',
 'password' = '',
 'sink.label-prefix' = 'doris_label1',
 'sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)'
);

insert into page_view_bitmap select * from page_view


我们知道 Doris Flink Connector Sink 底层是基于 Doris Stream Load 来实现的,同样 Stream load 在 Connector 里也是一样适用,我们将这个参数封装在了 :sink.properties 参数里,


这里我们可以看到上面这个例子里我们在是 With 属性里加入了我们 Columns 参数,这里我们配置了列的转换操作,将 user_id 通过 to_bitmap 函数进行转换,并导入到 Doris 表里。


查询结果

mysql> select dt,page,bitmap_to_string(user_id) from `test`.`page_view_bitmap`;
+----------+--------+---------------------------+
| dt       | page   | bitmap_to_string(user_id) |
+----------+--------+---------------------------+
| 20230921 | home   | 1001,1002                 |
| 20230921 | search | 1003                      |
| 20230922 | home   | 1002                      |
| 20230922 | mine   | 1001                      |
+----------+--------+---------------------------+
4 rows in set (0.00 sec)

Flink DataStream


使用 DataStream API 模拟数据写入刚才的表中。


DataStream API 对 Bitmap 的操作也是和上面 SQL 操作的方式一样。

public static void main(String[] args) throws Exception {
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setParallelism(1);
       env.setRuntimeMode(RuntimeExecutionMode.BATCH);
       DorisSink.Builder<String> builder = DorisSink.builder();
       final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
       Properties properties = new Properties();
       properties.setProperty("column_separator", ",");
       properties.setProperty("format", "csv");
       properties.setProperty("columns", "dt,page,user_id,user_id=to_bitmap(user_id)");
       DorisOptions.Builder dorisBuilder = DorisOptions.builder();
       dorisBuilder.setFenodes("127.0.0.1:8030")
              .setTableIdentifier("test.page_view_bitmap")
              .setUsername("root")
              .setPassword("");
       DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
       executionBuilder.setLabelPrefix("doris_label")
              .setStreamLoadProp(properties)
              .setDeletable(false);
       builder.setDorisReadOptions(readOptionBuilder.build())
              .setDorisExecutionOptions(executionBuilder.build())
              .setSerializer(new SimpleStringSerializer())
              .setDorisOptions(dorisBuilder.build());
       //mock data
       DataStreamSource<String> stringDataStreamSource = env.fromCollection(
               Arrays.asList("20230921,home,1003", "20230921,search,1001", "20230923,home,1001"));
       stringDataStreamSource.sinkTo(builder.build());
       env.execute("doris bitmap write");
  }

查询结果

mysql> select dt,page,bitmap_to_string(user_id) from `test`.`page_view_bitmap`;
+----------+--------+---------------------------+
| dt       | page   | bitmap_to_string(user_id) |
+----------+--------+---------------------------+
| 20230921 | home   | 1001,1002,1003            |
| 20230921 | search | 1001,1003                 |
| 20230922 | home   | 1002                      |
| 20230922 | mine   | 1001                      |
| 20230923 | home   | 1001                      |
+----------+--------+---------------------------+
5 rows in set (0.00 sec)


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
14天前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
299 33
The Past, Present and Future of Apache Flink
|
2月前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
858 13
Apache Flink 2.0-preview released
|
2月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
97 3
|
1月前
|
存储 监控 数据处理
flink 向doris 数据库写入数据时出现背压如何排查?
本文介绍了如何确定和解决Flink任务向Doris数据库写入数据时遇到的背压问题。首先通过Flink Web UI和性能指标监控识别背压,然后从Doris数据库性能、网络连接稳定性、Flink任务数据处理逻辑及资源配置等方面排查原因,并通过分析相关日志进一步定位问题。
172 61
|
2月前
|
运维 数据处理 Apache
数据实时计算产品对比测评报告:阿里云实时计算Flink版
数据实时计算产品对比测评报告:阿里云实时计算Flink版
|
2月前
|
数据挖掘 物联网 数据处理
深入探讨Apache Flink:实时数据流处理的强大框架
在数据驱动时代,企业需高效处理实时数据流。Apache Flink作为开源流处理框架,以其高性能和灵活性成为首选平台。本文详细介绍Flink的核心特性和应用场景,包括实时流处理、强大的状态管理、灵活的窗口机制及批处理兼容性。无论在实时数据分析、金融服务、物联网还是广告技术领域,Flink均展现出巨大潜力,是企业实时数据处理的理想选择。随着大数据需求增长,Flink将继续在数据处理领域发挥重要作用。
191 0
|
3月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
1月前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
1266 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
zdl
|
1月前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
159 56
|
5月前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
875 7
阿里云实时计算Flink在多行业的应用和实践

推荐镜像

更多