Apache Kafka-初体验Kafka(04)-Java客户端操作Kafka

简介: Apache Kafka-初体验Kafka(04)-Java客户端操作Kafka


操作步骤

Maven依赖

核心依赖 kafka-clients

<dependency>
          <groupId>org.apache.kafka</groupId>
           <artifactId>kafka-clients</artifactId>
           <version>1.1.0</version>
       </dependency>

生产者

package com.artisan.kafkademo.producer;
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
 * @author 小工匠
 * @version v1.0
 * @create 2019-11-18 0:17
 * @motto show me the code ,change the word
 * @blog https://artisan.blog.csdn.net/
 * @description
 **/
public class MsgProducer {
    public static void main(String[] args) throws InterruptedException {
        // ---------------参数设置---------------BEGIN
        Properties properties = new Properties();
        // broker 信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.18.130:9092,192.168.18.131:9092,192.168.18.132:9092");
        /*
         发出消息持久化机制参数
        (1)acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。
                     性能最高,但是最容易丢消息。
        (2)acks=1: 至少要等待leader已经成功将数据写入本地log,
                     但是不需要等待所有follower是否成功写入。就可以继续发送下一条消息。
                     这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。
        (3)acks=‐1或all: 这意味着leader需要等待所有备份(min.insync.replicas配置的备份个数)都成功写入日志,
                这种策略会保证只要有一个备份存活就不会丢失数据。
                这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。
         */
        properties.put(ProducerConfig.ACKS_CONFIG, "1");
        // 发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,
        // 但是也可能造成消息重复发送,比如网络抖动,所以需要在接收者那边做好消息接收的幂等性处理
        properties.put(ProducerConfig.RETRIES_CONFIG,3);
        // 重试间隔设置
        properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,300);
        // /设置发送消息的本地缓冲区,
        // 如果设置了该缓冲区,消息会先发送到本地缓冲区,可以提高消息发送性能,默认值是33554432,即32MB
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
        // kafka本地线程会从缓冲区取数据,批量发送到broker,
        // 设置批量发送消息的大小,默认值是16384,即16kb,就是说一个batch满了16kb就发送出去
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
        //默认值是0,意思就是消息必须立即被发送,但这样会影响性能
        //一般设置100毫秒左右,就是说这个消息发送完后会进入本地的一个batch,
        // 如果100毫秒内,这个batch满了16kb就会随batch一起被发送出去
        //如果100毫秒内,batch没满,那么也必须把消息发送出去,不能让消息的发送延迟时间太长
        properties.put(ProducerConfig.LINGER_MS_CONFIG,100);
        // 把发送的key从字符串序列化为字节数组
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        //把发送消息value从字符串序列化为字节数组
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        // ---------------参数设置---------------END
        // 使用properties实例化KafkaProducer
        Producer producer = new KafkaProducer(properties);
        final int messageNum = 5 ;
        final CountDownLatch countDownLatch = new CountDownLatch(messageNum);
        // 发送5条消息
        for (int i = 1; i <= messageNum; i++) {
            Order order = new Order(i, 100,66,987.99 + i);
            // 指定发送分区
            ProducerRecord<String,String> record = new ProducerRecord<String, String>("artisan-replicated-topic",
                    0,String.valueOf(order.getOrderId()), JSON.toJSONString(order));
            // 异步方式发送消息
            producer.send(record, new Callback() {
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null){
                        countDownLatch.countDown();
                        System.err.println("发送消息失败:" + exception.getStackTrace());
                    }
                    if (metadata != null){
                        countDownLatch.countDown();
                        System.out.println("异步方式发送消息结果: topic=" + metadata.topic()
                                + " , partition=" + metadata.partition()
                                + " , offset=" + metadata.offset());
                    }
                }
            });
        }
        // 等5秒钟,5秒钟后,执行后续的代码
        countDownLatch.await(5, TimeUnit.SECONDS);
        producer.close();
    }
   static class Order {
        private int orderId ;
        private int productId ;
        private int productNum;
        private double orderAmount ;
        public Order() {
        }
        public Order(int orderId, int productId, int productNum, double orderAmount) {
            this.orderId = orderId;
            this.productId = productId;
            this.productNum = productNum;
            this.orderAmount = orderAmount;
        }
        public int getOrderId() {
            return orderId;
        }
        public void setOrderId(int orderId) {
            this.orderId = orderId;
        }
        public int getProductId() {
            return productId;
        }
        public void setProductId(int productId) {
            this.productId = productId;
        }
        public int getProductNum() {
            return productNum;
        }
        public void setProductNum(int productNum) {
            this.productNum = productNum;
        }
        public double getOrderAmount() {
            return orderAmount;
        }
        public void setOrderAmount(double orderAmount) {
            this.orderAmount = orderAmount;
        }
    }
}

消费者

package com.artisan.kafkademo.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;
/**
 * @author 小工匠
 * @version v1.0
 * @create 2019-11-18 23:51
 * @motto show me the code ,change the word
 * @blog https://artisan.blog.csdn.net/
 * @description
 **/
