【SpringCloud Stream消息驱动、设计思想以及整合rabbitmq消息队列案例--学习笔记】

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 【SpringCloud Stream消息驱动、设计思想以及整合rabbitmq消息队列案例--学习笔记】

一,RabbitMQ的安装和配置 并启动

参见 RabbitMQ的安装和配置https://blog.csdn.net/weixin_43025151/article/details/123186641

RabbitMQ启动成功:

二,SpringCloud Stream消息驱动

1,设计思想

一个标准的消息队列MQ,如下图:

为什么用CloudStream?

一句话:CloudStream屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型!

通过定义绑定器Binder作为中间件,实现了应用程序与消息中间件细节之间的隔离。

绑定器Binder:INPUT适用于消费者 OUTPUT适用于生产者

Stream中的消息通讯方式遵循了发布-订阅模式 Topic主题进行广播 在RabbitMQ就是Exchange 在Kafka中就是Topic。

2,Springcloud Stream标准流程套路

Binder:很方便的连接中间件,屏蔽差异
Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过channel对队列进行配置
Source和Sink:简单的可理解为参照对象是Springcloud
Stream自身,从Stream发布消息就是输出,接受消息就是输入

3,编码API和常用注解

三,详细案例(整合rabbitmq消息队列)

1,新建父工程 springcloud-tigerhhzz

pom文件代码

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.tigerhhzz.springcloud-tigerhhzz</groupId>
    <artifactId>springcloud-tigerhhzz</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>cloud-api-commons</module>
        <module>cloud-eureka-server7001</module>
        <module>cloud-eureka-server7002</module>
        <module>cloud-comsumer-order80</module>
        <module>cloud-provider-payment8001</module>
        <module>cloud-provider-payment8002</module>
        <module>cloud-comsumer-feign-order80</module>
        <module>cloud-provider-hystrix-payment8001</module>
        <module>cloud-comsumer-feign-hystrix-order80</module>
        <module>cloud-comsumer-hystrix-dashboard9001</module>
        <module>cloud-gateway-gateway9527</module>
        <module>cloud-config-center-3344</module>
        <module>cloud-config-client-3355</module>
        <module>cloud-config-client-3366</module>
        <module>cloud-stream-rabbitmq-provider8801</module>
        <module>cloud-stream-rabbitmq-consumer8802</module>
        <module>cloud-stream-rabbitmq-consumer8803</module>
    </modules>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <junit.version>4.12</junit.version>
        <log4j.version>1.2.17</log4j.version>
        <lombok.version>1.18.0</lombok.version>
        <mysql.version>5.1.47</mysql.version>
        <druid.version>1.1.16</druid.version>
        <mybatis.spring.boot.version>1.3.2</mybatis.spring.boot.version>
    </properties>
    <!-- 子模块继承之后,提供作用:锁定版本+子模块不用写groupId和version  -->
    <dependencyManagement>
        <dependencies>
            <!--  springboot 2.2.2    -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>2.2.2.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <!--  spring cloud Hoxton.SR1   -->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>Hoxton.SR1</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <!--  spring cloud alibaba 2.1.0.RELEASE    -->
            <dependency>
                <groupId>com.alibaba.cloud</groupId>
                <artifactId>spring-cloud-alibaba-dependencies</artifactId>
                <version>2.1.0.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>${mysql.version}</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>druid</artifactId>
                <version>${druid.version}</version>
            </dependency>
            <dependency>
                <groupId>org.mybatis.spring.boot</groupId>
                <artifactId>mybatis-spring-boot-starter</artifactId>
                <version>${mybatis.spring.boot.version}</version>
            </dependency>
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>${log4j.version}</version>
            </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>${junit.version}</version>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>${lombok.version}</version>
                <optional>true</optional>
            </dependency>
        </dependencies>
    </dependencyManagement>
</project>

2,新建注册中心模块(本文使用erueka作为注册中心)

cloud-eureka-server7001

项目结构:

项目main入口方法:

package com.tigerhhzz.springcloud;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;
/**
 * @author tigerhhzz
 * @date 2022/6/13 10:33
 */
