开源共建 | Dinky 扩展批流统一数据集成框架 ChunJun 的实践分享

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: Dinky 扩展批流统一数据集成框架 ChunJun 的实践分享

一、前言

ChunJun(原 FlinkX)是一个基于 Flink 提供易用、稳定、高效的批流统一的数据集成工具,既可以采集静态的数据,比如 MySQL,HDFS 等,也可以采集实时变化的数据,比如 binlog,Kafka 等。同时 ChunJun 也是一个支持原生 FlinkSql 所有语法和特性的计算框架。

ChunJun 具有丰富的插件种类,多达 40 种,如常见的 mysql、binlog、logminer 等,大部分插件都支持 source/reader、sink/writer 及维表功能。目前很多用户在思考能否在 Dinky 上使用 ChunJun 的插件以提供更全面的能力。那本文将带来如何在 Dinky 上集成 ChunJun 丰富的插件,其实简单,那我们开始吧。

二、部署 Flink+ChunJun

编译

注意,如果需要集成 Dinky,需要将 ChunJun 项目下的 chunjun-core 的 pom 文件中的 logback-classic 和 logback-core 注释掉,否则容易在 Dinky 执行 sql 任务的时候报错。

网络异常,图片无法展示
|
然后执行:

网络异常,图片无法展示
|

部署

使用 ChunJun 需要先部署 Flink 集群,其部署本文不再做指导。

值得注意的是,如果你需要调用 Flinkx 的 connect jar 的话,则需要将 classloader.resolve-order 改成 parent-first。修改完成配置以后,把 Flinkx 的 jar 包复制过来,主要是 chunjun-clients-master.jar(Flinkx 现在改名 ChunJun )以及 chunjun 的其它 connector 放到 flink/lib 目录下,如图所示。

网络异常,图片无法展示
|

异常处理

如果启动集群时出现异常,即 Flink standalone 集群加载 flinkx-dist 里 jar 包之后,集群无法启动,日志报错:Exception in thread "main" java.lang.NoSuchFieldError: EMPTY_BYTE_ARRAY.

Exception in thread"main"java.lang.NoSuchFieldError:EMPTY_BYTE_ARRAY
      at org.apache.logging.log4j.core.config.ConfigurationSource.<clinit>(ConfigurationSource.java:56)
      at org.apache.logging.log4j.core.config.NullConfiguration.<init>(NullConfiguration.java:32)
      at org.apache.logging.log4j.core.LoggerContext.<clinit>(LoggerContext.java:85)
      at java.lang.Class.forName0(Native Method)
      at java.lang.Class.forName(Class.java:264)
      at org.apache.log4j.LogManager.<clinit>(LogManager.java:72)
      at org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:73)
      at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:285)
      at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:305)
      at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.<clinit>(ClusterEntrypoint.java:107)

原因:这个报错是因为 log4j 版本不统一导致的,因为 flinkx-dist 中部分插件引用的还是旧版本的 log4j 依赖,导致集群启动过程中,出现了类冲突问题;

方案:临时方案是将 flink lib 中 log4j 相关的 jar 包名字前加上字符 ‘a‘,使得 flink standalone jvm 优先加载。

网络异常,图片无法展示
|

三、部署 Dinky

编译

网络异常,图片无法展示
|
编译完成后的压缩包在 Dinky 根目录下的 build 文件夹下。

部署

1、上传 dlink 压缩包到部署服务器

2、解压

网络异常,图片无法展示
|
3、数据库初始化

4、把 flink 的 jar 放到 dlink 目录下

网络异常,图片无法展示
|

切换 Dinky 的 Flink 版本

因为目前 flinkx 的稳定版本是 1.12.7,所以我们把 dlink 默认的 client 版本修改为 1.12

网络异常,图片无法展示
|
lib 下的目录如图:

网络异常,图片无法展示
|
注意:因为我没有用上 dlink-connector-jdbc 的 jar 包,所以图中的 dlink-connector-jdbc-1.13-0.6.4-SNAPSHOT.jar 没有换成 1.12 版本的,可以去掉。

启动

启动命令

网络异常,图片无法展示
|

注册集群实例

在集群实例中注册已经启动的 Flink 集群。

网络异常,图片无法展示
|

四、示例分享

添加依赖

这里演示 mysql->mysql 的同步作业,所以需要 Flinkx 的 mysql-connector.jar 以及核心 jar。

网络异常,图片无法展示
|

编写作业

Mysql DDL:

CREATE TABLE `datasource_classify` (
`id` int unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id',
`classify_code` varchar(64) NOT NULL COMMENT '类型栏唯一编码',
`sorted` int NOT NULL DEFAULT '0' COMMENT '类型栏排序字段 默认从0开始',
`classify_name` varchar(64) NOT NULL COMMENT '类型名称 包含全部和常用栏',
`is_deleted` tinyint NOT NULL DEFAULT '0' COMMENT '是否删除,1删除,0未删除',
`gmt_create` datetime DEFAULT CURRENT_TIMESTAMP,
`gmt_modified` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `classify_code` (`classify_code`)
) ENGINE=InnoDB AUTO_INCREMENT=12 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='数据源分类表';

