kafka-Java-SpringBoot-consumer API开发

简介:

ConsumerAPI的开发逻辑和Product是一样的,只不过多了一项必填选项group_id.
属性:

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.List;

/**
 * @Author dw07-Riven770[wudonghua@gznb.com]
 * @Date 2017/12/1315:58
 */
@ConfigurationProperties(prefix = "Riven.kafka.consumer")
public class ConsumerConfiguration {
    //kafka服务器列表
    private String bootstrapServers;

    /**
     * 如果设置成true,偏移量由auto.commit.interval.ms控制自动提交的频率。
     * <p>
     * 如果设置成false,不需要定时的提交offset,可以自己控制offset,当消息认为已消费过了,这个时候再去提交它们的偏移量。
     * 这个很有用的,当消费的消息结合了一些处理逻辑,这个消息就不应该认为是已经消费的,直到它完成了整个处理。
     */
    private Boolean enableAutoCommit = false;

    /**
     * 提交延迟毫秒数
     */
    private int autoCommitIntervalMs = 100;

    /**
     * 执行超时时间
     */
    private int sessionTimeoutMs = 15000;

    /**
     * 每次最少拉取多少数据
     */
    private int fetchMinBytes = 1;

    /**
     * 在单次调用中的最大返回
     */
    private int maxPollRecords = 300;

    /**
     * 该Consumer属于的组
     */
    private String groupId ;

    /**
     * 在consumter端配置文件中(或者是ConsumerConfig类参数)有个"autooffset.reset"(在kafka 0.8版本中为auto.offset.reset),
     * 有2个合法的值"largest"/"smallest",默认为"largest",此配置参数表示当此groupId下的消费者,在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空),
     * consumer应该从哪个offset开始消费.largest表示接受接收最大的offset(即最新消息),smallest表示最小offset,即从topic的开始位置消费所有消息.
     */
    private String autoOffseReset = "latest";

    /**
     * 同一个组下 启动几个consumer来获取kafka的消息
     */
    private int consumerAmount = 3;

    /**
     * 设置启动的consumer多久超时
     */
    private int pollTimeout = 5000;

    private List<String> topics;

    private String keySerializer = StringDeserializer.class.getName();
    private String valueSerializer = StringDeserializer.class.getName();

}

配置类:

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import riven.kafka.api.configuration.ConsumerConfiguration;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author dw07-Riven770[wudonghua@gznb.com]
 * @Date 2017/12/1411:16
 * 配置Consumer选项
 * 初始化consumer_S
 */
@Configuration
@EnableKafka
@EnableConfigurationProperties(ConsumerConfiguration.class)
@ConditionalOnProperty(name = {"Riven.kafka.consumer.bootstrapServers", "Riven.kafka.consumer.groupId"}, matchIfMissing = false)
public class ConsumerInitialize {


    private Logger logger = LoggerFactory.getLogger(this.getClass());

    /**
     * 初始化参数
     *
     * @param config
     * @return
     */
    private Map<String, Object> assembleProducer(ConsumerConfiguration config) {
        Map<String, Object> propsMap = new HashMap<>();
        if (StringUtils.isBlank(config.getBootstrapServers()))
            throw new RuntimeException("缺失kafka集群列表,初始化失败");
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());

        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.getEnableAutoCommit());
        //提交延迟毫秒数
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, config.getAutoCommitIntervalMs());
        //执行超时时间
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, config.getSessionTimeoutMs());
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, config.getKeySerializer());
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, config.getValueSerializer());
        propsMap.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, config.getFetchMinBytes());
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, config.getMaxPollRecords());
        //组ID
        if (StringUtils.isBlank(config.getGroupId()))
            throw new RuntimeException("缺失Consumer组信息,初始化失败");
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, config.getGroupId());

        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, config.getAutoOffseReset());
        return propsMap;
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory
            (ConsumerConfiguration ver) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        try {
            factory.setConsumerFactory(consumerFactory(ver));
            factory.setConcurrency(ver.getConsumerAmount());//启动的consumer个数
            factory.getContainerProperties().setPollTimeout(ver.getPollTimeout());//consumer;连接超时时间ms
            logger.info("初始化Consumer_S完成,共启动 {} 个Consumer", ver.getConsumerAmount());
        } catch (Exception e) {
            logger.info("初始化Consumer_S失败!");
            e.printStackTrace();
        }
        return factory;
    }

    @org.jetbrains.annotations.NotNull
    private ConsumerFactory<String, String> consumerFactory(ConsumerConfiguration ver) {
        return new DefaultKafkaConsumerFactory<>(assembleProducer(ver));
    }

最后,在配置文件根目录下创建Spring监听器:
spring.factories文件
并添加需要Spring监听初始化的类路径(多个使用,逗号隔开):