@SpringBootApplication
@EnableEurekaServer
public class EurekaMain7001 {
    public static void main(String[] args){
        SpringApplication.run(EurekaMain7001.class,args);
    }
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springcloud-tigerhhzz</artifactId>
        <groupId>com.tigerhhzz.springcloud-tigerhhzz</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>cloud-eureka-server7001</artifactId>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>com.tigerhhzz.springcloud-tigerhhzz</groupId>
            <artifactId>cloud-api-commons</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>
</project>

application.yml

server:
  port: 7001
spring:
  application:
    name: cloud-eureka-server7001
eureka:
  instance:
    hostname: eureka7001.com
  client:
    register-with-eureka: false # 表示不向注册中心注册
    fetch-registry: false       # 由于注册中心的职责就是维护服务实例,所以它不需要去检索服务
    service-url:
      #defaultZone: http://eureka7002.com:7002/eureka/ #集群 指向其他eureka
      defaultZone: http://eureka7001.com:7001/eureka/  #单机 指向自己
  server:
    enable-self-preservation: false   #关闭自我保护机制 ,保证不可用服务被及时剔除

3,新建三个子模块

cloud-stream-rabbitmq-provider8801,作为生产者进行消息模块
cloud-stream-rabbitmq-consumer8802,作为消费者接受模块
cloud-stream-rabbitmq-consumer8803,作为消费者接受模块

目录结构:

module:cloud-stream-rabbitmq-provider8801

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springcloud-tigerhhzz</artifactId>
        <groupId>com.tigerhhzz.springcloud-tigerhhzz</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>cloud-stream-rabbitmq-provider8801</artifactId>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.tigerhhzz.springcloud-tigerhhzz</groupId>
            <artifactId>cloud-api-commons</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>
</project>

application.yml

server:
  port: 8801
spring:
  application:
    name: cloud-stream-privider
  cloud:
    stream:
      binders:   #自此处配置要绑定的rabbitmq的服务信息
        defaultRabbit: #表示定义的名称,用于binding整合
          type: rabbit #消息组件类型
          environment:  # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings:   #服务的整合处理
          output: #这个名字是一个通道的名称
            destination: studyExchange #表示要使用的exchange名称定义
            content-type: application/json #设置消息类型,本次为json
            binder: defaultRabbit  #设置要绑定的消息服务的具体设置
eureka:
  client:
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka/ #http://localhost:7001/eureka/
  instance:
    lease-expiration-duration-in-seconds: 5 #如果现在超过了5秒的间隔
    lease-renewal-interval-in-seconds: 2 #设置心跳的时间间隔
    instance-id: send-8801.com
    prefer-ip-address: true #访问的路径变为IP地址

主启动类

package com.tigerhhzz.springcloud;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
 * @author tigerhhzz
 * @date 2022/6/18 10:00
 */
@SpringBootApplication
public class StreamMQMain8801 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8801.class,args);
    }
}

业务类–接口

package com.tigerhhzz.springcloud.service;
/**
 * @author tigerhhzz
 * @date 2022/6/18 10:01
 */
public interface IMessageService {
    public String send();
}

业务类–实现类

package com.tigerhhzz.springcloud.service.impl;
import cn.hutool.core.lang.UUID;
import com.tigerhhzz.springcloud.service.IMessageService;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import javax.annotation.Resource;
/**
 * @author tigerhhzz
 * @date 2022/6/18 10:01
 */
@EnableBinding(Source.class)  //定义消息推送通道
public class MessageServiceImpl implements IMessageService {
    //消息发送通道
    @Resource
    private MessageChannel output;
    @Override
    public String send() {
        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());
        System.out.println("***********serial:"+serial);
        return serial;
    }
}

controller层

package com.tigerhhzz.springcloud.controller;
import com.tigerhhzz.springcloud.service.IMessageService;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
 * @author tigerhhzz
 * @date 2022/6/18 10:05
 */
@RestController
public class SendMessageController {
    @Resource
    private IMessageService messageService;
    @RequestMapping("/sendMessage")
    public String sendMessage() {
        String send = messageService.send();
        return send;
    }
}

8802基本和生产者8801的module一样

8802的配置:

server:
  port: 8802
spring:
  application:
    name: cloud-stream-message-consumer
  cloud:
    stream:
      binders:   #自此处配置要绑定的rabbitmq的服务信息
        defaultRabbit: #表示定义的名称,用于binding整合
          type: rabbit #消息组件类型
          environment:  # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings:   #服务的整合处理
        input: #这个名字是一个通道的名称
          destination: studyExchange #表示要使用的exchange名称定义
          content-type: application/json #设置消息类型,本次为json
          binder: defaultRabbit  #设置要绑定的消息服务的具体设置
eureka:
  client:
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka/ #http://localhost:7001/eureka/
  instance:
    lease-expiration-duration-in-seconds: 5 #如果现在超过了5秒的间隔
    lease-renewal-interval-in-seconds: 2 #设置心跳的时间间隔
    instance-id: receiver-8802.com
    prefer-ip-address: true #访问的路径变为IP地址

8802的控制层:

