SSM(十七) MQ应用(下)

简介: 写这篇文章的起因是由于之前的一篇关于Kafka异常消费,当时为了解决问题不得不使用临时的方案。 总结起来归根结底还是对Kafka不熟悉导致的,加上平时工作的需要,之后就花些时间看了Kafka相关的资料。

多线程消费


针对于单线程消费实现起来自然是比较简单,但是效率也是要大打折扣的。


为此我做了一个测试,使用之前的单线程消费120009条数据的结果如下:



总共花了12450毫秒。


那么换成多线程消费怎么实现呢?


我们可以利用partition的分区特性来提高消费能力,单线程的时候等于是一个线程要把所有分区里的数据都消费一遍,如果换成多线程就可以让一个线程只消费一个分区,这样效率自然就提高了,所以线程数coreSize<=partition


首先来看下入口:


public class ConsumerThreadMain {
    private static String brokerList = "localhost:9094";
    private static String groupId = "group1";
    private static String topic = "test";
    /**
     * 线程数量
     */
    private static int threadNum = 3;
    public static void main(String[] args) {
        ConsumerGroup consumerGroup = new ConsumerGroup(threadNum, groupId, topic, brokerList);
        consumerGroup.execute();
    }
}


其中的ConsumerGroup类:


public class ConsumerGroup {
    private static Logger LOGGER = LoggerFactory.getLogger(ConsumerGroup.class);
    /**
     * 线程池
     */
    private ExecutorService threadPool;
    private List<ConsumerCallable> consumers ;
    public ConsumerGroup(int threadNum, String groupId, String topic, String brokerList) {
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
                .setNameFormat("consumer-pool-%d").build();
        threadPool = new ThreadPoolExecutor(threadNum, threadNum,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
        consumers = new ArrayList<ConsumerCallable>(threadNum);
        for (int i = 0; i < threadNum; i++) {
            ConsumerCallable consumerThread = new ConsumerCallable(brokerList, groupId, topic);
            consumers.add(consumerThread);
        }
    }
    /**
     * 执行任务
     */
    public void execute() {
        long startTime = System.currentTimeMillis() ;
        for (ConsumerCallable runnable : consumers) {
            Future<ConsumerFuture> future = threadPool.submit(runnable) ;
        }
        if (threadPool.isShutdown()){
            long endTime = System.currentTimeMillis() ;
            LOGGER.info("main thread use {} Millis" ,endTime -startTime) ;
        }
        threadPool.shutdown();
    }
}


最后真正的执行逻辑ConsumerCallable:


public class ConsumerCallable implements Callable<ConsumerFuture> {
    private static Logger LOGGER = LoggerFactory.getLogger(ConsumerCallable.class);
    private AtomicInteger totalCount = new AtomicInteger() ;
    private AtomicLong totalTime = new AtomicLong() ;
    private AtomicInteger count = new AtomicInteger() ;
    /**
     * 每个线程维护KafkaConsumer实例
     */
    private final KafkaConsumer<String, String> consumer;
    public ConsumerCallable(String brokerList, String groupId, String topic) {
        Properties props = new Properties();
        props.put("bootstrap.servers", brokerList);
        props.put("group.id", groupId);
        //自动提交位移
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));
    }
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    @Override
    public ConsumerFuture call() throws Exception {
        boolean flag = true;
        int failPollTimes = 0 ;
        long startTime = System.currentTimeMillis() ;
        while (flag) {
            // 使用200ms作为获取超时时间
            ConsumerRecords<String, String> records = consumer.poll(200);
            if (records.count() <= 0){
                failPollTimes ++ ;
                if (failPollTimes >= 20){
                    LOGGER.debug("达到{}次数,退出 ",failPollTimes);
                    flag = false ;
                }
                continue ;
            }
            //获取到之后则清零
            failPollTimes = 0 ;
            LOGGER.debug("本次获取:"+records.count());
            count.addAndGet(records.count()) ;
            totalCount.addAndGet(count.get()) ;
            long endTime = System.currentTimeMillis() ;
            if (count.get() >= 10000 ){
                LOGGER.info("this consumer {} record,use {} milliseconds",count,endTime-startTime);
                totalTime.addAndGet(endTime-startTime) ;
                startTime = System.currentTimeMillis() ;
                count = new AtomicInteger();
            }
            LOGGER.debug("end totalCount={},min={}",totalCount,totalTime);
            /*for (ConsumerRecord<String, String> record : records) {
                // 简单地打印消息
                LOGGER.debug(record.value() + " consumed " + record.partition() +
                        " message with offset: " + record.offset());
            }*/
        }
        ConsumerFuture consumerFuture = new ConsumerFuture(totalCount.get(),totalTime.get()) ;
        return consumerFuture ;
    }
}


