怎么使用 Flink 向 Apache Doris 表中写 Bitmap 类型的数据

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: 怎么使用 Flink 向 Apache Doris 表中写 Bitmap 类型的数据

Bitmap是一种经典的数据结构,用于高效地对大量的二进制数据进行压缩存储和快速查询。Doris支持bitmap数据类型,在Flink计算场景中,可以结合Flink doris Connector对bitmap数据做计算。


社区里很多小伙伴在是Doris Flink Connector的时候,不知道怎么写Bitmap类型的数据,本文将介绍如何使用 Flink Doris Connector 如何将 bitmap 数据写入 Doris 中。


前置准备


Doris2.0.1的环境


Flink1.16,同时将 Doris Flink Connector的Jar包放在<FLINK_HOME>/lib 下面。


创建Doris表

CREATE TABLE `page_view_bitmap` (
`dt` int,
`page` varchar(256),
`user_id` bitmap bitmap_union
)
AGGREGATE KEY(`dt`, page)
DISTRIBUTED BY HASH(`dt`) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
)

写入Bitmap数据


这里模拟Flink读取MySQL数据写入Doris,同时将user_id存储到bitmap中。


模拟数据


创建MySQL表

CREATE TABLE `page_view` (
 `id` int NOT NULL,
 `dt` int,
 `page` varchar(256),
 `user_id` int,
 PRIMARY KEY (`id`)
);
#模拟数据
INSERT INTO `test`.`page_view` (`id`, `dt`, `page`, `user_id`) VALUES (1, 20230921, 'home', 1001);
INSERT INTO `test`.`page_view` (`id`, `dt`, `page`, `user_id`) VALUES (2, 20230921, 'home', 1002);
INSERT INTO `test`.`page_view` (`id`, `dt`, `page`, `user_id`) VALUES (3, 20230921, 'search', 1003);
INSERT INTO `test`.`page_view` (`id`, `dt`, `page`, `user_id`) VALUES (4, 20230922, 'mine', 1001);
INSERT INTO `test`.`page_view` (`id`, `dt`, `page`, `user_id`) VALUES (5, 20230922, 'home', 1002);
FlinkSQL写入Bitmap
#使用JDBC读取mysql数据
CREATE TABLE page_view (
   `dt` int,
   `page` string,
   `user_id` int
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://127.0.0.1:3306/test',
   'table-name' = 'page_view',
   'username' = 'root',
   'password' = '123456'
);

doris connector写入数据

CREATE TABLE page_view_bitmap (
dt int,
page string,
user_id int
)
WITH (
 'connector' = 'doris',
 'fenodes' = '127.0.0.1:8030',
 'table.identifier' = 'test.page_view_bitmap',
 'username' = 'root',
 'password' = '',
 'sink.label-prefix' = 'doris_label1',
 'sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)'
);

insert into page_view_bitmap select * from page_view


我们知道 Doris Flink Connector Sink 底层是基于 Doris Stream Load 来实现的,同样 Stream load 在 Connector 里也是一样适用,我们将这个参数封装在了 :sink.properties 参数里,


这里我们可以看到上面这个例子里我们在是 With 属性里加入了我们 Columns 参数,这里我们配置了列的转换操作,将 user_id 通过 to_bitmap 函数进行转换,并导入到 Doris 表里。


查询结果

mysql> select dt,page,bitmap_to_string(user_id) from `test`.`page_view_bitmap`;
+----------+--------+---------------------------+
| dt       | page   | bitmap_to_string(user_id) |
+----------+--------+---------------------------+
| 20230921 | home   | 1001,1002                 |
| 20230921 | search | 1003                      |
| 20230922 | home   | 1002                      |
| 20230922 | mine   | 1001                      |
+----------+--------+---------------------------+
4 rows in set (0.00 sec)

Flink DataStream


使用 DataStream API 模拟数据写入刚才的表中。


DataStream API 对 Bitmap 的操作也是和上面 SQL 操作的方式一样。

public static void main(String[] args) throws Exception {
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setParallelism(1);
       env.setRuntimeMode(RuntimeExecutionMode.BATCH);
       DorisSink.Builder<String> builder = DorisSink.builder();
       final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
       Properties properties = new Properties();
       properties.setProperty("column_separator", ",");
       properties.setProperty("format", "csv");
       properties.setProperty("columns", "dt,page,user_id,user_id=to_bitmap(user_id)");
       DorisOptions.Builder dorisBuilder = DorisOptions.builder();
       dorisBuilder.setFenodes("127.0.0.1:8030")
              .setTableIdentifier("test.page_view_bitmap")
              .setUsername("root")
              .setPassword("");
       DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
       executionBuilder.setLabelPrefix("doris_label")
              .setStreamLoadProp(properties)
              .setDeletable(false);
       builder.setDorisReadOptions(readOptionBuilder.build())
              .setDorisExecutionOptions(executionBuilder.build())
              .setSerializer(new SimpleStringSerializer())
              .setDorisOptions(dorisBuilder.build());
       //mock data
       DataStreamSource<String> stringDataStreamSource = env.fromCollection(
               Arrays.asList("20230921,home,1003", "20230921,search,1001", "20230923,home,1001"));
       stringDataStreamSource.sinkTo(builder.build());
       env.execute("doris bitmap write");
  }

