多线程消费
针对于单线程消费实现起来自然是比较简单,但是效率也是要大打折扣的。
为此我做了一个测试,使用之前的单线程消费120009条数据的结果如下:
总共花了12450毫秒。
那么换成多线程消费怎么实现呢?
我们可以利用partition
的分区特性来提高消费能力,单线程的时候等于是一个线程要把所有分区里的数据都消费一遍,如果换成多线程就可以让一个线程只消费一个分区,这样效率自然就提高了,所以线程数coreSize<=partition
。
首先来看下入口:
public class ConsumerThreadMain { private static String brokerList = "localhost:9094"; private static String groupId = "group1"; private static String topic = "test"; /** * 线程数量 */ private static int threadNum = 3; public static void main(String[] args) { ConsumerGroup consumerGroup = new ConsumerGroup(threadNum, groupId, topic, brokerList); consumerGroup.execute(); } }
其中的ConsumerGroup
类:
public class ConsumerGroup { private static Logger LOGGER = LoggerFactory.getLogger(ConsumerGroup.class); /** * 线程池 */ private ExecutorService threadPool; private List<ConsumerCallable> consumers ; public ConsumerGroup(int threadNum, String groupId, String topic, String brokerList) { ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("consumer-pool-%d").build(); threadPool = new ThreadPoolExecutor(threadNum, threadNum, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); consumers = new ArrayList<ConsumerCallable>(threadNum); for (int i = 0; i < threadNum; i++) { ConsumerCallable consumerThread = new ConsumerCallable(brokerList, groupId, topic); consumers.add(consumerThread); } } /** * 执行任务 */ public void execute() { long startTime = System.currentTimeMillis() ; for (ConsumerCallable runnable : consumers) { Future<ConsumerFuture> future = threadPool.submit(runnable) ; } if (threadPool.isShutdown()){ long endTime = System.currentTimeMillis() ; LOGGER.info("main thread use {} Millis" ,endTime -startTime) ; } threadPool.shutdown(); } }
最后真正的执行逻辑ConsumerCallable
:
public class ConsumerCallable implements Callable<ConsumerFuture> { private static Logger LOGGER = LoggerFactory.getLogger(ConsumerCallable.class); private AtomicInteger totalCount = new AtomicInteger() ; private AtomicLong totalTime = new AtomicLong() ; private AtomicInteger count = new AtomicInteger() ; /** * 每个线程维护KafkaConsumer实例 */ private final KafkaConsumer<String, String> consumer; public ConsumerCallable(String brokerList, String groupId, String topic) { Properties props = new Properties(); props.put("bootstrap.servers", brokerList); props.put("group.id", groupId); //自动提交位移 props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); this.consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); } /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ @Override public ConsumerFuture call() throws Exception { boolean flag = true; int failPollTimes = 0 ; long startTime = System.currentTimeMillis() ; while (flag) { // 使用200ms作为获取超时时间 ConsumerRecords<String, String> records = consumer.poll(200); if (records.count() <= 0){ failPollTimes ++ ; if (failPollTimes >= 20){ LOGGER.debug("达到{}次数,退出 ",failPollTimes); flag = false ; } continue ; } //获取到之后则清零 failPollTimes = 0 ; LOGGER.debug("本次获取:"+records.count()); count.addAndGet(records.count()) ; totalCount.addAndGet(count.get()) ; long endTime = System.currentTimeMillis() ; if (count.get() >= 10000 ){ LOGGER.info("this consumer {} record,use {} milliseconds",count,endTime-startTime); totalTime.addAndGet(endTime-startTime) ; startTime = System.currentTimeMillis() ; count = new AtomicInteger(); } LOGGER.debug("end totalCount={},min={}",totalCount,totalTime); /*for (ConsumerRecord<String, String> record : records) { // 简单地打印消息 LOGGER.debug(record.value() + " consumed " + record.partition() + " message with offset: " + record.offset()); }*/ } ConsumerFuture consumerFuture = new ConsumerFuture(totalCount.get(),totalTime.get()) ; return consumerFuture ; } }
理一下逻辑:
其实就是初始化出三个消费者实例,用于三个线程消费。其中加入了一些统计,最后也是消费120009条数据结果如下。
由于是并行运行,可见消费120009条数据可以提高2秒左右,当数据以更高的数量级提升后效果会更加明显。
但这也有一些弊端:
- 灵活度不高,当分区数量变更之后不能自适应调整。
- 消费逻辑和处理逻辑在同一个线程,如果处理逻辑较为复杂会影响效率,耦合也较高。当然这个处理逻辑可以再通过一个内部队列发出去由另外的程序来处理也是可以的。
总结
Kafka
的知识点还是较多,Kafka
的使用也远不这些。之后会继续分享一些关于Kafka
监控等相关内容。
个人博客:crossoverjie.top。
作者:crossoverJie
链接:https://juejin.cn/post/6844903508009811976
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。