Kafka与SpringBoot的整合使用

简介: Kafka与SpringBoot的整合使用

Kafka是Apache下的一个子项目,是一个高性能跨语言分布式发布/订阅消息队列系统,完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡;Apache Kafka相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。

1.今天我们来看下springboot怎么整合kafka使用,首先我们要引入需要的jar包:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>


2.添加基本配置:

###########【Kafka集群】###########
spring.kafka.bootstrap-servers=127.0.0.1:9092
###########【初始化生产者配置】###########
# 重试次数
spring.kafka.producer.retries=0
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=1
# 批量大小
spring.kafka.producer.batch-size=16384
# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
# 生产端缓冲区大小
spring.kafka.producer.buffer-memory = 33554432
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
###########【初始化消费者配置】###########
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto-commit-interval=1000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.group-id=test_group
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer


3.写一个发布消息的controller,有简单发送消息和带回调消息的两种写法:

package com.demo.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TestController {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    // 发送消息
    @GetMapping("/kafka/{message}")
    public void sendMessage1(@PathVariable("message") String normalMessage) {
        kafkaTemplate.send("topic1", normalMessage);
    }
    @GetMapping("/kafkaBack1/{message}")
    public void sendMessage2(@PathVariable("message") String callbackMessage) {
        kafkaTemplate.send("topic1", callbackMessage).addCallback(success -> {
            // 消息发送到的topic
            String topic = success.getRecordMetadata().topic();
            // 消息发送到的分区
            int partition = success.getRecordMetadata().partition();
            // 消息在分区内的offset
            long offset = success.getRecordMetadata().offset();
            System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
        }, failure -> {
            System.out.println("发送消息失败:" + failure.getMessage());
        });
    }
    @GetMapping("/kafkaBack2/{message}")
    public void sendMessage3(@PathVariable("message") String callbackMessage) {
        kafkaTemplate.send("topic1", callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("发送消息失败:"+ex.getMessage());
            }
            @Override
            public void onSuccess(SendResult<String, Object> result) {
                System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
                        + result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
            }
        });
    }
}



4.定义一个监听的类,收到消息处理的消费者:

package com.demo.component;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"topic1"})
    public void onMessage1(ConsumerRecord<?, ?> record){
        // 消费的哪个topic、partition的消息,打印出消息内容
        System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
    }
}


简单吧,相比于spring的复杂配置,boot把很多东西给自己处理了,我们只需要简单的配置就可以使用了。
源码地址:
https://gitee.com/fengyamin/BootFrameDemo.git

相关文章
|
7天前
|
消息中间件 Java Kafka
Springboot集成高低版本kafka
Springboot集成高低版本kafka
|
9月前
|
消息中间件 存储 NoSQL
kafka整合springboot以及核心参数的使用
kafka整合springboot以及核心参数的使用
306 0
|
10月前
|
消息中间件 缓存 NoSQL
手把手教你云相册项目简易开发 day1 Kafka+IDEA+Springboot+Redis+MySQL+libvips 简单运行和使用
手把手教你云相册项目简易开发 day1 Kafka+IDEA+Springboot+Redis+MySQL+libvips 简单运行和使用
146 0
|
6月前
|
消息中间件 数据可视化 Java
消息中间件系列教程(22) -Kafka- SpringBoot集成Kafka
消息中间件系列教程(22) -Kafka- SpringBoot集成Kafka
72 0
|
7天前
|
消息中间件 SQL druid
最新版 springboot集成kafka
最新版 springboot集成kafka
37 0
|
9月前
|
消息中间件 数据可视化 Java
SpringBoot3集成Kafka
SpringBoot3集成KafkaKafka是一个开源的分布式事件流平台,常被用于高性能数据管道、流分析、数据集成和关键任务应用,基于Zookeeper协调的处理平台,也是一种消息系统,具有更好的吞吐量、内置分区、复制和容错。
216 0
|
8天前
|
消息中间件 Java Kafka
spring boot 集成kafka
spring boot 集成kafka
57 0
spring boot 集成kafka
|
8天前
|
消息中间件 Java 关系型数据库
【Spring Boot+Kafka+Mysql+HBase】实现分布式优惠券后台应用系统(附源码)
【Spring Boot+Kafka+Mysql+HBase】实现分布式优惠券后台应用系统(附源码)
106 2
|
8月前
|
消息中间件 Java Kafka
SpringBoot整合Kafka(SASL认证配置、处理毒丸消息)
SpringBoot整合Kafka(SASL认证配置、处理毒丸消息)
942 0
|
9月前
|
消息中间件 Java Kafka
SpringBoot整合Kafka简单配置实现生产消费
本文基于SpringBoot整合Kafka,通过简单配置实现生产及消费,包括生产消费的配置说明、消费者偏移设置方式等。更多功能细节可参考
273 0