applycation.yml
spring: # https://developer.aliyun.com/article/784990 kafka: bootstrap-servers: XXXX producer: # producer 生产者 retries: 1 # 重试次数 acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1) batch-size: 51200 # 批量大小 buffer-memory: 33554432 # 生产端缓冲区大小 key-serializer: org.apache.kafka.common.serialization.StringSerializer #value-serializer: com.itheima.demo.config.MySerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer compression-type: none properties: send.buffer.bytes: 262144 security.protocol: SASL_SSL #ssl.ca.location: D:\KafkaCertificate\ca-cert.pem sasl.mechanism: PLAIN sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="xxx" password="xxx"; # ;号不能少 ssl.truststore.location: D:/KafkaCertificate/mix.4096.client.truststore.jks ssl.truststore.password: KafkaOnsClient ssl.endpoint.identification.algorithm: consumer: # consumer消费者 group-id: vipsoft-group # 默认的消费组ID enable-auto-commit: true # 是否自动提交offset auto-commit-interval: 100 # 提交offset延时(接收到消息后多久提交offset) # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 auto-offset-reset: latest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # value-deserializer: com.itheima.demo.config.MyDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer properties: send.buffer.bytes: 262144 security.protocol: SASL_SSL #ssl.ca.location: D:\KafkaCertificate\ca-cert.pem sasl.mechanism: PLAIN sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="xxx" password="xxx"; ssl.truststore.location: D:/KafkaCertificate/mix.4096.client.truststore.jks ssl.truststore.password: KafkaOnsClient ssl.endpoint.identification.algorithm:
KafkaController
package com.vipsoft.controller; import com.vipsoft.kafka.Sender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; @RestController @RequestMapping("/kafka") public class KafkaController { @Autowired private Sender sender; @GetMapping("/send/{msg}") // 发送消息测试,注意此处为Post public String send(@PathVariable("msg") String msg) { sender.send(msg); return msg; } }
Sender.java
package com.vipsoft.kafka; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; @Component public class Sender { @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void send(String msg) { this.kafkaTemplate.send("vipsoft-topic","my_msg", msg); System.out.println("send message:" + msg); } }
Receiver.java
package com.vipsoft.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class Receiver { @KafkaListener(topics = { "vipsoft-topic" }) // 参数配置要监听的Topic public void receiveMessage(ConsumerRecord<String, String> record) { System.out.println("Receive Message"); System.out.println("【*** Message: ***】key = " + record.key() + "、value = " + record.value()); } }