Kafka 消息队列 Java版

简介: 消费者 apache kafka工具类,消费者Consumer类 public class Consumer { private ConsumerHandler handler; private ConsumerConfig config; private KafkaConsu...

消费者

apache kafka工具类,消费者Consumer类

public class Consumer {

private ConsumerHandler handler;

private ConsumerConfig config;

private KafkaConsumer<String, String> consumer;

private boolean startFlag = false;

/**
 * 创建消费者
 * 
 * @param handler
 *            消费者处理类
 * @param config
 *            消费者处理配置
 */
public Consumer(ConsumerHandler handler, ConsumerConfig config) {
    this.handler = handler;
    this.config = config;
    init();
}

/**
 * 初始化接收器
 */
private void init() {
    Properties props = new Properties();
    props.put("bootstrap.servers", config.getBootstrapServers());// 服务器ip:端口号,集群用逗号分隔
    props.put("group.id", config.getGroupID());
    /* 是否自动确认offset */
    props.put("enable.auto.commit", "true");
    /* 自动确认offset的时间间隔 */
    props.put("auto.commit.interval.ms", config.getAutoCommitInterVal());
    props.put("session.timeout.ms", config.getSessionTimeOut());
    /* key的序列化类 */
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    /* value的序列化类 */
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    consumer = new KafkaConsumer<>(props);
    if (config.isProcessBeforeData()) {
        /* 消费者订阅的topic, 可同时订阅多个 */
        consumer.subscribe(config.getTopicList(), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                for (TopicPartition partition : partitions) {
                    long offset = handler.getSeek(partition.topic(), partition.partition());
                    if (offset >= 0) {
                        if (consumer != null) {
                            consumer.seek(partition, offset + 1);
                        }
                    } else {
                        consumer.seekToBeginning(partitions);
                    }

                }
            }
        });
        start();
    } else {
        consumer.subscribe(config.getTopicList());
    }
}

