本文作者:周波,阿里云智能高级开发工程师, Apache RocketMQ Committer 。
01 从问题中来的RocketMQ Connect
在电商系统、金融系统及物流系统,我们经常可以看到 RocketMQ 的身影。原因不难理解,随着数字化转型范围的扩大及进程的加快,业务系统的数据也在每日暴增,此时为了保证系统的稳定运行,就需要把运行压力分担出去。RocketMQ就担任着这样的角色,它的异步消息处理与高并发读写能力,决定了系统底层的重构不会影响上层应用的功能。而RocketMQ的另一个优势——可伸缩能力,使系统在面临流量的不确定性时,实现对流量的缓冲处理。此外,RocketMQ的顺序设计特性使其成为一个天然的排队引擎,例如,三个应用同时对一个后台引擎发起请求,确保不引起“撞车”事故。因此,RocketMQ被用在异步解耦、削峰填谷以及事务消息等场景中。
但是,数字化转型浪潮也带来了更多用户对数据价值的关注——如何让数据产生更大利用价值?RocketMQ自身不具备数据分析能力,但是有不少用户希望从 RocketMQ Topic 中获取数据并进行在线或离线的数据分析。然而,使用市面上的数据集成或数据同步工具,将 RocketMQ Topic 数据同步到一些分析系统中虽然是一种可行方案,却会引入新的组件,造成数据同步的链路较长,时延相对较高,用户体验不佳。
举个例子,假设业务场景中使用 OceanBase 作为数据存储,同时希望将这些数据同步到 Elasticsearch进行全文搜索,有两种可行的数据同步方案。
方案一:从 OceanBase 中获取数据,写入 Elasticsearch组件并进行数据同步,在数据源较少时此方案没什么问题,一旦数据增多,开发和维护都非常复杂,此时就要用到第二种方案。
[]()
方案二:引入消息中间件对上下游进行解藕,这能解决第一种方案的问题,但是一些较为复杂的问题还没有完全解决。比如,如何将数据从源数据同步到目标系统并保证高性能,如果保证同步任务的部分节点挂掉,数据同步依然正常进行,节点恢复依然可以断点续传,同时随着数据管道的增多,如何管理数据管道也变得十分困难。
[]()
总的来说,数据集成过程中的挑战主要有五个。
挑战一: 数据源多,市面上可能有上百个数据源,且各数据源的系统差异较大,实现任意数据源之间的数据同步工作量较大,研发周期很长。
挑战二: 高性能问题,如何高效地从源数据系统同步到目的数据系统,并保障其性能。
挑战三: 高可用问题,即Failover能力,当一个节点挂掉是否这个节点的任务就停止了,任务重新启动是否还可以断点续传。
挑战四: 弹性扩缩容能力,根据系统流量动态增加或减少节点数量,既能通过扩容满足高峰期业务,也能在低峰期缩减节点,节省成本。
挑战五: 数据管道的管理运维,随着数据管道的增多,运维监控的数据管道也会变得越来越复杂,如何高效管理监控众多的同步任务。
面对上述挑战 RocketMQ 如何解决?
第一,标准化数据集成 API (Open Messaging Connect API)。在 RocketMQ 生态中增加 Connect 组件,一方面对数据集成过程抽象,抽象标准的数据格式以及描述数据的 Schema,另一方面对同步任务进行抽象,任务的创建、分片都抽象成一套标准化的流程。
第二,基于标准的 API 实现 Connect Runtime。Runtime提供了集群管理、配置管理、位点管理、负载均衡相关的能力,拥有了这些能力,开发者或者用户就只需要关注数据如何获取或如何写入,从而快速构建数据生态,如与OceanBase、MySQL、Elasticsearc等快速建立连接,搭建数据集成平台。整个数据集成平台的构建也非常简单,通过 Runtime 提供的 RESTFull API 进行简单调用即可。
第三,提供完善的运维工具,方便管理同步任务,同时提供丰富的Metrics 信息,方便查看同步任务的TPS,流量等信息。
02 RocketMQ Connect 两大使用场景
这里为大家整理了 RocketMQ Connect的两大使用场景。
场景一,RocketMQ 作为中间媒介,可以将上下游数据打通,比如在新旧系统迁移的过程中,如果在业务量不大时使用 MySQL 就可以满足业务需求,而随着业务的增长,MySQL 性能无法满足业务要求时,需要对系统进行升级,选用分布式数据库OceanBase 提升系统性能。
如何将旧系统数据无缝迁移到 OceanBase 中呢?在这个场景中RocketMQ Connect 就可以发挥作用,RocketMQ Connect可以构建一个从 MySQL 到 OceanBase 的数据管道,实现数据的平滑迁移。RocketMQ Connect 还可以用在搭建数据湖、搜索引擎、ETL 平台等场景。例如将各个数据源的数据集成到 RocketMQ Topic当中,目标存储只需要对接 Elasticsearch 就可以构建一个搜索平台,目标存储如果是数据湖就可以构建一个数据湖平台。
除此之外,RocketMQ 自身也可以作为一个数据源,将一个 RocketMQ 集群的数据同步到另一个集群,可以构建 RocketMQ 多活容灾能力,这是社区正在孵化的Replicator可以实现的能力。
[]()
场景二,RocketMQ 作为端点。 RocketMQ 的生态中提供了流计算能力组件-RocketMQ Streams,Connector将各个存储系统的数据集成到RocketMQ Topic当中,下游使用 RocketMQ Streams流计算的能力就可以构建一个实时的流计算平台。当然也可以配合业务系统的Service 实现业务系统快速从其它存储统一快速获取数据的能力。
[]()
还可以将 RocketMQ 作为端点的上游,将业务消息发到 Topic 中,使用 Connector 对数据做持久化或转存的操作。
[]()
如此一来,RocketMQ 就具备数据集成能力,可以实现任意任意异构数据源之间的数据同步,同时也具备统一的集群管理、监控能力及配置化搭建数据管道搭建能力,开发者或者用户只需要专注于数据拷贝,简单配置就可以得到一个具备配置化、低代码、低延时、高可用,支持故障处理和动态扩缩容数据集成平台。
那么, RocketMQ Connect 是如何实现的呢?
03RocketMQ Connect 实现原理
在介绍实现原理前,先来了解两个概念。
概念一,什么是 Connector(连接器)? 它定义数据从哪复制到哪,是从源数据系统读取数据写入RocketMQ,这种是SourceConnector,或从RocketMQ读数据写入到目标系统,这种是SinkConnector。Connector决定需要创建任务的数量,从Worker接收配置传递给任务。
[]()
概念二,什么是 Task ? Task 是 Connector 任务分片的最小分配单位,是实际将源数据源数据复制到 RocketMQ(SourceTask),或者将数据从RocketMQ 读出写入到目标系统(SinkTask)真正的执行者,Task是无状态的,可以动态的启停任务,多个Task可以并行执行,Connector 复制数据的并行度主要体现在 Task 上。一个 Task 任务可以理解为一个线程,多个 Task 则以多线程的方式运行。
[]()
通过Connect的API也可以看到Connector和Task各自的职责,Connector实现时就已经确定数据复制的流向,Connector接收数据源相关的配置,taskClass获取需要创建的任务类型,通过taskConfigs的数量确定任务数量,并且为Task分配好配置。Task拿到配置以后数据源建立连接并获取数据写入到目标存储。通过下面的两张图可以清楚的看到,Connector和Task处理基本流程。
[]()
一个 RocketMQ Connect 集群中会有多个 Connector ,每个 Connector 会对应一个或多个 Task,这些任务运行在 Worker(进程)中。Worker进程是Connector和Task运行环境,它提供RESTFull能力,接收HTTP请求,将获取到的配置传递给Connector和Task,它还负责启动Connector和Task,保存Connector配置信息,保存Task同步数据的位点信息,除此以外,Worker还提供负载均衡能力,Connect集群高可用、扩缩容、故障处理主要依赖Worker的负责均衡能力实现的。Worker 提供服务的流程如下:
[]()
Worker 提供的服务发现及负载均衡的实现原理如下:
服务发现:
[]()
用过 RocketMQ 的开发者应该知道,它的使用很简单,就是发送和接收消息。消费模式分为集群模式和广播模式两种,集群消费模式下一个 Topic 可以有多个 Consumer 消费消息,任意一个 Consumer 的上线或下线 RocketMQ 服务端都有感知,并且还可以将客户端上下线信息通知给其它节点,利用RocketMQ这个特性就实现了 Worker的服务发现。
配置/Offset同步:
[]()
Connector 的配置/Offset信息同步通过每个 Worker订阅相同的 Topic,不同Worker 使用不同的 Consumer Group 实现的, Worker 节点可以通过这种方式消费到相同Topic的所有数据,即Connector配置/Offset信息,这类似于广播消费模式,这种数据同步模式可以保证任何一个 Worker 挂掉,该Worker上的任务依旧可以在存活的 Worker 正常拉起运行 ,并且可以获取到任务对应的 Offset 信息实现断点续传, 这是故障转移以及高可用能力的基础。
负载均衡
RocketMQ 消费场景中,消费客户端 与Topic Queue 之间有负载均衡能力,Connector 在这一部分也是类似的,只不过它负载均衡的对象不一样,Connector 是 Worker 节点和 Task之间的负载均衡,与RocketMQ客户端负载均衡一样,可以根据使用场景选择不同负载均衡算法。
[]()
上文提到过 RocketMQ Connect 提供 RESTFull API能力。通过 RESTFull AP可以创建 Connector,管理Connector 以及查看 Connector 状态,简单列举:
- POST /connectors/{connector name}
- GET /connectors/{connector name}/config
- GET /connectors/{connector name}/status
- POST /connectors/{connector name}/stop
目前 Connector 支持单机、集群两种部署模式。集群模式至少要有两个节点,才能保证它的高可用。并且集群可以动态增加或者减少,做到了动态控制提升集群性能和节省成本节省的能力。单机模式更多方便了开发者开发测试 Connector 。
如何如何实现一个 Connector呢? 还是结合一个具体的场景看一看,例如业务数据当前是写入MySQL 数据库中的,希望将MySQL中数据实时同步到数据湖Hudi当中。只要实现 MySQL Source Connector 、Hudi Sink Connector这两个 Connector 即可。
[]()
下面就以 MySQLSource Connector 为例,来看一下具体的如何实现。
实现 Connector最主要的就是实现两个 API 。第一个是 Connector API ,除了实现它生命周期相关的 API 外,还有任务如何分配,是通过 Topic、Table 还是通过数据库的维度去分。第二个API是需要创建的 Task,Connector 通过任务分配将相关的配置信息传递给 Task, Task 拿到这些信息,例如数据库账号,密码,IP,端口后就会创建数据库连接,再通过 MySQL 提供的 BINLOG 机智获取到表的数据,将这些数据写到一个阻塞队列中。Task有个 Poll 方法,实现Connector时只要调用到Poll方法时可以获取到数据即可,这样 Connector 就基本写完了。然后打包以Jar包的形式提供出来,将它加载到 Worker 的节点中。
[]()
创建 Connector 任务后, Worker 中会创建一个或者多个线程,不停的轮询 Poll方法,从而获取到MySQL表中的数据,再通过 RocketMQ Producer 发送到 RocketMQ Broker中,这就是 Connector 从实现到运行的整体过程(见下图)。
[]()
04 RocketMQ Connect 现状与未来
RocketMQ Connect 的发展历程分为三个阶段。
第一阶段:Preview 阶段
RocketMQ Connect发展的初期也即Preview阶段,实现了Open Messaging Connect API 1.0 版本,基于该版本实现了 RocketMQ Connect Runtime ,同时提供了 10+ Connector 实现(MySQL,Redis,Kafka,Jms,MongoDB……)。在该阶段,RocketMQ Connect 可以简单实现端到端的数据源同步,但功能还不够完善,不支持数据转换,序列化等能力,生态相对还比较贫乏。
第二阶段:1.0 阶段
在 1.0 阶段,Open Messaging Connect API 进行了升级,支持Schema、Transform,Converter等能力,在此基础上对 Connect Runtime 也进行了重大升级,对数据转换,序列化做了支持,复杂Schema也做了完善的支持。该阶段的 API、Runtime 能力已经基本完善,在此基础上,还有30+ Connecotor 实现,覆盖了 CDC、JDBC、SFTP、NoSQL、缓存Redis、HTTP、AMQP、JMS、数据湖、实时数仓、Replicator、等Connector实现,还做了Kafka Connector Adaptor可以运行Kafka生态的Connector。
第三阶段:2.0 阶段
RocketMQ Connect当前处于这个阶段,重点发展Connector生态,当 RocketMQ 的 Connector生态达到 100 + 时,RocketMQ 基本上可以与任意的一个数据系统去做连接。
目前 RocketMQ 社区正在和 OceanBase 社区合作,进行 OceanBase 到 RocketMQ Connect 的研发工作,提供 JDBC 和 CDC 两种模式接入模式,后续会在社区中发布,欢迎感兴趣的同学试用。
05 总结
RocketMQ是一个可靠的数据集成组件,具备分布式、伸缩性、故障容错等能力,可以实现RocketMQ与其他数据系统之间的数据流入与流出。通过 RocketMQ Connect 可以实现CDC,构建数据湖,结合流计算可实现数据价值。