基于 RocketMQ Connect 构建数据流转处理平台

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
可观测可视化 Grafana 版,10个用户账号 1个月
简介: 基于 RocketMQ Connect 构建数据流转处理平台

作者:周波,阿里云智能高级开发工程师, Apache RocketMQ Committer


01 从问题中来的 RocketMQ Connect


在电商系统、金融系统及物流系统,我们经常可以看到 RocketMQ 的身影。原因不难理解,随着数字化转型范围的扩大及进程的加快,业务系统的数据也在每日暴增,此时为了保证系统的稳定运行,就需要把运行压力分担出去。RocketMQ 就担任着这样的角色,它的异步消息处理与高并发读写能力,决定了系统底层的重构不会影响上层应用的功能。而 RocketMQ 的另一个优势——可伸缩能力,使系统在面临流量的不确定性时,实现对流量的缓冲处理。此外,RocketMQ 的顺序设计特性使其成为一个天然的排队引擎,例如,三个应用同时对一个后台引擎发起请求,确保不引起“撞车”事故。因此,RocketMQ 被用在异步解耦、削峰填谷以及事务消息等场景中。


但是,数字化转型浪潮也带来了更多用户对数据价值的关注——如何让数据产生更大利用价值?RocketMQ 自身不具备数据分析能力,但是有不少用户希望从 RocketMQ Topic 中获取数据并进行在线或离线的数据分析。然而,使用市面上的数据集成或数据同步工具,将 RocketMQ Topic 数据同步到一些分析系统中虽然是一种可行方案,却会引入新的组件,造成数据同步的链路较长,时延相对较高,用户体验不佳。


举个例子,假设业务场景中使用 OceanBase 作为数据存储,同时希望将这些数据同步到 Elasticsearch 进行全文搜索,有两种可行的数据同步方案。


方案一:从 OceanBase 中获取数据,写入 Elasticsearch 组件并进行数据同步,在数据源较少时此方案没什么问题,一旦数据增多,开发和维护都非常复杂,此时就要用到第二种方案。

image.png

方案二:引入消息中间件对上下游进行解藕,这能解决第一种方案的问题,但是一些较为复杂的问题还没有完全解决。比如,如何将数据从源数据同步到目标系统并保证高性能,如果保证同步任务的部分节点挂掉,数据同步依然正常进行,节点恢复依然可以断点续传,同时随着数据管道的增多,如何管理数据管道也变得十分困难。

image.png

总的来说,数据集成过程中的挑战主要有五个。


挑战一:数据源多,市面上可能有上百个数据源,且各数据源的系统差异较大,实现任意数据源之间的数据同步工作量较大,研发周期很长。


挑战二:高性能问题,如何高效地从源数据系统同步到目的数据系统,并保障其性能。


挑战三:高可用问题,即 Failover 能力,当一个节点挂掉是否这个节点的任务就停止了,任务重新启动是否还可以断点续传。


挑战四:弹性扩缩容能力,根据系统流量动态增加或减少节点数量,既能通过扩容满足高峰期业务,也能在低峰期缩减节点,节省成本。


挑战五:数据管道的管理运维,随着数据管道的增多,运维监控的数据管道也会变得越来越复杂,如何高效管理监控众多的同步任务。


面对上述挑战 RocketMQ 如何解决?

第一,标准化数据集成 API (Open Messaging Connect API)。在 RocketMQ 生态中增加 Connect 组件,一方面对数据集成过程抽象,抽象标准的数据格式以及描述数据的 Schema,另一方面对同步任务进行抽象,任务的创建、分片都抽象成一套标准化的流程。


第二,基于标准的 API 实现 Connect Runtime。Runtime 提供了集群管理、配置管理、位点管理、负载均衡相关的能力,拥有了这些能力,开发者或者用户就只需要关注数据如何获取或如何写入,从而快速构建数据生态,如与OceanBase、MySQL、Elasticsearc 等快速建立连接,搭建数据集成平台。整个数据集成平台的构建也非常简单,通过 Runtime 提供的 RESTFull API 进行简单调用即可。


第三,提供完善的运维工具,方便管理同步任务,同时提供丰富的 Metrics 信息,方便查看同步任务的 TPS、流量等信息。


02 RocketMQ Connect 两大使用场景


