kafka-Java-SpringBoot-product API开发

简介: 前面讨论过如何安装kafka集群群及优化配置的问题,现在需要使用kafka集群,由于我们项目使用的是SpingBoot,故做一个inject到IOC容器的kafka-Java-SpringBoot-API,废话补多少,直接上代码:第一步,制定初始化类属性内容,最好赋初值,这样在使用的时候就不需要进行判空类:ProducerConfiguration import org.

前面讨论过如何安装kafka集群群及优化配置的问题,现在需要使用kafka集群,由于我们项目使用的是SpingBoot,故做一个inject到IOC容器的kafka-Java-SpringBoot-API,废话补多少,直接上代码:
第一步,制定初始化类属性内容,最好赋初值,这样在使用的时候就不需要进行判空
类:ProducerConfiguration

import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.context.properties.ConfigurationProperties;

/**
 * @Author dw07-Riven770[wudonghua@gznb.com]
 * @Date 2017/12/1315:58
 * 配置类
 */
@ConfigurationProperties(prefix = "Riven.kafka.producer")
public class ProducerConfiguration {

    //kafka服务器列表
    private String bootstrapServers;

    // 如果请求失败,生产者会自动重试,我们指定是0次,如果启用重试,则会有重复消息的可能性
    private int retries = 0;

    /**
     * Server完成 producer request 前需要确认的数量。 acks=0时,producer不会等待确认,直接添加到socket等待发送;
     * acks=1时,等待leader写到local log就行; acks=all或acks=-1时,等待isr中所有副本确认 (注意:确认都是 broker
     * 接收到消息放入内存就直接返回确认,不是需要等待数据写入磁盘后才返回确认,这也是kafka快的原因)
     */
    private String acks = "-1";

    /**
     * Producer可以将发往同一个Partition的数据做成一个Produce
     * Request发送请求,即Batch批处理,以减少请求次数,该值即为每次批处理的大小。
     * 另外每个Request请求包含多个Batch,每个Batch对应一个Partition,且一个Request发送的目的Broker均为这些partition的leader副本。
     * 若将该值设为0,则不会进行批处理
     */
    private int batchSize = 4096;

    /**
     * 默认缓冲可立即发送,即遍缓冲空间还没有满,但是,如果你想减少请求的数量,可以设置linger.ms大于0。
     * 这将指示生产者发送请求之前等待一段时间,希望更多的消息填补到未满的批中。这类似于TCP的算法,例如上面的代码段,
     * 可能100条消息在一个请求发送,因为我们设置了linger(逗留)时间为1毫秒,然后,如果我们没有填满缓冲区,
     * 这个设置将增加1毫秒的延迟请求以等待更多的消息。 需要注意的是,在高负载下,相近的时间一般也会组成批,即使是
     * linger.ms=0。在不处于高负载的情况下,如果设置比0大,以少量的延迟代价换取更少的,更有效的请求。
     */
    private int lingerMs = 1;

    /**
     * 控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。
     * 当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过max.block.ms设定, 之后它将抛出一个TimeoutException。
     */

    private int bufferMemory = 40960;

    /**
     * 序列化方式
     */
    private String keySerializer = StringSerializer.class.getName();
    private String valueSerializer = StringSerializer.class.getName();

  省略gettersetter
}

配置类:ProducerInitialize


import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import riven.kafka.api.configuration.ProducerConfiguration;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author dw07-Riven770[wudonghua@gznb.com]
 * @Date 2017/12/1314:21
 *
 */
