Java实现Kafka生产者与消费者

本文涉及的产品
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: Java实现Kafka生产者与消费者

Kafka生产者

import org.apache.kafka.clients.producer.*;
import org.apache.log4j.Logger;

import java.util.Properties;

/**
 * @author liqifeng
 * 此类使用Holder单例模式实现了kafka生产者
 */
public class TestProducer {
    private static Producer<String, String> producer;
    private static Logger log =Logger.getLogger(TestProducer.class);
    private static class TestProducerHolder{
        private static TestProducer TestProducer = new TestProducer();
    }

    private TestProducer(){
        log.info("Init Class TestProducer...");
        Properties props = new Properties();
        props.setProperty("retries","0");
        props.setProperty("zookeeper.connect","master:2181,slaver:2181");//1:2181,slaver2:2181
        props.setProperty("bootstrap.servers","master:9092,slaver:9092");//1:9092,slaver2:9092
        props.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.setProperty("linger.ms","1");
        props.setProperty("acks","all");
        props.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.setProperty("buffer.memory","33554432");
        producer = new KafkaProducer<>(props);
        log.info("Init Class TestProducer success");
    }

    public static TestProducer getInstance(){
        return TestProducerHolder.TestProducer;
    }

    /**
     * 调用此方法发送消息,
     * @param msg 待发送的用户行为数据,格式为JSON格式,使用时需将JSON对象转化为String对象
     */
    public void send(String msg){
        ProducerRecord<String, String>record = new ProducerRecord<String, String>("xiapu_test2",msg);
        //发送消息,并且调用回调函数,并对返回的偏移量信息进行操作
        producer.send(record, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        /*
                         * 如果一分钟后未返回偏移量,则会报超时错误。错误如下
                         * org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
                         * 此错误多是由于生产者代码连接kafka失败所导致
                         * 第一,查看kafka是否启动,如果未启动,则启动kafka即可解决
                         * 第二,如果kafka正在运行,则检查上述配置项中zookeeper.connect和value.serializer是否配置正确
                         */
                        if(e != null){
                            e.printStackTrace();
                        } else{
                            log.info(String.format("record-info:%s-%d-%d, send successfully, value is %s",recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), record.value()));
                        }
                    }
                });
        //此步骤非常重要,如果省略,会造成消息阻塞,消费者无法接受到消息
        producer.flush();
    }
    
}

上述代码为生产者代码,建议每次调用send方法发送kafka消息的时候,新建一个回调函数。此回调函数接收消费者返回的消费信息。
消费信息被封装为RecordMetadata对象 ,此对象包含消息分区,偏移量,topic等信息,可输出到日志或写入数据库中作后续操作。
如果超过一分钟后消费信息未被接收,则会报超时错误。解决方法见下述代码注释:

 producer.send(record, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        /*
                         * 如果一分钟后未返回偏移量,则会报超时错误。错误如下
                         * org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
                         * 此错误多是由于生产者代码连接kafka失败所导致
                         * 第一,查看kafka是否启动,如果未启动,则启动kafka即可解决
                         * 第二,如果kafka正在运行,则检查上述配置项中zookeeper.connect和value.serializer是否配置正确
                         */
                        if(e != null){
                            e.printStackTrace();
                        } else{
                            log.info(String.format("record-info:%s-%d-%d, send successfully, value is %s",recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), record.value()));
                        }
                    }
                });

Kafka消费者

package xiapu.kafka;

import net.sf.json.JSONObject;
import org.apache.kafka.clients.consumer.*;
import org.apache.log4j.Logger;
import xiapu.hbase.XiapuHbase;

import java.util.*;

public class TestConsumer {
    private Logger log = Logger.getLogger(XiapuHbase.class);
    private static class TestConsumerHolder{
        private static TestConsumer TestConsumer = new TestConsumer();
    }

    private TestConsumer(){
        log.info("Init Class TestConsumer...");
        XiapuHbase xiapuHbase = XiapuHbase.getInstance();
        log.info("Init Class TestConsumer success");
        Properties props = new Properties();
        props.setProperty("zookeeper.connect","master:2181,slaver:2181");
        props.setProperty("bootstrap.servers","master:9092,slaver:9092");
        props.setProperty("enable.auto.commit","false");
        props.setProperty("auto.offset.reset","earliest");
        props.setProperty("group.id","xiapu");
        props.setProperty("session.timeout.ms","30000");
        props.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        Consumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        //指定topic信息
        consumer.subscribe(Collections.singletonList("xiapu_test2"));
        while(true){
            //接收消息,poll参数为连接超时时间
            ConsumerRecords<String, String> records = consumer.poll(6000);
            for(ConsumerRecord<String, String> record:records){
                JSONObject jsonObject = JSONObject.fromObject(record.value());
                boolean writeResult = xiapuHbase.writeByJson(jsonObject);
                if(writeResult) { //如果写入Hbase成功,则手动提交偏移量
                    consumer.commitAsync();
                    log.info(String.format("record-info:%s-%d-%d, writes to Hbase successfully, value is %s", record.topic(),record.partition(), record.offset(), record.value()));
                } else{
                    log.error(String.format("record-info:%s-%d-%d, writes to Hbase Failed, value is %s", record.topic(),record.partition(), record.offset(), record.value()));
                }
            }

        }
    }