查询结果

mysql> select dt,page,bitmap_to_string(user_id) from `test`.`page_view_bitmap`;
+----------+--------+---------------------------+
| dt       | page   | bitmap_to_string(user_id) |
+----------+--------+---------------------------+
| 20230921 | home   | 1001,1002,1003            |
| 20230921 | search | 1001,1003                 |
| 20230922 | home   | 1002                      |
| 20230922 | mine   | 1001                      |
| 20230923 | home   | 1001                      |
+----------+--------+---------------------------+
5 rows in set (0.00 sec)


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
打赏
0
0
0
0
18
分享
相关文章
基于 Flink 的中国电信星海时空数据多引擎实时改造
本文整理自中国电信集团大数据架构师李新虎老师在Flink Forward Asia 2024的分享,围绕星海时空智能系统展开,涵盖四个核心部分:时空数据现状、实时场景多引擎化、典型应用及未来展望。系统日处理8000亿条数据,具备亚米级定位能力,通过Flink多引擎架构解决数据膨胀与响应时效等问题,优化资源利用并提升计算效率。应用场景包括运动状态识别、个体行为分析和群智感知,未来将推进湖仓一体改造与三维时空服务体系建设,助力数字化转型与智慧城市建设。
238 1
基于 Flink 的中国电信星海时空数据多引擎实时改造
网易游戏 x Apache Doris:湖仓一体架构演进之路
网易游戏 Apache Doris 集群超 20 个 ,总节点数百个,已对接内部 200+ 项目,日均查询量超过 1500 万,总存储数据量 PB 级别。
网易游戏 x Apache Doris:湖仓一体架构演进之路
Apache Doris & SelectDB 技术能力全面解析
本文将对 Doris & SelectDB 适合的分析场景和技术能力进行概述解析
Apache Doris & SelectDB 技术能力全面解析
Apache Doris 2025 Roadmap:构建 GenAI 时代实时高效统一的数据底座
秉承“以场景驱动创新” 的核心理念,持续深耕三大核心场景的关键能力,并对大模型 GenAI 场景的融合应用进行重点投入,为智能时代构建实时、高效、统一的数据底座。
Apache Doris 2025 Roadmap:构建 GenAI 时代实时高效统一的数据底座
为什么 Apache Doris 是比 Elasticsearch 更好的实时分析替代方案?
本文将从技术选型的视角,从开放性、系统架构、实时写入、实时存储、实时查询等多方面,深入分析 Apache Doris 与 Elasticsearch 的能力差异及性能表现
为什么 Apache Doris 是比 Elasticsearch 更好的实时分析替代方案?
Flink + Doris 实时湖仓解决方案
本文整理自SelectDB技术副总裁陈明雨在Flink Forward Asia 2024的分享,聚焦Apache Doris与湖仓一体解决方案。内容涵盖三部分:一是介绍Apache Doris,一款高性能实时分析数据库,支持多场景应用;二是基于Doris、Flink和Paimon的湖仓解决方案,解决批流融合与数据一致性挑战;三是Doris社区生态及云原生发展,包括存算分离架构与600多位贡献者的活跃社区。文章深入探讨了Doris在性能、易用性及场景支持上的优势,并展示了其在多维分析、日志分析和湖仓分析中的实际应用案例。
176 17
Flink + Doris 实时湖仓解决方案
拉卡拉 x Apache Doris:统一金融场景 OLAP 引擎,查询提速 15 倍,资源直降 52%
拉卡拉早期基于 Lambda 架构构建数据系统面临存储成本高、实时写入性能差、复杂查询耗时久、组件维护复杂等问题。为此,拉卡拉选择使用 Apache Doris 替换 Elasticsearch、Hive、Hbase、TiDB、Oracle / MySQL 等组件,实现了 OLAP 引擎的统一、查询性能提升 15 倍、资源减少 52% 的显著成效。
拉卡拉 x Apache Doris:统一金融场景 OLAP 引擎,查询提速 15 倍,资源直降 52%
【YashanDB知识库】Flink CDC实时同步Oracle数据到崖山
本文介绍通过Flink CDC实现Oracle数据实时同步至崖山数据库(YashanDB)的方法,支持全量与增量同步,并涵盖新增、修改和删除的DML操作。内容包括环境准备(如JDK、Flink版本等)、Oracle日志归档启用、用户权限配置、增量日志记录设置、元数据迁移、Flink安装与配置、生成Flink SQL文件、Streampark部署,以及创建和启动实时同步任务的具体步骤。适合需要跨数据库实时同步方案的技术人员参考。
【YashanDB知识库】Flink CDC实时同步Oracle数据到崖山
Apache Doris 2.1.9 版本正式发布
Apache Doris 2.1.9 版本正式发布,欢迎使用~
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
518 33
The Past, Present and Future of Apache Flink

热门文章

最新文章

推荐镜像

更多
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等