*本文基于SpringBoot整合Kafka,通过简单配置实现生产及消费,包括生产消费的配置说明、消费者偏移设置方式等。更多功能细节可参考
spring kafka 文档:https: //docs.spring.io/spring-kafka/docs/current/reference/html
前提条件
- 搭建Kafka环境,参考Kafka集群环境搭建及使用
- Java环境:JDK1.8
- Maven版本:apache-maven-3.6.3
- 开发工具:IntelliJ IDEA
项目环境
- 创建Springboot项目。
- pom.xml文件中引入kafka依赖。
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
创建Topic
创建topic命名为testtopic并指定2个分区。
./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic testtopic --partitions 2
配置信息
application.yml配置文件信息
spring:
application:
name: kafka_springboot
kafka:
bootstrap-servers: 127.0.0.1:9092
producer:
#ACK机制,默认为1 (0,1,-1)
acks: -1
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
# 自定义分区策略
partitioner:
class: org.bg.kafka.PartitionPolicy
consumer:
#设置是否自动提交,默认为true
enable-auto-commit: false
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#当一个新的消费组或者消费信息丢失后,在哪里开始进行消费。earliest:消费最早的消息。latest(默认):消费最近可用的消息。none:没有找到消费组消费数据时报异常。
auto-offset-reset: latest
#批量消费时每次poll的数量
#max-poll-records: 5
listener:
# 当每一条记录被消费者监听器处理之后提交
# RECORD,
# 当每一批数据被消费者监听器处理之后提交
# BATCH,
# 当每一批数据被消费者监听器处理之后,距离上次提交时间大于TIME时提交
# TIME,
# 当每一批数据被消费者监听器处理之后,被处理record数量大于等于COUNT时提交
# COUNT,
# #TIME | COUNT 有一个条件满足时提交
# COUNT_TIME,
# #当每一批数据被消费者监听器处理之后,手动调用Acknowledgment.acknowledge()后提交:
# MANUAL,
# # 手动调用Acknowledgment.acknowledge()后立即提交
# MANUAL_IMMEDIATE;
ack-mode: manual
#批量消费
type: batch
更多配置信息查看KafkaProperties
生产消息
@Component
public class Producer {
@Autowired
private KafkaTemplate kafkaTemplate;
public void send(String msg) {
kafkaTemplate.send(new ProducerRecord<String, String>("testtopic", "key111", msg));
}
}
生产自定义分区策略
package org.bg.kafka;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
public class PartitionPolicy implements Partitioner {
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap();
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = this.nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return ((PartitionInfo)availablePartitions.get(part)).partition();
} else {
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private int nextValue(String topic) {
AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
生产到指定分区
ProducerRecord有指定分区的构造方法,设置分区号public ProducerRecord(String topic, Integer partition, K key, V value)
。
kafkaTemplate.send(new ProducerRecord<String, String>("testtopic",1, "key111", msg));
消费消息
/**
* 自定义seek参考
* https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek
*/
@Component
public class Consumer implements ConsumerSeekAware{
@KafkaListener(topics = {"testtopic"},groupId = "test_group",clientIdPrefix = "bg",id = "testconsumer")
public void onMessage(List<ConsumerRecord<String, String>> records, Acknowledgment ack){
System.out.println(records.size());
System.out.println(records.toString());
ack.acknowledge();
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
//按照时间戳设置偏移
callback.seekToTimestamp(assignments.keySet(),1670233826705L);
//设置偏移到最近
callback.seekToEnd(assignments.keySet());
//设置偏移到最开始
callback.seekToBeginning(assignments.keySet());
//指定 offset
for (TopicPartition topicPartition : assignments.keySet()) {
callback.seek(topicPartition.topic(),topicPartition.partition(),0L);
}
}
}
offset设置方式
如代码所示,实现ConsumerSeekAware接口,设置offset几种方式:
- 指定 offset,需要自己维护 offset,方便重试。
- 指定从头开始消费。
- 指定 offset 为最近可用的 offset (默认)。
- 根据时间戳获取 offset,设置 offset。