@Configuration
@ConditionalOnClass({KafkaTemplate.class})
@ConditionalOnProperty(name = "Riven.kafka.producer.bootstrapServers", matchIfMissing = false)//某一个值存在着才初始化和个BEAN
@EnableConfigurationProperties(ProducerConfiguration.class)//检查ConfigurationProperties注解标记的配置类是否初始化
@EnableKafka
public class ProducerInitialize {
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    /**
     * 初始化producer参数
     *
     * @param config 参数
     * @return 初始化map
     */
    private Map<String, Object> assembleProducer(ProducerConfiguration config) {
        Map<String, Object> props = new HashMap<>();
        if (StringUtils.isNoneBlank(config.getBootstrapServers()))
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
        if (StringUtils.isNoneBlank(config.getAcks()))
            props.put(ProducerConfig.ACKS_CONFIG, config.getAcks());
        props.put(ProducerConfig.RETRIES_CONFIG, config.getRetries());
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, config.getBatchSize());
        props.put(ProducerConfig.LINGER_MS_CONFIG, config.getLingerMs());
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.getBufferMemory());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getKeySerializer());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getKeySerializer());
        return props;
    }

    private ProducerFactory<String, String> producerFactory(ProducerConfiguration config) {
        return new DefaultKafkaProducerFactory<>(assembleProducer(config));
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(ProducerConfiguration config) {
        KafkaTemplate<String, String> stringStringKafkaTemplate = new KafkaTemplate<>(producerFactory(config));
        stringStringKafkaTemplate.setProducerListener(new SimpleProducerListener());
        logger.info("kafka Producer 初始化完成");
        return stringStringKafkaTemplate;
    }

生产者发送记录:SimpleProducerListener

import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.util.ObjectUtils;

/**
 * @Author dw07-Riven770[wudonghua@gznb.com]
 * @Date 2017/12/1411:05
 * simple implements interface {@link ProducerListener} to logging producer send result info
 * 做Producer发送消息给kafka之前和之后的一些记录
 */
public class SimpleProducerListener implements ProducerListener<String,String> {

    private static final Logger logger = LoggerFactory.getLogger(SimpleProducerListener.class);

    private int maxContentLogged = 500;

    /**
     * Invoked after the successful send of a message (that is, after it has been acknowledged by the broker).
     * @param topic the destination topic
     * @param partition the destination partition (could be null)
     * @param key the key of the outbound message
     * @param value the payload of the outbound message
     * @param recordMetadata the result of the successful send operation
     */
    @Override
    public void onSuccess(String topic, Integer partition, String key, String value, RecordMetadata recordMetadata) {
        StringBuilder logOutput = new StringBuilder();
        logOutput.append("消息发送成功! \n");
        logOutput.append(" with key=【").append(toDisplayString(ObjectUtils.nullSafeToString(key), this.maxContentLogged)).append("】");
        logOutput.append(" and value=【").append(toDisplayString(ObjectUtils.nullSafeToString(value), this.maxContentLogged)).append("】");
        logOutput.append(" to topic 【").append(topic).append("】");
        String[] resultArr = recordMetadata.toString().split("@");
        logOutput.append(" send result: topicPartition【").append(resultArr[0]).append("】 offset 【").append(resultArr[1]).append("】");
        logger.info(logOutput.toString());
    }

    /**
     * Invoked after an attempt to send a message has failed.
     * @param topic the destination topic
     * @param partition the destination partition (could be null)
     * @param key the key of the outbound message
     * @param value the payload of the outbound message
     * @param exception the exception thrown
     */
    @Override
    public void onError(String topic, Integer partition, String key, String value, Exception exception) {
        StringBuilder logOutput = new StringBuilder();
        logOutput.append("消息发送失败!\n");
        logOutput.append("Exception thrown when sending a message");
        logOutput.append(" with key=【").append(toDisplayString(ObjectUtils.nullSafeToString(key), this.maxContentLogged)).append("】");
        logOutput.append(" and value=【").append(toDisplayString(ObjectUtils.nullSafeToString(value), this.maxContentLogged)).append("】");
        logOutput.append(" to topic 【").append(topic).append("】");
        if (partition != null) {
            logOutput.append(" and partition 【" + partition + "】");
        }
        logOutput.append(":");
        logger.error(logOutput.toString(), exception);
    }

    /**
     * Return true if this listener is interested in success as well as failure.
     * @return true to express interest in successful sends.
     */
    @Override
    public boolean isInterestedInSuccess() {
        return true;
    }

    private String toDisplayString(String original, int maxCharacters) {
        if (original.length() <= maxCharacters) {
            return original;
        }
        return original.substring(0, maxCharacters) + "...";
    }
}

最后,在配置文件根目录下创建Spring监听器:
spring.factories文件
并添加需要Spring监听初始化的类路径(多个使用,逗号隔开):

org.springframework.boot.autoconfigure.EnableAutoConfiguration=riven.kafka.api.producer.ProducerInitialize

整个系列需要使用的jar包

        <dependency>
            <groupId>org.jetbrains</groupId>
            <artifactId>annotations</artifactId>
            <version>15.0</version>
        </dependency>
        <dependency>
            <groupId>javassist</groupId>
            <artifactId>javassist</artifactId>
            <version>3.11.0.GA</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.6</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
目录
相关文章
|
22天前
|
Java API Maven
如何使用Java开发抖音API接口?
在数字化时代,社交媒体平台如抖音成为生活的重要部分。本文详细介绍了如何用Java开发抖音API接口,从创建开发者账号、申请API权限、准备开发环境,到编写代码、测试运行及注意事项,全面覆盖了整个开发流程。
64 10
|
23天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
60 2
|
2天前
|
算法 Java API
如何使用Java开发获得淘宝商品描述API接口?
本文详细介绍如何使用Java开发调用淘宝商品描述API接口,涵盖从注册淘宝开放平台账号、阅读平台规则、创建应用并申请接口权限,到安装开发工具、配置开发环境、获取访问令牌,以及具体的Java代码实现和注意事项。通过遵循这些步骤,开发者可以高效地获取商品详情、描述及图片等信息,为项目和业务增添价值。
31 10
|
23天前
|
API 开发工具 数据库
开发一份API接口,需要注意这些,看你做到了几项
本文介绍了设计API接口时需注意的关键点,包括数字签名、敏感数据加密与脱敏、限流、参数校验、统一返回与异常处理、请求日志记录、幂等设计、数据量限制、异步处理、参数定义、完整文档及开发者对接SDK等内容,旨在帮助开发者设计出安全、稳定、易维护的API接口。
87 6
开发一份API接口,需要注意这些,看你做到了几项
|
1天前
|
数据可视化 搜索推荐 API
速卖通获得aliexpress商品详情API接口的开发、应用与收益。
速卖通(AliExpress)作为阿里巴巴旗下的跨境电商平台,为全球消费者提供丰富商品。其开放平台提供的API接口支持开发者获取商品详情等信息,本文探讨了速卖通商品详情API的开发流程、应用场景及潜在收益,包括提高运营效率、降低成本、增加收入和提升竞争力等方面。
12 1
|
5天前
|
JavaScript 安全 Java
java版药品不良反应智能监测系统源码,采用SpringBoot、Vue、MySQL技术开发
基于B/S架构,采用Java、SpringBoot、Vue、MySQL等技术自主研发的ADR智能监测系统,适用于三甲医院,支持二次开发。该系统能自动监测全院患者药物不良反应,通过移动端和PC端实时反馈,提升用药安全。系统涵盖规则管理、监测报告、系统管理三大模块,确保精准、高效地处理ADR事件。
|
19天前
|
缓存 前端开发 API
深入浅出:后端开发中的RESTful API设计原则
【10月更文挑战第43天】在数字化浪潮中,后端开发如同搭建梦想的脚手架,而RESTful API则是连接梦想与现实的桥梁。本文将带你领略API设计的哲学之美,探索如何通过简洁明了的设计,提升开发效率与用户体验。从资源定位到接口约束,从状态转换到性能优化,我们将一步步构建高效、易用、可维护的后端服务。无论你是初涉后端的新手,还是寻求进阶的开发者,这篇文章都将为你的开发之路提供指引。让我们一起走进RESTful API的世界,解锁后端开发的新篇章。
|
23天前
|
JSON API 数据格式
如何使用Python开发1688商品详情API接口?
本文介绍了如何使用Python开发1688商品详情API接口,获取商品的标题、价格、销量和评价等详细信息。主要内容包括注册1688开放平台账号、安装必要Python模块、了解API接口、生成签名、编写Python代码、解析返回数据以及错误处理和日志记录。通过这些步骤,开发者可以轻松地集成1688商品数据到自己的应用中。
31 1
|
20天前
|
Web App开发 人工智能 自然语言处理
WebChat:开源的网页内容增强问答 AI 助手,基于 Chrome 扩展的最佳实践开发,支持自定义 API 和本地大模型
WebChat 是一个基于 Chrome 扩展开发的 AI 助手,能够帮助用户理解和分析当前网页的内容,支持自定义 API 和本地大模型。
53 0
|
21天前
|
数据采集 自然语言处理 搜索推荐
淘宝评价API接口的开发与应用
在数字化商业时代,数据成为企业提升竞争力的关键资源。淘宝作为电商巨头,其商品评论数据极具价值。本文详细介绍了淘宝评价API接口的开发流程与应用场景,从注册账号、获取密钥到实际调用和数据解析,再到商品分析、店铺管理、个性化推荐等多个方面,全面解析了技术细节与实践方法,为企业和开发者提供了宝贵的技术支持和数据资源。
84 0