一,RabbitMQ的安装和配置 并启动
参见 RabbitMQ的安装和配置https://blog.csdn.net/weixin_43025151/article/details/123186641
RabbitMQ启动成功:
二,SpringCloud Stream消息驱动
1,设计思想
一个标准的消息队列MQ,如下图:
为什么用CloudStream?
一句话:CloudStream屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型!
通过定义绑定器Binder作为中间件,实现了应用程序与消息中间件细节之间的隔离。
绑定器Binder:INPUT适用于消费者 OUTPUT适用于生产者
Stream中的消息通讯方式遵循了发布-订阅模式 Topic主题进行广播 在RabbitMQ就是Exchange 在Kafka中就是Topic。
2,Springcloud Stream标准流程套路
Binder:很方便的连接中间件,屏蔽差异
Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过channel对队列进行配置
Source和Sink:简单的可理解为参照对象是Springcloud
Stream自身,从Stream发布消息就是输出,接受消息就是输入
3,编码API和常用注解
三,详细案例(整合rabbitmq消息队列)
1,新建父工程 springcloud-tigerhhzz
pom文件代码
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.tigerhhzz.springcloud-tigerhhzz</groupId> <artifactId>springcloud-tigerhhzz</artifactId> <packaging>pom</packaging> <version>1.0-SNAPSHOT</version> <modules> <module>cloud-api-commons</module> <module>cloud-eureka-server7001</module> <module>cloud-eureka-server7002</module> <module>cloud-comsumer-order80</module> <module>cloud-provider-payment8001</module> <module>cloud-provider-payment8002</module> <module>cloud-comsumer-feign-order80</module> <module>cloud-provider-hystrix-payment8001</module> <module>cloud-comsumer-feign-hystrix-order80</module> <module>cloud-comsumer-hystrix-dashboard9001</module> <module>cloud-gateway-gateway9527</module> <module>cloud-config-center-3344</module> <module>cloud-config-client-3355</module> <module>cloud-config-client-3366</module> <module>cloud-stream-rabbitmq-provider8801</module> <module>cloud-stream-rabbitmq-consumer8802</module> <module>cloud-stream-rabbitmq-consumer8803</module> </modules> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <junit.version>4.12</junit.version> <log4j.version>1.2.17</log4j.version> <lombok.version>1.18.0</lombok.version> <mysql.version>5.1.47</mysql.version> <druid.version>1.1.16</druid.version> <mybatis.spring.boot.version>1.3.2</mybatis.spring.boot.version> </properties> <!-- 子模块继承之后,提供作用:锁定版本+子模块不用写groupId和version --> <dependencyManagement> <dependencies> <!-- springboot 2.2.2 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>2.2.2.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> <!-- spring cloud Hoxton.SR1 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Hoxton.SR1</version> <type>pom</type> <scope>import</scope> </dependency> <!-- spring cloud alibaba 2.1.0.RELEASE --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-dependencies</artifactId> <version>2.1.0.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>${druid.version}</version> </dependency> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>${mybatis.spring.boot.version}</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>${log4j.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>${lombok.version}</version> <optional>true</optional> </dependency> </dependencies> </dependencyManagement> </project>
2,新建注册中心模块(本文使用erueka作为注册中心)
cloud-eureka-server7001
项目结构:
项目main入口方法:
package com.tigerhhzz.springcloud; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer; /** * @author tigerhhzz * @date 2022/6/13 10:33 */ @SpringBootApplication @EnableEurekaServer public class EurekaMain7001 { public static void main(String[] args){ SpringApplication.run(EurekaMain7001.class,args); } }
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>springcloud-tigerhhzz</artifactId> <groupId>com.tigerhhzz.springcloud-tigerhhzz</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>cloud-eureka-server7001</artifactId> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>com.tigerhhzz.springcloud-tigerhhzz</groupId> <artifactId>cloud-api-commons</artifactId> <version>1.0-SNAPSHOT</version> </dependency> </dependencies> </project>
application.yml
server: port: 7001 spring: application: name: cloud-eureka-server7001 eureka: instance: hostname: eureka7001.com client: register-with-eureka: false # 表示不向注册中心注册 fetch-registry: false # 由于注册中心的职责就是维护服务实例,所以它不需要去检索服务 service-url: #defaultZone: http://eureka7002.com:7002/eureka/ #集群 指向其他eureka defaultZone: http://eureka7001.com:7001/eureka/ #单机 指向自己 server: enable-self-preservation: false #关闭自我保护机制 ,保证不可用服务被及时剔除
3,新建三个子模块
cloud-stream-rabbitmq-provider8801,作为生产者进行消息模块
cloud-stream-rabbitmq-consumer8802,作为消费者接受模块
cloud-stream-rabbitmq-consumer8803,作为消费者接受模块
目录结构:
module:cloud-stream-rabbitmq-provider8801
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>springcloud-tigerhhzz</artifactId> <groupId>com.tigerhhzz.springcloud-tigerhhzz</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>cloud-stream-rabbitmq-provider8801</artifactId> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.tigerhhzz.springcloud-tigerhhzz</groupId> <artifactId>cloud-api-commons</artifactId> <version>1.0-SNAPSHOT</version> </dependency> </dependencies> </project>
application.yml
server: port: 8801 spring: application: name: cloud-stream-privider cloud: stream: binders: #自此处配置要绑定的rabbitmq的服务信息 defaultRabbit: #表示定义的名称,用于binding整合 type: rabbit #消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: #服务的整合处理 output: #这个名字是一个通道的名称 destination: studyExchange #表示要使用的exchange名称定义 content-type: application/json #设置消息类型,本次为json binder: defaultRabbit #设置要绑定的消息服务的具体设置 eureka: client: service-url: defaultZone: http://eureka7001.com:7001/eureka/ #http://localhost:7001/eureka/ instance: lease-expiration-duration-in-seconds: 5 #如果现在超过了5秒的间隔 lease-renewal-interval-in-seconds: 2 #设置心跳的时间间隔 instance-id: send-8801.com prefer-ip-address: true #访问的路径变为IP地址
主启动类
package com.tigerhhzz.springcloud; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * @author tigerhhzz * @date 2022/6/18 10:00 */ @SpringBootApplication public class StreamMQMain8801 { public static void main(String[] args) { SpringApplication.run(StreamMQMain8801.class,args); } }
业务类–接口
package com.tigerhhzz.springcloud.service; /** * @author tigerhhzz * @date 2022/6/18 10:01 */ public interface IMessageService { public String send(); }
业务类–实现类
package com.tigerhhzz.springcloud.service.impl; import cn.hutool.core.lang.UUID; import com.tigerhhzz.springcloud.service.IMessageService; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.MessageBuilder; import javax.annotation.Resource; /** * @author tigerhhzz * @date 2022/6/18 10:01 */ @EnableBinding(Source.class) //定义消息推送通道 public class MessageServiceImpl implements IMessageService { //消息发送通道 @Resource private MessageChannel output; @Override public String send() { String serial = UUID.randomUUID().toString(); output.send(MessageBuilder.withPayload(serial).build()); System.out.println("***********serial:"+serial); return serial; } }
controller层
package com.tigerhhzz.springcloud.controller; import com.tigerhhzz.springcloud.service.IMessageService; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; /** * @author tigerhhzz * @date 2022/6/18 10:05 */ @RestController public class SendMessageController { @Resource private IMessageService messageService; @RequestMapping("/sendMessage") public String sendMessage() { String send = messageService.send(); return send; } }
8802基本和生产者8801的module一样
8802的配置:
server: port: 8802 spring: application: name: cloud-stream-message-consumer cloud: stream: binders: #自此处配置要绑定的rabbitmq的服务信息 defaultRabbit: #表示定义的名称,用于binding整合 type: rabbit #消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: #服务的整合处理 input: #这个名字是一个通道的名称 destination: studyExchange #表示要使用的exchange名称定义 content-type: application/json #设置消息类型,本次为json binder: defaultRabbit #设置要绑定的消息服务的具体设置 eureka: client: service-url: defaultZone: http://eureka7001.com:7001/eureka/ #http://localhost:7001/eureka/ instance: lease-expiration-duration-in-seconds: 5 #如果现在超过了5秒的间隔 lease-renewal-interval-in-seconds: 2 #设置心跳的时间间隔 instance-id: receiver-8802.com prefer-ip-address: true #访问的路径变为IP地址
8802的控制层:
package com.tigerhhzz.springcloud.controller; import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; /** * @author tigerhhzz * @date 2022/6/18 10:32 */ @Component @EnableBinding(Sink.class) public class ReceiveMessageListenerController { @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) public void input(Message<String> message){ System.out.println("我是消费者8802,-----》接受到的消息是:"+message.getPayload()+"\t"+serverPort); } }
依照8802,clone出来一份运行8803
4,测试
依次启动rabbitmq
7001注册中心
8801消息生产模块
8802和8803消息消费模块
eureka7001注册中心
测试: 8801发送两条消息
http://localhost:8801/sendMessage
***********serial:76f1e833-100b-4d30-9423-4de0fad7e71e ***********serial:cfbcbed8-ba27-4036-8c5b-264a44cffea4
8802和8803分别接收到两条消息
我是消费者8802,-----》接受到的消息是:76f1e833-100b-4d30-9423-4de0fad7e71e 8802 我是消费者8802,-----》接受到的消息是:cfbcbed8-ba27-4036-8c5b-264a44cffea4 8802
我是消费者8803,-----》接受到的消息是:76f1e833-100b-4d30-9423-4de0fad7e71e 8803 我是消费者8803,-----》接受到的消息是:cfbcbed8-ba27-4036-8c5b-264a44cffea4 8803
5,分组消费与持久化
微服务应用放置于同一个group中,就能够保证消息只会被其中一个应用消费一次。不同的组是可以消费的,同一个组内会发生竞争关系,只有其中一个可以消费。
8803/8802都变成相同组,group两个相同
8802/8803实现了轮询分组,每次只有一个消费者。8801模块的发的消息只能被8802或者8803其中一个接受到,这样避免了重复消费
加上分组group属性,可以实现消息队列的持久化,group属性是持久化消息队列中很重要的一个属性。
rabiitmq界面: