Kafka集群的安装(传统方式&Docker方式)&Springboot整合Kafka

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: Kafka集群的安装(传统方式&Docker方式)&Springboot整合Kafka

正文


一、什么是kafka


       Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。——来自百度百科


二、卡夫卡安装


传统方式


下载地址:Apache Download Mirrors


安装环境:


1、Java8+ ,参考Linux系统下安装jdk17&jdk8安装


2、安装ZK,参考搭建ZooKeeper3.7.0集群(传统方式&Docker方式)


3、解压文件


[root@localhost ~]# tar -zxvf kafka_2.13-3.0.0.tgz


4、移动到/usr/local/kafka


[root@localhost ~]# mv kafka_2.13-3.0.0 /usr/local/kafka


5、修改kafka配置文件


[root@localhost config]# vim server.properties


broker1

broker.id=0
#监听
listeners=PLAINTEXT://192.168.139.155:9092
#zk地址
zookeeper.connect=192.168.139.155:2181, 192.168.139.155:2182, 192.168.139.155:2183


broker2


broker.id=1
#监听
listeners=PLAINTEXT://192.168.139.156:9092
#zk地址
zookeeper.connect=192.168.139.155:2181, 192.168.139.155:2182, 192.168.139.155:2183


broker3


broker.id=2
#监听
listeners=PLAINTEXT://192.168.139.157:9094
#zk地址
zookeeper.connect=192.168.139.155:2181, 192.168.139.155:2182, 192.168.139.155:2183


6、分别启动kafka


[root@localhost kafka]# ./bin/kafka-server-start.sh -daemon config/server.properties


7、在其中一台创建topic


[root@localhost kafka]# ./bin/kafka-topics.sh --bootstrap-server 192.168.139.155:9092 --create --topic test-topic --partitions 3   --replication-factor 3 


通过zk的可视化工具可知,分区已经创建完成。


111.png


8、测试


发送消息


[root@localhost kafka]# ./bin/kafka-console-producer.sh --topic test-topic --bootstrap-server 192.168.139.155:9092


消费消息 在另一台broker上接收消息


[root@localhost kafka]# ./bin/kafka-console-consumer.sh --topic test-topic --from-beginning --bootstrap-server 192.168.139.156:9092


docker方式安装


1、拉取镜像


[root@localhost ~]# docker pull wurstmeister/kafka


2、安装


Broker1


docker run -d --name kafka1 \
-p 9092:9092 \
-e KAFKA_BROKER_ID=1 \
-e KAFKA_ZOOKEEPER_CONNECT=192.168.139.155:2181,192.168.139.155:2182,192.168.139.155:2183 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.139.155:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka

Broker2


docker run -d --name kafka2 \
-p 9093:9093 \
-e KAFKA_BROKER_ID=2 \
-e KAFKA_ZOOKEEPER_CONNECT=192.168.139.155:2181,192.168.139.155:2182,192.168.139.155:2183 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.139.155:9093 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9093 -t wurstmeister/kafka


Broker3


docker run -d --name kafka3 \
-p 9094:9094 \
-e KAFKA_BROKER_ID=3 \
-e KAFKA_ZOOKEEPER_CONNECT=192.168.139.155:2181,192.168.139.155:2182,192.168.139.155:2183 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.139.155:9094 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9094 -t wurstmeister/kafka


3、测试


进入容器


[root@bogon ~]# docker container exec -it 2d4be3823f16 /bin/bash 
进入/opt/kafka_2.13-2.7.1/bin目录


创建topic


bash-5.1# ./kafka-topics.sh --bootstrap-server 192.168.139.155:9092 --create --topic my-topic --partitions 3   --replication-factor 3


创建消息


bash-5.1# ./kafka-console-producer.sh --topic my-topic --bootstrap-server 192.168.139.155:9092 

消费者 ,进入另一个容器进行消费


bash-5.1# ./kafka-console-consumer.sh --topic my-topic --from-beginning --bootstrap-server 192.168.139.155:9093


三、整合SpringBoot


一般模式消费


默认情况下是自动提交offset值。可通过consumer下的属性配置


enable-auto-commit: false


生产者


   public void sendMSg(){
        System.out.println(">>>>>>>>>>>>>>>>>");
        for (int i=0;i<5;i++){
            kafkaTemplate.send("xiaojie-topic","test message>>>>>>>>>>>>>>>>>>>>>>"+i);
        }
    }


消费者


  @KafkaListener(groupId = "xiaojie_group",topics = {"xiaojie-topic"})
    public void onMessage(ConsumerRecord<?, ?> record) {
        log.info("消费主题>>>>>>{},消费分区>>>>>>>>{},消费偏移量>>>>>{},消息内容>>>>>{}",
                record.topic(), record.partition(), record.offset(), record.value());
    }

