引入依赖
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.6.RELEASE</version> </dependency>
添加配置
server port5896spring kafka bootstrap-servers 192.168.16.239092# SASL认证,如果kafka有增加SASL认证需要的相关配置# properties:# security:# protocol: SASL_PLAINTEXT# sasl:# mechanism: PLAIN# jaas:# config: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="xxx" password="xxx";'# 生产者配置 producer retries 0 # 重试次数 acks 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1) batch-size 16384 # 一次最多发送数据量 buffer-memory 33554432 # 生产端缓冲区大小# 序列化类 key-serializer org.apache.kafka.common.serialization.StringSerializer value-serializer org.springframework.kafka.support.serializer.JsonSerializer # 消费者配置 consumer group-id test # 设置手动提交offset enable-auto-commitfalse# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest auto-offset-reset latest # 异常处理,处理毒丸(Poison Pill)消息 key-deserializer org.springframework.kafka.support.serializer.ErrorHandlingDeserializer value-deserializer org.springframework.kafka.support.serializer.ErrorHandlingDeserializer properties spring.json.trusted.packages'*'# 序列化、反序列化必须一致,否则会出现无法序列化 spring.deserializer.key.delegate.class org.apache.kafka.common.serialization.StringDeserializer spring.deserializer.value.delegate.class org.apache.kafka.common.serialization.StringDeserializer # 监听 listener# record:当每一条记录被消费者监听器(ListenerConsumer)处理之后提交# batch:当每一批poll()的数据被ListenerConsumer处理之后提交# time:当每一批poll()的数据被ListenerConsumer处理之后,距离上次提交时间大于TIME时提交# count:当每一批poll()的数据被ListenerConsumer处理之后,被处理record数量大于等于COUNT时提交# count_time:TIME或COUNT中有一个条件满足时提交# manual:当每一批poll()的数据被ListenerConsumer处理之后, 手动调用Acknowledgment.acknowledge()后提交# manual_immediate:手动调用Acknowledgment.acknowledge()后立即提交,一般推荐使用这种 ack-mode manual_immediate
简单消息发送
packagecom.example.demokafka.service; importorg.springframework.kafka.core.KafkaTemplate; importorg.springframework.stereotype.Service; importjavax.annotation.Resource; /*** @Author: CQL* @Date: 2023/8/29 17:41*/publicclassMessageServiceKafka { privateKafkaTemplate<String,String>kafkaTemplate; publicvoidsendMessage(Stringmsg) { System.out.println("待发送的信息已纳入处理队列(kafka),msg:"+msg); //使用send方法发送消息,需要传入topic名称kafkaTemplate.send("test", msg); } }
简单消息接收
packagecom.example.demokafka.listener; importcom.alibaba.fastjson.JSONObject; importlombok.extern.slf4j.Slf4j; importorg.apache.kafka.clients.consumer.ConsumerRecord; importorg.springframework.kafka.annotation.KafkaListener; importorg.springframework.kafka.support.Acknowledgment; importorg.springframework.stereotype.Component; /*** @Author: CQL* @Date: 2023/8/29 17:43*/publicclassMessageListener { topics="test", groupId="test") (publicvoidkafkaListener(ConsumerRecord<?, ?>item, Acknowledgmentack) { System.out.printf("topic主题=%s, offset偏移量=%d,partition分区=%s, 内容=%s \n", item.topic(), item.offset(), item.partition(), item.value()); log.info("topic主题={}, offset偏移量={},partition分区={}, 内容={}", item.topic(), item.offset(), item.partition(), item.value()); // 设置手动提交需要确认ack.acknowledge(); } }