kafka-Java-SpringBoot-consumer API开发

简介:

ConsumerAPI的开发逻辑和Product是一样的,只不过多了一项必填选项group_id.
属性:

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.List;

/**
 * @Author dw07-Riven770[wudonghua@gznb.com]
 * @Date 2017/12/1315:58
 */
@ConfigurationProperties(prefix = "Riven.kafka.consumer")
public class ConsumerConfiguration {
    //kafka服务器列表
    private String bootstrapServers;

    /**
     * 如果设置成true,偏移量由auto.commit.interval.ms控制自动提交的频率。
     * <p>
     * 如果设置成false,不需要定时的提交offset,可以自己控制offset,当消息认为已消费过了,这个时候再去提交它们的偏移量。
     * 这个很有用的,当消费的消息结合了一些处理逻辑,这个消息就不应该认为是已经消费的,直到它完成了整个处理。
     */
    private Boolean enableAutoCommit = false;

    /**
     * 提交延迟毫秒数
     */
    private int autoCommitIntervalMs = 100;

    /**
     * 执行超时时间
     */
    private int sessionTimeoutMs = 15000;

    /**
     * 每次最少拉取多少数据
     */
    private int fetchMinBytes = 1;

    /**
     * 在单次调用中的最大返回
     */
    private int maxPollRecords = 300;

    /**
     * 该Consumer属于的组
     */
    private String groupId ;

    /**
     * 在consumter端配置文件中(或者是ConsumerConfig类参数)有个"autooffset.reset"(在kafka 0.8版本中为auto.offset.reset),
     * 有2个合法的值"largest"/"smallest",默认为"largest",此配置参数表示当此groupId下的消费者,在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空),
     * consumer应该从哪个offset开始消费.largest表示接受接收最大的offset(即最新消息),smallest表示最小offset,即从topic的开始位置消费所有消息.
     */
    private String autoOffseReset = "latest";

    /**
     * 同一个组下 启动几个consumer来获取kafka的消息
     */
    private int consumerAmount = 3;

    /**
     * 设置启动的consumer多久超时
     */
    private int pollTimeout = 5000;

    private List<String> topics;

    private String keySerializer = StringDeserializer.class.getName();
    private String valueSerializer = StringDeserializer.class.getName();

}
AI 代码解读

配置类:

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import riven.kafka.api.configuration.ConsumerConfiguration;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author dw07-Riven770[wudonghua@gznb.com]
 * @Date 2017/12/1411:16
 * 配置Consumer选项
 * 初始化consumer_S
 */
@Configuration
@EnableKafka
@EnableConfigurationProperties(ConsumerConfiguration.class)
@ConditionalOnProperty(name = {"Riven.kafka.consumer.bootstrapServers", "Riven.kafka.consumer.groupId"}, matchIfMissing = false)
public class ConsumerInitialize {


    private Logger logger = LoggerFactory.getLogger(this.getClass());

    /**
     * 初始化参数
     *
     * @param config
     * @return
     */
    private Map<String, Object> assembleProducer(ConsumerConfiguration config) {
        Map<String, Object> propsMap = new HashMap<>();
        if (StringUtils.isBlank(config.getBootstrapServers()))
            throw new RuntimeException("缺失kafka集群列表,初始化失败");
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());

        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.getEnableAutoCommit());
        //提交延迟毫秒数
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, config.getAutoCommitIntervalMs());
        //执行超时时间
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, config.getSessionTimeoutMs());
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, config.getKeySerializer());
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, config.getValueSerializer());
        propsMap.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, config.getFetchMinBytes());
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, config.getMaxPollRecords());
        //组ID
        if (StringUtils.isBlank(config.getGroupId()))
            throw new RuntimeException("缺失Consumer组信息,初始化失败");
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, config.getGroupId());

        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, config.getAutoOffseReset());
        return propsMap;
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory
            (ConsumerConfiguration ver) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        try {
            factory.setConsumerFactory(consumerFactory(ver));
            factory.setConcurrency(ver.getConsumerAmount());//启动的consumer个数
            factory.getContainerProperties().setPollTimeout(ver.getPollTimeout());//consumer;连接超时时间ms
            logger.info("初始化Consumer_S完成,共启动 {} 个Consumer", ver.getConsumerAmount());
        } catch (Exception e) {
            logger.info("初始化Consumer_S失败!");
            e.printStackTrace();
        }
        return factory;
    }

    @org.jetbrains.annotations.NotNull
    private ConsumerFactory<String, String> consumerFactory(ConsumerConfiguration ver) {
        return new DefaultKafkaConsumerFactory<>(assembleProducer(ver));
    }

AI 代码解读

最后,在配置文件根目录下创建Spring监听器:
spring.factories文件
并添加需要Spring监听初始化的类路径(多个使用,逗号隔开):

