五、发送消息到输出通道
/** * kafka消息发送器 * @author dbq * @date 2019/9/26 17:50 */ @Component public class EsKafkaMessageSender { @Autowired private EsChannel channel; /** * 消息发送到默认通道:缺省通道对应缺省主题 * @param message */ public void sendToDefaultChannel(String message){ channel.sendEsDefaultMessage().send(MessageBuilder.withPayload(message).build()); } /** * 消息发送到告警通道:告警通道对应告警主题 * @param message */ public void sendToAlarmChannel(String message){ channel.sendEsAlarmMessage().send(MessageBuilder.withPayload(message).build()); } }
注入先前定义的通道EsChannel,sendToDefaultChannel、sendToAlarmChannel分别为我们自定义的两个发送方法,可将消息发送到不同的通道中,每个通道对应一个kafka的主题。
六、从输入通道订阅消息
@EnableBinding(value = EsChannel.class) public class EsStreamListener { /** * 从缺省通道接收消息 * @param message */ @StreamListener(EsChannel.ES_DEFAULT_INPUT) public void receive(Message<String> message){ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss"); System.out.println(sdf.format(new Date())+"------start--------安全用电默认消息:" + message); try { Thread.sleep(1000*10); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(sdf.format(new Date())+"------end--------安全用电默认消息"); } /** * 从告警通道接收消息 * @param message */ @StreamListener(EsChannel.ES_ALARM_INPUT) public void receiveAlarm(Message<String> message){ System.out.println("订阅告警消息:" + message); } }
从不同的通道实现消息的订阅。
七、这样完整的消息系统就搭建好了,定义Controller发送消息测试
@ApiOperation(value = "test1", httpMethod = "POST") @PostMapping(value = "/test1", produces = "application/json;charset=UTF-8") public void test1(String message, HttpServletRequest request, HttpServletResponse response) { sender.sendToDefaultChannel(message); sender.sendToDefaultChannel(message); sender.sendToDefaultChannel(message); sender.sendToDefaultChannel(message); } @ApiOperation(value = "test", httpMethod = "POST") @PostMapping(value = "/test2", produces = "application/json;charset=UTF-8") public void test2(String message, HttpServletRequest request, HttpServletResponse response) { sender.sendToAlarmChannel(message); }
test1:发送消息的缺省消息通道
test2:发送消息到告警消息通道
八、并发性测试
如七中所示,一次发送4条消息到缺省消息通道中,并启动两个实例(即两个微服务组成一个小型集群),在并发性配置为1的情况下,即spring.cloud.stream.bindings.es_default_input.consumer.concurrency=1
实例1
2019-09-30 11:13:14------start--------默认消息... 2019-09-30 11:13:24------end--------默认消息
实例2
2019-09-30 11:13:14------start--------默认消息:... 2019-09-30 11:13:24------end--------默认消息 2019-09-30 11:13:24------start--------默认消息:... 2019-09-30 11:13:34------end--------默认消息 2019-09-30 11:13:34------start--------默认消息:... 2019-09-30 11:13:44------end--------默认消息
通过打印日志(日志做了简化处理)可以看出,两个实例之间是做到了并发消费,但是在1个实例内部,并没有并发消费。
如果将concurrency修改为2.
日志如下
实例1
2019-09-30 11:31:13------start--------:... 2019-09-30 11:31:13------start--------默认消息:... 2019-09-30 11:31:23------end--------默认消息 2019-09-30 11:31:23------end--------默认消息 2019-09-30 11:31:23------start--------默认消息:... 2019-09-30 11:31:33------end--------默认消息
实例2
2019-09-30 11:31:13------start--------默认消息:... 2019-09-30 11:31:23------end--------
从日志可以看出,实例1中实现了两个线程的并发消费。