生产者回调模式


生产者回调函数,可以确认消息是否成功发送到broker,发送失败,进行重试或者人工补偿措施,确保消息投递到broker。有以下两种方式


方式1:


public void sendMsgCallback(String callbackMessage){
        kafkaTemplate.send("callback-topic","xiaojie_key",callbackMessage).addCallback(success -> {
        //当消息发送成功的回调函数
            // 消息发送到的topic
            String topic = success.getRecordMetadata().topic();
            // 消息发送到的分区
            int partition = success.getRecordMetadata().partition();
            // 消息在分区内的offset
            long offset = success.getRecordMetadata().offset();
            System.out.println("发送消息成功>>>>>>>>>>>>>>>>>>>>>>>>>" + topic + "-" + partition + "-" + offset);
        }, failure -> {
            //消息发送失败的回调函数
            System.out.println("消息发送失败,可以进行人工补偿");
        });
    }


方式2


public void sendMsgCallback1(String callbackMessage){
        kafkaTemplate.send("callback-topic","xiaojie_key",callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable ex) {
                //发送失败
                System.out.println("发送失败。。。。。。。。。。。");
            }
            @Override
            public void onSuccess(SendResult<String, String> result) {
                //分区信息
                Integer partition = result.getRecordMetadata().partition();
                //主题
                String topic=result.getProducerRecord().topic();
                String key=result.getProducerRecord().key();
                //发送成功
                System.out.println("发送成功。。。。。。。。。。。分区为:"+partition+",主题topic:"+topic+",key:"+key);
            }
        });
    }


Kafka事务


应用场景


最简单的需求是producer发的多条消息组成一个事务这些消息需要对consumer同时可见或者同时不可见 。

producer可能会给多个topic,多个partition发消息,这些消息也需要能放在一个事务里面,这就形成了一个典型的分布式事务。

kafka的应用场景经常是应用先消费一个topic,然后做处理再发到另一个topic,这个consume-transform-produce过程需要放到一个事务里面,比如在消息处理或者发送的过程中如果失败了,消费位点也不能提交。

producer或者producer所在的应用可能会挂掉,新的producer启动以后需要知道怎么处理之前未完成的事务 。

流式处理的拓扑可能会比较深,如果下游只有等上游消息事务提交以后才能读到,可能会导致rt非常长吞吐量也随之下降很多,所以需要实现read committed和read uncommitted两种事务隔离级别

在spring.kafka.producer.transaction-id-prefix: tx #开启事务管理


注意:此时重试retries不能为0,acks=-1或者all


    /**
     * @description: kafak事务提交 本地事务不需要事务管理器
     * @param:
     * @return: void
     * @author xiaojie
     * @date: 2021/10/14 21:35
     */
    public void sendTx(){
        kafkaTemplate.executeInTransaction(kafkaOperations -> {
            String msg="这是一条测试事务的数据......";
            kafkaOperations.send("tx-topic",msg);
            int i=1/0; //报错之后,由于事务存在,消息并不会发送到broker
            return null;
        });
    }


消费者批量消费消息


消费者批量消费消息,如果此时开启批量消费模式,那么同样的topic,消费者将会进行批量消费,不再进行逐条消费。


 
         


消费者手动确认


Kafak并不会像rabbitmq那样,消息消费之后,会将消息从队列中删除,Kafka通常根据时间决定数据可以保留多久。默认使用log.retention.hours参数配置时间,默认值是168小时,也就是一周。除此之外,还有其他两个参数,log.retention.minutes和log.retention.ms,这三个参数作用是一样的,都是决定消息多久以会被删除,不过还是推荐使用log.retention.ms,如果指定了不止一个参数,Kafka会优先使用最小值的那个参数。卡夫卡是以offset的位置进行消费,如果不进行,确认那么消费者下次消费的时候,还会从上次消费的位置进行消费。


修改消费者自动提交为false:enable-auto-commit: false


配置工厂类


/*
    RECORD,当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
    BATCH,当每一批记录被消费者监听器(ListenerConsumer)处理之后提交
    TIME, 每隔多长时间提交,超过该时间会自动提交
    COUNT, 每次提交的数量,超过该数量自动提交
    COUNT_TIME, 满足时间和数量的任何一个条件提交
    MANUAL_IMMEDIATE
    MANUAL
 */ 
@Bean("manualListenerContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> manualListenerContainerFactory(
            ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties().setPollTimeout(1500);
        factory.setBatchListener(true); //设置批量为true,那么消费端就要一批量的形式接收信息
        //配置手动提交offset
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }


