SpringBoot整合Kafka(SASL认证配置、处理毒丸消息)

简介: SpringBoot整合Kafka(SASL认证配置、处理毒丸消息)
+关注继续查看

引入依赖

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.2.6.RELEASE</version>
</dependency>

添加配置

server:
  port: 5896

spring:
  kafka:
    bootstrap-servers: 192.168.16.23:9092
      # SASL认证,如果kafka有增加SASL认证需要的相关配置
#    properties:
#      security:
#        protocol: SASL_PLAINTEXT
#      sasl:
#        mechanism: PLAIN
#        jaas:
#          config: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="xxx" password="xxx";'
        # 生产者配置
    producer:
      retries: 0 # 重试次数
      acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
      batch-size: 16384 # 一次最多发送数据量
      buffer-memory: 33554432 # 生产端缓冲区大小
      # 序列化类
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
        # 消费者配置
    consumer:
      group-id: test
      # 设置手动提交offset
      enable-auto-commit: false
      # smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
      auto-offset-reset: latest
#     异常处理,处理毒丸(Poison Pill)消息
      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      properties:
        spring.json.trusted.packages: '*'
#       序列化、反序列化必须一致,否则会出现无法序列化
        spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
        spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer

    # 监听
    listener:
      # record:当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
      # batch:当每一批poll()的数据被ListenerConsumer处理之后提交
      # time:当每一批poll()的数据被ListenerConsumer处理之后,距离上次提交时间大于TIME时提交
      # count:当每一批poll()的数据被ListenerConsumer处理之后,被处理record数量大于等于COUNT时提交
      # count_time:TIME或COUNT中有一个条件满足时提交
      # manual:当每一批poll()的数据被ListenerConsumer处理之后, 手动调用Acknowledgment.acknowledge()后提交
      # manual_immediate:手动调用Acknowledgment.acknowledge()后立即提交,一般推荐使用这种
      ack-mode: manual_immediate


简单消息发送

package com.example.demokafka.service;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/**
 * @Author: CQL
 * @Date: 2023/8/29 17:41
 */
@Service
public class MessageServiceKafka {

    @Resource
    private KafkaTemplate<String,String> kafkaTemplate;

    public void sendMessage(String msg) {
        System.out.println("待发送的信息已纳入处理队列(kafka),msg:"+msg);
        //使用send方法发送消息,需要传入topic名称
        kafkaTemplate.send("test", msg);
    }
}


简单消息接收

package com.example.demokafka.listener;

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;


/**
 * @Author: CQL
 * @Date: 2023/8/29 17:43
 */
@Slf4j
@Component
public class MessageListener {

    @KafkaListener(topics = "test", groupId = "test")
    public void kafkaListener(ConsumerRecord<?, ?> item, Acknowledgment ack) {
        System.out.printf("topic主题=%s, offset偏移量=%d,partition分区=%s, 内容=%s \n", item.topic(), item.offset(), item.partition(), item.value());
        log.info("topic主题={}, offset偏移量={},partition分区={}, 内容={}", item.topic(), item.offset(), item.partition(), item.value());
        // 设置手动提交需要确认
        ack.acknowledge();
    }

}
目录
相关文章
|
19小时前
|
消息中间件 Kafka 流计算
在Flink我从holo读取数据,数据往kafka 写,好像差12个小时, 有没有参数哪里可以配置 ?
在Flink我从holo读取数据,数据往kafka 写,好像差12个小时, 有没有参数哪里可以配置 ?
11 1
|
4月前
|
消息中间件 存储 安全
基于SASL和ACL的Kafka安全性解析
本文主要介绍基于SCRAM进行身份验证,使用Kafka ACL进行授权,SSL进行加密以及使用camel-Kafka连接Kafka群集以使用camel路由生产和消费消息的过程。
151 0
|
8月前
|
消息中间件 Cloud Native 物联网
阿里云消息队列 RocketMQ、Kafka 荣获金融级产品稳定性测评 “先进级” 认证
在混沌工程技术沙龙--金融行业精品专场的分布式系统稳定性评估体系获奖名单中,阿里云分布式消息队列服务成为通过首批消息队列服务稳定性认证,荣获最高级别 “先进级” 认证的消息队列服务。
415 0
阿里云消息队列 RocketMQ、Kafka 荣获金融级产品稳定性测评 “先进级” 认证
|
9月前
|
消息中间件 Ubuntu Java
Kafka安装并配置SASL_PLAINTEXT认证
Kafka安装并配置SASL_PLAINTEXT认证
674 0
|
11月前
|
消息中间件 存储 算法
【Kafka SASL/SCRAM动态认证集群部署
【Kafka SASL/SCRAM动态认证集群部署
611 0
|
11月前
|
消息中间件 安全 Kafka
Kafka SASL集群部署
Kafka SASL集群部署
190 0
|
SQL 消息中间件 Kafka
flink 读取kafka 写入带kerberos认证的hive环境
flink 读取kafka 写入带kerberos认证的hive环境
|
消息中间件 安全 Java
阿里云Kafka SASL认证Quick Start
借助消息队列Kafka版的ACL,用户可以按需为SASL用户赋予不同访问消息队列Kafka版资源的权限,实现权限分割。阿里云专业版Kafka实例目前已经可以支持SASL认证方式,本文主要演示SASL的开通及使用。
1390 0
阿里云Kafka SASL认证Quick Start
|
消息中间件 安全 Java
zookeeper和kafka的SASL认证以及生产实践
一、什么是zookeeper? ZooKeeper是一个集中的服务,用于维护配置信息、命名、提供分布式同步以及提供组服务。所有这些类型的服务都以某种形式被分布式应用程序使用。每次它们被实现时,都有大量的工作需要去修复不可避免的bug和竞争条件。
6954 1
|
消息中间件 Kafka 数据安全/隐私保护
推荐文章
更多