package com.tigerhhzz.springcloud.controller;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
/**
 * @author tigerhhzz
 * @date 2022/6/18 10:32
 */
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
    @Value("${server.port}")
    private String serverPort;
    @StreamListener(Sink.INPUT)
    public void input(Message<String> message){
        System.out.println("我是消费者8802,-----》接受到的消息是:"+message.getPayload()+"\t"+serverPort);
    }
}

依照8802,clone出来一份运行8803

4,测试

依次启动rabbitmq
7001注册中心
8801消息生产模块
8802和8803消息消费模块

eureka7001注册中心

测试: 8801发送两条消息

http://localhost:8801/sendMessage

***********serial:76f1e833-100b-4d30-9423-4de0fad7e71e
***********serial:cfbcbed8-ba27-4036-8c5b-264a44cffea4

8802和8803分别接收到两条消息

我是消费者8802,-----》接受到的消息是:76f1e833-100b-4d30-9423-4de0fad7e71e  8802
我是消费者8802,-----》接受到的消息是:cfbcbed8-ba27-4036-8c5b-264a44cffea4  8802
我是消费者8803,-----》接受到的消息是:76f1e833-100b-4d30-9423-4de0fad7e71e  8803
我是消费者8803,-----》接受到的消息是:cfbcbed8-ba27-4036-8c5b-264a44cffea4  8803

5,分组消费与持久化

微服务应用放置于同一个group中,就能够保证消息只会被其中一个应用消费一次。不同的组是可以消费的,同一个组内会发生竞争关系,只有其中一个可以消费。
8803/8802都变成相同组,group两个相同
8802/8803实现了轮询分组,每次只有一个消费者。8801模块的发的消息只能被8802或者8803其中一个接受到,这样避免了重复消费
加上分组group属性,可以实现消息队列的持久化,group属性是持久化消息队列中很重要的一个属性。

rabiitmq界面:

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
4月前
|
消息中间件 Java 测试技术
消息队列 MQ使用问题之数据流出规则是否支持平台的云RabbitMQ
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
1月前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
63 7
|
21天前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
|
29天前
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践
|
1月前
|
消息中间件 存储 监控
解决方案 | 云消息队列RabbitMQ实践
在实际业务中,网站因消息堆积和高流量脉冲导致系统故障。为解决这些问题,云消息队列 RabbitMQ 版提供高性能的消息处理和海量消息堆积能力,确保系统在流量高峰时仍能稳定运行。迁移前需进行技术能力和成本效益评估,包括功能、性能、限制值及费用等方面。迁移步骤包括元数据迁移、创建用户、网络打通和数据迁移。
64 4
|
2月前
|
消息中间件 运维 监控
云消息队列RabbitMQ实践解决方案评测报告
本报告旨在对《云消息队列RabbitMQ实践》解决方案进行综合评测。通过对该方案的原理理解、部署体验、设计验证以及实际应用价值等方面进行全面分析,为用户提供详尽的反馈与建议。
80 16
|
2月前
|
消息中间件 弹性计算 运维
阿里云云消息队列RabbitMQ实践解决方案评测报告
阿里云云消息队列RabbitMQ实践解决方案评测报告
73 9
|
2月前
|
消息中间件 监控 数据处理
解决方案 | 云消息队列RabbitMQ实践
解决方案 | 云消息队列RabbitMQ实践
52 1
|
2月前
|
消息中间件 弹性计算 运维
云消息队列RabbitMQ实践
本评测报告详细分析了阿里云云消息队列 RabbitMQ 版的实践原理、部署体验及核心优势。报告认为其在解决消息积压、脑裂难题及弹性伸缩方面表现优秀,但建议进一步细化架构优化策略和技术细节描述。部署文档详尽,对初学者友好,但仍需加强网络配置和版本兼容性说明。实际部署展示了其高可用性和成本优化能力,适用于高并发消息处理和分布式系统数据同步。为进一步提升方案,建议增加安全性配置指导、性能调优建议及监控告警系统设置。
|
1月前
|
消息中间件 监控 测试技术
云消息队列RabbitMQ实践 - 评测
根据反馈,对本解决方案的实践原理已有一定理解,描述整体清晰但需在消息队列配置与使用上增加更多示例和说明以助理解。部署体验中获得了一定的引导和文档支持,尽管文档仍有待完善;期间出现的配置文件错误及依赖库缺失等问题已通过查阅资料解决。设计验证展示了云消息队列RabbitMQ的核心优势,包括高可用性和灵活性,未来可通过增加自动化测试来提高系统稳定性。实践后,用户对方案解决问题的能力及适用场景有了明确认识,认为其具有实际生产价值,不过仍需在性能优化、安全性增强及监控功能上进行改进以适应高并发和大数据量环境。
42 0