spring boot 集成kafka
1、先解决依赖
springboot相关的依赖我们就不提了,和kafka相关的只依赖一个spring-kafka集成包 org.springframework.kafka spring-kafka 2.2.0.RELEASE 这里我们先把配置文件展示一下 #============== kafka producer=================== spring.kafka.producer.bootstrap-servers=localhost:9092 spring.kafka.producer.retries=1 spring.kafka.producer.batch-size=16384 spring.kafka.producer.buffer-memory=33554432 spring.kafka.producer.properties.max.requst.size=2097152 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #=============== consumer ======================= spring.kafka.consumer.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=0 spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.auto-commit-interval=100 #=======set comsumer max fetch.byte 2*1024*1024============= spring.kafka.consumer.properties.max.partition.fetch.bytes=2097152
2、Kafka producer
@Order(value = 1) @Component @Slf4j public class Producer implements CommandLineRunner { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; @Override public void run(String... strings) throws Exception { while (true) { log.info("kafka的消息"); kafkaTemplate.send("test", "zhugezifang"); log.info("发送kafka成功."); Thread.sleep(5000); } } }
3、kafka consumer
@Component @Slf4j public class Consumer { @KafkaListener(topics = {"test"}) public void listen(ConsumerRecord<?, ?> record) { log.info("topic:{}, offset:{}, value:{}", record.topic(), record.offset(), record.value()); } }