spring-cloud-stream整合kafka

简介: 我这边的spring-boot-starter-parent版本是2.1.5

1.在项目的pom中引入

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

2.配置消息通道

public interface Demo {
    /**
     * 发消息的通道名称
     */
    String DEMO_OUTPUT = "demo_output";
    /**
     * 消息的订阅通道名称
     */
    String DEMO_INPUT = "demo_input";
    /**
     * 发消息的通道
     *
     * @return
     */
    @Output(DEMO_OUTPUT)
    MessageChannel sendDemoMessage();
    /**
     * 收消息的通道
     *
     * @return
     */
    @Input(DEMO_INPUT)
    SubscribableChannel recieveDemoMessage();
}
  1. 使带注释组件的结合Input和Output根据作为值给注释传递接口的列表到代理
@EnableBinding(value = {Demo.class})

4.链接kafka配置

spring.cloud.stream.bindings.demo_input.destination=demo
spring.cloud.stream.bindings.demo_input.group=demo
spring.cloud.stream.bindings.demo_output.destination=demo
spring.cloud.stream.bindings.demo_output.group=demo
spring.cloud.stream.default-binder=kafka
spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.ByteArraySerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer

5.发送消息

@Resource(name = Demo.DEMO_OUTPUT)
    private MessageChannel sendDemoMessageChannel;
    @Test
    public void Demo() {
        boolean isSendSuccess = sendDemoMessageChannel.
                send(MessageBuilder.withPayload("OK").build());
            System.out.println(isSendSuccess);
    }

6.接收消息

 @StreamListener(Demo. DEMO_INPUT)
    public void insertQuotationK(Message<String> message) {
        if (StringUtils.isEmpty(message.getPayload())) {
            System.out.println("receiver data is empty !");
            System.out.println(400 + "failed");
        }
        System.out.println("kafka收到"+message.getPayload());
    }

7.结束咯,如果出现异常,请留言。

相关文章
|
11天前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
28 6
|
11天前
|
Java 关系型数据库 MySQL
如何将Spring Boot + MySQL应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + MySQL应用程序部署到Pivotal Cloud Foundry (PCF)
32 5
|
11天前
|
缓存 监控 Java
如何将Spring Boot应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot应用程序部署到Pivotal Cloud Foundry (PCF)
23 5
|
13天前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
44 5
|
15天前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
23 1
|
4月前
|
消息中间件 开发框架 Java
掌握这一招,Spring Boot与Kafka完美融合,顺序消费不再是难题,让你轻松应对业务挑战!
【8月更文挑战第29天】Spring Boot与Kafka集成广泛用于处理分布式消息队列。本文探讨了在Spring Boot中实现Kafka顺序消费的方法,包括使用单个Partition或消息Key确保消息路由到同一Partition,并设置Consumer并发数为1以保证顺序消费。通过示例代码展示了如何配置Kafka Producer和Consumer,并自定义Partitioner。为确保数据正确性,还建议在业务逻辑中增加顺序校验机制。
149 3
|
4月前
|
消息中间件 Java Kafka
|
4月前
|
消息中间件 Java Kafka
|
5月前
|
负载均衡 Java Spring
Spring cloud gateway 如何在路由时进行负载均衡
Spring cloud gateway 如何在路由时进行负载均衡
548 15
|
4月前
|
消息中间件 安全 Java
Spring Boot 基于 SCRAM 认证集成 Kafka 的详解
【8月更文挑战第4天】本文详解Spring Boot结合SCRAM认证集成Kafka的过程。SCRAM为Kafka提供安全身份验证。首先确认Kafka服务已启用SCRAM,并准备认证凭据。接着,在`pom.xml`添加`spring-kafka`依赖,并在`application.properties`中配置Kafka属性,包括SASL_SSL协议与SCRAM-SHA-256机制。创建生产者与消费者类以实现消息的发送与接收功能。最后,通过实际消息传递测试集成效果与认证机制的有效性。
167 4