    public static TestConsumer getInstance(){
        return TestConsumerHolder.TestConsumer;
    }

    public static void main(String[] args) {
        TestConsumer TestConsumer = new TestConsumer();
    }

}

此消费者采用手动提交偏移量的方式,确保消息被成功写入到Hbase中才提交偏移量
手动提交偏移量需要将“enable.auto.commit”选项设置为“false”

目录
相关文章
|
2月前
|
消息中间件 负载均衡 大数据
揭秘Kafka背后的秘密!再均衡如何上演一场消费者组的‘权力游戏’,让消息处理秒变高能剧情?
【8月更文挑战第24天】Kafka是一款在大数据处理领域备受推崇的产品,以其出色的性能和可扩展性著称。本文通过一个具体案例介绍其核心机制之一——再均衡(Rebalancing)。案例中,“user_activity”主题下10个分区被3个消费者均衡消费。当新消费者加入或原有消费者离开时,Kafka将自动触发再均衡过程,确保所有消费者能有效处理分配给它们的分区。
125 62
|
2月前
|
消息中间件 Kafka API
【Kafka消费新风潮】告别复杂,迎接简洁之美——深度解析Kafka新旧消费者API大比拼!
【8月更文挑战第24天】Apache Kafka作为一个领先的分布式流处理平台,广泛用于实时数据管道和流式应用的构建。随着其发展,消费者API经历了重大更新。旧消费者API(包括“低级”和“高级”API)虽提供灵活性但在消息顺序处理上存在挑战。2017年引入的新消费者API简化了接口,自动管理偏移量,支持更强大的消费组功能,显著降低了开发复杂度。通过对比新旧消费者API的代码示例可以看出,新API极大提高了开发效率和系统可维护性。
114 58
|
8天前
|
消息中间件 Kafka
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
|
2月前
|
消息中间件 Kafka 测试技术
【Azure 事件中心】使用Kafka的性能测试工具(kafka-producer-perf-test)测试生产者发送消息到Azure Event Hub的性能
【Azure 事件中心】使用Kafka的性能测试工具(kafka-producer-perf-test)测试生产者发送消息到Azure Event Hub的性能
|
2月前
|
消息中间件 负载均衡 Kafka
【Kafka消费秘籍】深入了解消费者组与独立模式,掌握消息消费的两种超能力!
【8月更文挑战第24天】Apache Kafka是一款高性能的分布式消息系统,支持灵活多样的消费模型以适应不同的应用场景。消息按主题组织,每个主题可划分为多个分区,确保消息顺序性。本文深入探讨了Kafka中的两大核心消费模式:消费者组(Consumer Group)和独立消费者(Standalone Consumer)。消费者组允许多个消费者协同工作,实现负载均衡及故障恢复,是最常用的消费模式。独立消费者模式则适用于需要高度定制化处理逻辑的场景,如消息重放等。通过对比这两种模式的特点和提供的示例代码,开发者可以根据具体需求选择最合适的消费策略,从而更好地利用Kafka构建高效的数据流应用程序。
39 3
|
2月前
|
消息中间件 Java Kafka
【Azure 事件中心】在微软云中国区 (Mooncake) 上实验以Apache Kafka协议方式发送/接受Event Hubs消息 (Java版)
【Azure 事件中心】在微软云中国区 (Mooncake) 上实验以Apache Kafka协议方式发送/接受Event Hubs消息 (Java版)
|
2月前
|
消息中间件 Java 大数据
"深入理解Kafka单线程Consumer:核心参数配置、Java实现与实战指南"
【8月更文挑战第10天】在大数据领域,Apache Kafka以高吞吐和可扩展性成为主流数据流处理平台。Kafka的单线程Consumer因其实现简单且易于管理而在多种场景中受到欢迎。本文解析单线程Consumer的工作机制,强调其在错误处理和状态管理方面的优势,并通过详细参数说明及示例代码展示如何有效地使用KafkaConsumer类。了解这些内容将帮助开发者优化实时数据处理系统的性能与可靠性。
70 7
|
2月前
|
安全 Java
Java模拟生产者-消费者问题。生产者不断的往仓库中存放产品,消费者从仓库中消费产品。其中生产者和消费者都可以有若干个。在这里,生产者是一个线程,消费者是一个线程。仓库容量有限,只有库满时生产者不能存
该博客文章通过Java代码示例演示了生产者-消费者问题,其中生产者在仓库未满时生产产品,消费者在仓库有产品时消费产品,通过同步机制确保多线程环境下的线程安全和有效通信。
|
2月前
|
图形学 C# 开发者
全面掌握Unity游戏开发核心技术:C#脚本编程从入门到精通——详解生命周期方法、事件处理与面向对象设计,助你打造高效稳定的互动娱乐体验
【8月更文挑战第31天】Unity 是一款强大的游戏开发平台,支持多种编程语言,其中 C# 最为常用。本文介绍 C# 在 Unity 中的应用,涵盖脚本生命周期、常用函数、事件处理及面向对象编程等核心概念。通过具体示例,展示如何编写有效的 C# 脚本,包括 Start、Update 和 LateUpdate 等生命周期方法,以及碰撞检测和类继承等高级技巧,帮助开发者掌握 Unity 脚本编程基础,提升游戏开发效率。
44 0
下一篇
无影云桌面