kafka-Java-SpringBoot-consumer API开发

简介:

ConsumerAPI的开发逻辑和Product是一样的,只不过多了一项必填选项group_id.
属性:

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.List;

/**
 * @Author dw07-Riven770[wudonghua@gznb.com]
 * @Date 2017/12/1315:58
 */
@ConfigurationProperties(prefix = "Riven.kafka.consumer")
public class ConsumerConfiguration {
    //kafka服务器列表
    private String bootstrapServers;

    /**
     * 如果设置成true,偏移量由auto.commit.interval.ms控制自动提交的频率。
     * <p>
     * 如果设置成false,不需要定时的提交offset,可以自己控制offset,当消息认为已消费过了,这个时候再去提交它们的偏移量。
     * 这个很有用的,当消费的消息结合了一些处理逻辑,这个消息就不应该认为是已经消费的,直到它完成了整个处理。
     */
    private Boolean enableAutoCommit = false;

    /**
     * 提交延迟毫秒数
     */
    private int autoCommitIntervalMs = 100;

    /**
     * 执行超时时间
     */
    private int sessionTimeoutMs = 15000;

    /**
     * 每次最少拉取多少数据
     */
    private int fetchMinBytes = 1;

    /**
     * 在单次调用中的最大返回
     */
    private int maxPollRecords = 300;

    /**
     * 该Consumer属于的组
     */
    private String groupId ;

    /**
     * 在consumter端配置文件中(或者是ConsumerConfig类参数)有个"autooffset.reset"(在kafka 0.8版本中为auto.offset.reset),
     * 有2个合法的值"largest"/"smallest",默认为"largest",此配置参数表示当此groupId下的消费者,在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空),
     * consumer应该从哪个offset开始消费.largest表示接受接收最大的offset(即最新消息),smallest表示最小offset,即从topic的开始位置消费所有消息.
     */
    private String autoOffseReset = "latest";

    /**
     * 同一个组下 启动几个consumer来获取kafka的消息
     */
    private int consumerAmount = 3;

    /**
     * 设置启动的consumer多久超时
     */
    private int pollTimeout = 5000;

    private List<String> topics;

    private String keySerializer = StringDeserializer.class.getName();
    private String valueSerializer = StringDeserializer.class.getName();

}

配置类:

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import riven.kafka.api.configuration.ConsumerConfiguration;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author dw07-Riven770[wudonghua@gznb.com]
 * @Date 2017/12/1411:16
 * 配置Consumer选项
 * 初始化consumer_S
 */
@Configuration
@EnableKafka
@EnableConfigurationProperties(ConsumerConfiguration.class)
@ConditionalOnProperty(name = {"Riven.kafka.consumer.bootstrapServers", "Riven.kafka.consumer.groupId"}, matchIfMissing = false)
public class ConsumerInitialize {


    private Logger logger = LoggerFactory.getLogger(this.getClass());

    /**
     * 初始化参数
     *
     * @param config
     * @return
     */
    private Map<String, Object> assembleProducer(ConsumerConfiguration config) {
        Map<String, Object> propsMap = new HashMap<>();
        if (StringUtils.isBlank(config.getBootstrapServers()))
            throw new RuntimeException("缺失kafka集群列表,初始化失败");
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());

        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.getEnableAutoCommit());
        //提交延迟毫秒数
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, config.getAutoCommitIntervalMs());
        //执行超时时间
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, config.getSessionTimeoutMs());
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, config.getKeySerializer());
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, config.getValueSerializer());
        propsMap.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, config.getFetchMinBytes());
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, config.getMaxPollRecords());
        //组ID
        if (StringUtils.isBlank(config.getGroupId()))
            throw new RuntimeException("缺失Consumer组信息,初始化失败");
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, config.getGroupId());

        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, config.getAutoOffseReset());
        return propsMap;
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory
            (ConsumerConfiguration ver) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        try {
            factory.setConsumerFactory(consumerFactory(ver));
            factory.setConcurrency(ver.getConsumerAmount());//启动的consumer个数
            factory.getContainerProperties().setPollTimeout(ver.getPollTimeout());//consumer;连接超时时间ms
            logger.info("初始化Consumer_S完成,共启动 {} 个Consumer", ver.getConsumerAmount());
        } catch (Exception e) {
            logger.info("初始化Consumer_S失败!");
            e.printStackTrace();
        }
        return factory;
    }

    @org.jetbrains.annotations.NotNull
    private ConsumerFactory<String, String> consumerFactory(ConsumerConfiguration ver) {
        return new DefaultKafkaConsumerFactory<>(assembleProducer(ver));
    }