这里为大家整理了 RocketMQ Connect 的两大使用场景。


场景一,RocketMQ 作为中间媒介,可以将上下游数据打通,比如在新旧系统迁移的过程中,如果在业务量不大时使用 MySQL 就可以满足业务需求,而随着业务的增长,MySQL 性能无法满足业务要求时,需要对系统进行升级,选用分布式数据库OceanBase 提升系统性能。


如何将旧系统数据无缝迁移到 OceanBase 中呢?在这个场景中 RocketMQ  Connect 就可以发挥作用,RocketMQ Connect 可以构建一个从 MySQL 到 OceanBase 的数据管道,实现数据的平滑迁移。RocketMQ Connect 还可以用在搭建数据湖、搜索引擎、ETL 平台等场景。例如将各个数据源的数据集成到 RocketMQ Topic当中,目标存储只需要对接 Elasticsearch 就可以构建一个搜索平台,目标存储如果是数据湖就可以构建一个数据湖平台。


除此之外,RocketMQ 自身也可以作为一个数据源,将一个 RocketMQ 集群的数据同步到另一个集群,可以构建 RocketMQ 多活容灾能力,这是社区正在孵化的Replicator可以实现的能力。

image.png

场景二,RocketMQ 作为端点。RocketMQ 的生态中提供了流计算能力组件 -RocketMQ Streams,Connector 将各个存储系统的数据集成到 RocketMQ Topic 当中,下游使用 RocketMQ Streams 流计算的能力就可以构建一个实时的流计算平台。当然也可以配合业务系统的 Service 实现业务系统快速从其它存储统一快速获取数据的能力。

image.png

还可以将 RocketMQ 作为端点的上游,将业务消息发到 Topic 中,使用 Connector 对数据做持久化或转存的操作。

image.png

如此一来,RocketMQ 就具备数据集成能力,可以实现任意任意异构数据源之间的数据同步,同时也具备统一的集群管理、监控能力及配置化搭建数据管道搭建能力,开发者或者用户只需要专注于数据拷贝,简单配置就可以得到一个具备配置化、低代码、低延时、高可用,支持故障处理和动态扩缩容数据集成平台。


那么, RocketMQ Connect 是如何实现的呢?


03 RocketMQ Connect 实现原理


在介绍实现原理前,先来了解两个概念。


概念一,什么是 Connector(连接器)?它定义数据从哪复制到哪,是从源数据系统读取数据写入RocketMQ,这种是 SourceConnector,或从 RocketMQ 读数据写入到目标系统,这种是 SinkConnector。Connector 决定需要创建任务的数量,从 Worker 接收配置传递给任务。

image.png

概念二,什么是 Task ?Task 是 Connector 任务分片的最小分配单位,是实际将源数据源数据复制到 RocketMQ(SourceTask),或者将数据从 RocketMQ 读出写入到目标系统(SinkTask)真正的执行者,Task 是无状态的,可以动态的启停任务,多个 Task 可以并行执行,Connector 复制数据的并行度主要体现在 Task 上。一个 Task 任务可以理解为一个线程,多个 Task 则以多线程的方式运行。

image.png

通过 Connect 的 API 也可以看到 Connector 和 Task 各自的职责,Connector 实现时就已经确定数据复制的流向,Connector 接收数据源相关的配置,taskClass 获取需要创建的任务类型,通过 taskConfigs 的数量确定任务数量,并且为 Task 分配好配置。Task 拿到配置以后数据源建立连接并获取数据写入到目标存储。通过下面的两张图可以清楚的看到,Connector 和 Task 处理基本流程。

image.png

一个 RocketMQ Connect 集群中会有多个 Connector ,每个 Connector 会对应一个或多个 Task,这些任务运行在 Worker(进程)中。Worker 进程是 Connector 和 Task 运行环境,它提供 RESTFull 能力,接收 HTTP 请求,将获取到的配置传递给 Connector 和Task,它还负责启动 Connector 和 Task,保存 Connector 配置信息,保存 Task 同步数据的位点信息,除此以外,Worker 还提供负载均衡能力,Connect 集群高可用、扩缩容、故障处理主要依赖 Worker 的负责均衡能力实现的。Worker 提供服务的流程如下:

image.png