消费者


    /*
     *
     * @param message
     * @param ack
     * @手动提交ack
     * containerFactory  手动提交消息ack
     * errorHandler 消费端异常处理器
     * @author xiaojie
     * @date 2021/10/14
     * @return void
     */
    @KafkaListener(containerFactory = "manualListenerContainerFactory", topics = "xiaojie-topic",
            errorHandler = "consumerAwareListenerErrorHandler"
    )
    public void onMessageManual(List<ConsumerRecord<?, ?>> record, Acknowledgment ack) {
        for (int i=0;i<record.size();i++){
            System.out.println(record.get(i).value());
        }
        ack.acknowledge();//直接提交offset
    }


指定消费

  /**
     * @description: id:消费者ID;
     * groupId:消费组ID;
     * topics:监听的topic,可监听多个; topics不能和topicPartitions同时使用
     * topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听。
     * @param:
     * @param: record
     * @return: void
     * @author xiaojie
     * @date: 2021/10/14 21:50
     */
    @KafkaListener(groupId = "xiaojie_group",topicPartitions = {
            @TopicPartition(topic = "test-topic", partitions = {"1"}),
            @TopicPartition(topic = "xiaojie-test-topic", partitions = {"1"},
                    partitionOffsets = @PartitionOffset(partition = "2", initialOffset = "15"))
    })
    public void onMessage1(ConsumerRecord<?, ?> record) {
        //指定消费某个topic,的某个分区,指定消费位置
        //执行消费xiaojie-test-topic的1号分区,和xiaojie-test-topic的1和2号分区,并且2号分区从15开始消费
        log.info("消费主题>>>>>>:{},消费分区>>>>>>>>:{},消费偏移量>>>>>:{},消息内容>>>>>:{}",
                record.topic(), record.partition(), record.offset(), record.value());
    }


指定自定义分区器


我们知道,kafka中每个topic被划分为多个分区,那么生产者将消息发送到topic时,具体追加到哪个分区呢?这就是所谓的分区策略,Kafka 为我们提供了默认的分区策略,同时它也支持自定义分区策略。其路由机制为:

1、若发送消息时指定了分区(即自定义分区策略),则直接将消息append到指定分区;

2、若发送消息时未指定 patition,但指定了 key(kafka允许为每条消息设置一个key),则对key值进行hash计算,根据计算结果路由到指定分区,这种情况下可以保证同一个 Key 的所有消息都进入到相同的分区;这种方式可以解决消息顺序消费


3、patition 和 key 都未指定,则使用kafka默认的分区策略,轮询选出一个 patition;


package com.xiaojie.config;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
 * @Description:自定义分区器 我们知道,kafka中每个topic被划分为多个分区,那么生产者将消息发送到topic时,具体追加到哪个分区呢?这就是所谓的分区策略,Kafka 为我们提供了默认的分区策略,同时它也支持自定义分区策略。其路由机制为:
 * 若发送消息时指定了分区(即自定义分区策略),则直接将消息append到指定分区;
 * 若发送消息时未指定 patition,但指定了 key(kafka允许为每条消息设置一个key),则对key值进行hash计算,根据计算结果路由到指定分区,
 * 这种情况下可以保证同一个 Key 的所有消息都进入到相同的分区;这种方式可以解决消息顺序消费
 * patition 和 key 都未指定,则使用kafka默认的分区策略,轮询选出一个 patition;
 * @author: xiaojie
 * @date: 2021.10.14
 */
@Component
public class CustomizePartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //计算分区器
        System.out.println("key>>>>>>>>>>>>>"+key);
        if ("weixin".equals(key)&&"test-topic".equals(topic)){
            return 1;
        }
        return 0;
    }
    @Override
    public void close() {
    }
    @Override
    public void configure(Map<String, ?> configs) {
    }
}


消费端异常处理


package com.xiaojie.config;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.stereotype.Component;
/**
 * @author xiaojie
 * @version 1.0
 * @description:通过异常处理器,我们可以处理consumer在消费时发生的异常。
 * 将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面
 * @date 2021/10/14 21:56
 */
@Component
public class MyErrorHandler {
    @Bean
    ConsumerAwareListenerErrorHandler consumerAwareListenerErrorHandler(){
        return (message, e, consumer) -> {
            System.out.println("消息消费异常"+message.getPayload());
            System.out.println("异常信息>>>>>>>>>>>>>>>>>"+e);
            return null;
        };
    }
}


使用方法