org.springframework.boot.autoconfigure.EnableAutoConfiguration=riven.kafka.api.producer.ProducerInitialize,riven.kafka.api.consumer.ConsumerInitialize
目录
相关文章
|
2月前
|
前端开发 Java 关系型数据库
基于Java+Springboot+Vue开发的鲜花商城管理系统源码+运行
基于Java+Springboot+Vue开发的鲜花商城管理系统(前后端分离),这是一项为大学生课程设计作业而开发的项目。该系统旨在帮助大学生学习并掌握Java编程技能,同时锻炼他们的项目设计与开发能力。通过学习基于Java的鲜花商城管理系统项目,大学生可以在实践中学习和提升自己的能力,为以后的职业发展打下坚实基础。技术学习共同进步
238 7
|
2月前
|
自动驾驶 程序员 API
告别重复繁琐!Apipost参数描述库让API开发效率飙升!
在API开发中,重复录入参数占用了42%的时间,不仅效率低下还易出错。Apipost推出的参数描述库解决了这一痛点,通过智能记忆功能实现参数自动填充,如版本号、分页控制、用户信息等常用字段,大幅减少手动输入。支持Key-Value与Raw-Json格式导入,一键提取响应结果至文档,将创建20参数接口文档时间从18分钟缩短至2分钟。相比Postman需手动搜索变量,Apipost的参数复用响应速度仅0.3秒,且支持跨项目共享与实时纠错,真正实现“一次定义,终身受益”。
|
2月前
|
人工智能 安全 Java
智慧工地源码,Java语言开发,微服务架构,支持分布式和集群部署,多端覆盖
智慧工地是“互联网+建筑工地”的创新模式,基于物联网、移动互联网、BIM、大数据、人工智能等技术,实现对施工现场人员、设备、材料、安全等环节的智能化管理。其解决方案涵盖数据大屏、移动APP和PC管理端,采用高性能Java微服务架构,支持分布式与集群部署,结合Redis、消息队列等技术确保系统稳定高效。通过大数据驱动决策、物联网实时监测预警及AI智能视频监控,消除数据孤岛,提升项目可控性与安全性。智慧工地提供专家级远程管理服务,助力施工质量和安全管理升级,同时依托可扩展平台、多端应用和丰富设备接口,满足多样化需求,推动建筑行业数字化转型。
102 5
|
2月前
|
缓存 安全 API
API 接口开发与合理利用:构建高效、安全、可维护的数字桥梁
本文全面解析API接口的设计、优化与安全维护。API作为系统间交互的标准化契约,核心价值在于解耦系统、提升复用性和构建开放生态。设计时需遵循六大原则:明确输入输出、关注单一职责、实现自我表达、确保功能无重叠、保障幂等性及合理版本化。性能优化从批量处理、异步调用、并行执行等方面入手,同时结合缓存、池化技术和SQL优化提升效率。安全性涵盖加密传输、加签验签、Token认证、防重放攻击及限流熔断等十大要点。最后,通过文档自动生成、日志体系和版本管理确保接口可持续迭代。优秀的API应以契约优先、演进思维和防御心态为核心,成为系统的数字资产,支持内外部高效协作与生态建设。
|
1月前
|
Java 开发者
Java编程实用技巧:提升代码质量与开发效率
Java作为一门成熟且广泛应用的编程语言,掌握一些实用技巧可以显著提高开发效率和代码质量。以下是值得Java开发者掌握的实用技巧:
48 6
|
2月前
|
人工智能 Java 定位技术
Java 开发玩转 MCP:从 Claude 自动化到 Spring AI Alibaba 生态整合
本文详细讲解了Java开发者如何基于Spring AI Alibaba框架玩转MCP(Model Context Protocol),涵盖基础概念、快速体验、服务发布与调用等内容。重点包括将Spring应用发布为MCP Server(支持stdio与SSE模式)、开发MCP Client调用服务,以及在Spring AI Alibaba的OpenManus中使用MCP增强工具能力。通过实际示例,如天气查询与百度地图路线规划,展示了MCP在AI应用中的强大作用。最后总结了MCP对AI开发的意义及其在Spring AI中的实现价值。
964 9
|
2月前
|
前端开发 测试技术 API
2025年API开发必备:10款优秀Postman替代工具大盘点
API测试在现代开发中至关重要,Postman虽为首选,但市场上涌现出许多优秀替代工具。本文精选2025年10款好评如潮的API测试工具:Apifox、Insomnia、Hoppscotch、Paw、Talend API Tester、HTTPie、ARC、Swagger UI、SoapUI和Thunder Client。这些工具各具特色,满足不同需求,如团队协作、开源易用、自动化测试等。无论是简洁轻量还是功能全面,总有一款适合你的团队,助力效率提升。
|
1月前
|
人工智能 自然语言处理 JavaScript
关于API调用速率问题,能否增大一些?另外我想基于其开发实际场景应用,不知是否提供一些相关支持
这是一个关于开源多语言切换项目的简介:作者开发了一款自动为网页提供多语言切换的开源项目,已广泛应用于众多网站和项目。该项目现已对接通义千问(qwen3),但由于接口速度限制成为瓶颈,希望阿里云能提高请求速率。此外,作者询问是否能获得阿里支持,例如提升接口速率、用户推荐分成、以及文档展示支持等,以进一步推广多语言能力至更多应用场景。项目地址:https://github.com/xnx3/translate
71 0
|
2月前
|
人工智能 Java 物联网
没有好的学历,Java开发未来的路应该怎么走?
在数字化时代,Java开发者即使没有高学历,也能通过拥抱新兴技术(如大模型应用与鸿蒙系统开发)、积累实战经验、持续学习新技能等途径实现职业突破。从参与开源项目到关注行业动态,再到规划技术专家或管理路线,建立人脉网络并利用教育平台提升能力,开发者可拓宽技术边界,适应日新月异的技术需求,在未来发展中占据一席之地。