最后,在配置文件根目录下创建Spring监听器:
spring.factories文件
并添加需要Spring监听初始化的类路径(多个使用,逗号隔开):

org.springframework.boot.autoconfigure.EnableAutoConfiguration=riven.kafka.api.producer.ProducerInitialize,riven.kafka.api.consumer.ConsumerInitialize
目录
相关文章
|
3天前
|
Java
Java开发实现图片URL地址检验,如何编码?
【10月更文挑战第14天】Java开发实现图片URL地址检验,如何编码?
17 4
|
3天前
|
Java
Java开发实现图片地址检验,如果无法找到资源则使用默认图片,如何编码?
【10月更文挑战第14天】Java开发实现图片地址检验,如果无法找到资源则使用默认图片,如何编码?
18 2
|
6天前
|
Java API 数据库
构建RESTful API已经成为现代Web开发的标准做法之一。Spring Boot框架因其简洁的配置、快速的启动特性及丰富的功能集而备受开发者青睐。
【10月更文挑战第11天】本文介绍如何使用Spring Boot构建在线图书管理系统的RESTful API。通过创建Spring Boot项目,定义`Book`实体类、`BookRepository`接口和`BookService`服务类,最后实现`BookController`控制器来处理HTTP请求,展示了从基础环境搭建到API测试的完整过程。
22 4
|
7天前
|
IDE Java API
基于Spring Boot REST API设计指南
【10月更文挑战第4天】 在现代的软件开发中,RESTful API已经成为了构建网络应用的标准之一。它通过HTTP协议提供了与资源交互的方式,使得不同的应用程序能够进行数据交互。Spring Boot作为一个功能强大的框架,它简化了配置和开发流程,成为了构建RESTful API的理想选择。本文将详细介绍如何在Spring Boot中设计和实现高质量的RESTful API,并提供一些最佳实践。
24 1
|
5天前
|
缓存 Java API
基于Spring Boot REST API设计指南
【10月更文挑战第11天】 在构建现代Web应用程序时,RESTful API已成为一种标准,使得不同的应用程序能够通过HTTP协议进行通信,实现资源的创建、读取、更新和删除等操作。Spring Boot作为一个功能强大的框架,能够轻松创建RESTful API。本文将详细介绍如何在Spring Boot中设计和实现高质量的RESTful API。
100 61
|
2天前
|
Java 关系型数据库 API
介绍一款Java开发的企业接口管理系统和开放平台
YesApi接口管理平台Java版,基于Spring Boot、Vue.js等技术,提供API接口的快速研发、管理、开放及收费等功能,支持多数据库、Docker部署,适用于企业级PaaS和SaaS平台的二次开发与搭建。
|
2天前
|
监控 Java 测试技术
Java开发现在比较缺少什么工具?
【10月更文挑战第15天】Java开发现在比较缺少什么工具?
13 1
|
4天前
|
Java
Java开发如何实现文件的移动,但是在移动结束后才进行读取?
【10月更文挑战第13天】Java开发如何实现文件的移动,但是在移动结束后才进行读取?
15 2
|
6天前
|
缓存 监控 前端开发
利用GraphQL提升API开发效率
【10月更文挑战第10天】本文介绍了GraphQL的核心概念、优势及其实现步骤,探讨了其在现代开发中的应用,包括动态数据需求、单页应用和微服务架构。通过缓存策略、批处理、安全性和监控等实战技巧,提升API开发效率和用户体验。
|
8天前
|
运维 Java Linux
【运维基础知识】掌握VI编辑器:提升你的Java开发效率
本文详细介绍了VI编辑器的常用命令,包括模式切换、文本编辑、搜索替换及退出操作,帮助Java开发者提高在Linux环境下的编码效率。掌握这些命令,将使你在开发过程中更加得心应手。
11 2