Flink Sql:

CREATE TABLE source
(
  id          bigint,
  classify_code     STRING,
  sorted     int,
  classify_name   STRING,
  is_deleted    int,
  gmt_create        timestamp(9),
  gmt_modified timestamp(9),
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
     'connector' = 'mysql-x',
     'url' = 'jdbc:mysql://192.168.31.101:3306/datasource?useSSL=false',
     'table-name' = 'datasource_classify',
     'username' = 'root',
     'password' = 'root'
    ,'scan.fetch-size' = '2'
    ,'scan.query-timeout' = '10'
    );
CREATE TABLE sink
(
  id          bigint,
  classify_code     STRING,
  sorted     int,
  classify_name   STRING,
  is_deleted    int,
  gmt_create        timestamp(9),
  gmt_modified timestamp(9),
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
     'connector' = 'mysql-x',
     'url' = 'jdbc:mysql://192.168.31.106:3306/test?useSSL=false',
     'table-name' = 'datasource_classify',
     'username' = 'root',
     'password' = 'root'
    ,'scan.fetch-size' = '2'
    ,'scan.query-timeout' = '10'
    );
insert into sink
select *
from source u;


执行任务

网络异常,图片无法展示
|
选中 Yarn Session 模式提交作业。
网络异常,图片无法展示
|
提交后可从执行历史查看作业提交状况。
网络异常,图片无法展示
|
进程中可以看的 Flink 集群上批作业执行完成。

对比数据

源库:

网络异常,图片无法展示
|
目标库:
网络异常,图片无法展示
|
同步成功,很丝滑。

五、总结

在集成 ChunJun 的时候遇到的问题大部分都是缺包以及包冲突,所以只需要注意一下这个问题就能比较好的进行集成。

在集成服务的时候建议是,先把 Flink 和 ChunJun 进行集成,确保服务能够正常启用以后再进行 Dinky 的集成,这样有利于快速定位查找问题,如果遇到文章之外的问题,也可以查看 Dinky 官网 FAQ | Dinky (dlink.top) chunjun 的官网 QuickStart | ChunJun 纯钧 (dtstack.github.io/chunjun/),看看是否有类似问题的解决办法作为参考。

六、用户体验

因为本人目前还是处于学习使用的过程中,所以很多功能没有好好使用,待自己研究更加透彻后希望写一篇文章,优化官网的用户手册。以下的优缺点以及建议都是目前我在使用学习的过程中遇到的问题。

优点

Dinky 最吸引我的地方应该就是 sql 编辑模版了,直接快捷键生成 sql 模版,在开发测试中屡试不爽。在集成了 ChunJun (Flinkx) 以后,能够做到多源数据的离线跑批任务及日常小批量实时任务的同步。支持各种类型的任务执行方式。

缺点

ui 上适配还有点小问题,例如:打开 F12 调整宽度后,再关闭,页面 ui 不会自适应,需要刷新。

期待改进点

1、更多的自定义异常、业务异常

2、增加新的向导模式,结合数据源,通过 webUI 可以一键引入字段或者勾选需要的字段,生成 Flink Sql 的一大部分配置

CREATE TABLE 表名
(
-- 页面勾选字段,字段从元数据直接拉取
  id          bigint,
  classify_code     STRING,
  sorted     int,
  classify_name   STRING,
  is_deleted    int,
  gmt_create        timestamp(9),
  gmt_modified timestamp(9),
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
-- 从选择的数据中获取
     'connector' = 'mysql-x',
     'url' = 'jdbc:mysql://192.168.31.106:3306/test?useSSL=false',
     'table-name' = 'datasource_classify',
     'username' = 'root',
     'password' = 'root'
    ,
-- 其它非主要配置有用户自己填写
    );

3、sql 历史版本管理,目前我已经提交 Feature 并被合并到 0.6.5 版本中。

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=szalykfz

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术 qun」,交流最新开源技术信息,qun 号码:30537511,项目地址:https://github.com/DTStack/chunjun

