Kafka 消息队列 Java版

简介: 消费者 apache kafka工具类,消费者Consumer类 public class Consumer { private ConsumerHandler handler; private ConsumerConfig config; private KafkaConsu...

消费者

apache kafka工具类,消费者Consumer类

public class Consumer {

private ConsumerHandler handler;

private ConsumerConfig config;

private KafkaConsumer<String, String> consumer;

private boolean startFlag = false;

/**
 * 创建消费者
 * 
 * @param handler
 *            消费者处理类
 * @param config
 *            消费者处理配置
 */
public Consumer(ConsumerHandler handler, ConsumerConfig config) {
    this.handler = handler;
    this.config = config;
    init();
}

/**
 * 初始化接收器
 */
private void init() {
    Properties props = new Properties();
    props.put("bootstrap.servers", config.getBootstrapServers());// 服务器ip:端口号,集群用逗号分隔
    props.put("group.id", config.getGroupID());
    /* 是否自动确认offset */
    props.put("enable.auto.commit", "true");
    /* 自动确认offset的时间间隔 */
    props.put("auto.commit.interval.ms", config.getAutoCommitInterVal());
    props.put("session.timeout.ms", config.getSessionTimeOut());
    /* key的序列化类 */
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    /* value的序列化类 */
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    consumer = new KafkaConsumer<>(props);
    if (config.isProcessBeforeData()) {
        /* 消费者订阅的topic, 可同时订阅多个 */
        consumer.subscribe(config.getTopicList(), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                for (TopicPartition partition : partitions) {
                    long offset = handler.getSeek(partition.topic(), partition.partition());
                    if (offset >= 0) {
                        if (consumer != null) {
                            consumer.seek(partition, offset + 1);
                        }
                    } else {
                        consumer.seekToBeginning(partitions);
                    }

                }
            }
        });
        start();
    } else {
        consumer.subscribe(config.getTopicList());
    }
}

