SSM(十七) MQ应用(下)

简介: 写这篇文章的起因是由于之前的一篇关于Kafka异常消费,当时为了解决问题不得不使用临时的方案。 总结起来归根结底还是对Kafka不熟悉导致的,加上平时工作的需要,之后就花些时间看了Kafka相关的资料。

多线程消费


针对于单线程消费实现起来自然是比较简单,但是效率也是要大打折扣的。


为此我做了一个测试,使用之前的单线程消费120009条数据的结果如下:



总共花了12450毫秒。


那么换成多线程消费怎么实现呢?


我们可以利用partition的分区特性来提高消费能力,单线程的时候等于是一个线程要把所有分区里的数据都消费一遍,如果换成多线程就可以让一个线程只消费一个分区,这样效率自然就提高了,所以线程数coreSize<=partition


首先来看下入口:


public class ConsumerThreadMain {
    private static String brokerList = "localhost:9094";
    private static String groupId = "group1";
    private static String topic = "test";
    /**
     * 线程数量
     */
    private static int threadNum = 3;
    public static void main(String[] args) {
        ConsumerGroup consumerGroup = new ConsumerGroup(threadNum, groupId, topic, brokerList);
        consumerGroup.execute();
    }
}


其中的ConsumerGroup类:


public class ConsumerGroup {
    private static Logger LOGGER = LoggerFactory.getLogger(ConsumerGroup.class);
    /**
     * 线程池
     */
    private ExecutorService threadPool;
    private List<ConsumerCallable> consumers ;
    public ConsumerGroup(int threadNum, String groupId, String topic, String brokerList) {
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
                .setNameFormat("consumer-pool-%d").build();
        threadPool = new ThreadPoolExecutor(threadNum, threadNum,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
        consumers = new ArrayList<ConsumerCallable>(threadNum);
        for (int i = 0; i < threadNum; i++) {
            ConsumerCallable consumerThread = new ConsumerCallable(brokerList, groupId, topic);
            consumers.add(consumerThread);
        }
    }
    /**
     * 执行任务
     */
    public void execute() {
        long startTime = System.currentTimeMillis() ;
        for (ConsumerCallable runnable : consumers) {
            Future<ConsumerFuture> future = threadPool.submit(runnable) ;
        }
        if (threadPool.isShutdown()){
            long endTime = System.currentTimeMillis() ;
            LOGGER.info("main thread use {} Millis" ,endTime -startTime) ;
        }
        threadPool.shutdown();
    }
}


最后真正的执行逻辑ConsumerCallable:


public class ConsumerCallable implements Callable<ConsumerFuture> {
    private static Logger LOGGER = LoggerFactory.getLogger(ConsumerCallable.class);
    private AtomicInteger totalCount = new AtomicInteger() ;
    private AtomicLong totalTime = new AtomicLong() ;
    private AtomicInteger count = new AtomicInteger() ;
    /**
     * 每个线程维护KafkaConsumer实例
     */
    private final KafkaConsumer<String, String> consumer;
    public ConsumerCallable(String brokerList, String groupId, String topic) {
        Properties props = new Properties();
        props.put("bootstrap.servers", brokerList);
        props.put("group.id", groupId);
        //自动提交位移
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));
    }
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    @Override
    public ConsumerFuture call() throws Exception {
        boolean flag = true;
        int failPollTimes = 0 ;
        long startTime = System.currentTimeMillis() ;
        while (flag) {
            // 使用200ms作为获取超时时间
            ConsumerRecords<String, String> records = consumer.poll(200);
            if (records.count() <= 0){
                failPollTimes ++ ;
                if (failPollTimes >= 20){
                    LOGGER.debug("达到{}次数,退出 ",failPollTimes);
                    flag = false ;
                }
                continue ;
            }
            //获取到之后则清零
            failPollTimes = 0 ;
            LOGGER.debug("本次获取:"+records.count());
            count.addAndGet(records.count()) ;
            totalCount.addAndGet(count.get()) ;
            long endTime = System.currentTimeMillis() ;
            if (count.get() >= 10000 ){
                LOGGER.info("this consumer {} record,use {} milliseconds",count,endTime-startTime);
                totalTime.addAndGet(endTime-startTime) ;
                startTime = System.currentTimeMillis() ;
                count = new AtomicInteger();
            }
            LOGGER.debug("end totalCount={},min={}",totalCount,totalTime);
            /*for (ConsumerRecord<String, String> record : records) {
                // 简单地打印消息
                LOGGER.debug(record.value() + " consumed " + record.partition() +
                        " message with offset: " + record.offset());
            }*/
        }
        ConsumerFuture consumerFuture = new ConsumerFuture(totalCount.get(),totalTime.get()) ;
        return consumerFuture ;
    }
}


