RocketMQ Streams 1.1.0: 轻量级流处理再出发

简介: 作为一套全新的流式处理框架,RocketMQ Streams 如何实现流计算拓扑图构建呢?一起来了解下它的实现原理以及数据流转过程和过程中的状态变化吧~

本文作者:倪泽,Apache RocketMQ committer、RSQLDB/RocketMQ Streams Maintainer

01 背景

RocketMQ Streams是一款基于RocketMQ为基础的轻量级流计算引擎,具有资源消耗少、部署简单、功能全面的特点,目前已经在社区开源。RocketMQ Streams在阿里云内部被使用在对资源比较敏感,同时又强烈需要流计算的场景,比如在自建机房的云安全场景下。

自RocketMQ Streams开源以来,吸引了大量用户调研和试用。但是也存在一些问题,在RocketMQ Streams 1.1.0中,主要针对以下问题做出了改进和优化。

1、面向用户API不够友好,不能使用泛型,不支持自定义序列化/反序列化;

2、代码冗余,在RocketMQ Streams中存在将流处理拓扑序列化反序列化模块,RocketMQ Streams作为轻量级流处理SDK,构建好流处理节点之后应该可以直接处理数据,不存在将流处理拓扑图本地保存或者网络传输需求。

3、流处理过程不容易理解,含有大量缓存、刷新逻辑;

4、存在大量支持SQL的代码,这部分和SDK方式运行流处理任务的逻辑无关;

在RocketMQ Streams 1.1.0中,对上述问题做出了改进,期望能带来更好的使用体验。同时,重新设计了流处理拓扑构建过程、去掉冗余代码,使得代码更容易被理解。

从今天起,将推出系列文章介绍RocketMQ Streams 1.1.0版本,本次文章主要介绍RocketMQ Streams 1.1.0的API如何使用,如何利用RocketMQ Streams快速构建流处理应用。

02 典型使用示例

本地运行下列示例的步骤:

1、部署RocketMQ 5.0;

2、使用mqAdmin创建topic;

3、构建示例工程,添加依赖,启动示例。RocketMQ Streams 坐标:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-streams</artifactId>
    <version>1.1.0</version>
</dependency>

4、向topic中写入相应数据,并观察结果。

更详细文档请参考:https://github.com/apache/rocketmq-streams

WordCount

