TIS 整合 ChunJun 实操
B 站视频:
https://www.bilibili.com/video/BV1QM411z7w5/?spm_id_from=333.999.0.0
一、ChunJun 概述
ChunJun 是一款易用、稳定、高效的批流统一的数据集成框架,可基于实时计算引擎 Flink 实现多种异构数据源之间的数据同步与计算,既可以采集静态的数据,比如 MySQL,HDFS 等,也可以采集实时变化的数据,比如 Binlog,Kafka 等。
目前的核心功能包括:
・多源异构数据汇聚
作为一个开放式系统,用户可以根据需要开发新的插件,接入新的数据库类型,也可以使用内置的数据库插件。目前兼容 30 + 异构数据源的数据读写与 SQL 计算。
・断点续传
针对网络波动等异常情况,导致数据同步失败的任务,在下一次任务时自动从上一次失败的数据点进行数据同步,避免全部重跑。
・数据还原
除了 DML 操作以外,一些源端数据库的 DDL 操作也能做到同步,最大程度保证源端数据库和目标端数据库的数据统一和结构统一,做到数据还原。
・脏数据管理
数据传输过程中,因数据质量或主键约束等其他因素导致数据无法同步到目标数据库,针对这些脏数据进行统计和管理,便于后续进行脏数据分析。
・速率控制
数据同步过程中,数据传输效率是关键。ChunJun 针对各种场景,有的放矢地控制速率,最大程度保证数据同步的正常进行。
更多详见:
Github:https://github.com/DTStack/chunjun
Gitee:https://gitee.com/dtstack_dev_0/chunjun
官网:https://dtstack.github.io/chunjun/
ChunJun 架构:
二、TIS 概述
TIS 最早是基于 Solr 为用户提供一站式开箱即用、自助服务的搜索引擎中台产品。在 2020 年之前,当 Flink 和 MPP 引擎还没有形成影响力时 ,TIS 就已经在为互联网企业内部提供实时 OLAP 分析需求的服务。
为满足大数据业务需求,快速将工具栈进行整合。TIS 从 2019 年底开始转型,开始全方位支持现有实时数仓中台,从原先与搜索引擎强耦合的技术架构进行重构。从只处理搜索引擎一个场景,兼容到所有数据端的大数据生态场景。
经过 TIS 开发者的努力,现在的 TIS 内部有一套强大的元数据管理系统,根据用户需求大部分的工作脚本可自动生成(TIS 是基于模型的 DataOps,区别于市面上其他基于脚本任务的 DevOps 系统,摒弃掉所有繁琐的脚本操作),等到任务所需资源准备好,用户轻点数据系统就开始运行。
另外更为关键的是,TIS 能够将专业大数据技术人员和大数据分析师这两种角色解耦。一个实时数仓中台,使用它的人并不需要了解里面的技术细节,并不需要知道 Flink、Hive、Hadoop 的技术细节,只要知道他们是干什么的就行。基于以上,TIS 改造之初并没有针对实时数仓进行编码,而是花了将近一年时间对 TIS 产品底座进行构建,着重进行了以下几方面的构建:
插件仓库 / 热生效机制
现有行业中提供的工具栈,需要在后台系统中自行部署,TIS 则简化了这一流程,TIS 在构建项目之时会统一将第三方的依赖包进行打包,预先部署到远端仓库中,用户在 TIS 中可以查看到可用插件清单。在使用时,只需鼠标点击下载且热生效就可使用,操作体验流畅。
全流程建模
针对 ETL 的各流程进行建模,将可变因素进行抽象,抽取成一个 TIS 系统中的扩展点,统一归档到 TIS 的主工程中,在主工程中没有任何具体业务代码的实现,这样在进行具体业务逻辑实现中就不需要更改任何主工程的代码,在架构层面最大限度地贯彻了 OCP 原则。
例如以下是对 ETL 中,针对结构化(支持 JDBC 接口)和非结构化数据源的执行流程图:
构建 UI-DSL 系统
随着整合进 TIS 的功能组件越来越多,需要单独开发的 UI 工作量巨大且风格难以统一,大量重新代码维护困难,同时由于行业分工精细化,流程需要前后端工程师相互协作,导致开发效率低,如何让没有前端开发经验的后端开发工程师,能够独立且畅快地完成一个 UI 组件的开发,成为一个重要的课题。为解决这个问题,TIS 在底座中实现了一个 UI-DSL 的系统,后端开发工程师使用 JAVA 语言编写一个表单对应的 MetaData 脚本,里面定义表单的布局,输入项的校验等信息,运行期会自动将 MetaData 脚本渲染成前端的表单,从而完美解决这个课题。
@FormField(ordinal = 3, // 表单中的排位顺序 type = FormFieldType.INPUTTEXT // 表单中控件类型 , validate = {Validator.require, Validator.identity}) // 输入项的校验规则 public String dbName;
DataSourceFactory.json
{ "dbName": { "label": "数据库名", "help": "数据库名,创建JDBC实例时用" }}
三、整合 ChunJun 完善 TIS 生态
经过几个月时间的研发,TIS V3.6.0-alpha 版本终于发布了。该版本的最大亮点,即整合了大数据领域数据同步工具的翘楚 ChunJun,将 TIS 的业务能力提升到了新高度。
TIS 的最新版本:
https://github.com/qlangtech/tis/releases/tag/v3.6.0-alpha
早在 V3.6.0-alpha 之前,TIS 已经整合了 Alibaba DataX 和 Flink-CDC。离线批量同步利用 DataX 组件实现,而在实时数据变更 Source 组件方面,TIS 是基于 Flink-CDC 来实现的。至于 Sink 部分,则一直是基于各种数据端提供的生态 API 包经过二次开发完成的。
其中存在的问题是,开发周期长,调试困难,例如,仅仅为了实现 StarRocks 一个 Sink 端实现一个基于 StreamFunction 的 Sink 实现,连开发带测试花去了整整三个星期的时间。
直到整合 ChunJun 之后才解决了这些问题。ChunJun 已经很好地支持了大数据领域的大部分数据端,包括 Source 和 Sink。它的 Source 端基于 Polling 轮询机制来实现,相较与 Flink CDC 实现的 Source 端是有自己的特色的。
例如,并不是所有的端都支持类似 MySQL binlog 这样的实时同步机制,即使支持类似 Oracle 的 LogMiner,如需开启,也需要专业 Oracle DBA 协助,不然设置权限就会吓退很多用户。而基于 Polling 机制的实时更新订阅却可以支持所有的 Source 端,只要实现了 JDBC 接口就行。
所以 ChunJun 的 Source 端通用性非常好,比之于 Flink CDC 的唯一劣势是实时性要低,不过一般在大部份 OLAP 的场景下用户对实时性的要求并没有那么高,所以一般情况下推荐使用 ChunJun 的 Source 来监听实时数据变更。
另外,ChunJun 的 Sink 端实现也是一大特色,一般情况下数据端的生态产品中会提供 Flink Sink 的实现,例如:ElasticSearch 的 Flink 官网提供了一个基于 SinkFunction 的实现,StarRocks 在官网也提供了 Sink 实现。但是各家实现方式各不相同,没有一个统一的抽象模型。另外各厂商提供的实现中基本上只是一些半成品,像容灾、监控等都没有提供,导致 TIS 在整合各家 Sink 端时着实花了不少精力且很难做得完美。
因此在 TIS v3.6.0 中利用 ChunJun v1.12.5 全面改写了 TIS 原有的 Sink 端实现,由于 ChunJun 实现是一个封装好并且已经在生产环境中经过检验的,并且在实现方式上已经通过统一建模,每种端的接入方式可以统一,对 TIS 来说大大提高了整合开发效率,而且将容灾、监控、脏数据管理也一并实现。
ChunJun 支持的 Connector 端非常丰富,TIS v3.6.0 中只是拣取了几个用户高频使用的端来封装,其他端的封装会在后续版本中逐步实现。以下是 v3.6.0 版本中实现的端类型:
四、TIS 是如何整合 ChunJun
利用 TIS 元数据管理系统接管 ChunJun 流数据类型控制
ChunJun 流处理中构建的 RowData 实例是通过目标端 Jdbc MetaData 自动生成的(用户不需要在 JSON 配置文件中设置),内部需要通过目标端(Source/Sink)字段 JDBC 中的元数据信息的 fieldType 作为参数来映射 flink 的 DataType 实例,调用的接口是 com.dtstack.chunjun.converter.RawTypeConverter,
public interface RawTypeConverter { DataType apply(String type); }
在实际处理过程中发现,仅仅利用 JDBC col metaDatafieldType 作为参数还是不够,例如:MySQL 的表定义为 bigint,int,smallint 的整型,当用户添加 unsigned 修饰,bigint 在 Flink 中的映射类型需要从 BigIntType 变成 DataTypes.DECIMAL,原 smallint 类型需要变成 IntType,不然执行就会出错。另外像 Oracle 的 Jdbc 内部实现了一套区别于 Jdbc 标准的类型规范 oracle.jdbc.OracleTypes,当得到 Oracle 的类型之后需要归一化成 Jdbc 的类型 java.sql.Types,不然没法正常执行。
类型映射虽然很简单,但由于 Java 是强类型语言,在流处理执行过程中稍有不慎就会出现 ClassCastException,所以得格外小心地处理,因此 TIS 在 ChunJun 中引入了一个新的类型抽象 com.qlangtech.tis.plugin.ds.ColMeta 来封装 Jdbc MetaData 的列信息,在具体执行过程中可以更加细腻地控制 Flink 内部的列类型。
public interface RawTypeConverter { DataType apply(ColMeta type); } public class ColMeta implements Serializable { public final String name; public final DataType type; public final boolean pk; public ColMeta(String name, DataType type, boolean pk) { this.name = name; this.type = type; this.pk = pk; } //... } public class DataType implements Serializable { public final int type; public final int columnSize; public final String typeName; // decimal 的小数位长度 private Integer decimalDigits; public DataType(int type, String typeName, int columnSize) { this.type = type; this.columnSize = columnSize; this.typeName = typeName; } /** * is UNSIGNED */ public boolean isUnsigned() { //... } }
取代基于 JSON 配置驱动的任务变为基于元数据模型驱动任务
有了 TIS 底层元数据关系管理的支持,数据同步任务定义的大部分工作可以自动生成,用户只需要做一些辅助工作,例如,用户需要导入一个张表,表有 10 列,用户需要做的是辅助确认:对于 Source 端确认表主键,Polling 策略的轮询间隔时间及轮询列名,对于 Sink 端选取 Insert 的插入策略,这些都只需要点击鼠标就能完成,页面 UI 中的显示逻辑和 ChunJun 的规则相一致。
为 ChunJun 添加新的 TIS 扩展点
想要在 v3.6.0 版本顺利地将 ChunJun Connector 整合进 TIS,需要添加两个功能扩展点,一是为增量 Source 端表的属性设置 com.qlangtech.tis.plugins.incr.flink.chunjun.source.SelectedTabPropsExtends,二是为 Sink 端表的属性设置 com.qlangtech.tis.plugins.incr.flink.chunjun.sink.SinkTabPropsExtends
五、开源共建,繁荣生态
TIS 的构建理念是坚决避免重复造轮子,必须站在行业的巨人的肩膀上,做大数据行业中优秀工具栈的粘合剂。TIS V3.6.0alpha 有幸能按时发布,得益于行业中有像 ChunJun、DataX、Flink-CDC、Flink 这样优秀的开源项目存在 ,使得 TIS 整体可靠性得到保障。特别要感谢 Apache Flink,提供了一个强大的实时计算生态,Flink CDC、ChunJun 和 TIS 都是生长在这个生态中的茁壮成长的小树苗,每个项目都专注于自己擅长的领域,且相互补充。
临近发布,发现一个很有意思的使用场景,那就是用户可以选择基于 Flink-CDC 的 MySQL Source 插件来监听 MySQL 表的增量变更,将数据同步到以 ChunJun 构建的 Sink 中去,这样的混搭使用方式给用户带来了更多的选择自由度,也避免了在 Flink-CDC 和 ChunJun 各自的框架内部重复造轮子从而造成生态内卷。
六、拥抱 CloudNative
云原生(CloudNative)时代的到来为我们描绘了一副美好的画卷,对于终端用户来说提供了低成本、可靠的 IT 基础服务,可以专注于业务开发,这非常好。
但对于互联网技术从业者来说,似乎有隐忧,那就是互联网红利将会被阿里云这样的云厂商通吃,小厂商只有干瞪眼的份,那我们煞费苦心构建的像 TIS 这样的开源项目在云时代还有用武之地吗?其实这样的担心是多余的。
一个健康的生态,必须要保证生物多样性,生态中各个物种并不是独立,他们之间存在相互依存的关系。同样在大数据生态中如果只有像阿里云、亚马逊这样互联网大厂活得很滋润,并且构成了一个人才黑洞,把其他小厂的资源全部吸干了,想必这样的生态也不可能长远。
从本质来说,促成任何个人或组织之间的合作都有一个前提,那就是存在比较优势,就如同瞎子背瘸子相互协助前行,国家之间的合作也是,中国具有廉价劳动力和广阔的市场与发达国家的技术优势进行互补,这种合作是可持续的。
云大厂可以把昂贵的互联网基础设置,用集约化采购的规模优势大大地降低成本,然后用技术手段将这些设备云化成 IAAS 服务提供给客户,小厂技术具有灵活高效与较低的技术人员薪资成本优势,以这种优势在 IAAS 之上构建 PAAS 服务,类似任务调度,实时数仓非常合适。国外也已经有成功的案例,比如 Snowflake 提供的云原生实时数仓和亚马逊等云厂商之间的合作,有同学肯定会问:"为啥亚马逊不能自己搞一个像 snowflake 呢?",其实答案前面已经提到。
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=szalykfz
添加【小袋鼠:dtstack001】入qun,免费获取大数据&开源干货
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术 qun」,交流最新开源技术信息,qun 号码:30537511,项目地址:https://github.com/DTStack