Spring整合kafka

简介: Spring整合kafka

方式1

只用spring-kafka依赖就行

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.0.RELEASE</version>
</dependency>

注入KafkaTemplate模板

@Configuration
@EnableKafka
public class KafkaConfig {
 
    private final  static  String  CONSUMER_GROUP_ID="yd-group";
 
    public  final static   String TOPIC_NAME="yd-kf-topic";
 
    @Bean
   public ConcurrentKafkaListenerContainerFactory<String, String>  kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
 
    /**
     * 消费工厂
     * @return
     */
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>(8);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "PLAINTEXT://192.168.81.200:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }
 
    /**
     * 生产工厂
     * @return
     */
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> props = new HashMap<>(8);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "PLAINTEXT://192.168.81.200:9092");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }
 
    /**
     * kafka模板
     * @return
     */
    @Bean("kafkaTemplate")
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

消息发送

@Slf4j
@Service
public class KafkaProducer {
 
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
 
    public  String  sendSyncMessage(String key,String  msg){
        String s;
        try {
            ListenableFuture<SendResult<String, String>> tagA = kafkaTemplate.send(KafkaConfig.TOPIC_NAME, key, msg);
            s = tagA.get().toString();
            log.info("生产kafka消息 {}",s);
            return  s;
        } catch (InterruptedException|ExecutionException e) {
            e.printStackTrace();
            s=e.getMessage();
            log.error("sendSyncMessage-->发送消息异常{}",e.getMessage());
        }
        return s;
    }
}

监听消息消费

@Slf4j
@Component
public class CustomKafkaListener  /**implements MessageListener<String,String>*/ {
 
    @KafkaListener(topics = {KafkaConfig.TOPIC_NAME},id = KafkaConfig.TOPIC_NAME)
    public   void   onMessage1(String  msg){
        log.info("onMessage1消费kafka消息 {} ",msg);
    }
    
}

测试发送

@RestController
public class KafkaSendController {
    
    @Autowired
    private KafkaProducer kafkaProducer;
 
    @GetMapping("/kafka/sendMsg")
    public  String  sendMsg(String  key,String msg){
        return kafkaProducer.sendSyncMessage(key,msg);
    }
 
}

方式2:

spring-kafka和kafka-clients结合使用(推荐)

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.0.RELEASE</version>
</dependency>
 
<dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.6.0</version>
</dependency>

消费者组件

import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.ClassUtils;
 
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
 
/**
 * Consumer config of SSL Kafka
 */
@Configuration
public class TestKafkaConsumerConfig {
 
    private static final Logger LOGGER = LoggerFactory.getLogger(TestKafkaConsumerConfig.class);
 
//读取kafka相关配置
    @Value("#{configproperties_leconf['outer.kafka.bootstrap.servers']}")
    private String bootStrapServers;
    @Value("#{configproperties_leconf['outer.kafka.bootstrap.bootStrapServersNoJks']}")
    private String bootStrapServersNoJks;
    @Value("#{configproperties_leconf['outer.kafka.enable.auto.commit']}")
    private String enableAutoCommit;
    @Value("#{configproperties_leconf['outer.kafka.auto.commit.interval']}")
    private String autoCommitInterval;
    @Value("#{configproperties_leconf['outer.kafka.session.timeout']}")
    private String sessionTimeout;
    @Value("#{configproperties_leconf['outer.kafka.auto.offset.reset']}")
    private String autoOffsetReset;
 
    @Value("#{configproperties_leconf['outer.kafka.sasl.mechanism']}")
    private String saslMechanism;
    @Value("#{configproperties_leconf['outer.kafka.sasl.jaas.config']}")
    private String saslJaasConfig;
    @Value("#{configproperties_leconf['outer.kafka.ssl.password']}")
    private String sslPassword;
    @Value("#{configproperties_leconf['outer.kafka.sasl.truststore']}")
    private String kafkaJks;
 
   
    @Value("#{configproperties_leconf['outer.kafka.kafkaUrlType']}")
    private Integer kafkaUrlType;
 
