Java实现Kafka生产者与消费者

本文涉及的产品
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: Java实现Kafka生产者与消费者

Kafka生产者

import org.apache.kafka.clients.producer.*;
import org.apache.log4j.Logger;

import java.util.Properties;

/**
 * @author liqifeng
 * 此类使用Holder单例模式实现了kafka生产者
 */
public class TestProducer {
    private static Producer<String, String> producer;
    private static Logger log =Logger.getLogger(TestProducer.class);
    private static class TestProducerHolder{
        private static TestProducer TestProducer = new TestProducer();
    }

    private TestProducer(){
        log.info("Init Class TestProducer...");
        Properties props = new Properties();
        props.setProperty("retries","0");
        props.setProperty("zookeeper.connect","master:2181,slaver:2181");//1:2181,slaver2:2181
        props.setProperty("bootstrap.servers","master:9092,slaver:9092");//1:9092,slaver2:9092
        props.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.setProperty("linger.ms","1");
        props.setProperty("acks","all");
        props.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.setProperty("buffer.memory","33554432");
        producer = new KafkaProducer<>(props);
        log.info("Init Class TestProducer success");
    }

    public static TestProducer getInstance(){
        return TestProducerHolder.TestProducer;
    }

    /**
     * 调用此方法发送消息,
     * @param msg 待发送的用户行为数据,格式为JSON格式,使用时需将JSON对象转化为String对象
     */
    public void send(String msg){
        ProducerRecord<String, String>record = new ProducerRecord<String, String>("xiapu_test2",msg);
        //发送消息,并且调用回调函数,并对返回的偏移量信息进行操作
        producer.send(record, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        /*
                         * 如果一分钟后未返回偏移量,则会报超时错误。错误如下
                         * org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
                         * 此错误多是由于生产者代码连接kafka失败所导致
                         * 第一,查看kafka是否启动,如果未启动,则启动kafka即可解决
                         * 第二,如果kafka正在运行,则检查上述配置项中zookeeper.connect和value.serializer是否配置正确
                         */
                        if(e != null){
                            e.printStackTrace();
                        } else{
                            log.info(String.format("record-info:%s-%d-%d, send successfully, value is %s",recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), record.value()));
                        }
                    }
                });
        //此步骤非常重要,如果省略,会造成消息阻塞,消费者无法接受到消息
        producer.flush();
    }
    
}

上述代码为生产者代码,建议每次调用send方法发送kafka消息的时候,新建一个回调函数。此回调函数接收消费者返回的消费信息。
消费信息被封装为RecordMetadata对象 ,此对象包含消息分区,偏移量,topic等信息,可输出到日志或写入数据库中作后续操作。
如果超过一分钟后消费信息未被接收,则会报超时错误。解决方法见下述代码注释:

 producer.send(record, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        /*
                         * 如果一分钟后未返回偏移量,则会报超时错误。错误如下
                         * org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
                         * 此错误多是由于生产者代码连接kafka失败所导致
                         * 第一,查看kafka是否启动,如果未启动,则启动kafka即可解决
                         * 第二,如果kafka正在运行,则检查上述配置项中zookeeper.connect和value.serializer是否配置正确
                         */
                        if(e != null){
                            e.printStackTrace();
                        } else{
                            log.info(String.format("record-info:%s-%d-%d, send successfully, value is %s",recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), record.value()));
                        }
                    }
                });

Kafka消费者

package xiapu.kafka;

import net.sf.json.JSONObject;
import org.apache.kafka.clients.consumer.*;
import org.apache.log4j.Logger;
import xiapu.hbase.XiapuHbase;

import java.util.*;

public class TestConsumer {
    private Logger log = Logger.getLogger(XiapuHbase.class);
    private static class TestConsumerHolder{
        private static TestConsumer TestConsumer = new TestConsumer();
    }

