defaultZone: http://peer1:1010/eureka/,http://peer2:1011/eureka/,http://peer3:1012/eureka/ instance: prefer-ip-address: true #使用ip注册到Eureka instance-id: stream-consumer-server:1090 #指定客户端实例的ID spring: application: name: stream-consumer-server cloud: stream: bindings: input: #消息输入配置,我们这里是消费者,如果是生成者就用output content-type: application/json #内容类型 destination: myStream #消息目的地 , 会在RabbitMQ创建一个名字叫mySteram的交换机 rabbitmq: #RabbitMQ的链接配置 host: localhost port: 5672 username: guest password: guest server: port: 1090
这里除了要注册到Eureak Server之外,通过spring.cloud.stream.bindings.input 来定义了消息的输入配置。
- content-type :用来指定消息的类型
- destination :用来指定消息的目的地,其实就是指定MQ中的交换机
spring.rabbitmq是用来配置RabbitMQ的链接参数。
Stream的消息通道绑定接口
public interface StreamClient { //对应配置中的bindings String INPUT = “input”; //用来输入消息 @Input(StreamClient.INPUT) SubscribableChannel input(); }
创建 StreamClient 接口,通过 @Input注解定义输入通道,另外,@Input 还有一个 value 属性,该属性可以用来设置消息通道的名称,这里指定的消息通道名称是 input对应yml配置。如果直接使用两个注解而没有指定具体的 value 值,则会默认使用方法名作为消息通道的名称,定义输入通道时,需要返回 SubscribableChannel 接口对象,该接口集成自 MessageChannel 接口,它定义了维护消息通道订阅者的方法。简而言之,就是用来定义消息接收的。
消息接收
@Component //绑定消息接口 @EnableBinding(value = {StreamClient.class}) public class StreamConsumer { private Logger logger = LoggerFactory.getLogger(StreamConsumer .class); @StreamListener(StreamClient.INPUT) public void receive(String message) { logger.info(“收到消息: {}”, message); } }
@EnableBinding 注解用来指定一个或多个定义了 @Input 注解的接口,以此实现对消息通道(Channel)的绑定。上面我们通过 @EnableBinding(value = {StreamClient.class}) 绑定了 StreamClient 接口,该接口是我们自己实现的对输入输出消息通道绑定的定义
@StreamListener,主要定义在方法上,作用是将被修饰的方法注册为消息中间件上数据流的事件监听器,注解中的属性值对应了监听的消息通道名。上面我们将 receive 方法注册为StreamClient.INPUT 消息通道的监听处理器,当我们往这个消息通道发送信息的时候,receiver 方法会执行。
简单理解就是当我们往 StreamClient.INPUT 这个输入通道发消息的时候 ,应用程序通过@StreamListener 标签监听到消息,然后调用receive方法接收数据进行消费。
启动测试
启动消费者程序,打开RabbitMQ控制界面,可以看到这里创建了一个交换机 , 类型默认是Topic定向 ,routingkey是“#”
2.3.提供者服务
搭建工程
创建工程
springcloud-stream-provider-server-1100 ,导入依赖,和消费者服务一样。
org.springframework.cloud
spring-cloud-starter-netflix-eureka-client
org.springframework.boot
spring-boot-starter-web
org.springframework.cloud
spring-cloud-starter-stream-rabbit
yml配置文件
eureka: client: serviceUrl: defaultZone: http://peer1:1010/eureka/,http://peer2:1011/eureka/,http://peer3:1012/eureka/ instance: prefer-ip-address: true #使用ip注册到Eureka instance-id: stream-consumer-server:1100 #指定客户端实例的ID spring: application: name: stream-provider-server cloud: stream: bindings: output: #消息输出配置 content-type: application/json destination: myStream #消息目的地 rabbitmq: host: localhost port: 5672 username: guest password: guest server: port: 1100
主配置类
//服务注册与发现 @SpringBootApplication @EnableDiscoveryClient @EnableBinding(StreamClient.class) //绑定stream的API接口 public class StreamProviderServerApplication1100 { public static void main(String[] args) { SpringApplication.run(StreamProviderServerApplication1100.class); } }
这里的主配置通过 @EnableBinding(StreamClient.class) 绑定stream的接口实现对消息通道的绑定。下面是Stream消息通道绑定接口:
Stream的消息通道绑定接口
public interface StreamClient { //对应配置中的bindings String OUTPUT = “output”; //用来输出消息 @Output(StreamClient.OUTPUT) MessageChannel output(); }
这里和消费者的绑定输入通道差不多,这里在提供者方绑定的是输出通道,“output”对应了yml配置中的“bindings.output”配置。应用程序通过 MessageChannel 来发送消息,通过 @Output(StreamClient.OUTPUT)绑定的通道把消息发送到RabbitMQ中。
编写消息发送测试controller
@RestController public class StreamProvider { @Autowired StreamClient streamClient ; @RequestMapping(“/send”) public void send(){ streamClient.output().send(MessageBuilder.withPayload(“我是消息我是消息”).build()); } }
注入 streamClient,通过output()得到消息输出通道(MessageChannel ),通过send方法去发送消息到MQ
访问测试
启动消费者,再启动提供者,访问提供者:http://localhost:1100/send ,观察消费者的控制台应该收到了消息,并且RabbitMQ产生了一个的队列
修改消费者 的端口为 1091 ,再启动一个消费者,然后访问 http://localhost:1100/send 观察两个消费者应该都受到消息了
2.4.消息分组
上面的案例Stream是通过topic方式进行消息广播 ,有的时候我们希望一个消息只是被一个消费者收到,因为有些消息不能被重复消费,我们可以使用消组。通过配置消息分组的方式来达到如上效果,下面修改下配置文件,修改如下:注意,我这里修改的是消费者
eureka:
client:
serviceUrl:
defaultZone: http://peer1:1010/eureka/,http://peer2:1011/eureka/,http://peer3:1012/eureka/
instance:
prefer-ip-address: true #使用ip注册到Eureka
instance-id: stream-consumer-server:1090 #指定客户端实例的ID
spring:
application:
name: stream-consumer-server
cloud:
stream:
bindings:
input: #消息输入配置,我们这里是消费者,如果是生成者就用output
content-type: application/json #内容类型
destination: myStream #消息目的地 , 会在RabbitMQ创建一个名字叫mySteram的交换机
group: stream #指定组,多个消费者在同一个组,那么一个消息就只会给到一个消费者
rabbitmq: #RabbitMQ的链接配置
host: localhost
port: 5672
username: guest
password: guest
server:
port: 1090
group: stream #指定组,多个消费者在同一个组,那么一个消息就只会给到一个消费者 , 重启多个消费者,查看MQ,这里多个一个:myStream.stream的队列,这个stream的后缀就是组名。
我们启动测试,多次发送消息,同一个消息只会被一个消息者获取。
2.5.消息分区
有的时候我们可能不满足于一个消息被一个消费者消费,对于特殊业务情况,除了要保证单个消费者消费之外,还希望有相同特点的消费都可以同一个消费者消费,这里就可以使用 Spring Cloud Stream 提供的消息分区功能。
消费者
Spring Cloud Stream 实现消息分区只需要在配置文件里进行相应的配置即可,消费者修改配置文件如下:
eureka:
client:
serviceUrl:
defaultZone: http://peer1:1010/eureka/,http://peer2:1011/eureka/,http://peer3:1012/eureka/
instance:
prefer-ip-address: true #使用ip注册到Eureka
instance-id: stream-consumer-server:1090 #指定客户端实例的ID
spring:
application:
name: stream-consumer-server
cloud:
stream:
bindings:
input: #消息输入配置,我们这里是消费者,如果是生成者就用output
content-type: application/json #内容类型
destination: myStream #消息目的地 , 会在RabbitMQ创建一个名字叫mySteram的交换机
group: stream #指定组,多个消费者在同一个组,那么一个消息就只会给到一个消费者
#通过该参数开启消费者分区功能
partitioned: true
#配置总共有多少个消费者
instance-count: 2
#当前消费者是第几个,从 0 开始,最大值为instance-count 减 1 。
instance-index: 0
rabbitmq: #RabbitMQ的链接配置
host: localhost
port: 5672
username: guest
password: guest
server:
port: 1090
启动两个消费者 ,注意修改端口,第二个消费者需要将 instance-index: 0 改为1 , 观察MQ
提供者
修改提供者springcloud-stream-provider-1100工程,修改配置文件,指定分区个数
eureka: client: serviceUrl: defaultZone: http://peer1:1010/eureka/,http://peer2:1011/eureka/,http://peer3:1012/eureka/ instance: prefer-ip-address: true #使用ip注册到Eureka instance-id: stream-consumer-server:1100 #指定客户端实例的ID spring: application: name: stream-provider-server cloud: stream: bindings: output: #消息输出配置 content-type: application/json destination: myStream #消息目的地 partitionCount: 2 #分区个数 #分区规则表达式配置,从header中获取分区索引 partitionKeyExpression: headers[‘partitionKey’] rabbitmq: host: localhost port: 5672 username: guest password: guest server: