配置参考
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.3.8.RELEASE</version> </dependency> 复制代码
#-----------------kafka---------------------------- spring.kafka.producer.bootstrap-servers=172.29.43.121:9092,172.29.43.121:9093,172.29.43.121:9094 #----------------生产者配置------------------------ # 重试次数,如果设置可能改变消息顺序。例如消息1重试2成功 spring.kafka.producer.retries=0 #可选0,1,-1 默认1 #0:只要发送过去就返回ack 吞吐量高,可以容忍一些数据丢失 #1:当有个一个副本(leader的)写成功就算成功,如果leader挂了其他broker没有同步则可能出现消息丢失 #-1:保证isr写入数不小于min.insync.replicas(在server.properties 或者创建主题的时候设置)才算成功 spring.kafka.producer.acks=1 # 批量大小 16K=16*1024 spring.kafka.producer.batch-size=16384 # 提交延时 # 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka # linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了 spring.kafka.producer.properties.linger.ms=100 # 生产端缓冲区大小 32M=1024*1024*32 spring.kafka.producer.buffer-memory = 33554432 # Kafka提供的序列化和反序列化类 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #请求最大字节数,防止数据过大 1M=1024*1024 spring.kafka.producer.properties.max.request.size=1048576 # 自定义分区器 默认org.apache.kafka.clients.producer.internals.DefaultPartitioner #spring.kafka.producer.properties.partitioner.class=org.apache.kafka.clients.producer.internals.DefaultPartitioner #----------------初始化消费者配置------------------------ spring.kafka.consumer.bootstrap-servers=172.29.43.121:9092,172.29.43.121:9093,172.29.43.121:9094 # 默认的消费组ID #spring.kafka.consumer.properties.group.id=defaultConsumerGroup # 是否自动提交offset spring.kafka.consumer.enable-auto-commit=false # 提交offset延时(接收到消息后多久提交offset) spring.kafka.consumer.auto.commit.interval.ms=1000 # 当kafka中没有初始offset或offset超出范围时将自动重置offset # earliest:消费当前所有的数据(重置为分区中最小的offset); # latest:重置为分区中最新的offset(消费分区中新产生的数据); # none:只要有一个分区不存在已提交的offset,就抛出异常; spring.kafka.consumer.auto-offset-reset=latest # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作) spring.kafka.consumer.properties.session.timeout.ms=10000 # 客户端将等待请求的响应的最大时间,如果在这个时间内没有收到响应,客户端将重发请求;超过重试次数将抛异常 spring.kafka.consumer.properties.request.timeout.ms=30000 # 消费端监听的topic不存在时,项目启动会报错(关掉) spring.kafka.listener.missing-topics-fatal=false # 批量消费每次最多消费多少条消息 spring.kafka.consumer.max-poll-records=50 # Kafka提供的序列化和反序列化类 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 设置批量消费 #spring.kafka.listener.type=batch #manual: 表示手动提交,但是测试下来发现是批量提交 #manual_immediate: 表示手动提交,当调用 Acknowledgment#acknowledge之后立马提交。 #spring.kafka.listener.ack-mode=manual spring.kafka.listener.ack-mode=manual_immediate # 经测试也是批量提交的ack , 当消费完 spring.kafka.consumer.max-poll-records 这么多的数据时候,提交 spring.kafka.listener.poll-timeout=50S 复制代码
demo
@Service public class KafkaService { private Logger logger = LoggerFactory.getLogger(KafkaService.class); @Resource private KafkaTemplate<Object, Object> template; public void sendInfo(String topic,String data) { template.send(topic, data); logger.info("{}:{}",topic,data); } public void sendProducerInfo(String topic,String data) { ProducerRecord<Object, Object> pr = new ProducerRecord<>(topic, data); template.send(pr); logger.info("{}:{}",topic,data); } @KafkaListener(id = "webGroup", topics = "topic_input") public void listen(String input) { logger.info("input value: {}" , input); } @KafkaListener(id = "webGroup1", topics = "hello") public void onMessage(ConsumerRecord<Object, Object> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { System.out.println("单条消费消息:" + record.topic() + "----" + record.partition() + "----" + record.value()); ack.acknowledge(); } @KafkaListener(id = "webGroup2", topics = "hello") public void onMessageButch(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) { for(ConsumerRecord<?, ?> record:records) { System.out.println("批量消费消息:" + record.topic() + "----" + record.partition() + "----" + record.value()); } ack.acknowledge(); } }