Worker 提供的服务发现及负载均衡的实现原理如下:


服务发现:

image.png

用过 RocketMQ 的开发者应该知道,它的使用很简单,就是发送和接收消息。消费模式分为集群模式和广播模式两种,集群消费模式下一个 Topic 可以有多个 Consumer 消费消息,任意一个 Consumer 的上线或下线 RocketMQ 服务端都有感知,并且还可以将客户端上下线信息通知给其它节点,利用 RocketMQ 这个特性就实现了 Worker 的服务发现。


配置/Offset同步:

image.png

Connector 的配置 /Offset 信息同步通过每个 Worker 订阅相同的 Topic,不同 Worker 使用不同的 Consumer Group 实现的, Worker 节点可以通过这种方式消费到相同Topic的所有数据,即 Connector 配置 /Offset 信息,这类似于广播消费模式,这种数据同步模式可以保证任何一个 Worker 挂掉,该Worker上的任务依旧可以在存活的 Worker 正常拉起运行 ,并且可以获取到任务对应的 Offset 信息实现断点续传, 这是故障转移以及高可用能力的基础。


负载均衡:

RocketMQ 消费场景中,消费客户端 与Topic Queue 之间有负载均衡能力,Connector 在这一部分也是类似的,只不过它负载均衡的对象不一样,Connector 是 Worker 节点和 Task 之间的负载均衡,与 RocketMQ 客户端负载均衡一样,可以根据使用场景选择不同负载均衡算法。

image.png

上文提到过 RocketMQ Connect 提供 RESTFull API能力。通过 RESTFull AP 可以创建 Connector,管理 Connector 以及查看 Connector 状态,简单列举:


  • POST /connectors/{connector name}
  • GET /connectors/{connector name}/config
  • GET /connectors/{connector name}/status
  • POST /connectors/{connector name}/stop


目前 Connector 支持单机、集群两种部署模式。集群模式至少要有两个节点,才能保证它的高可用。并且集群可以动态增加或者减少,做到了动态控制提升集群性能和节省成本节省的能力。单机模式更多方便了开发者开发测试 Connector 。


如何如何实现一个 Connector 呢?还是结合一个具体的场景看一看,例如业务数据当前是写入 MySQL 数据库中的,希望将 MySQL 中数据实时同步到数据湖 Hudi 当中。只要实现 MySQL Source Connector 、Hudi Sink Connector 这两个 Connector 即可。

image.png

下面就以 MySQLSource Connector 为例,来看一下具体的如何实现。


实现 Connector 最主要的就是实现两个 API 。第一个是 Connector API ,除了实现它生命周期相关的 API 外,还有任务如何分配,是通过 Topic、Table 还是通过数据库的维度去分。第二个API 是需要创建的 Task,Connector 通过任务分配将相关的配置信息传递给 Task, Task 拿到这些信息,例如数据库账号,密码,IP,端口后就会创建数据库连接,再通过 MySQL 提供的 BINLOG 机智获取到表的数据,将这些数据写到一个阻塞队列中。Task 有个 Poll 方法,实现 Connector 时只要调用到 Poll 方法时可以获取到数据即可,这样 Connector 就基本写完了。然后打包以 Jar 包的形式提供出来,将它加载到 Worker 的节点中。

image.png

创建 Connector 任务后, Worker 中会创建一个或者多个线程,不停的轮询 Poll 方法,从而获取到 MySQL 表中的数据,再通过 RocketMQ Producer 发送到 RocketMQ Broker中,这就是 Connector 从实现到运行的整体过程(见下图)。

04image.png

04 RocketMQ Connect 现状与未来


RocketMQ Connect 的发展历程分为三个阶段。


第一阶段:Preview 阶段

RocketMQ Connect 发展的初期也即 Preview 阶段,实现了 Open Messaging Connect API 1.0 版本,基于该版本实现了 RocketMQ Connect Runtime ,同时提供了 10+ Connector 实现(MySQL,Redis,Kafka,Jms,MongoDB……)。在该阶段,RocketMQ Connect 可以简单实现端到端的数据源同步,但功能还不够完善,不支持数据转换,序列化等能力,生态相对还比较贫乏。


第二阶段:1.0 阶段