public class WordCount {
    public static void main(String[] args) {
        StreamBuilder builder = new StreamBuilder("wordCount");


        builder.source("sourceTopic", total -> {
                    String value = new String(total, StandardCharsets.UTF_8);
                    return new Pair<>(null, value);
                })
                .flatMap((ValueMapperAction<String, List<String>>) value -> {
                    String[] splits = value.toLowerCase().split("\W+");
                    return Arrays.asList(splits);
                })
                .keyBy(value -> value)
                .count()
                .toRStream()
                .print();


        TopologyBuilder topologyBuilder = builder.build();


        Properties properties = new Properties();
        properties.put(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");


        RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties);


        final CountDownLatch latch = new CountDownLatch(1);


        Runtime.getRuntime().addShutdownHook(new Thread("wordcount-shutdown-hook") {
            @Override
            public void run() {
                rocketMQStream.stop();
                latch.countDown();
            }
        });


        try {
            rocketMQStream.start();
            latch.await();
        } catch (final Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}

WordCount示例要点:

1、JobId wordCount唯一标识流处理任务;

2、自定义的反序列化;

3、一对多转化;

4、lambda形式从数据中指定Key;

5、支持有状态计算;

窗口聚合

public class WindowCount {
    public static void main(String[] args) {
        StreamBuilder builder = new StreamBuilder("windowCountUser");


        AggregateAction<String, User, Num> aggregateAction = (key, value, accumulator) -> new Num(value.getName(), 100);


        builder.source("user", source -> {
                    User user1 = JSON.parseObject(source, User.class);
                    return new Pair<>(null, user1);
                })
                .selectTimestamp(User::getTimestamp)
                .filter(value -> value.getAge() > 0)
                .keyBy(value -> "key")
                .window(WindowBuilder.tumblingWindow(Time.seconds(15)))
                .aggregate(aggregateAction)
                .toRStream()
                .print();


        TopologyBuilder topologyBuilder = builder.build();


        Properties properties = new Properties();
        properties.putIfAbsent(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");
        properties.put(Constant.TIME_TYPE, TimeType.EVENT_TIME);
        properties.put(Constant.ALLOW_LATENESS_MILLISECOND, 2000);


        RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties);


        rocketMQStream.start();
    }
}

窗口聚合示例要点:

1、支持指定时间字段;

2、支持滑动、滚动、会话多种类型window;

3、支持自定义UDAF类型聚合;

4、支持自定义时间类型和数据最大迟到时间;

双流JOIN

public class JoinWindow {
    public static void main(String[] args) {
        StreamBuilder builder = new StreamBuilder("joinWindow");


        //左流
        RStream<User> user = builder.source("user", total -> {
            User user1 = JSON.parseObject(total, User.class);
            return new Pair<>(null, user1);
        });


        //右流
        RStream<Num> num = builder.source("num", source -> {
            Num user12 = JSON.parseObject(source, Num.class);
            return new Pair<>(null, user12);
        });


        //自定义join后的运算
        ValueJoinAction<User, Num, Union> action = new ValueJoinAction<User, Num, Union>() {
            @Override
            public Union apply(User value1, Num value2) {
                ...
            }
        };


        user.join(num)
                .where(User::getName)
                .equalTo(Num::getName)
                .window(WindowBuilder.tumblingWindow(Time.seconds(30)))
                .apply(action)
                .print();


        TopologyBuilder topologyBuilder = builder.build();


        Properties properties = new Properties();
        properties.put(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");


        RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties);


        rocketMQStream.start();
    }
}

双流聚合示例要点:

1、支持window join和非window join,对于非window join,只需要在上述及连表达式中去掉window即可;

2、支持多种窗口类型的window join;

3、支持对join后数据自定义操作;

03 参与贡献

RocketMQ Streams是Apache RocketMQ的子项目,已经在社区开源,参与RocketMQ Streams相关工作,请参考以下资源:

1、试用RocketMQ Streams,并阅读相关文档以了解更多信息;

maven仓库坐标:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-streams</artifactId>
    <version>1.1.0</version>
</dependency>

RocketMQ Streams文档:

https://rocketmq.apache.org/zh/docs/streams/30RocketMQ%20Streams%20Overview

2、参与贡献:如果你有任何功能请求或错误报告,请随时提交 Pull Request 来分享你的反馈和想法;

社区仓库:

https://github.com/apache/rocketmq-streams

3、联系我们:可以在 GitHub上创建 Issue,向 RocketMQ 邮件列表发送电子邮件,或在 RocketMQ Streams SIG 交流群与专家共同探讨,RocketMQ Streams SIG加入方式:添加“小火箭”微信,回复RocketMQ Streams。

邮件列表:

https://lists.apache.org/list.html?dev@rocketmq.apache.org

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 存储 数据库
RocketMQ 流数据库解析:如何实现一体化流处理?
RocketMQ 5.0 是一款云原生的消息中间件,旨在覆盖更多业务场景。它针对国内企业在数字化转型中面临的多场景消息处理需求,提供了一体化的解决方案。
112010 18
|
6月前
|
消息中间件 存储 数据库
深度剖析 RocketMQ 5.0,流数据库:如何实现一体化流处理?
本文将从概念和宏观角度理解什么是流处理。 RocketMQ 5.0,学习 RocketMQ 提供的轻量流处理引擎 RStreams,了解其特性和原理。学习 RocketMQ 的流数据库 RSQLDB,通过流存储和流计算的深度结合,看它如何进一步降低流处理使用门槛。
78876 0
|
消息中间件 存储 Cloud Native
十问 RocketMQ:十年再出发,到底有何不同?
云原生时代,RocketMQ 全新升级背后的原因是什么?我们选取了十大问题,抛给阿里云 RocketMQ 团队,听听他们对于产品发展与决策的思考。
227 0
十问  RocketMQ:十年再出发,到底有何不同?
|
消息中间件 存储 Cloud Native
生于云、长于云,RocketMQ 5.0再出发
万物皆云的时代,RocketMQ 让数字化转型更简单高效,也将消息、事件、流的价值最大程度释放。Apache RocketMQ 将不断推动技术演进与落地实践,帮助企业真正实现高质量数字化转型与创新。
255 0
生于云、长于云,RocketMQ 5.0再出发
|
存储 SQL 消息中间件
RocketMQ-Streams 首个版本发布,轻量级计算的新选择
RocketMQ-Streams 聚焦「大数据量-&gt;高过滤-&gt;轻窗口计算」场景,核心打造轻资源,高性能优势,在资源敏感场景有很大优势,最低 1Core,1G 可部署。通过大量过滤优化,性能比其他大数据提升 2-5 倍性能。广泛应用于安全,风控,边缘计算,消息队列流计算。
417 0
RocketMQ-Streams 首个版本发布,轻量级计算的新选择
|
消息中间件 存储 Java
RocketMQ Streams拓扑构建与数据处理过程
作为一套全新的流式处理框架,RocketMQ Streams 如何实现流计算拓扑图构建呢?一起来了解下它的实现原理以及数据流转过程和过程中的状态变化吧~
155 0
RocketMQ Streams拓扑构建与数据处理过程
|
消息中间件 存储 云安全
RocketMQ Streams 1.1.0:轻量级流处理再出发
一、背景流处理是数据集成领域一个重要话题,他能显著减少数据输入和结果输出之间延迟,在对时间延迟敏感的商业场景,例如安全、智能运维、实时推荐,有大量的需求。RocketMQ作为一款消息中间件,已经在业务集成领域展现出巨大价值,但是在数据集成领域还有较大拓展空间。通过支持流处理可以带动RocketMQ进入数据集成领域,拓展RocketMQ的使用范围。RocketMQ Streams是一款基于Rocke
288 0
RocketMQ Streams 1.1.0:轻量级流处理再出发
|
消息中间件 算法 Java
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
767 1
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
|
消息中间件 uml RocketMQ
3 张图带你彻底理解 RocketMQ 事务消息
3 张图带你彻底理解 RocketMQ 事务消息
67766 2
3 张图带你彻底理解 RocketMQ 事务消息
|
消息中间件 Java uml
5张图带你理解 RocketMQ 顺序消息实现机制
5张图带你理解 RocketMQ 顺序消息实现机制
683 1
5张图带你理解 RocketMQ 顺序消息实现机制