理一下逻辑:


其实就是初始化出三个消费者实例,用于三个线程消费。其中加入了一些统计,最后也是消费120009条数据结果如下。



由于是并行运行,可见消费120009条数据可以提高2秒左右,当数据以更高的数量级提升后效果会更加明显。


但这也有一些弊端:


  • 灵活度不高,当分区数量变更之后不能自适应调整。


  • 消费逻辑和处理逻辑在同一个线程,如果处理逻辑较为复杂会影响效率,耦合也较高。当然这个处理逻辑可以再通过一个内部队列发出去由另外的程序来处理也是可以的。


总结


Kafka的知识点还是较多,Kafka的使用也远不这些。之后会继续分享一些关于Kafka监控等相关内容。


项目地址:github.com/crossoverJi…

个人博客:crossoverjie.top


作者:crossoverJie

链接:https://juejin.cn/post/6844903508009811976

来源:稀土掘金

著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
传感器 监控 物联网
golang开源的可嵌入应用程序高性能的MQTT服务
golang开源的可嵌入应用程序高性能的MQTT服务
205 3
|
3月前
|
消息中间件 存储 监控
搭建消息时光机:深入探究RabbitMQ_recent_history_exchange在Spring Boot中的应用【RabbitMQ实战 二】
搭建消息时光机:深入探究RabbitMQ_recent_history_exchange在Spring Boot中的应用【RabbitMQ实战 二】
36 1
|
6月前
|
消息中间件 存储 网络协议
我们一起来学RabbitMQ 二:RabbiMQ 的 6 种模式的基本应用
我们一起来学RabbitMQ 二:RabbiMQ 的 6 种模式的基本应用
|
3月前
|
网络协议 Go 数据安全/隐私保护
golang开源的可嵌入应用程序高性能的MQTT服务
golang开源的可嵌入应用程序高性能的MQTT服务
262 2
|
4月前
|
消息中间件 监控 负载均衡
Kafka高级应用:如何配置处理MQ百万级消息队列?
在大数据时代,Apache Kafka作为一款高性能的分布式消息队列系统,广泛应用于处理大规模数据流。本文将深入探讨在Kafka环境中处理百万级消息队列的高级应用技巧。
184 0
|
5月前
|
消息中间件 缓存 NoSQL
[中间件] 秒杀系统秒杀率提高300%?教你如何利用redis和rabbitmq 优化应用!
[中间件] 秒杀系统秒杀率提高300%?教你如何利用redis和rabbitmq 优化应用!
173 0
|
5月前
|
消息中间件 Java
RabbitMQ【应用 01】SpringBoot集成RabbitMQ及设置RabbitMQ启动总开关
RabbitMQ【应用 01】SpringBoot集成RabbitMQ及设置RabbitMQ启动总开关
102 0
|
6月前
|
消息中间件 网络协议 物联网
Golang微服务框架Kratos应用MQTT消息队列
MQTT 协议 是由`IBM`的`Andy Stanford-Clark博士`和`Arcom`(已更名为Eurotech)的`Arlen Nipper博士`于 1999 年发明,用于石油和天然气行业。工程师需要一种协议来实现最小带宽和最小电池损耗,以通过卫星监控石油管道。最初,该协议被称为消息队列遥测传输,得名于首先支持其初始阶段的 IBM 产品 MQ 系列。2010 年,IBM 发布了 MQTT 3.1 作为任何人都可以实施的免费开放协议,然后于 2013 年将其提交给结构化信息标准促进组织 (OASIS) 规范机构进行维护。2019 年,OASIS 发布了升级的 MQTT 版本 5。
49 0
|
6月前
|
消息中间件 存储 中间件
Golang微服务框架Kratos应用RabbitMQ消息队列
RabbitMQ是一套开源(MPL)的消息队列服务软件,是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成。
90 1
|
6月前
|
消息中间件 存储 Go
Golang微服务框架Kratos应用RocketMQ消息队列
RocketMQ是由阿里捐赠给Apache的一款低延迟、高并发、高可用、高可靠的分布式消息中间件。经历了淘宝双十一的洗礼。RocketMQ既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。
108 0