基于commons-pool2实现KafkaProducer池来提升kafka发送消息性能

简介: 基于commons-pool2实现KafkaProducer池来提升kafka发送消息性能

业务场景

Spark用fileStream实时从NFS获取一批文件,将文件中JSON结构里面的大小图二进制数据上传云存储获取url然后再将url以string回写到json中发送kafka,最早使用多线程并行发送内个线程创建一个KafkaProducer速度慢的惊人无法满足现场,接着在线程中共用一个KafkaProducer再测试发现没有改观。

问题分析

分析kafka日志发现,每次发送数据大部分时间在0-1ms,出现时延的情况时发现都是连续出现的,由于发送端只有一个producer实例,这样当一个message发送阻塞了,将会瞬间导致TPS急剧下降,正常情况下一个kafka实例在1秒内能够处理上千个发送请求(由于我们的消息每个在323970B左右,千兆网用尽上行io差不多也只能发送11710241024/323970 = 378),但出现1秒的时延将会导致1秒只能处理1个发送请求,这样会阻塞后面数据的处理。

问题原因

由于producer是线程安全的,所以采用单实例,但一次发送阻塞(因为使用同步发送,每次发送都会等待结果,这个过程是同步的),将会影响到后续的数据处理,那就只能缓存producer实例了。

实现方案

对象池工厂实现的代码实现

public class KafkaProducerPooledObjectFactory implements PooledObjectFactory<KafkaProducer<String, String>>, Serializable {

    Properties props;
    public KafkaProducerPooledObjectFactory(Properties props) {
        this.props = props;
    }

    @Override
    public PooledObject<KafkaProducer<String, String>> makeObject() throws Exception {
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
        return new DefaultPooledObject<KafkaProducer<String, String>>(kafkaProducer);
    }

    @Override
    public void destroyObject(PooledObject<KafkaProducer<String, String>> p) throws Exception {
        KafkaProducer<String, String> o = p.getObject();
        o = null;
    }

    @Override
    public boolean validateObject(PooledObject<KafkaProducer<String, String>> p) {
        return false;
    }

    @Override
    public void activateObject(PooledObject<KafkaProducer<String, String>> p) throws Exception {
        //System.out.println("activateObject");
    }

    @Override
    public void passivateObject(PooledObject<KafkaProducer<String, String>> p) throws Exception {
        //System.out.println("passivateObject");
    }

}

对象池工对外的代码实现

public class KafkaProducerPool implements Serializable {

    private GenericObjectPool<KafkaProducer<String, String>> objectPool;

    public KafkaProducerPool(Properties props) {

        KafkaProducerPooledObjectFactory kafkaProducerPooledObjectFactory = new KafkaProducerPooledObjectFactory(props);

        GenericObjectPoolConfig config = new GenericObjectPoolConfig(); // 池子配置文件
        config.setMaxTotal(100);                                        // 整个池最大值
        config.setMaxIdle(10);                                          // 最大空闲
        config.setMinIdle(0);                                           // 最小空闲
        config.setMaxWaitMillis(5000);                                  // 最大等待时间,-1表示一直等
        config.setBlockWhenExhausted(true);                             // 当对象池没有空闲对象时,新的获取对象的请求是否阻塞。true阻塞。默认值是true
        config.setTestOnBorrow(false);                                  // 在从对象池获取对象时是否检测对象有效,true是;默认值是false
        config.setTestOnReturn(false);                                  // 在向对象池中归还对象时是否检测对象有效,true是,默认值是false
        config.setTestWhileIdle(false);                                 // 在检测空闲对象线程检测到对象不需要移除时,是否检测对象的有效性。true是,默认值是false
        config.setMinEvictableIdleTimeMillis(60000L);                   // 可发呆的时间,10mins
        config.setTestWhileIdle(true);                                  // 发呆过长移除的时候是否test一下先
        config.setTimeBetweenEvictionRunsMillis(3000);                  // 回收资源线程的执行周期 3s
        config.setNumTestsPerEvictionRun(10);

        objectPool = new GenericObjectPool<>(kafkaProducerPooledObjectFactory, config);
    }

    public static Properties getConfig(String hosts) {
        Properties props = new Properties();
        props.put("bootstrap.servers", hosts);
        // procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
        // acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
        // acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
        // acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
        // 可以设置的值为:all, -1, 0, 1
        props.put("acks", "1");
        props.put("retries", 0);
        props.put("batch.size", 10000);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return props;
    }

    public KafkaProducer<String, String> getProducer() {
        try {
            KafkaProducer<String, String> producer = objectPool.borrowObject();
            return producer;
        } catch (Exception e) {
            throw new RuntimeException("获取KafkaProducer连接异常", e);
        }
    }

    public void returnProducer(KafkaProducer<String, String> producer) {
        try {
            objectPool.returnObject(producer);// 将对象放回对象池
        } catch (Exception e) {
            throw new RuntimeException("释放KafkaProducer连接异常", e);
        }
    }


    public static void main(String[] args) {
        String topic = "TEST_1";
        String hosts = "hdh109:9092";
        Properties props = getConfig(hosts);

        KafkaProducerPool pool = new KafkaProducerPool(props);
        for (int i = 0; i < 10000; i++) {
            KafkaProducer<String, String> producer = pool.getProducer();
            pool.returnProducer(producer);
        }
    }
}

