使用stream发送和接收消息
1. 引入依赖 spring-cloud-starter-stream-rabbit
2. 配置 rabbitMq
3. 新建接口 StreamClient,分别定义 input() 和 output()
4. 接收端
5. 发送端
Order
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
package com.imooc.order.message; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; public interface StreamClient { String INPUT = "myMessage"; String INPUT2 = "myMessage2"; @Input(StreamClient.INPUT) SubscribableChannel input(); @Output(StreamClient.INPUT) MessageChannel output(); @Input(StreamClient.INPUT2) SubscribableChannel input2(); @Output(StreamClient.INPUT2) MessageChannel output2(); }
package com.imooc.order.controller; import com.imooc.order.dto.OrderDTO; import com.imooc.order.message.StreamClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Date; @RestController public class SendMessageController { @Autowired private StreamClient streamClient; // @GetMapping("/sendMessage") // public void process() { // String message = "now " + new Date(); // streamClient.output().send(MessageBuilder.withPayload(message).build()); // } /** * 发送 orderDTO对象 */ @GetMapping("/sendMessage") public void process() { OrderDTO orderDTO = new OrderDTO(); orderDTO.setOrderId("123456"); streamClient.output().send(MessageBuilder.withPayload(orderDTO).build()); } }
package com.imooc.order.message; import com.imooc.order.dto.OrderDTO; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.stereotype.Component; @Component @EnableBinding(StreamClient.class) @Slf4j public class StreamReceiver { // @StreamListener(value = StreamClient.INPUT) // public void process(Object message) { // log.info("StreamReceiver: {}", message); // } /** * 接收orderDTO对象 消息 * @param message */ @StreamListener(value = StreamClient.INPUT) @SendTo(StreamClient.INPUT2) // 接收消息后,回复消息,或间接转发消息 public String process(OrderDTO message) { log.info("StreamReceiver: {}", message); return "received."; } @StreamListener(value = StreamClient.INPUT2) public void process2(String message) { log.info("StreamReceiver2: {}", message); } }
- @SendTo(StreamClient.INPUT2):接收消息后,回复消息,或间接转发消息。
启动多个同服务时会有多个实例,如何做到只让一个实例收到消息?
stream有个分组,配置下就好:
spring: cloud: stream: bindings: myMessage: group: order content-type: application/json
- 支持复杂对象消息,需要配置参数套路: content-type: application/json,效果图对比使用前后,如下图。
- 使用前
- 使用后