public class MsgConsumer {
    public static void main(String[] args) {
        // ---------------参数设置---------------BEGIN
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.18.130:9092,192.168.18.131:9092,192.168.18.132:9092");
        // 消费分组名
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"testGroup");
        // 是否自动提交offset
    /*props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    // 自动提交offset的间隔时间
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG , "1000");*/
        //props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    /*
    心跳时间,服务端broker通过心跳确认consumer是否故障,如果发现故障,就会通过心跳下发
    rebalance的指令给其他的consumer通知他们进行rebalance操作,这个时间可以稍微短一点
    */
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
        //服务端broker多久感知不到一个consumer心跳就认为他故障了,默认是10秒
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
        /*
        如果两次poll操作间隔超过了这个时间,broker就会认为这个consumer处理能力太弱,
        会将其踢出消费组,将分区分配给别的consumer消费
        */
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        // 消费主题
        String topicName = "artisan-replicated-topic";
        consumer.subscribe(Arrays.asList(topicName));
        // 消费指定分区
        //consumer.assign(Arrays.asList(new TopicPartition(topicName, 0)));
        //消息回溯消费
        /*consumer.assign(Arrays.asList(new TopicPartition(topicName, 0)));
        *//*consumer.seekToBeginning(Arrays.asList(new TopicPartition(topicName, 0)));*//*
        //指定offset消费
        consumer.seek(new TopicPartition(topicName, 0), 10);
        //从指定时间点开始消费
        Map<TopicPartition, Long> map = new HashMap<TopicPartition, Long>();
        List<PartitionInfo> topicPartitions = consumer.partitionsFor(topicName);
        //从半小时前开始消费
        long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
        for (PartitionInfo par : topicPartitions) {
            map.put(new TopicPartition(topicName, par.partition()), fetchDataTime);
        }
        Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
        for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {
            TopicPartition key = entry.getKey();
            OffsetAndTimestamp value = entry.getValue();
            if (key == null || value == null) continue;
            Long offset = value.offset();
            System.out.println("partition-" + key.partition() + "|offset-" + offset);
            System.out.println();
            //根据消费里的timestamp确定offset
            if (value != null) {
                //没有这行代码会导致下面的报错信息
                consumer.assign(Arrays.asList(key));
                consumer.seek(key, offset);
            }
        }
*/
        while (true) {
            /*
             * poll() API 是拉取消息的长轮询,主要是判断consumer是否还活着,只要我们持续调用poll(),
             * 消费者就会存活在自己所在的group中,并且持续的消费指定partition的消息。
             * 底层是这么做的:消费者向server持续发送心跳,如果一个时间段(session.
             * timeout.ms)consumer挂掉或是不能发送心跳,这个消费者会被认为是挂掉了,
             * 这个Partition也会被重新分配给其他consumer
             */
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("收到消息:offset = %d, key = %s, value = %s%n", record.offset(), record.key(),
                        record.value());
            }
            if (records.count() > 0) {
                // 提交offset
                consumer.commitSync();
            }
        }
    }
}


相关文章
|
1月前
|
人工智能 Java API
MCP客户端调用看这一篇就够了(Java版)
本文详细介绍了MCP(Model Context Protocol)客户端的开发方法,包括在没有MCP时的痛点、MCP的作用以及如何通过Spring-AI框架和原生SDK调用MCP服务。文章首先分析了MCP协议的必要性,接着分别讲解了Spring-AI框架和自研SDK的使用方式,涵盖配置LLM接口、工具注入、动态封装工具等步骤,并提供了代码示例。此外,还记录了开发过程中遇到的问题及解决办法,如版本冲突、服务连接超时等。最后,文章探讨了框架与原生SDK的选择,认为框架适合快速构建应用,而原生SDK更适合平台级开发,强调了两者结合使用的价值。
1829 27
MCP客户端调用看这一篇就够了(Java版)
|
1月前
|
存储 网络协议 Java
Java获取客户端IP问题:返回127.0.0.1
总结:要解决Java获取客户端IP返回127.0.0.1的问题,首先要找出原因,再采取合适的解决方案。请参考上述方案来改进代码,确保在各种网络环境下都能正确获取客户端IP地址。希望本文对您有所帮助。
144 25
|
7月前
|
Java Apache C++
别再手写RPC了,Apache Thrift帮你自动生成RPC客户端及服务端代码
Thrift 是一个轻量级、跨语言的远程服务调用框架,由 Facebook 开发并贡献给 Apache。它通过 IDL 生成多种语言的 RPC 服务端和客户端代码,支持 C++、Java、Python 等。Thrift 的主要特点包括开发速度快、接口维护简单、学习成本低和多语言支持。广泛应用于 Cassandra、Hadoop 等开源项目及 Facebook、百度等公司。
别再手写RPC了,Apache Thrift帮你自动生成RPC客户端及服务端代码
|
7月前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
378 5
|
7月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
168 1
|
7月前
|
消息中间件 缓存 Java
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
零拷贝技术 Zero-Copy 是指计算机执行操作时,可以直接从源(如文件或网络套接字)将数据传输到目标缓冲区, 而不需要 CPU 先将数据从某处内存复制到另一个特定区域,从而减少上下文切换以及 CPU 的拷贝时间。
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
|
7月前
|
存储 Java API
Java实现导出多个excel表打包到zip文件中,供客户端另存为窗口下载
Java实现导出多个excel表打包到zip文件中,供客户端另存为窗口下载
578 4
|
7月前
|
消息中间件 Ubuntu Java
Ubuntu系统上安装Apache Kafka
Ubuntu系统上安装Apache Kafka
|
7月前
|
消息中间件 监控 Kafka
Apache Kafka 成为处理实时数据流的关键组件。Kafka Manager 提供了一个简洁的 Web 界面
随着大数据技术的发展,Apache Kafka 成为处理实时数据流的关键组件。Kafka Manager 提供了一个简洁的 Web 界面,方便管理和监控 Kafka 集群。本文详细介绍了 Kafka Manager 的部署步骤和基本使用方法,包括配置文件的修改、启动命令、API 示例代码等,帮助你快速上手并有效管理 Kafka 集群。
126 0
|
6月前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
564 33
The Past, Present and Future of Apache Flink

推荐镜像

更多