调用代码片段1

val pool = new KafkaProducerPool(KafkaProducerPool.getConfig(hosts))
val executors:ExecutorService = Executors.newFixedThreadPool(40)
while (i.hasNext) {
    val item = i.next().toString
    val exec = new Exec(ACCESS_KEY, SECRET_KEY, gateHost, gatePort, serialId, poolId, token, topic, hosts, item, pool)
    val future = executors.submit(exec)
    future.get()
}
executors.shutdown()

Exec中的代码片段2

public class Exec implements Runnable {

    String msg;
    String topic, hosts;
    KafkaProducerPool pool;

    public Exec(String topic, String hosts, String msg, KafkaProducerPool pool) {

        this.msg = msg;

        this.topic = topic;
        this.hosts = hosts;
        this.pool = pool;
    }

    public Exec(String topic, String hosts, String msg, KafkaProducerExample prod) {
        this.msg = msg;

        this.topic = topic;
        this.hosts = hosts;
        this.prod = prod;
    }

    @Override
    public void run() {

            //省略一些消息处理相关代码

            KafkaProducer<String, String> producer = pool.getProducer();
            ProducerRecord rec = new ProducerRecord<>(topic, UUID.randomUUID().toString(), jo.toJSONString());
            producer.send(rec);
            pool.returnProducer(producer);

        } catch (IOException exec) {
            exec.printStackTrace();
        }

    }
}
目录
相关文章
|
8月前
|
消息中间件 存储 网络协议
【Kafka】Kafka 性能高的原因分析
【4月更文挑战第5天】【Kafka】Kafka 性能高的原因分析
|
8月前
|
消息中间件 监控 Java
Kafka Producer异步发送消息技巧大揭秘
Kafka Producer异步发送消息技巧大揭秘
591 0
|
8月前
|
消息中间件 存储 负载均衡
Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
【2月更文挑战第21天】Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
513 4
|
2月前
|
消息中间件 监控 大数据
优化Apache Kafka性能:最佳实践与调优策略
【10月更文挑战第24天】作为一名已经对Apache Kafka有所了解并有实际使用经验的开发者,我深知在大数据处理和实时数据流传输中,Kafka的重要性不言而喻。然而,在面对日益增长的数据量和业务需求时,如何保证系统的高性能和稳定性成为了摆在我们面前的一个挑战。本文将从我的个人视角出发,分享一些关于如何通过合理的配置和调优来提高Kafka性能的经验和建议。
114 4
|
3月前
|
消息中间件 存储 监控
说说如何解决RocketMq消息积压?为什么Kafka性能比RocketMq高?它们区别是什么?
【10月更文挑战第8天】在分布式系统中,消息队列扮演着至关重要的角色,它不仅能够解耦系统组件,还能提供异步处理、流量削峰和消息持久化等功能。在众多的消息队列产品中,RocketMQ和Kafka无疑是其中的佼佼者。本文将围绕如何解决RocketMQ消息积压、为什么Kafka性能比RocketMQ高以及它们之间的区别进行深入探讨。
135 1
|
5月前
|
图形学 人工智能 C#
从零起步,到亲手实现:一步步教你用Unity引擎搭建出令人惊叹的3D游戏世界,绝不错过的初学者友好型超详细指南 ——兼探索游戏设计奥秘与实践编程技巧的完美结合之旅
【8月更文挑战第31天】本文介绍如何使用Unity引擎从零开始创建简单的3D游戏世界,涵盖游戏对象创建、物理模拟、用户输入处理及动画效果。Unity是一款强大的跨平台游戏开发工具,支持多种编程语言,具有直观编辑器和丰富文档。文章指导读者创建新项目、添加立方体对象、编写移动脚本,并引入基础动画,帮助初学者快速掌握Unity开发核心概念,迈出游戏制作的第一步。
334 1
|
5月前
|
消息中间件 传感器 缓存
为什么Kafka能秒杀众多消息队列?揭秘它背后的五大性能神器,让你秒懂Kafka的极速之道!
【8月更文挑战第24天】Apache Kafka作为分布式流处理平台的领先者,凭借其出色的性能和扩展能力广受好评。本文通过案例分析,深入探讨Kafka实现高性能的关键因素:分区与并行处理显著提升吞吐量;批量发送结合压缩算法减少网络I/O次数及数据量;顺序写盘与页缓存机制提高写入效率;Zero-Copy技术降低CPU消耗;集群扩展与负载均衡确保系统稳定性和可靠性。这些机制共同作用,使Kafka能够在处理大规模数据流时表现出色。
77 3
|
5月前
|
消息中间件 Kafka 测试技术
【Azure 事件中心】使用Kafka的性能测试工具(kafka-producer-perf-test)测试生产者发送消息到Azure Event Hub的性能
【Azure 事件中心】使用Kafka的性能测试工具(kafka-producer-perf-test)测试生产者发送消息到Azure Event Hub的性能
|
6月前
|
消息中间件 存储 缓存
面试题Kafka问题之Kafka的生产消费基本流程如何解决
面试题Kafka问题之Kafka的生产消费基本流程如何解决
56 1
|
5月前
|
消息中间件 存储 Kafka
现代消息队列与云存储问题之Kafka在海量队列场景下存在性能的问题如何解决
现代消息队列与云存储问题之Kafka在海量队列场景下存在性能的问题如何解决

热门文章

最新文章