在 1.0 阶段,Open Messaging Connect API 进行了升级,支持 Schema、Transform,Converter 等能力,在此基础上对 Connect Runtime 也进行了重大升级,对数据转换,序列化做了支持,复杂Schema也做了完善的支持。该阶段的 API、Runtime 能力已经基本完善,在此基础上,还有30+ Connecotor 实现,覆盖了 CDC、JDBC、SFTP、NoSQL、缓存 Redis、HTTP、AMQP、JMS、数据湖、实时数仓、Replicator、等 Connector 实现,还做了 Kafka Connector Adaptor 可以运行 Kafka 生态的 Connector。


第三阶段:2.0 阶段

RocketMQ Connect 当前处于这个阶段,重点发展 Connector 生态,当 RocketMQ 的 Connector 生态达到 100 + 时,RocketMQ 基本上可以与任意的一个数据系统去做连接。


目前 RocketMQ 社区正在和 OceanBase 社区合作,进行 OceanBase 到 RocketMQ Connect 的研发工作,提供 JDBC 和 CDC 两种模式接入模式,后续会在社区中发布,欢迎感兴趣的同学试用。


05 总结


RocketMQ 是一个可靠的数据集成组件,具备分布式、伸缩性、故障容错等能力,可以实现 RocketMQ 与其他数据系统之间的数据流入与流出。通过 RocketMQ Connect 可以实现 CDC,构建数据湖,结合流计算可实现数据价值。


加入 Apache RocketMQ 社区

十年铸剑,Apache RocketMQ 的成长离不开全球 800+ 位开发者的积极参与贡献,相信在下个版本你就是 Apache RocketMQ 的贡献者,在社区不仅可以结识社区大牛,提升技术水平,也可以提升个人影响力,促进自身成长。


社区 5.x 版本正在进行着如火如荼的开发,以及 30 +个 SIG(兴趣小组)等你加入,欢迎立志打造世界级分布式系统的同学通过以下方式加入社区:


  • 微信搜索:「rocketmq666」,添加开发者微信,即可加入微信群;
  • 钉钉搜索:「21982288」,即可加入钉群。

image.png

微信扫码添加小火箭进群

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 安全 物联网
MQTT常见问题之新增自定义主题后平台侧收不到发布的数据如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
2月前
|
消息中间件 物联网 网络性能优化
MQTT常见问题之mqtt 连接一直显示 Not authorized to connect如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
2月前
|
物联网 网络性能优化 API
MQTT常见问题之单个消息发送数据不能超过64k如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
3月前
|
消息中间件 存储 监控
|
3月前
|
机器学习/深度学习 开发工具
DP活动:HMI-Board以太网数据监视器(二)MQTT和LVGL
DP活动:HMI-Board以太网数据监视器(二)MQTT和LVGL
32 1
|
2月前
|
消息中间件 物联网 关系型数据库
MQTT常见问题之消息对列mqtt的历史数据查看失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
2月前
|
安全 物联网 测试技术
C++ 构建通用的MQTT接口:从理论到实践
C++ 构建通用的MQTT接口:从理论到实践
192 2
|
2月前
|
消息中间件 Kubernetes Docker
KubeSphere 核心实战之三【在kubesphere平台上部署ElasticSearch、应用商店部署RabbitMQ和应用市场部署Zookeeper】(实操篇 3/4)
KubeSphere 核心实战之三【在kubesphere平台上部署ElasticSearch、应用商店部署RabbitMQ和应用市场部署Zookeeper】(实操篇 3/4)
41 0
|
3月前
|
Java 物联网 网络安全
mqtt问题之STM32F103GPRS模组如何接入物理网平台
MQTT接入是指将设备或应用通过MQTT协议接入到消息服务器,以实现数据的发布和订阅;本合集着眼于MQTT接入的流程、配置指导以及常见接入问题的解决方法,帮助用户实现稳定可靠的消息交换。
75 2
|
3月前
|
消息中间件 Web App开发 监控
mqtt数据问题之如何实现webRTC 协议的监控视频压测
MQTT协议是一个轻量级的消息传输协议,设计用于物联网(IoT)环境中设备间的通信;本合集将详细阐述MQTT协议的基本原理、特性以及各种实际应用场景,供用户学习和参考。
64 0

相关产品

  • 云消息队列 MQ