RocketMQ Streams:将轻量级实时计算引擎融合进消息系统

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
日志服务 SLS,月写入数据量 50GB 1个月
简介: 随着各行各业移动互联和云计算技术的普及发展,大数据计算已深入人心,最常见的比如 flink、spark 等。这些大数据框架,采用中心化的 Master-Slave 架构,依赖和部署比较重,每个任务也有较大开销,有较大的使用成本。RocketMQ Streams 着重打造轻量计算引擎,除了消息队列,无额外依赖,对过滤场景做了大量优化,性能提升 3-5 倍,资源节省 50%-80%。

作者 | 袁小栋、程君杰


随着各行各业移动互联和云计算技术的普及发展,大数据计算已深入人心,最常见的比如 flink、spark 等。这些大数据框架,采用中心化的 Master-Slave 架构,依赖和部署比较重,每个任务也有较大开销,有较大的使用成本。RocketMQ Streams 着重打造轻量计算引擎,除了消息队列,无额外依赖,对过滤场景做了大量优化,性能提升 3-5 倍,资源节省 50%-80%。


RocketMQ Streams 适合大数据量->高过滤->轻窗口计算的场景,核心打造轻资源,高性能优势,在资源敏感场景中有很大优势,最低 1core,1g 可部署,建议的应用场景(安全,风控,边缘计算,消息队列流计算)。


RocketMQ Streams 兼容 Blink(Flink 的阿里内部版本) 的 SQL,UDF/UDTF/UDAF,多数 Blink 任务可以直接迁移成 RocketMQ Streams 任务。将来还会发布和 Flink 的融合版本,RocketMQ Streams 可以直接发布成 Flink 任务,既可以享有 RocketMQ Streams 带来的高性能、轻资源,还可以和现有的 Flink 任务统一运维和管理。

什么是 RocketMQ Streams?


本章节从基础简介、设计思路和特点三方面对 RocketMQ Streams 进行整体介绍。


1、RocketMQ Streams 简介


1)它是一个 Lib 包,启动即运行,和业务直接集成;2)它具备 SQL 引擎能力,兼容 Blink SQL 语法,兼容 Blink UDF/UDTF/UDAF;3)它包含 ETL 引擎,可以无编码实现数据的 ETL、过滤和转存;4)它基于数据开发 SDK,大量实用组件可直接使用,如:Source、sink、script、filter、lease、scheduler、configurable 不局限流的场景。


2、RocketMQ Streams 的特点


RocketMQ streams 基于上述的实现思路,可以看到它有以下几个特点:

  • 轻量

1 核 1g 就可以部署,依赖较轻,在测试场景下用 Jar 包直接写个 main 方法就可以运行,在正式环境下最多依赖消息队列和存储(其中存储是可选的,主要是为了分片切换时的容错)。


  • 高性能

实现高过滤优化器,包括前置指纹过滤,同源规则自动归并,hyperscan 加速,表达式指纹等,比优化前性能提升 3-5 倍,资源节省 50%以上。


  • 维表 JOIN(千万数据量维表支持)

设计高压缩内存存储数据,无 java 头部和对齐的开销,存储接近原始数据大小,纯内存操作,性能最大化,同时对于 Mysql 提供了多线程并发加载,提高加载维表的速度。


  • 高扩展的能力

1)Source 可按需扩展,已实现:RocketMQ,File,Kafka;2)Sink 可按需扩展,已实现:RocketMQ,File,Kafka,Mysql,ES;3)可按 Blink 规范扩展 UDF/UDTF/UDAF;4)提供了更轻的 UDF/UDTF 扩展能力,不需要任何依赖就可以完成函数的扩展。

  • 提供了丰富的大数据的能力

包括精确计算一次灵活的窗口,双流 join,统计,开窗,各种转换过滤,满足大数据开发的各种场景,支持弹性容错的能力。


RocketMQ Streams的使用


RocketMQ Streams 对外提供两种 SDK,一种是 DSL SDK,一种是 SQL SDK,用户可以按需选择;DSL SDK 支持实时场景 DSL 语义;SQL SDK 兼容 Blink(Flink 的阿里内部版本) SQL 的语法,多数 Blink SQL 可以通过 RocketMQ Streams 运行。
接下来,我们详细的介绍一下这两种 SDK。
0

1、环境要求


1)JDK 1.8 版本以上;2)Maven 3.2 版本以上。

