rocketmq整合SpringCloudStream

简介: rocketmq整合SpringCloudStream

rocketmq整合SpringCloudStream

发送消息

引入包:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>

添加配置文件

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
      bindings:
        output:
          destination: TopicTest
          group: PRODUCER_GROUP_TOPIC_TEST

代码配置:

@SpringBootApplication
@EnableBinding({ Source.class })
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}
@Component
public class ProduceController {

    @Autowired
    private Source source;

    @PostConstruct
    private void init() throws InterruptedException {
        MessageBuilder builder = MessageBuilder.withPayload("init...");
        Message message = builder.build();
        source.output().send(message);
        System.out.println("init...");
    }
}

@EnableBinding({ Source.class })表示绑定配置文件中名称为output的消息通道Binding,Source类中定义的消息通道名称为output。

消费消息:

配置文件:

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
      bindings:
        input:
          destination: TopicTest2
          group: CONSUER_GROUP_DEMO_1

name-server是RocketMq的NameServer地址,destination指定Topic名称,指定名称为input的Binding接收TopicTest的消息

消息监听:

@EnableBinding({ Sink.class})
@SpringBootApplication
public class Application {

    @StreamListener(value = InputChannel.ORDER_INPUT)
    public void receive(String receiveMsg) {
        System.out.println("receive: " + receiveMsg);
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

@EnableBinding({ Sink.class})表示绑定配置文件名称为input的消息通道Binding,Sink类中定义的消息通道名称为input,@StreamListener表示定义一个消息监听器,接收RocketMQ中的消息。

Spring Cloud Stream

Spring Cloud Stream是构建与共享消息传递系统连接的高度可伸缩的事件驱动微服务,目的是简化消息业务在Spring Cloud应用程序中的开发。

通过Spring Cloud Stream注入的输入通道inputs和输出通道outputs与消息中间件Middleware通信,消息通道通过特定中间件绑定器Binder实现连接到外部代理。

Spring Cloud Stream实现基于发布/订阅机制,核心四个部分组成:Spring Framework中的Spring Messaging和Spring Integration,Spring Cloud Stream中的Binders和Bindings。

Spring Messaging:Spring Framework中的统一消息编程模型

  • Message:消息对象,包含消息头Header和消息体Payload
  • MessageChannel:消息通道接口,用于接收消息,提供send方法将消息发送至消息通道。
  • MessageHandler:消息处理器接口,用于处理消息逻辑。

Spring Integration:支持企业集成的扩展机制,提供简单的模型来构建企业集成解决方案,对Spring Messaging进行扩展。

  • MessageDispatcher:消息分发接口,用于分发消息和添加删除消息处理器
  • MessageRouter:消息路由接口,定义默认的输出消息通道。
  • Filter:消息过滤注解,用于配置消息过滤表达式
  • Aggregator:消息的聚合注解,用于将一条消息拆分成多条。
  • Splitter:消息分割,用于将一条消息拆分成多条。

Binders:目标绑定器,负责与外部消息中间件系统集成的组件。

  • doBindProducer:绑定消息中间件客户端发送消息模块。
  • doBindConsumer:绑定消息中间件客户端接收消息模块。

Bindings:外部消息中间件系统与应用程序提供的消息生产者和消费者之间的桥梁。


Spring Cloud Alibaba RocketMQ架构图

  • MessageChannel(output):消息通道,用于发送消息,Spring Cloud Stream的标准接口
  • MessageChannel(input):消息通道,用于订阅消息,Spring Cloud Stream的标准接口
  • Binder bindProducer:目标绑定器,将发送通道发过来的消息发送到RocketMQ消息服务器
  • Binder bindConsumer:目标绑定器,将接收到RocketMQ消息服务器的消息推送给订阅通道

spring-cloud-stream官网:

https://docs.spring.io/spring-cloud-stream/docs/3.1.4/reference/html/spring-cloud-stream.html#spring-cloud-stream-reference

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
消息中间件 Java RocketMQ
【RocketMQ系列九】SpringCloudStream整合RocketMQ
【RocketMQ系列九】SpringCloudStream整合RocketMQ
262 1
|
3月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
3月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
17天前
|
消息中间件 运维 监控
云消息队列RabbitMQ实践解决方案评测报告
本报告旨在对《云消息队列RabbitMQ实践》解决方案进行综合评测。通过对该方案的原理理解、部署体验、设计验证以及实际应用价值等方面进行全面分析,为用户提供详尽的反馈与建议。
48 15
|
17天前
|
消息中间件 弹性计算 运维
阿里云云消息队列RabbitMQ实践解决方案评测报告
阿里云云消息队列RabbitMQ实践解决方案评测报告
43 9
|
12天前
|
消息中间件 监控 数据处理
解决方案 | 云消息队列RabbitMQ实践
解决方案 | 云消息队列RabbitMQ实践
26 1
|
13天前
|
消息中间件 弹性计算 运维
云消息队列RabbitMQ实践
本评测报告详细分析了阿里云云消息队列 RabbitMQ 版的实践原理、部署体验及核心优势。报告认为其在解决消息积压、脑裂难题及弹性伸缩方面表现优秀,但建议进一步细化架构优化策略和技术细节描述。部署文档详尽,对初学者友好,但仍需加强网络配置和版本兼容性说明。实际部署展示了其高可用性和成本优化能力,适用于高并发消息处理和分布式系统数据同步。为进一步提升方案,建议增加安全性配置指导、性能调优建议及监控告警系统设置。
|
1天前
|
消息中间件 监控 测试技术
云消息队列RabbitMQ实践 - 评测
根据反馈,对本解决方案的实践原理已有一定理解,描述整体清晰但需在消息队列配置与使用上增加更多示例和说明以助理解。部署体验中获得了一定的引导和文档支持,尽管文档仍有待完善;期间出现的配置文件错误及依赖库缺失等问题已通过查阅资料解决。设计验证展示了云消息队列RabbitMQ的核心优势,包括高可用性和灵活性,未来可通过增加自动化测试来提高系统稳定性。实践后,用户对方案解决问题的能力及适用场景有了明确认识,认为其具有实际生产价值,不过仍需在性能优化、安全性增强及监控功能上进行改进以适应高并发和大数据量环境。
8 0
|
26天前
|
消息中间件
手撸MQ消息队列——循环数组
队列是一种常用的数据结构,类似于栈,但采用先进先出(FIFO)的原则。生活中常见的排队场景就是队列的应用实例。在数据结构中,队列通常用数组实现,包括入队(队尾插入元素)和出队(队头移除元素)两种基本操作。本文介绍了如何用数组实现队列,包括定义数组长度、维护队头和队尾下标(front 和 tail),并通过取模运算解决下标越界问题。此外,还讨论了队列的空与满状态判断,以及并发和等待机制的实现。通过示例代码展示了队列的基本操作及优化方法,确保多线程环境下的正确性和高效性。
25 0
手撸MQ消息队列——循环数组
|
2月前
|
消息中间件 存储 Java
【揭秘】RocketMQ内部运作大揭秘:一探究竟,原来消息队列是这样工作的!
【8月更文挑战第19天】RocketMQ是一款高性能、高可用的消息中间件,在分布式系统中至关重要。它采用发布/订阅模式,支持高吞吐量的消息传递。核心组件包括管理元数据的NameServer、存储消息的Broker以及Producer和Consumer。RocketMQ支持发布/订阅与点对点两种模型,并具备复杂的消息持久化和路由机制。通过Java API示例,可轻松实现消息的发送与接收。RocketMQ凭借其出色的特性和可靠性,成为大型分布式系统首选的消息解决方案。
58 5