/*
     *
     * @param message
     * @param ack
     * @手动提交ack
     * containerFactory  手动提交消息ack
     * errorHandler 消费端异常处理器
     * @author xiaojie
     * @date 2021/10/14
     * @return void
     */
    @KafkaListener(containerFactory = "manualListenerContainerFactory", topics = "xiaojie-topic",
            errorHandler = "consumerAwareListenerErrorHandler"
    )
    public void onMessageManual(List<ConsumerRecord<?, ?>> record, Acknowledgment ack) {
        for (int i=0;i<record.size();i++){
            System.out.println(record.get(i).value());
        }
        ack.acknowledge();//直接提交offset
    }


消息过滤器


  @Bean("filterFactory")
    public ConcurrentKafkaListenerContainerFactory filterFactory(ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        factory.setAckDiscarded(true);
        factory.setRecordFilterStrategy(consumerRecord -> {
            String value = (String) consumerRecord.value();
            if (value.contains("hello")) {
                //返回false消息没有被过滤继续消费
                return false;
            }
            System.out.println("....................");
            //返回true 消息被过滤掉了
            return true;
        });
        return factory;
    }


使用方法


  /** 
     * @description: 消费者过滤器
     * @param: 
     * @param: record
     * @return: void
     * @author xiaojie
     * @date: 2021/10/16 1:04
     */
    @KafkaListener(topics = "filter-topic",containerFactory = "filterFactory")
    public void filterOnmessage(ConsumerRecord<?,?> record){
        log.info("消费到的消息是:》》》》》》》》》》》{}",record.value());
    }


完整代码请参考,kafka部分:spring-boot: Springboot整合redis、消息中间件等相关代码


参考:SpringBoot集成kafka全面实战_Felix-CSDN博客

目录
打赏
0
0
0
0
9
分享
相关文章
Docker Compose V2 安装常用数据库MySQL+Mongo
以上内容涵盖了使用 Docker Compose 安装和管理 MySQL 和 MongoDB 的详细步骤,希望对您有所帮助。
135 42
阿里云服务器一键安装Docker社区版教程,基于系统运维管理OOS
阿里云服务器一键安装Docker社区版教程,基于系统运维管理OOS自动化部署。支持Ubuntu 22.04/20.04、CentOS 7.7-7.9及Alibaba Cloud Linux 3.2104 LTS。前提条件:ECS实例需运行中且有公网。步骤:选择Docker扩展并安装,验证成功通过命令`docker -v`查看版本号。
322 79
docker环境安装kafka/Flink/clickhouse镜像
通过上述步骤和示例,您可以系统地了解如何使用Docker Compose安装和配置Kafka、Flink和ClickHouse,并进行基本的验证操作。希望这些内容对您的学习和工作有所帮助。
109 28
|
19天前
|
【02】客户端服务端C语言-go语言-web端PHP语言整合内容发布-优雅草网络设备监控系统-2月12日优雅草简化Centos stream8安装zabbix7教程-本搭建教程非docker搭建教程-优雅草solution
【02】客户端服务端C语言-go语言-web端PHP语言整合内容发布-优雅草网络设备监控系统-2月12日优雅草简化Centos stream8安装zabbix7教程-本搭建教程非docker搭建教程-优雅草solution
68 20
安装docker-18.06报错Error: libseccomp conflicts with docker-18.06
通过这些步骤,您可以成功在CentOS上安装Docker 18.06,并解决libseccomp的冲突问题。这些方法确保系统兼容性,并保证Docker的正常运行。
62 27
ubuntu22 编译安装docker,和docker容器方式安装 deepseek
本脚本适用于Ubuntu 22.04,主要功能包括编译安装Docker和安装DeepSeek模型。首先通过Apt源配置安装Docker,确保网络稳定(建议使用VPN)。接着下载并配置Docker二进制文件,创建Docker用户组并设置守护进程。随后拉取Debian 12镜像,安装系统必备工具,配置Ollama模型管理器,并最终部署和运行DeepSeek模型,提供API接口进行交互测试。
278 15
docker compose 安装 kafka
通过本文的步骤,您可以快速在本地使用 Docker Compose 安装并配置 Kafka 和 Zookeeper。Docker Compose 简化了多容器应用的管理,方便快速搭建和测试分布式系统。
62 2
docker安装nginx,前端项目运行
通过上述步骤,你可以轻松地在Docker中部署Nginx并运行前端项目。这种方法不仅简化了部署流程,还确保了环境的一致性,提高了开发和运维的效率。确保按步骤操作,并根据项目的具体需求进行相应的配置调整。
160 25
docker私有仓库harbor安装
通过以上步骤,您可以成功在企业内部安装和配置Harbor私有仓库,方便地管理和分发Docker镜像。Harbor不仅提供了基础的镜像管理功能,还增强了安全性、身份管理和审计功能,使其成为企业级容器镜像管理的理想选择。
112 22
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。