目录
相关文章
|
9天前
|
运维 Cloud Native Devops
云原生时代的DevOps实践:自动化、持续集成与持续部署
【9月更文挑战第3天】未来,随着人工智能、大数据等技术的不断融入,DevOps实践将更加智能化和自动化。我们将看到更多创新的技术和工具涌现出来,为软件开发和运维带来更多便利和效益。同时,跨团队协作和集成也将得到进一步加强,推动软件开发向更加高效、可靠和灵活的方向发展。
|
7天前
|
Devops jenkins Shell
DevOps实践:持续集成与持续部署(CI/CD)的探索之旅
【9月更文挑战第3天】在软件开发的世界里,DevOps已经成为了提升效率、加速产品迭代的关键。本文将深入浅出地探讨DevOps文化中的核心实践——持续集成(Continuous Integration,CI)和持续部署(Continuous Deployment,CD),并展示如何通过实际操作来优化开发流程。我们将一起踏上这段旅程,解锁自动化的魅力,让代码更流畅地转化为价值。
|
12天前
|
Java Devops 持续交付
探索Java中的Lambda表达式:简化代码,提升效率DevOps实践:持续集成与部署的自动化之路
【8月更文挑战第30天】本文深入探讨了Java 8中引入的Lambda表达式如何改变了我们编写和管理代码的方式。通过简化代码结构,提高开发效率,Lambda表达式已成为现代Java开发不可或缺的一部分。文章将通过实际例子展示Lambda表达式的强大功能和优雅用法。
|
12天前
|
存储 消息中间件 前端开发
Web2py框架下的神秘力量:如何轻松集成第三方API,让你的应用不再孤单!
【8月更文挑战第31天】在开发现代Web应用时,常需集成第三方服务如支付网关、数据存储等。本文将指导你使用Web2py框架无缝接入第三方API。通过实例演示从注册获取API密钥、创建控制器、发送HTTP请求到处理响应的全过程。利用`requests`库与Web2py的内置功能,轻松实现API交互。文章详细介绍了如何编写RESTful控制器,处理API请求及响应,确保数据安全传输。通过本教程,你将学会如何高效整合第三方服务,拓展应用功能。欢迎留言交流心得与建议。
26 1
|
12天前
|
监控 安全 Devops
DevOps实践:持续集成和部署的自动化之旅
【8月更文挑战第30天】在软件开发的快节奏世界中,DevOps已成为推动项目成功的关键因素。本文将深入探讨如何通过持续集成(CI)和持续部署(CD)实现自动化,以加速开发流程、提升软件质量并确保快速交付。我们将从基础概念出发,逐步过渡到实际操作,最后讨论如何克服实施过程中的挑战。
|
11天前
|
开发者 C# UED
WPF与多媒体:解锁音频视频播放新姿势——从界面设计到代码实践,全方位教你如何在WPF应用中集成流畅的多媒体功能
【8月更文挑战第31天】本文以随笔形式介绍了如何在WPF应用中集成音频和视频播放功能。通过使用MediaElement控件,开发者能轻松创建多媒体应用程序。文章详细展示了从创建WPF项目到设计UI及实现媒体控制逻辑的过程,并提供了完整的示例代码。此外,还介绍了如何添加进度条等额外功能以增强用户体验。希望本文能为WPF开发者提供实用的技术指导与灵感。
22 0
|
11天前
|
测试技术 Java Spring
Spring 框架中的测试之道:揭秘单元测试与集成测试的双重保障,你的应用真的安全了吗?
【8月更文挑战第31天】本文以问答形式深入探讨了Spring框架中的测试策略,包括单元测试与集成测试的有效编写方法,及其对提升代码质量和可靠性的重要性。通过具体示例,展示了如何使用`@MockBean`、`@SpringBootTest`等注解来进行服务和控制器的测试,同时介绍了Spring Boot提供的测试工具,如`@DataJpaTest`,以简化数据库测试流程。合理运用这些测试策略和工具,将助力开发者构建更为稳健的软件系统。
21 0
|
11天前
|
Java Spring UED
Spring框架的异常处理秘籍:打造不败之身的应用!
【8月更文挑战第31天】在软件开发中,异常处理对应用的稳定性和健壮性至关重要。Spring框架提供了一套完善的异常处理机制,包括使用`@ExceptionHandler`注解和配置`@ControllerAdvice`。本文将详细介绍这两种方式,并通过示例代码展示其具体应用。`@ExceptionHandler`可用于控制器类中的方法,处理特定异常;而`@ControllerAdvice`则允许定义全局异常处理器,捕获多个控制器中的异常。
29 0
|
11天前
|
测试技术 持续交付 开发者
Xamarin 高效移动应用测试最佳实践大揭秘,从框架选择到持续集成,让你的应用质量无敌!
【8月更文挑战第31天】竞争激烈的移动应用市场,Xamarin 作为一款优秀的跨平台开发工具,提供了包括单元测试、集成测试及 UI 测试在内的全面测试方案。借助 Xamarin.UITest 框架,开发者能便捷地用 C# 编写测试案例,如登录功能测试;通过 Xamarin 模拟框架,则可在无需真实设备的情况下模拟各种环境测试应用表现;Xamarin.TestCloud 则支持在真实设备上执行自动化测试,确保应用兼容性。结合持续集成与部署策略,进一步提升测试效率与应用质量。掌握 Xamarin 的测试最佳实践,对确保应用稳定性和优化用户体验至关重要。
24 0
|
11天前
|
数据库 Java 数据库连接
Struts 2 与 Hibernate 的完美邂逅:如何无缝集成两大框架,轻松玩转高效 CRUD 操作?
【8月更文挑战第31天】本文通过具体示例介绍了如何在 Struts 2 中整合 Hibernate,实现基本的 CRUD 操作。首先创建 Maven 项目并添加相关依赖,接着配置 Hibernate 并定义实体类及其映射文件。然后创建 DAO 接口及实现类处理数据库操作,再通过 Struts 2 的 Action 类处理用户请求。最后配置 `struts.xml` 文件并创建 JSP 页面展示用户列表及编辑表单。此示例展示了如何配置和使用这两个框架,使代码更加模块化和可维护。
21 0

热门文章

最新文章