kafka SpringBoot
启动zookeeper
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
启动kafka
./bin/kafka-server-start.sh ./config/server.properties
创建topic
bin/kafka-topics.sh --create --topic topic1 --bootstrap-server localhost:9092
SpringBoot 引入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
application.yml配置
server:
port: 8080
spring:
application:
name: paw-kafka
#kafka
kafka:
# 连接kafka的地址,多个地址用逗号分隔
bootstrap-servers: localhost:9092
#producer
producer:
#若设置大于0的值,客户端会将发送失败的记录重新发送
retries: 0
#当将多个记录被发送到同一个分区时Producer将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。16384是缺省的配置
batch-size: 16384
#Producer 用来缓冲等待被发送到服务器的记录的总字节数,33554432是缺省配置
buffer-memory: 33554432
#关键字的序列化类
key-serializer: org.apache.kafka.common.serialization.StringSerializer
#值的序列化类
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
linger.ms: 1
#cousumer
consumer:
enable-auto-commit: false
auto-commit-interval: 100ms
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: test-group-id
properties:
session.timeout.ms: 15000
创建生产者、消费者
生产者通过KafkaTemplate.send发送消息,send可以指定key, 同一key发往同一分区,会发给同一消费者保证时序性。
消费者通过@KafkaListener监听topic,groupId指定分组,kafka将消息发往所有的分组,同一分组会选择一个消费者。
@Service
@Slf4j
public class KfkService {
@Autowired
private KafkaTemplate<Integer,String> kafkaTemplate;
//消费者:监听topic1,groupId1
@KafkaListener(topics = {"topic1"},groupId = "groupId1")
public void consumer1(ConsumerRecord<Integer,String> record){
log.info("consumer1 kfk consume message start...");
log.info("consumer1 kfk consume message topic:{},msg:{}",record.topic(),record.value());
log.info("consumer1 kfk consume message end...");
}
//消费者:监听topic1,groupId2
@KafkaListener(topics = {"topic1"},groupId = "groupId2")
public void consumer3(ConsumerRecord<Integer,String> record){
log.info("consumer3 kfk consume message start...");
log.info("consumer3 kfk consume message topic:{},msg:{}",record.topic(),record.value());
log.info("consumer3 kfk consume message end...");
}
//消费者:监听topic1,groupId2
@KafkaListener(topics = {"topic1"},groupId = "groupId2")
public void consumer2(ConsumerRecord<Integer,String> record){
log.info("consumer2 kfk consume message start...");
log.info("consumer2 kfk consume message topic:{},msg:{}",record.topic(),record.value());
log.info("consumer2 kfk consume message end...");
}
//生产者
public void sendMsg(String topic , String msg){
log.info("开始发送kfk消息,topic:{},msg:{}",topic,msg);
ListenableFuture<SendResult<Integer, String>> sendMsg = kafkaTemplate.send(topic, msg);
//消息确认
sendMsg.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onFailure(Throwable throwable) {
log.error("send error,ex:{},topic:{},msg:{}",throwable,topic,msg);
}
@Override
public void onSuccess(SendResult<Integer, String> stringStringSendResult) {
log.info("send success,topic:{},msg:{}",topic,msg);
}
});
log.info("kfk send end!");
}
}
测试类
@RestController
public class KfkController {
@Autowired
private KfkService kfkService;
@GetMapping("/send")
public String send(){
kfkService.sendMsg("topic1","I am topic msg");
return "success-topic1";
}
}
gitee: https://gitee.com/tg_seahorse/paw-demos paw-kafka项目