前言
https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit
官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。应用程序通过inputs或者outputs来与Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定),而Spring Cloud Stream的binder对象负责与消息中间件交互。
SpringCloud stream通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
Stream让我们不再关注具体MQ的细节我们只需要用一种适配绑定的方式,自动的给我们在各种MQ内切换,总的来说Stream能够屏蔽底层消息中间件的差异、降低切换成本,是统一消息的编程模型。
1、stream设计思想
- Binder:很方便的连接中间件,屏蔽差异
- Channel:通道是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
- Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。
2、编码常用的注解
组成 | 说明 |
Middleware | 中间件,目前只支持RabbitMQ和Kafka |
Binder | Binder是应用与消息中间件之间的封装,目前实现了Kafka和RabbitMQ的Binder,通过BInder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现。 |
@Input | 注解标识输入通道,通过该输入通道接收到的消息进入应用程序 |
@Output | 注解标识输出通道,发布的消息将通过该通道离开应用程序 |
@StreamListener | 监听队列,用于消费者的队列的消息接收 |
@EnableBinding | 指信道channel和exchange绑定在一起 |
3、编码步骤
3.1、添加依赖
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
3.2、修改配置文件
server: port: 8088 spring: cloud: stream: binders: #需要绑定的rabbitmq的服务信息 defaultRabbit: #定义的名称,用于bidding整合 type: rabbit #消息组件类型 environment: #配置rabbimq连接环境 spring: rabbitmq: host: localhost #rabbitmq 服务器的地址 port: 5672 #rabbitmq 服务器端口 username: tiger #rabbitmq 用户名 password: tiger #rabbitmq 密码 virtual-host: tiger_vh #虚拟路径 bindings: #服务的整合处理 saveOrderOutput: #这个是消息通道的名称 --->保存订单输出通道 destination: exchange-saveOrder #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。 content-type: application/json #设置消息的类型,本次为json default-binder: defaultRabbit group: saveOrderGroup #分组 saveOrderInput: #生产者绑定,这个是消息通道的名称---> 保存订单输入通道 destination: exchange-saveOrder #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。 content-type: application/json #设置消息的类型,本次为json default-binder: defaultRabbit group: saveOrderGroup #分组
3.3、生产
/** * 订单消息输出通道处理器 */ @Component public interface OrderOutputChannelProcesor { @Output("saveOrderOutput") MessageChannel saveOrderOutput(); }
@Slf4j @EnableBinding(OrderOutputChannelProcesor.class) public class OrderMessageProducer { @Autowired @Output("saveOrderOutput") private MessageChannel messageChannel; public void sentMsg(UserInfo userInfo){ messageChannel.send(MessageBuilder.withPayload(userInfo).build()); log.info("消息发送成功:" + userInfo); } }
3.4、消费
/** * 订单消息输入通道处理器 */ @Component public interface OrderInputChannelProcesor { @Input("saveOrderInput") SubscribableChannel saveOrderInput(); }
@Slf4j @EnableBinding(OrderInputChannelProcesor.class) public class OrderMessageConsumer { @StreamListener("saveOrderInput") public void receiveMsg(Message<UserInfo> userInfoMessage){ log.info("接收消息成功:" + userInfoMessage.getPayload()); } }
3.5、延迟队列
安装延迟队列插件:
下载解压,到plugins目录,执行以下的命令:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
3.5.1、修改配置文件
server: port: 8088 spring: cloud: stream: binders: #需要绑定的rabbitmq的服务信息 defaultRabbit: #定义的名称,用于bidding整合 type: rabbit #消息组件类型 environment: #配置rabbimq连接环境 spring: rabbitmq: host: localhost #rabbitmq 服务器的地址 port: 5672 #rabbitmq 服务器端口 username: tiger #rabbitmq 用户名 password: tiger #rabbitmq 密码 virtual-host: tiger_vh #虚拟路径 bindings: #服务的整合处理 saveOrderOutput: #这个是消息通道的名称 --->保存订单输出通道 destination: exchange-saveOrder-delay #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。 content-type: application/json #设置消息的类型,本次为json default-binder: defaultRabbit group: saveOrderGroup #分组 saveOrderInput: #生产者绑定,这个是消息通道的名称---> 保存订单输入通道 destination: exchange-saveOrder-delay #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。 content-type: application/json #设置消息的类型,本次为json default-binder: defaultRabbit group: saveOrderGroup #分组 rabbit: bindings: #服务的整合处理 saveOrderOutput: #这个是消息通道的名称 --->保存订单输出通道 producer: delayed-exchange: true saveOrderInput: consumer: delayed-exchange: true
3.5.2、生产端
@Slf4j @EnableBinding(OrderOutputChannelProcesor.class) public class OrderMessageProducer { @Autowired @Output("saveOrderOutput") private MessageChannel messageChannel; public void sentMsg(UserInfo userInfo){ messageChannel.send(MessageBuilder.withPayload(userInfo).setHeader("x-delay", 5000).build()); log.info("消息发送成功:" + userInfo); } }
3.5.2、消息确认机制 消费端
rabbit: bindings: #服务的整合处理 saveOrderInput: consumer: acknowledge-mode: MANUAL #手动确认
@StreamListener("saveOrderInput") public void receiveMsg(Message<UserInfo> userInfoMessage){ log.info("接收消息成功:" + userInfoMessage.getPayload()); Channel channel = (Channel) userInfoMessage.getHeaders().get(AmqpHeaders.CHANNEL); Long delieverTag = (Long) userInfoMessage.getHeaders().get(AmqpHeaders.DELIVERY_TAG); /* * deliveryTag:Channel的消息投递的唯一标识符。 * multiple:是否否定应答多条消息。如果设置为true,则否定应答带指定deliveryTag的消息及该deliveryTag之前的多条消息; * 如果设置为false,则仅否定应答带指定deliveryTag的单条消息。 * requeue:被否定应答的消息是否重入队列。如果设置为true,则消息重入队列; * 如果设置为false,则消息被丢弃或发送到死信Exchange。 */ try { channel.basicAck(delieverTag,true); } catch (IOException e) { e.printStackTrace(); } }
定义交换机类型为direct
rabbit: bindings: #服务的整合处理 saveOrderInput: consumer: bindingRoutingKey: orderRoutingKey bindQueue: true exchangeType: direct saveOrderOutput: producer: routingKeyExpression: '''orderRoutingKey''' exchangeType: direct