02

2、DSL SDK


利用 DSL SDK 开发实时任务时,需要做如下的一些准备工作:


  • 依赖准备
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-streams-clients</artifactId>
    <version>1.0.0-SNAPSHOT</version>
</dependency>


准备工作完成后,就可以直接开发自己的实时程序。


  • 代码开发
DataStreamSource source=StreamBuilder.dataStream("namespace","pipeline");
source.fromFile("~/admin/data/text.txt",false)
    .map(message->message + "--")
    .toPrint(1)
    .start();


其中:

1)Namespace 是业务隔离的,相同的业务可以写成相同的Namespace。相同的Namespace 在任务调度里可以跑在进程里,也可以共享一些配置;

2)pipelineName 可以理解成就是 job name ,唯一区分 job;

3)DataStreamSource 主要是创建 Source,然后这个程序运行起来,最终的结果就是在原始的消息里面会加"--",然后把它打印出来。


  • 丰富的算子

RocketMQ streams 提供了丰富的算子, 包括:

1)source 算子:包括 fromFile、fromRocketMQ、fromKafka 以及可以自定义 source 来源的 from 算子;

2)sink 算子: 包括 toFile、toRocketMQ、toKafka、toDB、toPrint、toES 以及可以自定义 sink 的 to 算子;

3)action 算子:包括 Filter、Expression、Script、selectFields、Union、forEach、Split、Select、Join、Window 等多个算子。


  • 部署执行

基于 DSL SDK 完成开发,通过下面命令打成 jar 包,执行 jar,或直接执行任务的 main 方法。

mvn -Prelease-all -DskipTests clean install -U
java -jar jarName mainClass &


03

3、SQL SDK


  • 依赖准备
  <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>rsqldb-clients</artifactId>
      <version>1.0.0-SNAPSHOT</version>
</dependency>

  • 代码开发

首先开发业务逻辑代码, 可以保存为文件也可以直接使用文本:

CREATE FUNCTION json_concat as 'xxx.xxx.JsonConcat';
CREATE TABLE `table_name` (
    `scan_time` VARCHAR,
    `file_name` VARCHAR,
    `cmdline` VARCHAR,
) WITH (
     type='file',
     filePath='/tmp/file.txt',
     isJsonData='true',
     msgIsJsonArray='false'
);
-- 数据标准化
create view data_filter as
select
     *
from (
    select
        scan_time as logtime
        , lower(cmdline) as lower_cmdline
        , file_name as proc_name
    from
        table_name
)x
where
    (
        lower(proc_name) like '%.xxxxxx'
        or lower_cmdline  like 'xxxxx%'
        or lower_cmdline like 'xxxxxxx%'
        or lower_cmdline like 'xxxx'
        or lower_cmdline like 'xxxxxx'
    )
;
CREATE TABLE `output` (
     `logtime` VARCHAR
    , `lower_cmdline` VARCHAR
    , `proc_name` VARCHAR
) WITH (
    type = 'print'
);
insert into output
select
    *
from
    aegis_log_proc_format_raw
;


其中:

1)CREATE FUNCTION:引入外部的函数来支持业务逻辑, 包括 flink 以及系统函数;

2)CREATE Table:创建 source/sink;

3)CREATE VIEW:执行字段转化,拆分,过滤;

4)INSERT INTO:数据写入 sink;

5)函数:内置函数,udf 函数。


  • SQL 扩展

RocketMQ streams 支持三种 SQL 扩展能力,具体实现细节请看:

1)通过 Blink UDF/UDTF/UDAF 扩展 SQL 能力;

2)通过 RocketMQ streams 扩展 SQL 能力,只要实现函数名是 eval 的 java bean 即可;

3)通过现有 java 代码扩展 SQL 能力,create function 函数名就是 java 类的方法名。


  • SQL 执行

你可以从下载最新的 RocketMQ Streams 代码并构建。

cd rsqldb/
mvn -Prelease-all -DskipTests clean install -U
cp rsqldb-runner/target/rocketmq-streams-sql-{版本号}-distribution.tar.gz 部署的目录

解压 tar.gz 包, 进入目录结构

tar -xvf rocketmq-streams-{版本号}-distribution.tar.gz
cd rocketmq-streams-{版本号

其目录结构如下:

1)bin 指令目录,包括启动和停止指令

