操作步骤
Maven依赖
核心依赖 kafka-clients
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.1.0</version> </dependency>
生产者
package com.artisan.kafkademo.producer; import com.alibaba.fastjson.JSON; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** * @author 小工匠 * @version v1.0 * @create 2019-11-18 0:17 * @motto show me the code ,change the word * @blog https://artisan.blog.csdn.net/ * @description **/ public class MsgProducer { public static void main(String[] args) throws InterruptedException { // ---------------参数设置---------------BEGIN Properties properties = new Properties(); // broker 信息 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.18.130:9092,192.168.18.131:9092,192.168.18.132:9092"); /* 发出消息持久化机制参数 (1)acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。 性能最高,但是最容易丢消息。 (2)acks=1: 至少要等待leader已经成功将数据写入本地log, 但是不需要等待所有follower是否成功写入。就可以继续发送下一条消息。 这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。 (3)acks=‐1或all: 这意味着leader需要等待所有备份(min.insync.replicas配置的备份个数)都成功写入日志, 这种策略会保证只要有一个备份存活就不会丢失数据。 这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。 */ properties.put(ProducerConfig.ACKS_CONFIG, "1"); // 发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性, // 但是也可能造成消息重复发送,比如网络抖动,所以需要在接收者那边做好消息接收的幂等性处理 properties.put(ProducerConfig.RETRIES_CONFIG,3); // 重试间隔设置 properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,300); // /设置发送消息的本地缓冲区, // 如果设置了该缓冲区,消息会先发送到本地缓冲区,可以提高消息发送性能,默认值是33554432,即32MB properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432); // kafka本地线程会从缓冲区取数据,批量发送到broker, // 设置批量发送消息的大小,默认值是16384,即16kb,就是说一个batch满了16kb就发送出去 properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384); //默认值是0,意思就是消息必须立即被发送,但这样会影响性能 //一般设置100毫秒左右,就是说这个消息发送完后会进入本地的一个batch, // 如果100毫秒内,这个batch满了16kb就会随batch一起被发送出去 //如果100毫秒内,batch没满,那么也必须把消息发送出去,不能让消息的发送延迟时间太长 properties.put(ProducerConfig.LINGER_MS_CONFIG,100); // 把发送的key从字符串序列化为字节数组 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); //把发送消息value从字符串序列化为字节数组 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); // ---------------参数设置---------------END // 使用properties实例化KafkaProducer Producer producer = new KafkaProducer(properties); final int messageNum = 5 ; final CountDownLatch countDownLatch = new CountDownLatch(messageNum); // 发送5条消息 for (int i = 1; i <= messageNum; i++) { Order order = new Order(i, 100,66,987.99 + i); // 指定发送分区 ProducerRecord<String,String> record = new ProducerRecord<String, String>("artisan-replicated-topic", 0,String.valueOf(order.getOrderId()), JSON.toJSONString(order)); // 异步方式发送消息 producer.send(record, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null){ countDownLatch.countDown(); System.err.println("发送消息失败:" + exception.getStackTrace()); } if (metadata != null){ countDownLatch.countDown(); System.out.println("异步方式发送消息结果: topic=" + metadata.topic() + " , partition=" + metadata.partition() + " , offset=" + metadata.offset()); } } }); } // 等5秒钟,5秒钟后,执行后续的代码 countDownLatch.await(5, TimeUnit.SECONDS); producer.close(); } static class Order { private int orderId ; private int productId ; private int productNum; private double orderAmount ; public Order() { } public Order(int orderId, int productId, int productNum, double orderAmount) { this.orderId = orderId; this.productId = productId; this.productNum = productNum; this.orderAmount = orderAmount; } public int getOrderId() { return orderId; } public void setOrderId(int orderId) { this.orderId = orderId; } public int getProductId() { return productId; } public void setProductId(int productId) { this.productId = productId; } public int getProductNum() { return productNum; } public void setProductNum(int productNum) { this.productNum = productNum; } public double getOrderAmount() { return orderAmount; } public void setOrderAmount(double orderAmount) { this.orderAmount = orderAmount; } } }
消费者
package com.artisan.kafkademo.consumer; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.*; /** * @author 小工匠 * @version v1.0 * @create 2019-11-18 23:51 * @motto show me the code ,change the word * @blog https://artisan.blog.csdn.net/ * @description **/ public class MsgConsumer { public static void main(String[] args) { // ---------------参数设置---------------BEGIN Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.18.130:9092,192.168.18.131:9092,192.168.18.132:9092"); // 消费分组名 props.put(ConsumerConfig.GROUP_ID_CONFIG,"testGroup"); // 是否自动提交offset /*props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // 自动提交offset的间隔时间 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG , "1000");*/ //props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); /* 心跳时间,服务端broker通过心跳确认consumer是否故障,如果发现故障,就会通过心跳下发 rebalance的指令给其他的consumer通知他们进行rebalance操作,这个时间可以稍微短一点 */ props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000); //服务端broker多久感知不到一个consumer心跳就认为他故障了,默认是10秒 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000); /* 如果两次poll操作间隔超过了这个时间,broker就会认为这个consumer处理能力太弱, 会将其踢出消费组,将分区分配给别的consumer消费 */ props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); // 消费主题 String topicName = "artisan-replicated-topic"; consumer.subscribe(Arrays.asList(topicName)); // 消费指定分区 //consumer.assign(Arrays.asList(new TopicPartition(topicName, 0))); //消息回溯消费 /*consumer.assign(Arrays.asList(new TopicPartition(topicName, 0))); *//*consumer.seekToBeginning(Arrays.asList(new TopicPartition(topicName, 0)));*//* //指定offset消费 consumer.seek(new TopicPartition(topicName, 0), 10); //从指定时间点开始消费 Map<TopicPartition, Long> map = new HashMap<TopicPartition, Long>(); List<PartitionInfo> topicPartitions = consumer.partitionsFor(topicName); //从半小时前开始消费 long fetchDataTime = new Date().getTime() - 1000 * 60 * 60; for (PartitionInfo par : topicPartitions) { map.put(new TopicPartition(topicName, par.partition()), fetchDataTime); } Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map); for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) { TopicPartition key = entry.getKey(); OffsetAndTimestamp value = entry.getValue(); if (key == null || value == null) continue; Long offset = value.offset(); System.out.println("partition-" + key.partition() + "|offset-" + offset); System.out.println(); //根据消费里的timestamp确定offset if (value != null) { //没有这行代码会导致下面的报错信息 consumer.assign(Arrays.asList(key)); consumer.seek(key, offset); } } */ while (true) { /* * poll() API 是拉取消息的长轮询,主要是判断consumer是否还活着,只要我们持续调用poll(), * 消费者就会存活在自己所在的group中,并且持续的消费指定partition的消息。 * 底层是这么做的:消费者向server持续发送心跳,如果一个时间段(session. * timeout.ms)consumer挂掉或是不能发送心跳,这个消费者会被认为是挂掉了, * 这个Partition也会被重新分配给其他consumer */ ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { System.out.printf("收到消息:offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } if (records.count() > 0) { // 提交offset consumer.commitSync(); } } } }