SpringCloud学习笔记(五)-SpringCloudStream集成kafka(下)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: Spring Cloud Stream是构建消息驱动的微服务应用程序框架。提供统一的接收发送管道以连接到消息代理。通过@EnableBinding注解开启SpringCloudStream的支持。通过@StreamListener注解,使其接收流处理的时间。

五、发送消息到输出通道


/**
 * kafka消息发送器
 * @author dbq
 * @date 2019/9/26 17:50
 */
@Component
public class EsKafkaMessageSender {
    @Autowired
    private EsChannel channel;
    /**
     * 消息发送到默认通道:缺省通道对应缺省主题
     * @param message
     */
    public void sendToDefaultChannel(String message){
        channel.sendEsDefaultMessage().send(MessageBuilder.withPayload(message).build());
    }
    /**
     * 消息发送到告警通道:告警通道对应告警主题
     * @param message
     */
    public void sendToAlarmChannel(String message){
        channel.sendEsAlarmMessage().send(MessageBuilder.withPayload(message).build());
    }
}


注入先前定义的通道EsChannel,sendToDefaultChannel、sendToAlarmChannel分别为我们自定义的两个发送方法,可将消息发送到不同的通道中,每个通道对应一个kafka的主题。


六、从输入通道订阅消息


@EnableBinding(value = EsChannel.class)
public class EsStreamListener {
    /**
     * 从缺省通道接收消息
     * @param message
     */
    @StreamListener(EsChannel.ES_DEFAULT_INPUT)
    public void receive(Message<String> message){
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
        System.out.println(sdf.format(new Date())+"------start--------安全用电默认消息:" + message);
        try {
            Thread.sleep(1000*10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(sdf.format(new Date())+"------end--------安全用电默认消息");
    }
    /**
     * 从告警通道接收消息
     * @param message
     */
    @StreamListener(EsChannel.ES_ALARM_INPUT)
    public void receiveAlarm(Message<String> message){
        System.out.println("订阅告警消息:" + message);
    }
}


从不同的通道实现消息的订阅。


七、这样完整的消息系统就搭建好了,定义Controller发送消息测试


@ApiOperation(value = "test1", httpMethod = "POST")
    @PostMapping(value = "/test1", produces = "application/json;charset=UTF-8")
    public void test1(String message, HttpServletRequest request,
                             HttpServletResponse response) {
        sender.sendToDefaultChannel(message);
        sender.sendToDefaultChannel(message);
        sender.sendToDefaultChannel(message);
        sender.sendToDefaultChannel(message);
    }
    @ApiOperation(value = "test", httpMethod = "POST")
    @PostMapping(value = "/test2", produces = "application/json;charset=UTF-8")
    public void test2(String message, HttpServletRequest request,
                      HttpServletResponse response) {
        sender.sendToAlarmChannel(message);
    }


test1:发送消息的缺省消息通道


test2:发送消息到告警消息通道


八、并发性测试


如七中所示,一次发送4条消息到缺省消息通道中,并启动两个实例(即两个微服务组成一个小型集群),在并发性配置为1的情况下,即spring.cloud.stream.bindings.es_default_input.consumer.concurrency=1


实例1

2019-09-30 11:13:14------start--------默认消息...
2019-09-30 11:13:24------end--------默认消息


实例2

2019-09-30 11:13:14------start--------默认消息:...
2019-09-30 11:13:24------end--------默认消息
2019-09-30 11:13:24------start--------默认消息:...
2019-09-30 11:13:34------end--------默认消息
2019-09-30 11:13:34------start--------默认消息:...
2019-09-30 11:13:44------end--------默认消息


通过打印日志(日志做了简化处理)可以看出,两个实例之间是做到了并发消费,但是在1个实例内部,并没有并发消费。


如果将concurrency修改为2.


日志如下


实例1

2019-09-30 11:31:13------start--------:...
2019-09-30 11:31:13------start--------默认消息:...
2019-09-30 11:31:23------end--------默认消息
2019-09-30 11:31:23------end--------默认消息
2019-09-30 11:31:23------start--------默认消息:...
2019-09-30 11:31:33------end--------默认消息


实例2

2019-09-30 11:31:13------start--------默认消息:...
2019-09-30 11:31:23------end--------


从日志可以看出,实例1中实现了两个线程的并发消费。


相关文章
|
2月前
|
存储 数据可视化 Java
基于MicrometerTracing门面和Zipkin实现集成springcloud2023的服务追踪
Sleuth将会停止维护,Sleuth最新版本也只支持springboot2。作为替代可以使用MicrometerTracing在微服务中作为服务追踪的工具。
129 1
|
12天前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
43 5
|
14天前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
22 1
|
4月前
|
jenkins 持续交付
jenkins学习笔记之六:共享库方式集成构建工具
jenkins学习笔记之六:共享库方式集成构建工具
|
4月前
|
Java jenkins Shell
jenkins学习笔记之五:Maven、Ant、Gradl、Node构建工具集成
jenkins学习笔记之五:Maven、Ant、Gradl、Node构建工具集成
|
4月前
|
jenkins 持续交付
jenkins学习笔记之九:jenkins认证集成github
jenkins学习笔记之九:jenkins认证集成github
|
4月前
|
安全 jenkins 持续交付
jenkins学习笔记之八:jenkins认证集成gitlab
jenkins学习笔记之八:jenkins认证集成gitlab
|
4月前
|
jenkins Devops 持续交付
jenkins学习笔记之七:jenkins集成LDAP用户认证
jenkins学习笔记之七:jenkins集成LDAP用户认证
|
2月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
|
2月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
52 1