Step By Step
1、kafka控制台创建公网类型实例
2、创建SpringBoot项目集成阿里云Kafka
3、发送接收测试
一、kafka控制台创建公网类型实例
1.1 Kafka控制台创建实例
1.2 获取认证参数
二、创建SpringBoot项目集成阿里云Kafka
2.1 创建Spring Boot(2.5.2)项目
2.2 依赖
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
2.3 Sender.class
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class Sender {
@Autowired
private KafkaTemplate<String, String> template;
public void send(String msg) {
this.template.sendDefault("my_msg", msg);
System.out.println("send message:" + msg);
}
}
2.4 Receiver.class
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class Receiver {
@KafkaListener(topics = { "taro_topic" }) // 参数配置要监听的Topic
public void receiveMessage(ConsumerRecord<String, String> record) {
System.out.println("Receive Message");
System.out.println("【*** Message: ***】key = " + record.key() + "、value = " + record.value());
}
}
2.5 KafkaController.class
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class KafkaController {
@Autowired
private Sender sender;
@PostMapping("/send/{msg}") // 发送消息测试,注意此处为Post
public String send(@PathVariable("msg") String msg) {
sender.send(msg);
return msg;
}
}
2.6 application.yml
spring:
kafka:
template:
default-topic: <topic>
bootstrap-servers: <SSL接入点>
jaas:
enabled: true
loginModule: org.apache.kafka.common.security.plain.PlainLoginModule
options:
username: <用户名>
password: <密码>
consumer:
ssl:
truststoreLocation: file:/kafka.client.truststore.jks
properties:
sasl.mechanism: PLAIN
security.protocol: SASL_SSL
ssl.endpoint.identification.algorithm:
group-id: <group>
max-poll-records: 2
producer:
ssl:
truststoreLocation: file:/kafka.client.truststore.jks
retries: 3
acks: 1
compression-type: lz4
buffer-memory: 33554432
batch-size: 51200
properties:
send.buffer.bytes: 262144
sasl.mechanism: PLAIN
security.protocol: SASL_SSL
ssl.endpoint.identification.algorithm:
kafka.client.truststore.jks 下载 地址,证书下载后直接放在C盘根目录下。
2.7 项目结构
三、发送接收测试
3.1 启动项目,使用PostMan发送Post请求
3.2 项目日志
3.3 控制台消息监控查看