    @Value("#{configproperties_leconf['outer.kafka.consumer.service.product.consumer.group']}")
    private String groupId;
    @Value("#{configproperties_leconf['outer.kafka.consumer.service.product.topic']}")
    private String serviceProductTopic;
 
    //消费ssl kafka 消息给price kafka ,消费组sslkafkaToPriceKafkaGroup
    @Value("#{configproperties_leconf['sslkafkaToPriceKafka.group']}")
    private String sslkafkaToPriceKafkaGroup;
  
    //ssl kafka topic和price kafka topic映射关系
    @Value("#{configproperties_leconf['sslkafka2PriceKafkaTopics']}")
    private String sslkafka2PriceKafkaTopics;
 
 
//这里构造配置,根据自己的kafka是否使用ssl加密&是否使用jks,没有的去掉if-else即可
    public Map<String, Object> kafkaConfigs() {
  //使用不带有jks的kafka配置
        Map<String, Object> configMap = new HashMap<>();
        if(kafkaUrlType.equals(1)){
            configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServersNoJks);
            configMap.put("security.protocol", "SASL_PLAINTEXT");
        }else {
            //使用jks配置的kafka
            configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
            configMap.put("security.protocol", "SASL_SSL");
            configMap.put("ssl.endpoint.identification.algorithm", "");
      //jks文件是放在项目resource下
            configMap.put("ssl.truststore.location", ClassUtils.getDefaultClassLoader().getResource("").getPath() + kafkaJks);
            configMap.put("ssl.truststore.password", sslPassword);
        }
        configMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        configMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        configMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        configMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        configMap.put("sasl.mechanism", saslMechanism);
        configMap.put("sasl.jaas.config", saslJaasConfig);
        configMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        return configMap;
    }
 
 
    @Bean
    public KafkaConsumer<String, String> testKafkaConsumer() {
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConfigs());
        consumer.subscribe(Arrays.asList(serviceProductTopic));
        return consumer;
    }
}

生产者组件

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.util.ClassUtils;
 
import java.util.HashMap;
import java.util.Map;
 
/**
 * outer kafka config
 */
@Configuration
public class TestKafkaProducerConfig {
 
    @Value("#{configproperties_leconf['outer.kafka.bootstrap.servers']}")
    private String bootStrapServers;
    @Value("#{configproperties_leconf['outer.kafka.bootstrap.bootStrapServersNoJks']}")
    private String bootStrapServersNoJks;
    @Value("#{configproperties_leconf['outer.kafka.request.timeout']}")
    private String requestTimeout;
    @Value("${searchengine.MQ.kafka.key.serializer:org.apache.kafka.common.serialization.StringSerializer}")
    private String keySerializer;
    @Value("${searchengine.MQ.kafka.value.serializer:org.apache.kafka.common.serialization.StringSerializer}")
    private String valueSerializer;
 
 
    @Value("#{configproperties_leconf['outer.kafka.kafkaUrlType']}")
    private Integer kafkaUrlType;
    @Value("#{configproperties_leconf['outer.kafka.sasl.mechanism']}")
    private String saslMechanism;
    @Value("#{configproperties_leconf['outer.kafka.sasl.jaas.config']}")
    private String saslJaasConfig;
    @Value("#{configproperties_leconf['outer.kafka.ssl.password']}")
    private String sslPassword;
    @Value("#{configproperties_leconf['outer.kafka.sasl.truststore']}")
    private String kafkaJks;
 
 
    @Bean
    public ProducerFactory<String, String> sslProducerFactory() {
        Map<String, Object> properties = new HashMap<>();
        if(kafkaUrlType.equals(1)){
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServersNoJks);
            //使用不带有jks的kafka配置
            properties.put("security.protocol", "SASL_PLAINTEXT");
        }else {
            //使用jks配置的kafka
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
            properties.put("security.protocol", "SASL_SSL");
            properties.put("ssl.endpoint.identification.algorithm", "");
            properties.put("ssl.truststore.location", ClassUtils.getDefaultClassLoader().getResource("").getPath() + kafkaJks);
            properties.put("ssl.truststore.password", sslPassword);
        }
 
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
        properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeout);
        properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30000);
        properties.put(ProducerConfig.ACKS_CONFIG, "1");
        //生产者重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);
        //指定ProducerBatch(消息累加器中BufferPool中的)可复用大小
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        //生产者会在ProducerBatch被填满或者等待超过LINGER_MS_CONFIG时发送
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        //消息缓存
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        //数据压缩
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
        //设置每条消息的最大值10M
        properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10485760);
        properties.put("sasl.mechanism", saslMechanism);
        properties.put("sasl.jaas.config", saslJaasConfig);
        return new DefaultKafkaProducerFactory<>(properties);
    }
 
    @Bean
    public KafkaTemplate<String, String> testKafkaTemplate(ProducerFactory<String, String> sslProducerFactory) {
        return new KafkaTemplate<>(sslProducerFactory);
    }
}