理一下逻辑:


其实就是初始化出三个消费者实例,用于三个线程消费。其中加入了一些统计,最后也是消费120009条数据结果如下。



由于是并行运行,可见消费120009条数据可以提高2秒左右,当数据以更高的数量级提升后效果会更加明显。


但这也有一些弊端:


  • 灵活度不高,当分区数量变更之后不能自适应调整。


  • 消费逻辑和处理逻辑在同一个线程,如果处理逻辑较为复杂会影响效率,耦合也较高。当然这个处理逻辑可以再通过一个内部队列发出去由另外的程序来处理也是可以的。


总结


Kafka的知识点还是较多,Kafka的使用也远不这些。之后会继续分享一些关于Kafka监控等相关内容。


项目地址:github.com/crossoverJi…

个人博客:crossoverjie.top


作者:crossoverJie

链接:https://juejin.cn/post/6844903508009811976

来源:稀土掘金

著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
5月前
|
消息中间件 Java RocketMQ
消息队列 MQ产品使用合集之当SpringBoot应用因网络不通而启动失败时,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
3月前
|
消息中间件 开发工具
【Azure Event Hub】原生应用中使用RabbitMQ,是否可以不改动代码的情况下直接转换为使用Event Hub呢?
【Azure Event Hub】原生应用中使用RabbitMQ,是否可以不改动代码的情况下直接转换为使用Event Hub呢?
|
5月前
|
数据采集 监控 物联网
MQTT协议在智能制造中的应用案例与效益分析
【6月更文挑战第8天】MQTT协议在智能制造中的应用案例与效益分析
153 1
|
5月前
|
消息中间件 Arthas 监控
消息队列 MQ产品使用合集之每次重置reconsumeTimes就无法达到死信阈值,重试次数是否就要应用方控制
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ产品使用合集之每次重置reconsumeTimes就无法达到死信阈值,重试次数是否就要应用方控制
|
5月前
|
JavaScript Java 测试技术
基于ssm+vue.js+uniapp小程序的代驾应用系统附带文章和源代码部署视频讲解等
基于ssm+vue.js+uniapp小程序的代驾应用系统附带文章和源代码部署视频讲解等
201 21
|
5月前
|
消息中间件 监控 数据安全/隐私保护
RabbitMQ 技术详解与应用指南
**RabbitMQ** 是一个开源消息代理,基于 AMQP 实现,用于应用程序间轻量、可靠的消息传递。本文档详细介绍了 RabbitMQ 的基础,包括**消息、队列、交换机、绑定、路由键和消费者**等概念,以及其**高可靠性、高性能、灵活性、可扩展性和易用性**等特性。RabbitMQ 使用生产者-消费者模型,消息通过交换机路由到队列,消费者接收并处理。文中还涵盖了安装配置的基本步骤和常见应用场景,如**异步处理、消息推送、系统解耦、流量削峰和日志收集**。
490 2
|
5月前
|
Java 测试技术 数据安全/隐私保护
基于ssm+vue.js+uniapp小程序的《数据库原理及应用》课程平台附带文章和源代码部署视频讲解等
基于ssm+vue.js+uniapp小程序的《数据库原理及应用》课程平台附带文章和源代码部署视频讲解等
38 0
基于ssm+vue.js+uniapp小程序的《数据库原理及应用》课程平台附带文章和源代码部署视频讲解等
|
5月前
|
消息中间件 Java Spring
Spring Boot与RabbitMQ的集成应用
Spring Boot与RabbitMQ的集成应用
|
5月前
|
消息中间件 缓存 数据库
rabbitmq系列(二)几种常见模式的应用场景及实现
rabbitmq系列(二)几种常见模式的应用场景及实现
|
5月前
|
消息中间件 Serverless Windows
消息队列 MQ产品使用合集之MQTT协议是否可以应用于社交软件的系统通知场景
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。