1、版本选择
springboot整合kafka的时候一定要根据自己springboot版本选择对应版本的kafka,两者版本对应关系可以直接查看官网
https://spring.io/projects/spring-kafka#overview
2、低版本springboot整合kafka
这里我SpringBoot版本是1.4.2.RELEASE版本,版本很低,官网显示的SpringBoot版本最低是1.5.x,可以使用1.3.x的版本,很明显我的这个不在官网给的范围内,然后我的spring版本是4.3.9.RELEASE,这里我在上面这个maven仓库spring-kafka地址里面看了一个1.3.0版本,如下:
直到我往下继续找,终于发现1.2.2.RELEASE这个版本是与我项目对应的。
刚好这个版本对应的spring版本是4.3.9.RELEASE与我项目的spring版本一致,于是我就使用了这个spring-kafka版本
这里之所以是在Java类里面写生产者和消费者配置,是因为springboot和kafka集成版本太低,不支持直接在application.yml里面配置,好像springboot高版本至少2.几的版本可以直接在application.yml里面配置,至于2.几的版本才支持我给忘记了
kafka生产者配置
这里是带用户名密码协议配置,最下面三个就是,协议类型为:SASL/SCRAM-SHA-256,如果你们那里的kafka配置没有设置这个,可以不需要配置最下面三个。企业开发一般需要进行认证才能发送消息。
@Configuration @EnableKafka public class KafkaProductConfig { //指定kafka 代理地址,多个地址用英文逗号隔开 private String bootstrapServers="192.168.11.111:9092,192.168.11.112:9093";//本地测试kafka使用 //消息重发次数,如果配置了事务,则不能为0,改为1 private int retries=0; //每次批量发送消息的数量 private String batchSize="16384"; //默认值为0,意思就是说消息必须立即被发送,但这样会影响性能 //一般设置10毫秒左右,这个消息发送完后会进入本地的一个batch,如果10毫秒内这个batch满了16kb就会随batch一起发送出去 private String lingerMs="10"; //生产者最大可发送的消息大小,内有多个batch,一旦满了,只有发送到kafka后才能空出位置,否则阻塞接收新消息 private String bufferMemory="33554432"; //指定消息key和消息体的编解码方式 private String keySerializer="org.apache.kafka.common.serialization.StringSerializer"; private String valueSerializer="org.apache.kafka.common.serialization.StringSerializer"; //确认等级ack,kafka生产端最重要的选项,如果配置了事务,那必须是-1或者all //acks=0,生产者在成功写入消息之前不会等待任何来自服务器的响应 //acks=1,只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应 //acks=-1,表示分区leader必须等待消息被成功写入到所有的ISR副本(同步副本)中才认为product请求成功。这种方案提供最高的消息持久性保证,但是理论上吞吐率也是最差的 private String acks="1"; //协议类型,为SASL类型 private String securityProtocol="SASL_PLAINTEXT"; //协议 private String saslMechanism="SCRAM-SHA-256"; //用户名密码配置 private String saslJaas="org.apache.kafka.common.security.scram.ScramLoginModule required username=root password=123456;"; @Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.RETRIES_CONFIG,retries); props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer); props.put(ProducerConfig.ACKS_CONFIG, acks); //如果kafka配置文件没有设置用户名密码协议,注释掉(ps:有些企业会使用jks加密文件通讯,那kafka 配置还的有其他配置 可参考kakfa 专栏spring整合kakfa) props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol); props.put(SaslConfigs.SASL_MECHANISM, saslMechanism); props.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaas); return new DefaultKafkaProducerFactory<>(props); } @Bean public KafkaTemplate<String, String> kafkaTestTemplate() { return new KafkaTemplate<>(producerFactory()); } }
kafka消费者配置
如果kafka配置文件没有配置用户名密码协议,认证后才能消费消息,可以将最下面的三个注释掉不使用。
import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SaslConfigs; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.*; import org.springframework.util.ClassUtils; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class KafkaConsumerConfig { //指定kafka 代理地址,多个地址用英文逗号隔开 private String bootstrapServers="192.168.11.111:9092,192.168.11.112:9093";//本地测试kafka使用 //指定默认消费者group id,消费者监听到的也是这个 private String groupId="test-consumer-group";//本地测试使用 //消费者在读取一个没有offset的分区或者offset无效时的策略,默认earliest是从头读,latest不是从头读 private String autoOffsetReset="earliest"; //是否自动提交偏移量offset,默认为true,一般是false,如果为false,则auto-commit-interval属性就会无效 private boolean enableAutoCommit=true; //自动提交间隔时间,接收到消息后多久会提交offset,前提需要开启自动提交,也就是enable-auto-commit设置为true,默认单位是毫秒(ms),如果写10s,最后加载的显示值为10000ms,需要符合特定时间格式:1000ms,1S,1M,1H,1D(毫秒,秒,分,小时,天) private String autoCommitInterval="1000"; //指定消息key和消息体的编解码方式 private String keyDeserializerClass="org.apache.kafka.common.serialization.StringDeserializer"; private String valueDeserializerClass ="org.apache.kafka.common.serialization.StringDeserializer"; //批量消费每次最多消费多少条信息 private String maxPollRecords="50"; //协议类型,为SASL类型 private String securityProtocol="SASL_PLAINTEXT"; //协议 private String saslMechanism="SCRAM-SHA-256"; //用户名密码配置 private String saslJaas="org.apache.kafka.common.security.scram.ScramLoginModule required username=root password=123456;"; @Bean ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); //设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG factory.setBatchListener(false);//这里为true的时候,KafkaConsumer那里需要使用批量消费方法,不然报错 return factory; } @Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol); props.put(SaslConfigs.SASL_MECHANISM, saslMechanism); props.put(SaslConfigs.SASL_JAAS_CONFIG,saslJaas); return new DefaultKafkaConsumerFactory<>(props); } }
发送消息给kafka的Controller代码
这里使用addCallback这个方法,是可以在生产者发送消息给kafka时,如果生产者配置有问题或者服务有问题,我可以直接看到接口返回结果,所以没有直接这样kafkaTemplate.send(“first”,data);写。
package com.gmcc.project.controllers.kafka; import com.gmcc.project.core.utils.StringUtils; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.util.HashMap; import java.util.Map; //kafka生产者 @RestController @RequestMapping("kafkaProducer") public class KafkaProducerController { @Resource private KafkaTemplate<String,String> kafkaTestTemplate; //向kafka发送消息 @RequestMapping(value = "/sendFileMd5", method = RequestMethod.POST) public Map<String, Object> sendFileMd5(@RequestParam(value = "fileMd5", required = false) String fileMd5, @RequestParam(value = "uuid", required = false) String uuid){ Map<String, Object> returnMap = new HashMap<>(); //写在success里面只会返回一次,第二次就给你返回一个空map对象 returnMap.put("message", "发送消息成功!"); returnMap.put("result", null); returnMap.put("status", "200"); //非空判断 if(StringUtils.isBlank(fileMd5)) { returnMap.put("message", "fileMd5不能为空!"); returnMap.put("result", ""); returnMap.put("status", "999"); return returnMap; } if(StringUtils.isBlank(uuid)) { returnMap.put("message", "uuid不能为空!"); returnMap.put("result", ""); returnMap.put("status", "999"); return returnMap; } try{ //需要发送的消息 String data="{\"file_md5\":\""+fileMd5+"\",\"uuid\":\""+uuid+"\",\"vendor\":\"etone\",\"model\":\"5g信令回放\"}"; //pro环境使用topic为test_sample_get //本地测试使用,向topic为first发送消息 kafkaTestTemplate.send("first",data).addCallback(success -> { // 消息发送到的topic String topic = success.getRecordMetadata().topic(); // 消息发送到的分区 int partition = success.getRecordMetadata().partition(); // 消息在分区内的offset long offset = success.getRecordMetadata().offset(); System.out.println("发送消息成功:"+data+",主题:"+topic+",分区:"+partition+",偏移量:"+offset); }, failure -> { returnMap.put("message", "发送消息失败:" + failure.getMessage()); returnMap.put("result", null); returnMap.put("status", "500"); }); }catch (Exception e){ returnMap.put("message", e.getMessage()); returnMap.put("result", null); returnMap.put("status", "500"); } return returnMap; } }
消费者消费代码
package com.gmcc.project.controllers.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class KafkaConsumer { //逐条消费 @KafkaListener(topics = "first") //@KafkaListener(topics = "test_sample_return") public void onMessage(ConsumerRecord<?,?> record){ try{ //消费的哪个topic、partition的消息,打印出消息内容 System.out.println("消费:"+record.topic()+"-"+record.partition()+"-"+record.value()); }catch (Exception e){ e.printStackTrace(); } } //批量消费方法 /*@KafkaListener(topics = "first") public void onMessage(List<ConsumerRecord<?,?>> records){ System.out.println("消费数量="+records.size()); for(ConsumerRecord<?,?> record:records){ //消费的哪个topic、partition的消息,打印出消息内容 System.out.println("消费:"+record.topic()+"-"+record.partition()+"-"+record.value()); } }*/ }
3、高版本springboot整合kafka
这里我的SpringBoot版本是2.6.2版本,spring-kafka版本是2.8.1版本。符合官网给的版本推荐
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
application.yml
这里之所以可以在application.yml直接配置kafka,是因为springboot和spring-kafka版本很高。这里生产者配置和消费者配置都在里面
server: port: 8080 spring: kafka: # 指定kafka 代理地址,多个地址用英文逗号隔开 bootstrap-servers: 192.168.11.111:9092 #初始化生产者配置 producer: #消息重发次数,如果配置了事务,则不能为0,改为1 retries: 0 # 每次批量发送消息的数量 batch-size: 16384 #生产者最大可发送的消息大小,内有多个batch,一旦满了,只有发送到kafka后才能空出位置,否则阻塞接收新消息 buffer-memory: 33554432 # 指定消息key和消息体的编解码方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer #确认等级ack,kafka生产端最重要的选项,如果配置了事务,那必须是-1或者all #acks=0,生产者在成功写入消息之前不会等待任何来自服务器的响应 #acks=1,只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应 #acks=-1,表示分区leader必须等待消息被成功写入到所有的ISR副本(同步副本)中才认为product请求成功。这种方案提供最高的消息持久性保证,但是理论上吞吐率也是最差的 acks: all #配置事务,名字随便起 #transaction-id-prefix: hbz-transaction- #初始化消费者配置 consumer: # 指定默认消费者group id,消费者监听到的也是这个 group-id: test-consumer-group #消费者在读取一个没有offset的分区或者offset无效时的策略,默认earliest是从头读,latest不是从头读 auto-offset-reset: earliest #是否自动提交偏移量offset,默认为true,一般是false,如果为false,则auto-commit-interval属性就会无效 enable-auto-commit: true #自动提交间隔时间,接收到消息后多久会提交offset,前提需要开启自动提交,也就是enable-auto-commit设置为true,默认单位是毫秒(ms),如果写10s,最后加载的显示值为10000ms,需要符合特定时间格式:1000ms,1S,1M,1H,1D(毫秒,秒,分,小时,天) auto-commit-interval: 1000 # 指定消息key和消息体的编解码方式 key-serializer: org.apache.kafka.common.serialization.StringDeserializer value-serializer: org.apache.kafka.common.serialization.StringDeserializer #批量消费每次最多消费多少条信息 max-poll-records: 50 #监听器设置 listener: #消费端监听的topic不存在时,项目启动会报错(关掉) missing-topics-fatal: false #设置消费类型 批量消费batch,单条消费single type: batch #指定容器的线程数,提高并发量,默认为1 #concurrency: 3 #手动提交偏移量,当enable-auto-commit为true自动提交时,不需要设置改属性 #ack-mode: manual
其他配置参考
###########【Kafka集群】########### spring.kafka.bootstrap-servers=112.126.74.249:9092,112.126.74.249:9093 ###########【初始化生产者配置】########### # 重试次数 spring.kafka.producer.retries=0 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1) spring.kafka.producer.acks=1 # 批量大小 spring.kafka.producer.batch-size=16384 # 提交延时 spring.kafka.producer.properties.linger.ms=0 # 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka # linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了 # 生产端缓冲区大小 spring.kafka.producer.buffer-memory = 33554432 # Kafka提供的序列化和反序列化类 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer # 自定义分区器 # spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner ###########【初始化消费者配置】########### # 默认的消费组ID spring.kafka.consumer.properties.group.id=defaultConsumerGroup # 是否自动提交offset spring.kafka.consumer.enable-auto-commit=true # 提交offset延时(接收到消息后多久提交offset) spring.kafka.consumer.auto.commit.interval.ms=1000 # 当kafka中没有初始offset或offset超出范围时将自动重置offset # earliest:重置为分区中最小的offset; # latest:重置为分区中最新的offset(消费分区中新产生的数据); # none:只要有一个分区不存在已提交的offset,就抛出异常; spring.kafka.consumer.auto-offset-reset=latest # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作) spring.kafka.consumer.properties.session.timeout.ms=120000 # 消费请求超时时间 spring.kafka.consumer.properties.request.timeout.ms=180000 # Kafka提供的序列化和反序列化类 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 消费端监听的topic不存在时,项目启动会报错(关掉) spring.kafka.listener.missing-topics-fatal=false # 设置批量消费 # spring.kafka.listener.type=batch # 批量消费每次最多消费多少条消息 # spring.kafka.consumer.max-poll-records=50
生成者
@RestController public class KafkaProducer { String topic = "first"; @Autowired private KafkaTemplate<String, Object> kafkaTemplate; // 发送消息 @GetMapping("/kafka/normal/{message}") public void sendMessage1(@PathVariable("message") String normalMessage) { kafkaTemplate.send(topic , normalMessage); } }
消费者
package com.project.kafkademo.kafkaconsumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.List; @Component public class KafkaConsumer { //消费监听,topics=监听的主题名,groupId=分组,consumer.properties里面的group.id配置 //如果在这里直接写groupId="test-consumer-group"会导致application.yml里面设置的group-id不起效 //最终会被这里的设置直接覆盖掉,所以这里不应该加groupId="test-consumer-group"这个属性 //@KafkaListener(topics = "first",groupId="test-consumer-group") //这样写的话,application.yml里面设置的group-id就会生效,监控的就是application.yml里面的了 //逐条消费 /*@KafkaListener(topics = "first") public void onMessage(ConsumerRecord<?,?> record){ //消费的哪个topic、partition的消息,打印出消息内容 System.out.println("消费:"+record.topic()+"-"+record.partition()+"-"+record.value()); }*/ //批量消费,用List批量接收消息,ConsumerRecord<?,?>只能单条消费消息 /*@KafkaListener(topics = "first") public void onMessage(List<ConsumerRecord<?,?>> records){ System.out.println("消费数量="+records.size()); for(ConsumerRecord<?,?> record:records){ //消费的哪个topic、partition的消息,打印出消息内容 System.out.println("消费:"+record.topic()+"-"+record.partition()+"-"+record.value()); } }*/ //批量消费,ConsumerRecords<?,?>用于批量消费消息 @KafkaListener(topics = "first") public void onMessage(ConsumerRecords<?,?> records){ System.out.println("消费数量="+records.count()); for(ConsumerRecord<?,?> record:records){ //消费的哪个topic、partition(哪个分区)的消息,打印出消息内容 System.out.println("消费:"+record.topic()+"-"+record.partition()+"-"+record.key()+"-"+record.value()); } } }