生产消息和消费消息

@Service
public class KafkaServiceImpl implements KafkaService, ApplicationListener<ContextRefreshedEvent> {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaServiceImpl.class);
 
    @Autowired
    @Qualifier("sslKafkaTemplate")
    private KafkaTemplate<String, String> sslKafkaTemplate;
    @Autowired
    @Qualifier("testKafkaConsumer")
    private KafkaConsumer<String, String> testKafkaConsumer;
 
    /**
     * 执行 原生 kafka consumer api 监听topic,并存储数据到数据库和redis。
     * Handle an application event.
     */
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        new Thread(()->{
            //拉去消息
            while (true) {
                ConsumerRecords<String, String> records = null;
                try {
                    records = testKafkaConsumer.poll(5000);
                    testKafkaConsumer.commitSync();
                } catch (ConcurrentModificationException e) {
                    //e.printStackTrace();
                } catch (Exception e) {
                    e.printStackTrace();
                    LOGGER.error("消费ssl kafka 消息失败。");
                    LOGGER.error(e.getMessage());
                }
                if (Objects.isNull(records)) {
                    continue;
                }
 
                final ConsumerRecords<String, String> consumerRecords = records;
                //数据太多 这里用线程池处理
                threadPoolTaskExecutor.submit(() -> {
                    for (ConsumerRecord<String, String> record : consumerRecords) {
                        if (Objects.isNull(record)) {
                            continue;
                        }
                        try {
                            LOGGER.info("records.count={},record.length={} partition= {}, offset = {}", consumerRecords.count(), record.value().length(), record.partition(), record.offset());
 
                            //拿到消息 这里处理业务
                            System.out.println(record.value());
                            
                            //这里模拟业务场景1,直接将消息消息发到另一个topic
                            doSend(sslKafkaTemplate,"test_topic", null,record.value());
 
                            
                            //这里模拟业务场景2,用于设置自定义的消息头消息
                            List<Header> headers = new ArrayList<Header>();
                            if(record.headers() != null){
                                record.headers().forEach(header -> {
                                    if(header.key().equals("region")){
                                        headers.add(new RecordHeader("region",header.value()));
                                    }
                                });
                            }
                            doSend(sslKafkaTemplate, "test_topic2", record.value(),headers);
                            
                        } catch (Exception e) {
                            LOGGER.error("Kafka  error: ", e);
                        }
                    }
                });
            }
        },"ssl Kafka ").start();
    }
 
    private ListenableFuture<SendResult<String, String>> doSend(KafkaTemplate<String, String> kafkaTemplate, String topic, String key, String message) {
        try {
            ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, message);
            future.get(1000 * 15, TimeUnit.MILLISECONDS);
            //TODO 异常处理
            future.addCallback(result -> LOGGER.info("send success,topic:" + topic), throwable -> LOGGER.info("send error:\n" + throwable));
        } catch (Exception e) {
            LOGGER.error(ExceptionUtil.getExceptionStackTrace(e));
            LOGGER.error("+++++++++++++++++++++++: ", e);
            LOGGER.error("not_send_topic:={}", topic);
            LOGGER.error("not_send_message:={}", message);
        }
        return null;
    }
    private ListenableFuture<SendResult<String, String>> doSend(KafkaTemplate<String, String> kafkaTemplate, String topic, String message,List<Header> headers){
        try {
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,null, null,message,headers);
            ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(record);
            //future.addCallback(result -> LOGGER.info("send success,ssl topic:" + topic), throwable -> LOGGER.info("send ssl error:\n" + throwable));
        }catch (Exception e){
            LOGGER.error(ExceptionUtil.getExceptionStackTrace(e));
            LOGGER.error("ssl exception+++++++++++++++++++++++: ", e);
            LOGGER.error("not_send_ssl_topic:={}", topic);
            LOGGER.error("not_send_ssl_message:={}", message);
        }
        return null;
    }
}