public void start() {

    startFlag = true;

    while (startFlag) {
        /* 读取数据,读取超时时间为XXms */
        ConsumerRecords<String, String> records = consumer.poll(config.getPollTime());

        if (records.count() > 0) {
            long offset = 0;
            int partition = 0;
            for (ConsumerRecord<String, String> record : records) {
                if (record != null) {
                    offset = record.offset();
                    partition = record.partition();
                    try {
                        handler.processObject(record.topic(), record.partition(), record.offset(), record.value());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }

        try {
            Thread.currentThread();
            Thread.sleep(1000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    consumer.close();
}

public void stop() {
    startFlag = false;
}

}

消费者配置ConsumerConfig类

public class ConsumerConfig {

private String bootstrapServers;
private String groupID;
private int autoCommitInterVal =1000;
private int sessionTimeOut = 30000;
private List<String> topicList;
private boolean processBeforeData;
private long pollTime = 100;


public ConsumerConfig() {
    super();
}

/**
 * 创建消费者配置
 * @param bootstrapServers     服务器配合 格式为服务器ip:端口号,集群用逗号分隔 例如 192.168.1.1:9092,192.168.1.2:9092
 * @param groupID               groupID
 * @param autoCommitInterVal  自动提交时间单位毫秒, 默认1000
 * @param sessionTimeOut       超时时间单位毫秒 , 默认30000
 * @param topicList             topicList列表
 * @param processBeforeData   是否处理启动之前的数据,该开关需要配置consumerHandler的跨步存储使用
 * @param pollTime              每次获取数据等待时间单位毫秒,默认100毫秒
 */
public ConsumerConfig(String bootstrapServers, String groupID, int autoCommitInterVal, int sessionTimeOut
        ,List<String> topicList,boolean processBeforeData,long pollTime) {
    this.bootstrapServers = bootstrapServers;
    this.groupID = groupID;
    this.autoCommitInterVal = autoCommitInterVal;
    this.sessionTimeOut = sessionTimeOut;
    this.topicList = topicList;
    this.processBeforeData = processBeforeData;
    this.pollTime = pollTime;
}

public String getBootstrapServers() {
    return bootstrapServers;
}

public void setBootstrapServers(String bootstrapServers) {
    this.bootstrapServers = bootstrapServers;
}

public String getGroupID() {
    return groupID;
}

public void setGroupID(String groupID) {
    this.groupID = groupID;
}

public int getAutoCommitInterVal() {
    return autoCommitInterVal;
}

public void setAutoCommitInterVal(int autoCommitInterVal) {
    this.autoCommitInterVal = autoCommitInterVal;
}

public int getSessionTimeOut() {
    return sessionTimeOut;
}

public void setSessionTimeOut(int sessionTimeOut) {
    this.sessionTimeOut = sessionTimeOut;
}

public List<String> getTopicList() {
    return topicList;
}

public void setTopicList(List<String> topicList) {
    this.topicList = topicList;
}
public boolean isProcessBeforeData() {
    return processBeforeData;
}

public void setProcessBeforeData(boolean processBeforeData) {
    this.processBeforeData = processBeforeData;
}

public long getPollTime() {
    return pollTime;
}

public void setPollTime(long pollTime) {
    this.pollTime = pollTime;
}
}

消费者处理ConsumerHandler类

public interface ConsumerHandler {
/**
 * 处理收到的消息
 * @param topic             收到消息的topic名称
 * @param partition         收到消息的partition内容
 * @param offset            收到消息在队列中的编号
 * @param value             收到的消息
 */
void processObject(String topic,int partition,long offset,String value);

/**
 * 获取跨步
 * @param topic             接受消息的topic
 * @param partition         接受消息的partition
 * @return                  当前topic,partition下的seek
 */
long getSeek(String topic , int partition);
}

生产者

kafka生产者,工具Producer类

public class Producer {

private ProducerConfig  config ;
private org.apache.kafka.clients.producer.Producer<String,String> producer;

public Producer(ProducerConfig config){
    this.config = config;
    init();
}

private void init(){
    Properties props = new Properties();
    props.put("bootstrap.servers",config.getBootstrapServers());
    props.put("acks", "all");
    props.put("retries", config.getRetries());
    props.put("batch.size", config.getBatchSize());
    props.put("linger.ms", config.getLingerMs());
    props.put("buffer.memory", config.getBufferMemory());
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    producer = new KafkaProducer<>(props);
}
/**
 * 发送消息
 * @param topic             要发送的topic
 * @param msg
 */
public void sendMessage(String topic,String msg){
    try {
        producer.send(new ProducerRecord<String, String>(config.getTopic(), String.valueOf(new Date().getTime()), msg)).get();
    } catch (InterruptedException e) {
        // TODO 自动生成的 catch 块
        e.printStackTrace();
    } catch (ExecutionException e) {
        // TODO 自动生成的 catch 块
        e.printStackTrace();
    }
    producer.flush();
}

public void close(){
    producer.close();
}

}

kafka生产者配置ProducerConfig类

public class ProducerConfig {

private String bootstrapServers;
private String topic;
private int retries = 0;
private int batchSize = 16384;
private int lingerMs=1;
private int bufferMemory=33554432;

public ProducerConfig() {
    super();
}

/**
 * 创建生产者配置文件
 * @param bootstrapServers          服务器配合 格式为服务器ip:端口号,集群用逗号分隔 例如 192.168.1.1:9092,192.168.1.2:9092
 * @param retries
 * @param batchSize
 * @param lingerMs
 * @param bufferMemory
 */
public ProducerConfig(String bootstrapServers,int retries, int batchSize, int lingerMs, int bufferMemory) {
    this.bootstrapServers = bootstrapServers;
    this.retries = retries;
    this.batchSize = batchSize;
    this.lingerMs = lingerMs;
    this.bufferMemory = bufferMemory;
}

public String getBootstrapServers() {
    return bootstrapServers;
}

public void setBootstrapServers(String bootstrapServers) {
    this.bootstrapServers = bootstrapServers;
}

public String getTopic() {
    return topic;
}

public void setTopic(String topic) {
    this.topic = topic;
}

public int getRetries() {
    return retries;
}

public void setRetries(int retries) {
    this.retries = retries;
}

public int getBatchSize() {
    return batchSize;
}

public void setBatchSize(int batchSize) {
    this.batchSize = batchSize;
}

public int getLingerMs() {
    return lingerMs;
}

public void setLingerMs(int lingerMs) {
    this.lingerMs = lingerMs;
}

public int getBufferMemory() {
    return bufferMemory;
}

public void setBufferMemory(int bufferMemory) {
    this.bufferMemory = bufferMemory;
}
}

测试

消费者处理实现ConsumerHandlerImpl类

public class ConsumerHandlerImpl implements ConsumerHandler{
/**
 * 处理收到的消息
 * @param topic             收到消息的topic名称
 * @param partition         收到消息的partition内容
 * @param offset            收到消息在队列中的编号
 * @param value             收到的消息
 */
public void processObject(String topic,int partition,long offset,String value) {
    System.out.println(topic+"从kafka接收"+partition+"到"+offset+"的消息是:"+value);
}

/**
 * 获取跨步
 * @param topic             接受消息的topic
 * @param partition         接受消息的partition
 * @return                  当前topic,partition下的seek
 */
public long getSeek(String topic , int partition) {
    return 1;
    
}
}

main方法类

public class AppResourceTest{  
public static void main(String[] args){  
      BeanDefinitionRegistry reg=new DefaultListableBeanFactory();  
      PropertiesBeanDefinitionReader reader=new PropertiesBeanDefinitionReader(reg);  
      reader.loadBeanDefinitions(new ClassPathResource("resources/kafka-consumer.properties")); 
      reader.loadBeanDefinitions(new ClassPathResource("resources/kafka-producer.properties")); 
      BeanFactory factory=(BeanFactory)reg;  
      ConsumerConfig consumerConfig=(ConsumerConfig)factory.getBean("consumerConfig");  
      System.out.println(consumerConfig.getPollTime());  
      ProducerConfig producerConfig=(ProducerConfig)factory.getBean("producerConfig");  
      System.out.println(producerConfig.getBatchSize());  
      
      Producer producer = new Producer(producerConfig);
      producer.sendMessage(producerConfig.getTopic(),"s4335453453454");
      producer.close();
      System.out.println("consumer"); 
      Consumer consumer = new Consumer(new ConsumerHandlerImpl(),consumerConfig);
      try{
          Thread.currentThread();
          Thread.sleep(10000);
      }catch(Exception e){
          e.printStackTrace();
      }
      
}  
}  

运行结果

QQ_20190122175655

目录
相关文章
|
2天前
|
消息中间件 Java Kafka
初识Apache Kafka:搭建你的第一个消息队列系统
【10月更文挑战第24天】在数字化转型的浪潮中,数据成为了企业决策的关键因素之一。而高效的数据处理能力,则成为了企业在竞争中脱颖而出的重要武器。在这个背景下,消息队列作为连接不同系统和服务的桥梁,其重要性日益凸显。Apache Kafka 是一款开源的消息队列系统,以其高吞吐量、可扩展性和持久性等特点受到了广泛欢迎。作为一名技术爱好者,我对 Apache Kafka 产生了浓厚的兴趣,并决定亲手搭建一套属于自己的消息队列系统。
11 2
初识Apache Kafka:搭建你的第一个消息队列系统
|
23天前
|
消息中间件 Java Kafka
Flink-07 Flink Java 3分钟上手 滚动窗口 事件驱动 Kafka TumblingWindow GlobalWindow CountWindow
Flink-07 Flink Java 3分钟上手 滚动窗口 事件驱动 Kafka TumblingWindow GlobalWindow CountWindow
26 7
|
22天前
|
消息中间件 分布式计算 Java
大数据-73 Kafka 高级特性 稳定性-事务 相关配置 事务操作Java 幂等性 仅一次发送
大数据-73 Kafka 高级特性 稳定性-事务 相关配置 事务操作Java 幂等性 仅一次发送
23 2
|
22天前
|
消息中间件 存储 Java
大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器 Java代码实现
大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器 Java代码实现
30 3
|
23天前
|
消息中间件 NoSQL Kafka
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
31 4
|
22天前
|
消息中间件 Java 大数据
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
55 2
|
6天前
|
消息中间件 中间件 Kafka
解锁Kafka等消息队列中间件的测试之道
在这个数字化时代,分布式系统和消息队列中间件(如Kafka、RabbitMQ)已成为日常工作的核心组件。本次公开课由前字节跳动资深专家KK老师主讲,深入解析消息队列的基本原理、架构及测试要点,涵盖功能、性能、可靠性、安全性和兼容性测试,并探讨其主要应用场景,如应用解耦、异步处理和限流削峰。课程最后设有互动答疑环节,助你全面掌握消息队列的测试方法。
8 0
|
消息中间件 Java 测试技术
JAVA应用开发MQ实战最佳实践——Series2:消息队列RocketMQ性能测试案例
JAVA应用开发MQ实战最佳实践——Series2:消息队列RocketMQ性能测试案例
JAVA应用开发MQ实战最佳实践——Series2:消息队列RocketMQ性能测试案例
|
3天前
|
监控 安全 Java
在 Java 中使用线程池监控以及动态调整线程池时需要注意什么?
【10月更文挑战第22天】在进行线程池的监控和动态调整时,要综合考虑多方面的因素,谨慎操作,以确保线程池能够高效、稳定地运行,满足业务的需求。
71 38
|
1天前
|
安全 Java
java 中 i++ 到底是否线程安全?
本文通过实例探讨了 `i++` 在多线程环境下的线程安全性问题。首先,使用 100 个线程分别执行 10000 次 `i++` 操作,发现最终结果小于预期的 1000000,证明 `i++` 是线程不安全的。接着,介绍了两种解决方法:使用 `synchronized` 关键字加锁和使用 `AtomicInteger` 类。其中,`AtomicInteger` 通过 `CAS` 操作实现了高效的线程安全。最后,通过分析字节码和源码,解释了 `i++` 为何线程不安全以及 `AtomicInteger` 如何保证线程安全。
java 中 i++ 到底是否线程安全?

相关产品

  • 云消息队列 Kafka 版