2)conf 配置目录,包括日志配置以及应用的相关配置文件

3)jobs 存放 sql,可以两级目录存储

4)ext 存放扩展的 UDF/UDTF/UDAF/Source/Sink

5)lib 依赖包目录

6)log 日志目录


  • 执行 SQL
#指定 sql 的路径,启动实时任务
bin/start-sql.sh sql_file_path

  • 执行多个 SQL

如果想批量执行一批 SQL,可以把 SQL 放到 jobs 目录,最多可以有两层,把 sql 放到对应目录中,通过 start 指定子目录或 sql 执行任务。


  • 任务停止
# 停止过程不加任何参数,则会将目前所有运行的任务同时停止
bin/stop.sh
# 停止过程添加了任务名称, 则会将目前运行的所有同名的任务都全部停止
bin/stop.sh sqlname

  • 日志查看

目前所有的运行日志都会存储在 log/catalina.out 文件中。


架构设计及原理分析


1、RocketMQ Streams设计思路


在了解完  RocketMQ Streams 的基本简介,接下来,我们看下 RocketMQ Streams 的设计思路,设计思路主要从设计目标和策略两个方面来介绍:


  • 设计目标

1)依赖少,部署简单,1 核 1g 单实例可部署,可随意扩展规模;

2)打造场景优势,重点打造大数据量->高过滤->轻窗口计算的场景,功能覆盖度要全,实现需要的大数据特性:Exactly-ONCE、灵活的窗口(滚动、滑动、会话窗口);

3)要在保持低资源的前提下,对高过滤有性能突破,打造性能优势;

4)兼容 Blink SQL,UDF/UDTF/UDAF,让非技术人员更容易上手。


  • 策略(适配场景:大数据量>高过滤/ETL>低窗口计算)

1)采用 shared-nothing 的分布式架构设计,依赖消息队列做负载均衡和容错机制,单实例可启动,增加实例实现能力扩展,并发能力取决于分片数;

2)利用消息队列的分片做 shuffle,利用消息队列负载均衡实现容错;

3)利用存储实现状态备份,实现 Exactly-ONCE 的语义。用结构化远程存储实现快速启动,不等本地存储恢复。

4)重力打造过滤优化器,通过前置指纹过滤,同源规则自动归并,hyperscan 加速,表达式指纹提高过滤性能

image.gif

2.png


2、RocketMQ Streams Source 的实现


1)Source 要求实现最少消费一次的语义,系统通过 checkpoint 系统消息实现,在提交 offset 前发送 checkpoint 消息,通知所有算子刷新内存。2)Source 支持分片的自动负载和容错

  • 数据源在分片移除时,发送移除系统消息,让算子完成分片清理工作;
  • 当有新分片时,发送新增分片消息,让算子完成分片初始化。

3)数据源通过 start 方法,启动 consuemr 获取消息;4)原始消息经过编码,附加头部信息包装成 Message 投递给后续算子。


3.pngimage.gif


3、RocketMQ Streams Sink 的实现


1)Sink 是实时性和吞吐的一个结合;

2)实现一个 sink 只要继承 AbstractSink 类实现 batchInsert 方法即可。batchInsert 的含义是一批数据写入存储,需要子类调用存储接口实现,尽量应用存储的批处理接口,提高吞吐;

3)常规的使用方式是写 message->cache->flush->存储的方式,系统会严格保证每次批次写入存储的量不超过 batchsize 的量,如果超过了,会拆分成多批写入;


4.pngimage.gif

4)Sink 有一个 cache,数据默认写 cache,批次写入存储,提高吞吐(一个分片一个 cache);

5)可以开启自动刷新,每个分片会有一个线程,定时刷新 cache 数据到存储,提高实时性。实现类:DataSourceAutoFlushTask;

6)通过调用 flush 方法刷新 cache 到存储;

7)Sink 的 cache 会有内存保护,当 cache 的消息条数>batchSize,会强制刷新,释放内存。

04


4、RocketMQ Streams Exactly-ONCE 实现


1)Source 确保在 commit offset 时,会发送 checkpoint 系统消息,收到消息的组件会完成存盘操作,消息至少消费一次;

2)每条消息会有消息头部,里面封装了 queueld 和 offset;

3)组件在存储数据时,会把 queueld 和处理的最大 offset 存储下来,当有消息重复时,根据 maxoffset 去重;

