SpringCloud学习笔记(五)-SpringCloudStream集成kafka(下)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: Spring Cloud Stream是构建消息驱动的微服务应用程序框架。提供统一的接收发送管道以连接到消息代理。通过@EnableBinding注解开启SpringCloudStream的支持。通过@StreamListener注解,使其接收流处理的时间。

五、发送消息到输出通道


/**
 * kafka消息发送器
 * @author dbq
 * @date 2019/9/26 17:50
 */
@Component
public class EsKafkaMessageSender {
    @Autowired
    private EsChannel channel;
    /**
     * 消息发送到默认通道:缺省通道对应缺省主题
     * @param message
     */
    public void sendToDefaultChannel(String message){
        channel.sendEsDefaultMessage().send(MessageBuilder.withPayload(message).build());
    }
    /**
     * 消息发送到告警通道:告警通道对应告警主题
     * @param message
     */
    public void sendToAlarmChannel(String message){
        channel.sendEsAlarmMessage().send(MessageBuilder.withPayload(message).build());
    }
}


注入先前定义的通道EsChannel,sendToDefaultChannel、sendToAlarmChannel分别为我们自定义的两个发送方法,可将消息发送到不同的通道中,每个通道对应一个kafka的主题。


六、从输入通道订阅消息


@EnableBinding(value = EsChannel.class)
public class EsStreamListener {
    /**
     * 从缺省通道接收消息
     * @param message
     */
    @StreamListener(EsChannel.ES_DEFAULT_INPUT)
    public void receive(Message<String> message){
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
        System.out.println(sdf.format(new Date())+"------start--------安全用电默认消息:" + message);
        try {
            Thread.sleep(1000*10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(sdf.format(new Date())+"------end--------安全用电默认消息");
    }
    /**
     * 从告警通道接收消息
     * @param message
     */
    @StreamListener(EsChannel.ES_ALARM_INPUT)
    public void receiveAlarm(Message<String> message){
        System.out.println("订阅告警消息:" + message);
    }
}


从不同的通道实现消息的订阅。


七、这样完整的消息系统就搭建好了,定义Controller发送消息测试


@ApiOperation(value = "test1", httpMethod = "POST")
    @PostMapping(value = "/test1", produces = "application/json;charset=UTF-8")
    public void test1(String message, HttpServletRequest request,
                             HttpServletResponse response) {
        sender.sendToDefaultChannel(message);
        sender.sendToDefaultChannel(message);
        sender.sendToDefaultChannel(message);
        sender.sendToDefaultChannel(message);
    }
    @ApiOperation(value = "test", httpMethod = "POST")
    @PostMapping(value = "/test2", produces = "application/json;charset=UTF-8")
    public void test2(String message, HttpServletRequest request,
                      HttpServletResponse response) {
        sender.sendToAlarmChannel(message);
    }


test1:发送消息的缺省消息通道


test2:发送消息到告警消息通道


八、并发性测试


如七中所示,一次发送4条消息到缺省消息通道中,并启动两个实例(即两个微服务组成一个小型集群),在并发性配置为1的情况下,即spring.cloud.stream.bindings.es_default_input.consumer.concurrency=1


实例1

2019-09-30 11:13:14------start--------默认消息...
2019-09-30 11:13:24------end--------默认消息


实例2

2019-09-30 11:13:14------start--------默认消息:...
2019-09-30 11:13:24------end--------默认消息
2019-09-30 11:13:24------start--------默认消息:...
2019-09-30 11:13:34------end--------默认消息
2019-09-30 11:13:34------start--------默认消息:...
2019-09-30 11:13:44------end--------默认消息


通过打印日志(日志做了简化处理)可以看出,两个实例之间是做到了并发消费,但是在1个实例内部,并没有并发消费。


如果将concurrency修改为2.


日志如下


实例1

2019-09-30 11:31:13------start--------:...
2019-09-30 11:31:13------start--------默认消息:...
2019-09-30 11:31:23------end--------默认消息
2019-09-30 11:31:23------end--------默认消息
2019-09-30 11:31:23------start--------默认消息:...
2019-09-30 11:31:33------end--------默认消息


实例2

2019-09-30 11:31:13------start--------默认消息:...
2019-09-30 11:31:23------end--------


从日志可以看出,实例1中实现了两个线程的并发消费。


相关文章
|
17天前
|
存储 数据可视化 Java
基于MicrometerTracing门面和Zipkin实现集成springcloud2023的服务追踪
Sleuth将会停止维护,Sleuth最新版本也只支持springboot2。作为替代可以使用MicrometerTracing在微服务中作为服务追踪的工具。
59 1
|
2月前
|
资源调度 Java 调度
Spring Cloud Alibaba 集成分布式定时任务调度功能
定时任务在企业应用中至关重要,常用于异步数据处理、自动化运维等场景。在单体应用中,利用Java的`java.util.Timer`或Spring的`@Scheduled`即可轻松实现。然而,进入微服务架构后,任务可能因多节点并发执行而重复。Spring Cloud Alibaba为此发布了Scheduling模块,提供轻量级、高可用的分布式定时任务解决方案,支持防重复执行、分片运行等功能,并可通过`spring-cloud-starter-alibaba-schedulerx`快速集成。用户可选择基于阿里云SchedulerX托管服务或采用本地开源方案(如ShedLock)
|
3月前
|
NoSQL Java Nacos
SpringCloud集成Seata并使用Nacos做注册中心与配置中心
SpringCloud集成Seata并使用Nacos做注册中心与配置中心
97 3
|
2月前
|
Java jenkins Shell
jenkins学习笔记之五:Maven、Ant、Gradl、Node构建工具集成
jenkins学习笔记之五:Maven、Ant、Gradl、Node构建工具集成
|
2月前
|
jenkins 持续交付
jenkins学习笔记之六:共享库方式集成构建工具
jenkins学习笔记之六:共享库方式集成构建工具
|
3月前
|
资源调度 Java 调度
Spring Cloud Alibaba 集成分布式定时任务调度功能
Spring Cloud Alibaba 发布了 Scheduling 任务调度模块 [#3732]提供了一套开源、轻量级、高可用的定时任务解决方案,帮助您快速开发微服务体系下的分布式定时任务。
14777 25
|
2月前
|
消息中间件 安全 Java
Spring Boot 基于 SCRAM 认证集成 Kafka 的详解
【8月更文挑战第4天】本文详解Spring Boot结合SCRAM认证集成Kafka的过程。SCRAM为Kafka提供安全身份验证。首先确认Kafka服务已启用SCRAM,并准备认证凭据。接着,在`pom.xml`添加`spring-kafka`依赖,并在`application.properties`中配置Kafka属性,包括SASL_SSL协议与SCRAM-SHA-256机制。创建生产者与消费者类以实现消息的发送与接收功能。最后,通过实际消息传递测试集成效果与认证机制的有效性。
112 4
|
2月前
|
jenkins 持续交付
jenkins学习笔记之九:jenkins认证集成github
jenkins学习笔记之九:jenkins认证集成github
|
2月前
|
安全 jenkins 持续交付
jenkins学习笔记之八:jenkins认证集成gitlab
jenkins学习笔记之八:jenkins认证集成gitlab
|
2月前
|
jenkins Devops 持续交付
jenkins学习笔记之七:jenkins集成LDAP用户认证
jenkins学习笔记之七:jenkins集成LDAP用户认证