org.springframework.boot.autoconfigure.EnableAutoConfiguration=riven.kafka.api.producer.ProducerInitialize,riven.kafka.api.consumer.ConsumerInitialize
AI 代码解读
ritit
+关注
目录
打赏
0
0
0
0
43
分享
相关文章
1688商品数据实战:API搜索接口开发与供应链分析应用
本文详细介绍了如何通过1688开放API实现商品数据的获取与应用,涵盖接入准备、签名流程、数据解析存储及商业化场景。开发者可完成智能选品、价格监控和供应商评级等功能,同时提供代码示例与问题解决方案,确保法律合规与数据安全。适合企业开发者快速构建供应链管理系统。
|
5天前
|
如何在苹果内购开发中获取App Store Connect API密钥-共享密钥理解内购安全-优雅草卓伊凡
如何在苹果内购开发中获取App Store Connect API密钥-共享密钥理解内购安全-优雅草卓伊凡
46 15
如何在苹果内购开发中获取App Store Connect API密钥-共享密钥理解内购安全-优雅草卓伊凡
JAVA接入DeepSeek大模型接口开发---阿里云的百炼模型
随着大模型的越来越盛行,现在很多企业开始接入大模型的接口,今天我从java开发角度来写一个demo的示例,用于接入DeepSeek大模型,国内的大模型有很多的接入渠道,今天主要介绍下阿里云的百炼模型,因为这个模型是免费的,只要注册一个账户,就会免费送百万的token进行学习,今天就从一个简单的可以执行的示例开始进行介绍,希望可以分享给各位正在学习的同学们。
109 3
JAVA接入DeepSeek大模型接口开发---阿里云的百炼模型
前后端分离开发:如何高效调试API?有工具 vs 无工具全解析
在前后端分离开发中,API调试至关重要。本文探讨有无调试工具时如何高效调试API,重点分析Postman、Swagger等工具优势及无工具代码调试方法。通过实际场景如用户登录接口,对比两者特性。同时介绍Apipost-Hepler(IDEA插件),将可视化与代码调试结合,提供全局请求头配置、历史记录保存等功能,优化团队协作与开发效率,助力API调试进入全新阶段。
鸿蒙相机开发实战:从设备适配到性能调优 —— 我的 ArkTS 录像功能落地手记(API 15)
本文分享鸿蒙相机开发经验,从环境准备到核心逻辑实现,涵盖权限声明、模块导入、Surface关联与分辨率匹配,再到录制控制及设备适配法则。通过实战案例解析,如旋转补偿、动态帧率调节和编解码优化,帮助开发者掌握功能实现、设备适配与体验设计三大要点,减少开发坑点。适合鸿蒙新手及希望深化硬件交互能力的工程师参考收藏。
30 2
|
15天前
|
《从头开始学java,一天一个知识点》之:字符串处理:String类的核心API
🌱 **《字符串处理:String类的核心API》一分钟速通!** 本文快速介绍Java中String类的3个高频API:`substring`、`indexOf`和`split`,并通过代码示例展示其用法。重点提示:`substring`的结束索引不包含该位置,`split`支持正则表达式。进一步探讨了String不可变性的高效设计原理及企业级编码规范,如避免使用`new String()`、拼接时使用`StringBuilder`等。最后通过互动解密游戏帮助读者巩固知识。 (上一篇:《多维数组与常见操作》 | 下一篇预告:《输入与输出:Scanner与System类》)
44 11
Java||Springboot读取本地目录的文件和文件结构,读取服务器文档目录数据供前端渲染的API实现
博客不应该只有代码和解决方案,重点应该在于给出解决方案的同时分享思维模式,只有思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
Java||Springboot读取本地目录的文件和文件结构,读取服务器文档目录数据供前端渲染的API实现
微店API开发全攻略:解锁电商数据与业务自动化的核心能力
微店开放平台提供覆盖商品、订单、用户、营销、物流五大核心模块的API接口,支持企业快速构建电商中台系统。其API体系具备模块化设计、双重认证机制、高并发支持和数据隔离等特性。文档详细解析了商品管理、订单处理、营销工具等核心接口功能,并提供实战代码示例。同时,介绍了企业级整合方案设计,如订单全链路自动化和商品数据中台架构,以及性能优化与稳定性保障措施。最后,针对高频问题提供了排查指南,帮助开发者高效利用API实现电商数智化转型。适合中高级开发者阅读。
前后端分离开发:如何高效调试API?有工具 vs 无工具全解析
在前后端分离的开发模式中,API 调试的效率直接影响项目的质量和交付速度。通过本文的对比分析,我们可以看到无工具调试模式虽具备灵活性和代码复用能力,但在操作便利性和团队协作上稍显不足。而传统的外部调试工具带来了可视化、高效协作与扩展性,却可能存在工具切换带来的开发链路断层问题。Apipost-Hepler 融合了两者的优势,让开发者无需离开熟悉的 IDEA 环境,就能享受可视化调试工具的强大功能。
44 5
课时146:使用JDT开发Java程序
在 Eclipse 之中提供有 JDT环境可以实现java 程序的开发,下面就通过一些功能进行演示。 项目开发流程

热门文章

最新文章