4)内存保护,一个 checkpoint 周期可能有多次 flush(条数触发),保障内存占用可控。


5.png


5、RocketMQ Streams Window 实现方式


1)支持滚动、滑动和会话窗口,支持事件时间和自然时间(消息进入算子的时间);

2)支持 Emit 语法,可以在触发前或触发后,每隔 n 段时间,更新一次数据;比如1小时窗口,窗口触发前希望每分钟看到最新结果,窗口触发后希望不丢失迟到一天内的数据,且每 10 分钟更新数据。

3)支持高性能模式和高可靠模式,高性能模式不依赖远程存储,但在分片切换时,有丢失窗数据的风险;

4)快速启动,无需等待本地存储恢复,在发生错误或分片切换时,异步从远程存储恢复数据,同时直接访问远程存储计算;

5)利用消息队列负载均衡,实现扩容缩容容,每个 queue 是一份组,一个分组同一刻只被一台机器消费;

6)正常计算依赖本地存储,具备 flink 相似的计算性能。

image.gif

6.png


RocketMQ Streams 在安全场景的最佳实践

01


1、背景


从公共云转战专有云,遇到了新的问题。因为专有云像大数据这种 SaaS 服务是非必须输出的,且最小输出规模也比较大,用户成本会增加很多,难落地,导致安全能力无法快速同步到专有云。

image.gif

7.png

02


2、解决办法


  • RocketMQ Streams 在云安全的应用-流计算

1)基于安全场景打造轻量级计算引擎,基于安全高过滤的场景特点,可以针对高过滤场景优化,然后再做较重的统计、窗口、join 操作,因为过滤率比较高,可以用更轻的方案实现统计和 join 操作;

2)SQL 和引擎都可热升级。


  • 业务结果

1)规则覆盖:自建引擎,覆盖 100%规则(正则、join、统计);

2)轻资源,内存是公共云引擎的 1/24,cpu 是 1/6,依赖过滤优化器,资源不随规则线性增加,新增规则无资源压力,通过高压缩表,支持千万情报;

3)SQL 发布,通过 c/s 部署模式,SQL 引擎热发布,尤其护网场景,可快速上线规则;

4)性能优化,对核心组件进行专题性能优化,保持高性能,每实例(2g,4 核,41 规则)5000qps 以上。

RocketMQ Streams 的未来规划


1、打造 RocketMQ 一体化计算能力


1)和 RocketMQ 整合,去除 DB 依赖,融合 RocketMQ KV;

2)和 RocketMQ 混部,支持本地计算,利用本地特点,打造高性能;

3)打造边缘计算最佳实践02


2、Connector 增强


1)支持 pull 消费方式,checkpoint 异步刷新;

2)兼容 blink/flink connector。03


3、ETL 能力建设


1)增加文件,syslog 的数据接入能力;

2)兼容 Grok 解析,增加常用日志的解析能力;

3)打造日志 ETL 的最佳实践04


4、稳定性和易用性打造


1)Window 多场景测试,提升稳定性,性能优化;

2)补充测试用例,文档,应用场景。


开源地址



以上是本次对 RocketMQ Stream 的整体介绍,希望对大家有所帮助和启发。