public void start() {

    startFlag = true;

    while (startFlag) {
        /* 读取数据,读取超时时间为XXms */
        ConsumerRecords<String, String> records = consumer.poll(config.getPollTime());

        if (records.count() > 0) {
            long offset = 0;
            int partition = 0;
            for (ConsumerRecord<String, String> record : records) {
                if (record != null) {
                    offset = record.offset();
                    partition = record.partition();
                    try {
                        handler.processObject(record.topic(), record.partition(), record.offset(), record.value());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }

        try {
            Thread.currentThread();
            Thread.sleep(1000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    consumer.close();
}

public void stop() {
    startFlag = false;
}

}

消费者配置ConsumerConfig类

public class ConsumerConfig {

private String bootstrapServers;
private String groupID;
private int autoCommitInterVal =1000;
private int sessionTimeOut = 30000;
private List<String> topicList;
private boolean processBeforeData;
private long pollTime = 100;


public ConsumerConfig() {
    super();
}

/**
 * 创建消费者配置
 * @param bootstrapServers     服务器配合 格式为服务器ip:端口号,集群用逗号分隔 例如 192.168.1.1:9092,192.168.1.2:9092
 * @param groupID               groupID
 * @param autoCommitInterVal  自动提交时间单位毫秒, 默认1000
 * @param sessionTimeOut       超时时间单位毫秒 , 默认30000
 * @param topicList             topicList列表
 * @param processBeforeData   是否处理启动之前的数据,该开关需要配置consumerHandler的跨步存储使用
 * @param pollTime              每次获取数据等待时间单位毫秒,默认100毫秒
 */
public ConsumerConfig(String bootstrapServers, String groupID, int autoCommitInterVal, int sessionTimeOut
        ,List<String> topicList,boolean processBeforeData,long pollTime) {
    this.bootstrapServers = bootstrapServers;
    this.groupID = groupID;
    this.autoCommitInterVal = autoCommitInterVal;
    this.sessionTimeOut = sessionTimeOut;
    this.topicList = topicList;
    this.processBeforeData = processBeforeData;
    this.pollTime = pollTime;
}

public String getBootstrapServers() {
    return bootstrapServers;
}

public void setBootstrapServers(String bootstrapServers) {
    this.bootstrapServers = bootstrapServers;
}

public String getGroupID() {
    return groupID;
}

public void setGroupID(String groupID) {
    this.groupID = groupID;
}

public int getAutoCommitInterVal() {
    return autoCommitInterVal;
}

public void setAutoCommitInterVal(int autoCommitInterVal) {
    this.autoCommitInterVal = autoCommitInterVal;
}

public int getSessionTimeOut() {
    return sessionTimeOut;
}

public void setSessionTimeOut(int sessionTimeOut) {
    this.sessionTimeOut = sessionTimeOut;
}

public List<String> getTopicList() {
    return topicList;
}

public void setTopicList(List<String> topicList) {
    this.topicList = topicList;
}
public boolean isProcessBeforeData() {
    return processBeforeData;
}

public void setProcessBeforeData(boolean processBeforeData) {
    this.processBeforeData = processBeforeData;
}

public long getPollTime() {
    return pollTime;
}

public void setPollTime(long pollTime) {
    this.pollTime = pollTime;
}
}

消费者处理ConsumerHandler类

public interface ConsumerHandler {
/**
 * 处理收到的消息
 * @param topic             收到消息的topic名称
 * @param partition         收到消息的partition内容
 * @param offset            收到消息在队列中的编号
 * @param value             收到的消息
 */
void processObject(String topic,int partition,long offset,String value);

/**
 * 获取跨步
 * @param topic             接受消息的topic
 * @param partition         接受消息的partition
 * @return                  当前topic,partition下的seek
 */
long getSeek(String topic , int partition);
}

生产者

kafka生产者,工具Producer类

public class Producer {

private ProducerConfig  config ;
private org.apache.kafka.clients.producer.Producer<String,String> producer;

public Producer(ProducerConfig config){
    this.config = config;
    init();
}

private void init(){
    Properties props = new Properties();
    props.put("bootstrap.servers",config.getBootstrapServers());
    props.put("acks", "all");
    props.put("retries", config.getRetries());
    props.put("batch.size", config.getBatchSize());
    props.put("linger.ms", config.getLingerMs());
    props.put("buffer.memory", config.getBufferMemory());
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    producer = new KafkaProducer<>(props);
}
/**
 * 发送消息
 * @param topic             要发送的topic
 * @param msg
 */
public void sendMessage(String topic,String msg){
    try {
        producer.send(new ProducerRecord<String, String>(config.getTopic(), String.valueOf(new Date().getTime()), msg)).get();
    } catch (InterruptedException e) {
        // TODO 自动生成的 catch 块
        e.printStackTrace();
    } catch (ExecutionException e) {
        // TODO 自动生成的 catch 块
        e.printStackTrace();
    }
    producer.flush();
}

public void close(){
    producer.close();
}

}

kafka生产者配置ProducerConfig类

public class ProducerConfig {

private String bootstrapServers;
private String topic;
private int retries = 0;
private int batchSize = 16384;
private int lingerMs=1;
private int bufferMemory=33554432;

public ProducerConfig() {
    super();
}

/**
 * 创建生产者配置文件
 * @param bootstrapServers          服务器配合 格式为服务器ip:端口号,集群用逗号分隔 例如 192.168.1.1:9092,192.168.1.2:9092
 * @param retries
 * @param batchSize
 * @param lingerMs
 * @param bufferMemory
 */
public ProducerConfig(String bootstrapServers,int retries, int batchSize, int lingerMs, int bufferMemory) {
    this.bootstrapServers = bootstrapServers;
    this.retries = retries;
    this.batchSize = batchSize;
    this.lingerMs = lingerMs;
    this.bufferMemory = bufferMemory;
}

public String getBootstrapServers() {
    return bootstrapServers;
}

public void setBootstrapServers(String bootstrapServers) {
    this.bootstrapServers = bootstrapServers;
}

public String getTopic() {
    return topic;
}

public void setTopic(String topic) {
    this.topic = topic;
}

public int getRetries() {
    return retries;
}

public void setRetries(int retries) {
    this.retries = retries;
}

public int getBatchSize() {
    return batchSize;
}

public void setBatchSize(int batchSize) {
    this.batchSize = batchSize;
}

public int getLingerMs() {
    return lingerMs;
}

public void setLingerMs(int lingerMs) {
    this.lingerMs = lingerMs;
}

public int getBufferMemory() {
    return bufferMemory;
}

public void setBufferMemory(int bufferMemory) {
    this.bufferMemory = bufferMemory;
}
}

测试

消费者处理实现ConsumerHandlerImpl类

public class ConsumerHandlerImpl implements ConsumerHandler{
/**
 * 处理收到的消息
 * @param topic             收到消息的topic名称
 * @param partition         收到消息的partition内容
 * @param offset            收到消息在队列中的编号
 * @param value             收到的消息
 */
public void processObject(String topic,int partition,long offset,String value) {
    System.out.println(topic+"从kafka接收"+partition+"到"+offset+"的消息是:"+value);
}

/**
 * 获取跨步
 * @param topic             接受消息的topic
 * @param partition         接受消息的partition
 * @return                  当前topic,partition下的seek
 */
public long getSeek(String topic , int partition) {
    return 1;
    
}
}

main方法类

public class AppResourceTest{  
public static void main(String[] args){  
      BeanDefinitionRegistry reg=new DefaultListableBeanFactory();  
      PropertiesBeanDefinitionReader reader=new PropertiesBeanDefinitionReader(reg);  
      reader.loadBeanDefinitions(new ClassPathResource("resources/kafka-consumer.properties")); 
      reader.loadBeanDefinitions(new ClassPathResource("resources/kafka-producer.properties")); 
      BeanFactory factory=(BeanFactory)reg;  
      ConsumerConfig consumerConfig=(ConsumerConfig)factory.getBean("consumerConfig");  
      System.out.println(consumerConfig.getPollTime());  
      ProducerConfig producerConfig=(ProducerConfig)factory.getBean("producerConfig");  
      System.out.println(producerConfig.getBatchSize());  
      
      Producer producer = new Producer(producerConfig);
      producer.sendMessage(producerConfig.getTopic(),"s4335453453454");
      producer.close();
      System.out.println("consumer"); 
      Consumer consumer = new Consumer(new ConsumerHandlerImpl(),consumerConfig);
      try{
          Thread.currentThread();
          Thread.sleep(10000);
      }catch(Exception e){
          e.printStackTrace();
      }
      
}  
}  

运行结果

QQ_20190122175655

目录
相关文章
|
2月前
|
消息中间件 人工智能 Kafka
AI 时代的数据通道:云消息队列 Kafka 的演进与实践
云消息队列 Kafka 版通过在架构创新、性能优化与生态融合等方面的突破性进展,为企业构建实时数据驱动的应用提供了坚实支撑,持续赋能客户业务创新。
387 31
|
4月前
|
消息中间件 Java Kafka
Java 事件驱动架构设计实战与 Kafka 生态系统组件实操全流程指南
本指南详解Java事件驱动架构与Kafka生态实操,涵盖环境搭建、事件模型定义、生产者与消费者实现、事件测试及高级特性,助你快速构建高可扩展分布式系统。
271 7
|
3月前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
253 1
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
12月前
|
消息中间件 存储 负载均衡
2024消息队列“四大天王”:Rabbit、Rocket、Kafka、Pulsar巅峰对决
本文对比了 RabbitMQ、RocketMQ、Kafka 和 Pulsar 四种消息队列系统,涵盖架构、性能、可用性和适用场景。RabbitMQ 以灵活路由和可靠性著称;RocketMQ 支持高可用和顺序消息;Kafka 专为高吞吐量和低延迟设计;Pulsar 提供多租户支持和高可扩展性。性能方面,吞吐量从高到低依次为
4411 1
|
消息中间件 Java Kafka
初识Apache Kafka:搭建你的第一个消息队列系统
【10月更文挑战第24天】在数字化转型的浪潮中,数据成为了企业决策的关键因素之一。而高效的数据处理能力,则成为了企业在竞争中脱颖而出的重要武器。在这个背景下,消息队列作为连接不同系统和服务的桥梁,其重要性日益凸显。Apache Kafka 是一款开源的消息队列系统,以其高吞吐量、可扩展性和持久性等特点受到了广泛欢迎。作为一名技术爱好者,我对 Apache Kafka 产生了浓厚的兴趣,并决定亲手搭建一套属于自己的消息队列系统。
328 2
初识Apache Kafka:搭建你的第一个消息队列系统
|
消息中间件 缓存 Java
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
零拷贝技术 Zero-Copy 是指计算机执行操作时,可以直接从源(如文件或网络套接字)将数据传输到目标缓冲区, 而不需要 CPU 先将数据从某处内存复制到另一个特定区域,从而减少上下文切换以及 CPU 的拷贝时间。
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
|
11月前
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
487 1
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
361 1

相关产品

  • 云消息队列 Kafka 版