Flink Mysql CDC结合Doris flink connector实现数据实时入库

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: Flink Mysql CDC结合Doris flink connector实现数据实时入库,Apache doris通过扩展支持通过 Flink 读写 doris 数仓中的数据表。

Flink Mysql CDC结合Doris flink connector实现数据实时入库


Apache doris通过扩展支持通过 Flink 读写 doris 数仓中的数据表,


目前 doris 支持 Flink 1.11.x ,1.12.x,1.13.x,Scala版本:2.12.x


目前Flink doris connector目前控制入库通过两个参数:


sink.batch.size :每多少条写入一次,默认100条


sink.batch.interval :每个多少秒写入一下,默认1秒


这两参数同时起作用,那个条件先到就触发写doris表操作,


注意:


这里注意的是要启用 http v2 版本,具体在 fe.conf 中配置

enable_http_server_v2=true,同时因为是通过 fe http rest api 获取 be 列表,这俩需要配置的用户有 admin 权限。


Flink Doris Connector 编译


在 doris 的 docker 编译环境 apache/incubator-doris:build-env-1.2 下进行编译,因为 1.3 下面的JDK 版本是 11,会存在编译问题。


在 extension/flink-doris-connector/ 源码目录下执行:

sh build.sh


编译成功后,会在 output/ 目录下生成文件 doris-flink-1.0.0-SNAPSHOT.jar。将此文件复制到 Flink 的 ClassPath 中即可使用 Flink-Doris-Connector。例如,Local 模式运行的 Flink,将此文件放入 jars/ 文件夹下。Yarn集群模式运行的Flink,则将此文件放入预部署包中。


针对Flink 1.13.x版本适配问题


<properties>

<scala.version>2.12</scala.version>

<flink.version>1.11.2</flink.version>

<libthrift.version>0.9.3</libthrift.version>

<arrow.version>0.15.1</arrow.version>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<doris.home>image.png{basedir}/../../thirdparty</doris.thirdparty>

</properties>


只需要将这里的 flink.version 改成和你 Flink 集群版本一致,重新编辑即可

使用示例


通过flink cdc实现mysql binlog日志数据的消费,然后通过flink doris connector sql实时导入mysql数据到doris表数据中


这个代码已经提交到apache doris的示例代码库里


org.apache.doris.demo.flink.FlinkConnectorMysqlCDCDemo


注意: 由于Flink doris connector jar包不在Maven中央仓库中,需要单独编译并添加到你项目的classpath中。参考Flink doris connector的编译和使用: Flink doris connector

首先Mysql 要开启 binlog 具体如何打开binlog请自行搜索或到Mysql官方文档查询

安装Flink,Flink的安装和使用这里不做介绍,只是在开发环境中给出代码示例

创建Mysql数据库表


CREATE TABLE `test` (
  `id` int NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
 ) ENGINE=InnoDB AUTO_INCREMENT=19 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci

创建doris表

CREATE TABLE `doris_test` (
  `id` int NULL COMMENT "",
  `name` varchar(100) NULL COMMENT ""
 ) ENGINE=OLAP
 DUPLICATE KEY(`id`)
 COMMENT "OLAP"
 DISTRIBUTED BY HASH(`id`) BUCKETS 1
 PROPERTIES (
 "replication_num" = "3",
 "in_memory" = "false",
 "storage_format" = "V2"
 );

创建Flink Mysql CDC

tEnv.executeSql(
  "CREATE TABLE orders (\n" +
  "  id INT,\n" +
  "  name STRING\n" +
  ") WITH (\n" +
  "  'connector' = 'mysql-cdc',\n" +
  "  'hostname' = 'localhost',\n" +
  "  'port' = '3306',\n" +
  "  'username' = 'root',\n" +
  "  'password' = 'zhangfeng',\n" +
  "  'database-name' = 'demo',\n" +
  "  'table-name' = 'test'\n" +
  ")");

创建Flink Doris Table 映射表

tEnv.executeSql(
  "CREATE TABLE doris_test_sink (" +
  "id INT," +
  "name STRING" +
  ") " +
  "WITH (\n" +
  "  'connector' = 'doris',\n" +
  "  'fenodes' = '10.220.146.10:8030',\n" +
  "  'table.identifier' = 'test_2.doris_test',\n" +
  "  'sink.batch.size' = '2',\n" +
  "  'username' = 'root',\n" +
  "  'password' = ''\n" +
  ")");

执行插入操作

tEnv.executeSql("INSERT INTO doris_test_sink select id,name from orders");




相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
2月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
|
3月前
|
SQL 关系型数据库 MySQL
“震撼揭秘!Flink CDC如何轻松实现SQL Server到MySQL的实时数据同步?一招在手,数据无忧!”
【8月更文挑战第7天】随着大数据技术的发展,实时数据同步变得至关重要。Apache Flink作为高性能流处理框架,在实时数据处理领域扮演着核心角色。Flink CDC(Change Data Capture)组件的加入,使得数据同步更为高效。本文介绍如何使用Flink CDC实现从SQL Server到MySQL的实时数据同步,并提供示例代码。首先确保SQL Server启用了CDC功能,接着在Flink环境中引入相关连接器。通过定义源表与目标表,并执行简单的`INSERT INTO SELECT`语句,即可完成数据同步。
356 1
|
3月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何实现MVSOL同步到Doris,并且源库和目标库的库名不同
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之运行mysql to doris pipeline时报错,该如何排查
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之要将MySQL同步到Doris,并设置整库同步,只变更库名、表名和表结构都不变,该如何设置
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
SQL Oracle 关系型数据库
实时计算 Flink版产品使用问题之在从MySQL同步数据到Doris时,delete语句无法同步,是什么原因
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
14天前
|
SQL 关系型数据库 MySQL
12 PHP配置数据库MySQL
路老师分享了PHP操作MySQL数据库的方法,包括安装并连接MySQL服务器、选择数据库、执行SQL语句(如插入、更新、删除和查询),以及将结果集返回到数组。通过具体示例代码,详细介绍了每一步的操作流程,帮助读者快速入门PHP与MySQL的交互。
29 1
|
17天前
|
SQL 关系型数据库 MySQL
go语言数据库中mysql驱动安装
【11月更文挑战第2天】
30 4
|
1月前
|
存储 关系型数据库 MySQL
Mysql(4)—数据库索引
数据库索引是用于提高数据检索效率的数据结构,类似于书籍中的索引。它允许用户快速找到数据,而无需扫描整个表。MySQL中的索引可以显著提升查询速度,使数据库操作更加高效。索引的发展经历了从无索引、简单索引到B-树、哈希索引、位图索引、全文索引等多个阶段。
64 3
Mysql(4)—数据库索引
|
23天前
|
监控 关系型数据库 MySQL
数据库优化:MySQL索引策略与查询性能调优实战
【10月更文挑战第27天】本文深入探讨了MySQL的索引策略和查询性能调优技巧。通过介绍B-Tree索引、哈希索引和全文索引等不同类型,以及如何创建和维护索引,结合实战案例分析查询执行计划,帮助读者掌握提升查询性能的方法。定期优化索引和调整查询语句是提高数据库性能的关键。
119 1

热门文章

最新文章

下一篇
无影云桌面