注:

这里记录一下生产发生的问题

关于max.poll.interval.ms配置的问题,根据自己的业务配置poll拉去间隔等待时间

kafka一次性拉多条数据,然后循环用线程池处理的,线程池数量有限,我们配置的线程池拒绝策略是CallerRunsPolicy,当线程池满的时候就会用当前线程处理请求,导致下次poll方法无法立即执行,当超过最大时间max.poll.interval.ms时候,服务端会认为当前消费者已经无效,就会踢掉消费者,导致后续不再消费了

https://blog.csdn.net/qq_34491508/article/details/126029810

springboot整合参考地址

https://blog.csdn.net/qq_34491508/article/details/120831627


相关文章
|
18天前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
51 5
|
20天前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
32 1
|
4月前
|
消息中间件 开发框架 Java
掌握这一招,Spring Boot与Kafka完美融合,顺序消费不再是难题,让你轻松应对业务挑战!
【8月更文挑战第29天】Spring Boot与Kafka集成广泛用于处理分布式消息队列。本文探讨了在Spring Boot中实现Kafka顺序消费的方法,包括使用单个Partition或消息Key确保消息路由到同一Partition,并设置Consumer并发数为1以保证顺序消费。通过示例代码展示了如何配置Kafka Producer和Consumer,并自定义Partitioner。为确保数据正确性,还建议在业务逻辑中增加顺序校验机制。
164 3
|
4月前
|
消息中间件 Java Kafka
|
4月前
|
消息中间件 Java Kafka
|
4月前
|
消息中间件 安全 Java
Spring Boot 基于 SCRAM 认证集成 Kafka 的详解
【8月更文挑战第4天】本文详解Spring Boot结合SCRAM认证集成Kafka的过程。SCRAM为Kafka提供安全身份验证。首先确认Kafka服务已启用SCRAM,并准备认证凭据。接着,在`pom.xml`添加`spring-kafka`依赖,并在`application.properties`中配置Kafka属性,包括SASL_SSL协议与SCRAM-SHA-256机制。创建生产者与消费者类以实现消息的发送与接收功能。最后,通过实际消息传递测试集成效果与认证机制的有效性。
174 4
|
4月前
|
消息中间件 Kafka Java
Spring 框架与 Kafka 联姻,竟引发软件世界的革命风暴!事件驱动架构震撼登场!
【8月更文挑战第31天】《Spring 框架与 Kafka 集成:实现事件驱动架构》介绍如何利用 Spring 框架的强大功能与 Kafka 分布式流平台结合,构建灵活且可扩展的事件驱动系统。通过添加 Spring Kafka 依赖并配置 Kafka 连接信息,可以轻松实现消息的生产和消费。文中详细展示了如何设置 `KafkaTemplate`、`ProducerFactory` 和 `ConsumerFactory`,并通过示例代码说明了生产者发送消息及消费者接收消息的具体实现。这一组合为构建高效可靠的分布式应用程序提供了有力支持。
116 0
|
4月前
|
消息中间件 Java Kafka
SpringBoot Kafka SSL接入点PLAIN机制收发消息
SpringBoot Kafka SSL接入点PLAIN机制收发消息
41 0
|
2月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
102 1
|
2月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
53 1