    private TestConsumer(){
        log.info("Init Class TestConsumer...");
        XiapuHbase xiapuHbase = XiapuHbase.getInstance();
        log.info("Init Class TestConsumer success");
        Properties props = new Properties();
        props.setProperty("zookeeper.connect","master:2181,slaver:2181");
        props.setProperty("bootstrap.servers","master:9092,slaver:9092");
        props.setProperty("enable.auto.commit","false");
        props.setProperty("auto.offset.reset","earliest");
        props.setProperty("group.id","xiapu");
        props.setProperty("session.timeout.ms","30000");
        props.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        Consumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        //指定topic信息
        consumer.subscribe(Collections.singletonList("xiapu_test2"));
        while(true){
            //接收消息,poll参数为连接超时时间
            ConsumerRecords<String, String> records = consumer.poll(6000);
            for(ConsumerRecord<String, String> record:records){
                JSONObject jsonObject = JSONObject.fromObject(record.value());
                boolean writeResult = xiapuHbase.writeByJson(jsonObject);
                if(writeResult) { //如果写入Hbase成功,则手动提交偏移量
                    consumer.commitAsync();
                    log.info(String.format("record-info:%s-%d-%d, writes to Hbase successfully, value is %s", record.topic(),record.partition(), record.offset(), record.value()));
                } else{
                    log.error(String.format("record-info:%s-%d-%d, writes to Hbase Failed, value is %s", record.topic(),record.partition(), record.offset(), record.value()));
                }
            }

        }
    }

    public static TestConsumer getInstance(){
        return TestConsumerHolder.TestConsumer;
    }

    public static void main(String[] args) {
        TestConsumer TestConsumer = new TestConsumer();
    }

}

此消费者采用手动提交偏移量的方式,确保消息被成功写入到Hbase中才提交偏移量
手动提交偏移量需要将“enable.auto.commit”选项设置为“false”

目录
相关文章
|
1月前
|
消息中间件 Kafka
【赵渝强老师】Kafka生产者的执行过程
Kafka生产者(Producer)将消息序列化后发送到指定主题的分区。整个过程由主线程和Sender线程协调完成。主线程创建KafkaProducer对象及ProducerRecord,经过拦截器、序列化器和分区器处理后,消息进入累加器。Sender线程负责从累加器获取消息并发送至KafkaBroker,Broker返回响应或错误信息,生产者根据反馈决定是否重发。视频和图片详细展示了这一流程。
130 61
|
11天前
|
消息中间件 Java Kafka
SpringBoot使用Kafka生产者、消费者
SpringBoot使用Kafka生产者、消费者
44 10
|
1月前
|
消息中间件 Kafka
【赵渝强老师】Kafka生产者的消息发送方式
Kafka生产者支持三种消息发送方式:1. **fire-and-forget**:发送后不关心结果,适用于允许消息丢失的场景;2. **同步发送**:通过Future对象确保消息成功送达,适用于高可靠性需求场景;3. **异步发送**:使用回调函数处理结果,吞吐量较高但牺牲部分可靠性。视频和代码示例详细讲解了这三种方式的具体实现。
|
1月前
|
消息中间件 Kafka
【赵渝强老师】Kafka的消费者与消费者组
Kafka消费者是从Kafka集群中消费数据的客户端。单消费者模型在数据生产速度超过消费速度时会导致数据堆积。为解决此问题,Kafka引入了消费者组的概念,允许多个消费者共同消费同一主题的消息。消费者组由一个或多个消费者组成,它们动态分配和重新分配主题分区,确保消息处理的高效性和可靠性。视频讲解及示意图详细展示了这一机制。
|
3月前
|
Java
Java—多线程实现生产消费者
本文介绍了多线程实现生产消费者模式的三个版本。Version1包含四个类:`Producer`(生产者)、`Consumer`(消费者)、`Resource`(公共资源)和`TestMain`(测试类)。通过`synchronized`和`wait/notify`机制控制线程同步,但存在多个生产者或消费者时可能出现多次生产和消费的问题。 Version2将`if`改为`while`,解决了多次生产和消费的问题,但仍可能因`notify()`随机唤醒线程而导致死锁。因此,引入了`notifyAll()`来唤醒所有等待线程,但这会带来性能问题。
Java—多线程实现生产消费者
|
4月前
|
消息中间件 缓存 Java
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
零拷贝技术 Zero-Copy 是指计算机执行操作时,可以直接从源(如文件或网络套接字)将数据传输到目标缓冲区, 而不需要 CPU 先将数据从某处内存复制到另一个特定区域,从而减少上下文切换以及 CPU 的拷贝时间。
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
|
4月前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
148 2
|
5月前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
54 1
|
5月前
|
消息中间件 分布式计算 Java
大数据-73 Kafka 高级特性 稳定性-事务 相关配置 事务操作Java 幂等性 仅一次发送
大数据-73 Kafka 高级特性 稳定性-事务 相关配置 事务操作Java 幂等性 仅一次发送
67 2
|
5月前
|
消息中间件 SQL 分布式计算
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
87 1

热门文章

最新文章