相关文章
|
7月前
|
存储 消息中间件 Kafka
基于 Flink 的中国电信星海时空数据多引擎实时改造
本文整理自中国电信集团大数据架构师李新虎老师在Flink Forward Asia 2024的分享,围绕星海时空智能系统展开,涵盖四个核心部分:时空数据现状、实时场景多引擎化、典型应用及未来展望。系统日处理8000亿条数据,具备亚米级定位能力,通过Flink多引擎架构解决数据膨胀与响应时效等问题,优化资源利用并提升计算效率。应用场景包括运动状态识别、个体行为分析和群智感知,未来将推进湖仓一体改造与三维时空服务体系建设,助力数字化转型与智慧城市建设。
810 3
基于 Flink 的中国电信星海时空数据多引擎实时改造
|
7月前
|
消息中间件 Java 数据库
RocketMQ实战—9.营销系统代码初版
本文主要介绍了实现营销系统四大促销场景的代码初版:全量用户推送促销活动、全量用户发放优惠券、特定用户推送领取优惠券消息、热门商品定时推送。
RocketMQ实战—9.营销系统代码初版
|
7月前
|
消息中间件 搜索推荐 调度
RocketMQ实战—8.营销系统业务和方案介绍
本文详细介绍了电商营销系统的业务流程、技术架构及挑战解决方案。涵盖核心交易与支付后履约流程,优惠券和促销活动的发券、领券、用券、销券机制,以及会员与推送的数据库设计。技术架构基于Nacos服务注册中心、Dubbo RPC框架、RocketMQ消息中间件和XXLJob分布式调度工具,实现系统间高效通信与任务管理。针对千万级用户量下的推送和发券场景,提出异步化、分片处理与惰性发券等优化方案,解决高并发压力。同时,通过RocketMQ实现系统解耦,提升扩展性,并利用XXLJob完成爆款商品推荐的分布式调度推送。整体设计确保系统在大规模用户场景下的性能与稳定性。
RocketMQ实战—8.营销系统业务和方案介绍
|
5月前
|
Java 调度 流计算
基于Java 17 + Spring Boot 3.2 + Flink 1.18的智慧实验室管理系统核心代码
这是一套基于Java 17、Spring Boot 3.2和Flink 1.18开发的智慧实验室管理系统核心代码。系统涵盖多协议设备接入(支持OPC UA、MQTT等12种工业协议)、实时异常检测(Flink流处理引擎实现设备状态监控)、强化学习调度(Q-Learning算法优化资源分配)、三维可视化(JavaFX与WebGL渲染实验室空间)、微服务架构(Spring Cloud构建分布式体系)及数据湖建设(Spark构建实验室数据仓库)。实际应用中,该系统显著提升了设备调度效率(响应时间从46分钟降至9秒)、设备利用率(从41%提升至89%),并大幅减少实验准备时间和维护成本。
344 0
|
7月前
|
消息中间件 NoSQL Java
RocketMQ实战—10.营销系统代码优化
本文主要介绍了如何对营销系统的四大促销场景的代码进行优化,包括:全量用户推送促销活动、全量用户发放优惠券、特定用户推送领取优惠券消息、热门商品定时推送。
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
1054 2
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
3596 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
|
10月前
|
SQL 存储 大数据
Flink 基础详解:大数据处理的强大引擎
Apache Flink 是一个分布式流批一体化的开源平台,专为大规模数据处理设计。它支持实时流处理和批处理,具有高吞吐量、低延迟特性。Flink 提供统一的编程抽象,简化大数据应用开发,并在流处理方面表现卓越,广泛应用于实时监控、金融交易分析等场景。其架构包括 JobManager、TaskManager 和 Client,支持并行度、水位线、时间语义等基础属性。Flink 还提供了丰富的算子、状态管理和容错机制,如检查点和 Savepoint,确保作业的可靠性和一致性。此外,Flink 支持 SQL 查询和 CDC 功能,实现实时数据捕获与同步,广泛应用于数据仓库和实时数据分析领域。
6985 32
|
存储 运维 监控
实时计算Flink版在稳定性、性能、开发运维、安全能力等等跟其他引擎及自建Flink集群比较。
实时计算Flink版在稳定性、性能、开发运维和安全能力等方面表现出色。其自研的高性能状态存储引擎GeminiStateBackend显著提升了作业稳定性,状态管理优化使性能提升40%以上。核心性能较开源Flink提升2-3倍,资源利用率提高100%。提供一站式开发管理、自动化运维和丰富的监控告警功能,支持多语言开发和智能调优。安全方面,具备访问控制、高可用保障和全链路容错能力,确保企业级应用的安全与稳定。
226 0
|
消息中间件 存储 负载均衡
"RabbitMQ集群大揭秘!让你的消息传递系统秒变超级英雄,轻松应对亿级并发挑战!"
【8月更文挑战第24天】RabbitMQ是一款基于AMQP的开源消息中间件,以其高可靠性、扩展性和易用性闻名。面对高并发和大数据挑战时,可通过构建集群提升性能。本文深入探讨RabbitMQ集群配置、工作原理,并提供示例代码。集群由多个通过网络连接的节点组成,共享消息队列,确保高可用性和负载均衡。搭建集群需准备多台服务器,安装Erlang和RabbitMQ,并确保节点间通信顺畅。核心步骤包括配置.erlang.cookie文件、使用rabbitmqctl命令加入集群。消息发布至任一节点时,通过集群机制同步至其他节点;消费者可从任一节点获取消息。
235 2

热门文章

最新文章