java 多线程消费kfk队列消息案例

简介: 很久之前老师写的,记录一下,不然找不到了

package org.training.hadoop.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class KafkaConsumerExample
{

//config
public static Properties getConfig()
{
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "testGroup");
    props.put("enable.auto.commit", "true");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    return props;
}

public void consumeMessage()
{
    // launch 3 threads to consume
    int numConsumers = 3;
    final String topic = "test1";
    final ExecutorService executor = Executors.newFixedThreadPool(numConsumers); //创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待
    final List<KafkaConsumerRunner> consumers = new ArrayList<KafkaConsumerRunner>();
    for (int i = 0; i < numConsumers; i++) {
        KafkaConsumerRunner consumer = new KafkaConsumerRunner(topic);
        consumers.add(consumer);
        executor.submit(consumer);
    }

    //关闭线程并清理---------------------------
    //当jvm关闭的时候,会执行系统中已经设置的所有通过方法addShutdownHook添加的钩子,清理
    Runtime.getRuntime().addShutdownHook(new Thread()
    {
        @Override
        public void run()
        {
            for (KafkaConsumerRunner consumer : consumers) {
                consumer.shutdown();  //关闭线程
            }
            executor.shutdown();
            try {
                //当前线程阻塞,直到
                //等所有已提交的任务(包括正在跑的和队列中等待的)执行完或者等超时时间到
                //或者线程被中断,抛出InterruptedExcepti
                executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
}

// Thread to consume kafka data
public static class KafkaConsumerRunner
        implements Runnable
{
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final KafkaConsumer<String, String> consumer;
    private final String topic;

    public KafkaConsumerRunner(String topic)
    {
        Properties props = getConfig();
        consumer = new KafkaConsumer<String, String>(props);
        this.topic = topic;
    }

    public void handleRecord(ConsumerRecord record)
    {
        System.out.println("name: " + Thread.currentThread().getName() + " ; topic: " + record.topic() + " ; offset" + record.offset() + " ; key: " + record.key() + " ; value: " + record.value());
    }

    public void run()
    {
        try {
            // subscribe
            consumer.subscribe(Arrays.asList(topic));
            while (!closed.get()) {
                //read data
                ConsumerRecords<String, String> records = consumer.poll(10000); //poll方法消费数据,心跳机制通知broker是否正常
                // Handle new records
                for (ConsumerRecord<String, String> record : records) {
                    handleRecord(record);  //打印消费数据
                }
            }
        }
        catch (WakeupException e) {
            // Ignore exception if closing
            if (!closed.get()) {
                throw e;
            }
        }
        finally {
            consumer.close();
        }
    }

    // Shutdown hook which can be called from a separate thread
    public void shutdown()
    {
        closed.set(true);
        consumer.wakeup();
    }
}

public static void main(String[] args)
{
    KafkaConsumerExample example = new KafkaConsumerExample();
    example.consumeMessage();
}

}

相关文章
|
6天前
|
Java 程序员 开发者
Java社招面试题:一个线程运行时发生异常会怎样?
大家好,我是小米。今天分享一个经典的 Java 面试题:线程运行时发生异常,程序会怎样处理?此问题考察 Java 线程和异常处理机制的理解。线程发生异常,默认会导致线程终止,但可以通过 try-catch 捕获并处理,避免影响其他线程。未捕获的异常可通过 Thread.UncaughtExceptionHandler 处理。线程池中的异常会被自动处理,不影响任务执行。希望这篇文章能帮助你深入理解 Java 线程异常处理机制,为面试做好准备。如果你觉得有帮助,欢迎收藏、转发!
55 14
|
9天前
|
安全 Java 程序员
Java 面试必问!线程构造方法和静态块的执行线程到底是谁?
大家好,我是小米。今天聊聊Java多线程面试题:线程类的构造方法和静态块是由哪个线程调用的?构造方法由创建线程实例的主线程调用,静态块在类加载时由主线程调用。理解这些细节有助于掌握Java多线程机制。下期再见! 简介: 本文通过一个常见的Java多线程面试题,详细讲解了线程类的构造方法和静态块是由哪个线程调用的。构造方法由创建线程实例的主线程调用,静态块在类加载时由主线程调用。理解这些细节对掌握Java多线程编程至关重要。
39 13
|
10天前
|
安全 Java 开发者
【JAVA】封装多线程原理
Java 中的多线程封装旨在简化使用、提高安全性和增强可维护性。通过抽象和隐藏底层细节,提供简洁接口。常见封装方式包括基于 Runnable 和 Callable 接口的任务封装,以及线程池的封装。Runnable 适用于无返回值任务,Callable 支持有返回值任务。线程池(如 ExecutorService)则用于管理和复用线程,减少性能开销。示例代码展示了如何实现这些封装,使多线程编程更加高效和安全。
|
1月前
|
缓存 安全 算法
Java 多线程 面试题
Java 多线程 相关基础面试题
|
1月前
|
存储 监控 Java
JAVA线程池有哪些队列? 以及它们的适用场景案例
不同的线程池队列有着各自的特点和适用场景,在实际使用线程池时,需要根据具体的业务需求、系统资源状况以及对任务执行顺序、响应时间等方面的要求,合理选择相应的队列来构建线程池,以实现高效的任务处理。
123 12
|
10天前
|
Linux
Linux编程: 在业务线程中注册和处理Linux信号
本文详细介绍了如何在Linux中通过在业务线程中注册和处理信号。我们讨论了信号的基本概念,并通过完整的代码示例展示了在业务线程中注册和处理信号的方法。通过正确地使用信号处理机制,可以提高程序的健壮性和响应能力。希望本文能帮助您更好地理解和应用Linux信号处理,提高开发效率和代码质量。
38 17
|
19天前
|
Linux
Linux编程: 在业务线程中注册和处理Linux信号
通过本文,您可以了解如何在业务线程中注册和处理Linux信号。正确处理信号可以提高程序的健壮性和稳定性。希望这些内容能帮助您更好地理解和应用Linux信号处理机制。
50 26
|
2月前
|
安全 Java API
【JavaEE】多线程编程引入——认识Thread类
Thread类,Thread中的run方法,在编程中怎么调度多线程
|
2月前
|
存储 安全 Java
Java多线程编程秘籍:各种方案一网打尽,不要错过!
Java 中实现多线程的方式主要有四种:继承 Thread 类、实现 Runnable 接口、实现 Callable 接口和使用线程池。每种方式各有优缺点,适用于不同的场景。继承 Thread 类最简单,实现 Runnable 接口更灵活,Callable 接口支持返回结果,线程池则便于管理和复用线程。实际应用中可根据需求选择合适的方式。此外,还介绍了多线程相关的常见面试问题及答案,涵盖线程概念、线程安全、线程池等知识点。
235 2
|
2月前
|
安全 算法 Java
Java多线程编程中的陷阱与最佳实践####
本文探讨了Java多线程编程中常见的陷阱,并介绍了如何通过最佳实践来避免这些问题。我们将从基础概念入手,逐步深入到具体的代码示例,帮助开发者更好地理解和应用多线程技术。无论是初学者还是有经验的开发者,都能从中获得有价值的见解和建议。 ####