SpringBoot Kafka SSL接入点PLAIN机制收发消息

简介: SpringBoot Kafka SSL接入点PLAIN机制收发消息

applycation.yml

spring: 
  # https://developer.aliyun.com/article/784990
  kafka:
    bootstrap-servers: XXXX
    producer: # producer 生产者
      retries: 1 # 重试次数
      acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
      batch-size: 51200 # 批量大小
      buffer-memory: 33554432 # 生产端缓冲区大小
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      #value-serializer: com.itheima.demo.config.MySerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      compression-type: none 
      properties:
        send.buffer.bytes: 262144
        security.protocol: SASL_SSL
        #ssl.ca.location: D:\KafkaCertificate\ca-cert.pem
        sasl.mechanism: PLAIN
        sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="xxx" password="xxx";  # ;号不能少
        ssl.truststore.location: D:/KafkaCertificate/mix.4096.client.truststore.jks
        ssl.truststore.password: KafkaOnsClient
        ssl.endpoint.identification.algorithm:
    consumer: # consumer消费者
      group-id: vipsoft-group # 默认的消费组ID
      enable-auto-commit: true # 是否自动提交offset
      auto-commit-interval: 100  # 提交offset延时(接收到消息后多久提交offset)
      # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #      value-deserializer: com.itheima.demo.config.MyDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        send.buffer.bytes: 262144
        security.protocol: SASL_SSL
        #ssl.ca.location: D:\KafkaCertificate\ca-cert.pem
        sasl.mechanism: PLAIN
        sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="xxx" password="xxx";
        ssl.truststore.location: D:/KafkaCertificate/mix.4096.client.truststore.jks
        ssl.truststore.password: KafkaOnsClient
        ssl.endpoint.identification.algorithm:

KafkaController

package com.vipsoft.controller;
import com.vipsoft.kafka.Sender; 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/kafka")
public class KafkaController {
    @Autowired
    private Sender sender;
    @GetMapping("/send/{msg}") // 发送消息测试,注意此处为Post
    public String send(@PathVariable("msg") String msg) {
        sender.send(msg);
        return msg;
    }
}

Sender.java

package com.vipsoft.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class Sender {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    public void send(String msg) {
        this.kafkaTemplate.send("vipsoft-topic","my_msg", msg);
        System.out.println("send message:" + msg);
    }
}

Receiver.java

package com.vipsoft.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class Receiver {
    @KafkaListener(topics = { "vipsoft-topic" }) // 参数配置要监听的Topic
    public void receiveMessage(ConsumerRecord<String, String> record) {
        System.out.println("Receive Message");
        System.out.println("【*** Message: ***】key = " + record.key() + "、value = " + record.value());
    }
}

 

目录
相关文章
消息中间件 Java Kafka
350 0
|
4月前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
311 1
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
9月前
|
消息中间件 Java Kafka
Spring Boot整合kafka
本文简要记录了Spring Boot与Kafka的整合过程。首先通过Docker搭建Kafka环境,包括Zookeeper和Kafka服务的配置文件。接着引入Spring Kafka依赖,并在`application.properties`中配置生产者和消费者参数。随后创建Kafka配置类,定义Topic及重试机制。最后实现生产者发送消息和消费者监听消息的功能,支持手动ACK确认。此方案适用于快速构建基于Spring Boot的Kafka消息系统。
1574 7
|
10月前
|
消息中间件 Java Kafka
SpringBoot使用Kafka生产者、消费者
SpringBoot使用Kafka生产者、消费者
533 10
|
11月前
|
消息中间件 Java Kafka
【Azure Kafka】使用Spring Cloud Stream Binder Kafka 发送并接收 Event Hub 消息及解决并发报错
reactor.core.publisher.Sinks$EmissionException: Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially.
209 5
|
NoSQL Java Redis
Spring Boot 自动配置机制:从原理到自定义
Spring Boot 的自动配置机制通过 `spring.factories` 文件和 `@EnableAutoConfiguration` 注解,根据类路径中的依赖和条件注解自动配置所需的 Bean,大大简化了开发过程。本文深入探讨了自动配置的原理、条件化配置、自定义自动配置以及实际应用案例,帮助开发者更好地理解和利用这一强大特性。
2122 15
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
664 5
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
526 1
|
架构师 Java 开发者
得物面试:Springboot自动装配机制是什么?如何控制一个bean 是否加载,使用什么注解?
在40岁老架构师尼恩的读者交流群中,近期多位读者成功获得了知名互联网企业的面试机会,如得物、阿里、滴滴等。然而,面对“Spring Boot自动装配机制”等核心面试题,部分读者因准备不足而未能顺利通过。为此,尼恩团队将系统化梳理和总结这一主题,帮助大家全面提升技术水平,让面试官“爱到不能自已”。
得物面试:Springboot自动装配机制是什么?如何控制一个bean 是否加载,使用什么注解?
|
消息中间件 Java 大数据
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
293 2

热门文章

最新文章