Kafka集群的安装(传统方式&Docker方式)&Springboot整合Kafka

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
容器服务 Serverless 版 ACK Serverless,317元额度 多规格
简介: Kafka集群的安装(传统方式&Docker方式)&Springboot整合Kafka

正文


一、什么是kafka


       Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。——来自百度百科


二、卡夫卡安装


传统方式


下载地址:Apache Download Mirrors


安装环境:


1、Java8+ ,参考Linux系统下安装jdk17&jdk8安装


2、安装ZK,参考搭建ZooKeeper3.7.0集群(传统方式&Docker方式)


3、解压文件


[root@localhost ~]# tar -zxvf kafka_2.13-3.0.0.tgz


4、移动到/usr/local/kafka


[root@localhost ~]# mv kafka_2.13-3.0.0 /usr/local/kafka


5、修改kafka配置文件


[root@localhost config]# vim server.properties


broker1

broker.id=0
#监听
listeners=PLAINTEXT://192.168.139.155:9092
#zk地址
zookeeper.connect=192.168.139.155:2181, 192.168.139.155:2182, 192.168.139.155:2183


broker2


broker.id=1
#监听
listeners=PLAINTEXT://192.168.139.156:9092
#zk地址
zookeeper.connect=192.168.139.155:2181, 192.168.139.155:2182, 192.168.139.155:2183


broker3


broker.id=2
#监听
listeners=PLAINTEXT://192.168.139.157:9094
#zk地址
zookeeper.connect=192.168.139.155:2181, 192.168.139.155:2182, 192.168.139.155:2183


6、分别启动kafka


[root@localhost kafka]# ./bin/kafka-server-start.sh -daemon config/server.properties


7、在其中一台创建topic


[root@localhost kafka]# ./bin/kafka-topics.sh --bootstrap-server 192.168.139.155:9092 --create --topic test-topic --partitions 3   --replication-factor 3 


通过zk的可视化工具可知,分区已经创建完成。


111.png


8、测试


发送消息


[root@localhost kafka]# ./bin/kafka-console-producer.sh --topic test-topic --bootstrap-server 192.168.139.155:9092


消费消息 在另一台broker上接收消息


[root@localhost kafka]# ./bin/kafka-console-consumer.sh --topic test-topic --from-beginning --bootstrap-server 192.168.139.156:9092


docker方式安装


1、拉取镜像


[root@localhost ~]# docker pull wurstmeister/kafka


2、安装


Broker1


docker run -d --name kafka1 \
-p 9092:9092 \
-e KAFKA_BROKER_ID=1 \
-e KAFKA_ZOOKEEPER_CONNECT=192.168.139.155:2181,192.168.139.155:2182,192.168.139.155:2183 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.139.155:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka

Broker2


docker run -d --name kafka2 \
-p 9093:9093 \
-e KAFKA_BROKER_ID=2 \
-e KAFKA_ZOOKEEPER_CONNECT=192.168.139.155:2181,192.168.139.155:2182,192.168.139.155:2183 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.139.155:9093 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9093 -t wurstmeister/kafka


Broker3


docker run -d --name kafka3 \
-p 9094:9094 \
-e KAFKA_BROKER_ID=3 \
-e KAFKA_ZOOKEEPER_CONNECT=192.168.139.155:2181,192.168.139.155:2182,192.168.139.155:2183 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.139.155:9094 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9094 -t wurstmeister/kafka


3、测试


进入容器


[root@bogon ~]# docker container exec -it 2d4be3823f16 /bin/bash 
进入/opt/kafka_2.13-2.7.1/bin目录


创建topic


bash-5.1# ./kafka-topics.sh --bootstrap-server 192.168.139.155:9092 --create --topic my-topic --partitions 3   --replication-factor 3


创建消息


bash-5.1# ./kafka-console-producer.sh --topic my-topic --bootstrap-server 192.168.139.155:9092 

消费者 ,进入另一个容器进行消费


bash-5.1# ./kafka-console-consumer.sh --topic my-topic --from-beginning --bootstrap-server 192.168.139.155:9093


三、整合SpringBoot


一般模式消费


默认情况下是自动提交offset值。可通过consumer下的属性配置


enable-auto-commit: false


生产者


   public void sendMSg(){
        System.out.println(">>>>>>>>>>>>>>>>>");
        for (int i=0;i<5;i++){
            kafkaTemplate.send("xiaojie-topic","test message>>>>>>>>>>>>>>>>>>>>>>"+i);
        }
    }


消费者


  @KafkaListener(groupId = "xiaojie_group",topics = {"xiaojie-topic"})
    public void onMessage(ConsumerRecord<?, ?> record) {
        log.info("消费主题>>>>>>{},消费分区>>>>>>>>{},消费偏移量>>>>>{},消息内容>>>>>{}",
                record.topic(), record.partition(), record.offset(), record.value());
    }

生产者回调模式


生产者回调函数,可以确认消息是否成功发送到broker,发送失败,进行重试或者人工补偿措施,确保消息投递到broker。有以下两种方式


方式1:


public void sendMsgCallback(String callbackMessage){
        kafkaTemplate.send("callback-topic","xiaojie_key",callbackMessage).addCallback(success -> {
        //当消息发送成功的回调函数
            // 消息发送到的topic
            String topic = success.getRecordMetadata().topic();
            // 消息发送到的分区
            int partition = success.getRecordMetadata().partition();
            // 消息在分区内的offset
            long offset = success.getRecordMetadata().offset();
            System.out.println("发送消息成功>>>>>>>>>>>>>>>>>>>>>>>>>" + topic + "-" + partition + "-" + offset);
        }, failure -> {
            //消息发送失败的回调函数
            System.out.println("消息发送失败,可以进行人工补偿");
        });
    }


方式2


public void sendMsgCallback1(String callbackMessage){
        kafkaTemplate.send("callback-topic","xiaojie_key",callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable ex) {
                //发送失败
                System.out.println("发送失败。。。。。。。。。。。");
            }
            @Override
            public void onSuccess(SendResult<String, String> result) {
                //分区信息
                Integer partition = result.getRecordMetadata().partition();
                //主题
                String topic=result.getProducerRecord().topic();
                String key=result.getProducerRecord().key();
                //发送成功
                System.out.println("发送成功。。。。。。。。。。。分区为:"+partition+",主题topic:"+topic+",key:"+key);
            }
        });
    }


Kafka事务


应用场景


最简单的需求是producer发的多条消息组成一个事务这些消息需要对consumer同时可见或者同时不可见 。

producer可能会给多个topic,多个partition发消息,这些消息也需要能放在一个事务里面,这就形成了一个典型的分布式事务。

kafka的应用场景经常是应用先消费一个topic,然后做处理再发到另一个topic,这个consume-transform-produce过程需要放到一个事务里面,比如在消息处理或者发送的过程中如果失败了,消费位点也不能提交。

producer或者producer所在的应用可能会挂掉,新的producer启动以后需要知道怎么处理之前未完成的事务 。

流式处理的拓扑可能会比较深,如果下游只有等上游消息事务提交以后才能读到,可能会导致rt非常长吞吐量也随之下降很多,所以需要实现read committed和read uncommitted两种事务隔离级别

在spring.kafka.producer.transaction-id-prefix: tx #开启事务管理


注意:此时重试retries不能为0,acks=-1或者all


    /**
     * @description: kafak事务提交 本地事务不需要事务管理器
     * @param:
     * @return: void
     * @author xiaojie
     * @date: 2021/10/14 21:35
     */
    public void sendTx(){
        kafkaTemplate.executeInTransaction(kafkaOperations -> {
            String msg="这是一条测试事务的数据......";
            kafkaOperations.send("tx-topic",msg);
            int i=1/0; //报错之后,由于事务存在,消息并不会发送到broker
            return null;
        });
    }


消费者批量消费消息


消费者批量消费消息,如果此时开启批量消费模式,那么同样的topic,消费者将会进行批量消费,不再进行逐条消费。


 
         


消费者手动确认


Kafak并不会像rabbitmq那样,消息消费之后,会将消息从队列中删除,Kafka通常根据时间决定数据可以保留多久。默认使用log.retention.hours参数配置时间,默认值是168小时,也就是一周。除此之外,还有其他两个参数,log.retention.minutes和log.retention.ms,这三个参数作用是一样的,都是决定消息多久以会被删除,不过还是推荐使用log.retention.ms,如果指定了不止一个参数,Kafka会优先使用最小值的那个参数。卡夫卡是以offset的位置进行消费,如果不进行,确认那么消费者下次消费的时候,还会从上次消费的位置进行消费。


修改消费者自动提交为false:enable-auto-commit: false


配置工厂类


/*
    RECORD,当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
    BATCH,当每一批记录被消费者监听器(ListenerConsumer)处理之后提交
    TIME, 每隔多长时间提交,超过该时间会自动提交
    COUNT, 每次提交的数量,超过该数量自动提交
    COUNT_TIME, 满足时间和数量的任何一个条件提交
    MANUAL_IMMEDIATE
    MANUAL
 */ 
@Bean("manualListenerContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> manualListenerContainerFactory(
            ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties().setPollTimeout(1500);
        factory.setBatchListener(true); //设置批量为true,那么消费端就要一批量的形式接收信息
        //配置手动提交offset
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }


消费者


    /*
     *
     * @param message
     * @param ack
     * @手动提交ack
     * containerFactory  手动提交消息ack
     * errorHandler 消费端异常处理器
     * @author xiaojie
     * @date 2021/10/14
     * @return void
     */
    @KafkaListener(containerFactory = "manualListenerContainerFactory", topics = "xiaojie-topic",
            errorHandler = "consumerAwareListenerErrorHandler"
    )
    public void onMessageManual(List<ConsumerRecord<?, ?>> record, Acknowledgment ack) {
        for (int i=0;i<record.size();i++){
            System.out.println(record.get(i).value());
        }
        ack.acknowledge();//直接提交offset
    }


指定消费

  /**
     * @description: id:消费者ID;
     * groupId:消费组ID;
     * topics:监听的topic,可监听多个; topics不能和topicPartitions同时使用
     * topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听。
     * @param:
     * @param: record
     * @return: void
     * @author xiaojie
     * @date: 2021/10/14 21:50
     */
    @KafkaListener(groupId = "xiaojie_group",topicPartitions = {
            @TopicPartition(topic = "test-topic", partitions = {"1"}),
            @TopicPartition(topic = "xiaojie-test-topic", partitions = {"1"},
                    partitionOffsets = @PartitionOffset(partition = "2", initialOffset = "15"))
    })
    public void onMessage1(ConsumerRecord<?, ?> record) {
        //指定消费某个topic,的某个分区,指定消费位置
        //执行消费xiaojie-test-topic的1号分区,和xiaojie-test-topic的1和2号分区,并且2号分区从15开始消费
        log.info("消费主题>>>>>>:{},消费分区>>>>>>>>:{},消费偏移量>>>>>:{},消息内容>>>>>:{}",
                record.topic(), record.partition(), record.offset(), record.value());
    }


指定自定义分区器


我们知道,kafka中每个topic被划分为多个分区,那么生产者将消息发送到topic时,具体追加到哪个分区呢?这就是所谓的分区策略,Kafka 为我们提供了默认的分区策略,同时它也支持自定义分区策略。其路由机制为:

1、若发送消息时指定了分区(即自定义分区策略),则直接将消息append到指定分区;

2、若发送消息时未指定 patition,但指定了 key(kafka允许为每条消息设置一个key),则对key值进行hash计算,根据计算结果路由到指定分区,这种情况下可以保证同一个 Key 的所有消息都进入到相同的分区;这种方式可以解决消息顺序消费


3、patition 和 key 都未指定,则使用kafka默认的分区策略,轮询选出一个 patition;


package com.xiaojie.config;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
 * @Description:自定义分区器 我们知道,kafka中每个topic被划分为多个分区,那么生产者将消息发送到topic时,具体追加到哪个分区呢?这就是所谓的分区策略,Kafka 为我们提供了默认的分区策略,同时它也支持自定义分区策略。其路由机制为:
 * 若发送消息时指定了分区(即自定义分区策略),则直接将消息append到指定分区;
 * 若发送消息时未指定 patition,但指定了 key(kafka允许为每条消息设置一个key),则对key值进行hash计算,根据计算结果路由到指定分区,
 * 这种情况下可以保证同一个 Key 的所有消息都进入到相同的分区;这种方式可以解决消息顺序消费
 * patition 和 key 都未指定,则使用kafka默认的分区策略,轮询选出一个 patition;
 * @author: xiaojie
 * @date: 2021.10.14
 */
@Component
public class CustomizePartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //计算分区器
        System.out.println("key>>>>>>>>>>>>>"+key);
        if ("weixin".equals(key)&&"test-topic".equals(topic)){
            return 1;
        }
        return 0;
    }
    @Override
    public void close() {
    }
    @Override
    public void configure(Map<String, ?> configs) {
    }
}


消费端异常处理


package com.xiaojie.config;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.stereotype.Component;
/**
 * @author xiaojie
 * @version 1.0
 * @description:通过异常处理器,我们可以处理consumer在消费时发生的异常。
 * 将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面
 * @date 2021/10/14 21:56
 */
@Component
public class MyErrorHandler {
    @Bean
    ConsumerAwareListenerErrorHandler consumerAwareListenerErrorHandler(){
        return (message, e, consumer) -> {
            System.out.println("消息消费异常"+message.getPayload());
            System.out.println("异常信息>>>>>>>>>>>>>>>>>"+e);
            return null;
        };
    }
}


使用方法


/*
     *
     * @param message
     * @param ack
     * @手动提交ack
     * containerFactory  手动提交消息ack
     * errorHandler 消费端异常处理器
     * @author xiaojie
     * @date 2021/10/14
     * @return void
     */
    @KafkaListener(containerFactory = "manualListenerContainerFactory", topics = "xiaojie-topic",
            errorHandler = "consumerAwareListenerErrorHandler"
    )
    public void onMessageManual(List<ConsumerRecord<?, ?>> record, Acknowledgment ack) {
        for (int i=0;i<record.size();i++){
            System.out.println(record.get(i).value());
        }
        ack.acknowledge();//直接提交offset
    }


消息过滤器


  @Bean("filterFactory")
    public ConcurrentKafkaListenerContainerFactory filterFactory(ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        factory.setAckDiscarded(true);
        factory.setRecordFilterStrategy(consumerRecord -> {
            String value = (String) consumerRecord.value();
            if (value.contains("hello")) {
                //返回false消息没有被过滤继续消费
                return false;
            }
            System.out.println("....................");
            //返回true 消息被过滤掉了
            return true;
        });
        return factory;
    }


使用方法


  /** 
     * @description: 消费者过滤器
     * @param: 
     * @param: record
     * @return: void
     * @author xiaojie
     * @date: 2021/10/16 1:04
     */
    @KafkaListener(topics = "filter-topic",containerFactory = "filterFactory")
    public void filterOnmessage(ConsumerRecord<?,?> record){
        log.info("消费到的消息是:》》》》》》》》》》》{}",record.value());
    }


完整代码请参考,kafka部分:spring-boot: Springboot整合redis、消息中间件等相关代码


参考:SpringBoot集成kafka全面实战_Felix-CSDN博客

相关文章
|
23天前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
57 4
|
20天前
|
消息中间件 存储 Prometheus
Kafka集群如何配置高可用性
Kafka集群如何配置高可用性
|
22天前
|
Prometheus 监控 Cloud Native
基于Docker安装Grafana和Prometheus
Grafana 是一款用 Go 语言开发的开源数据可视化工具,支持数据监控和统计,并具备告警功能。通过 Docker 部署 Grafana 和 Prometheus,可实现系统数据的采集、展示和告警。默认登录用户名和密码均为 admin。配置 Prometheus 数据源后,可导入主机监控模板(ID 8919)进行数据展示。
59 2
|
24天前
|
消息中间件 Linux RocketMQ
在Red Hat Enterprise Linux 9上使用Docker快速安装并部署
通过以上步骤,你可以在Red Hat Enterprise Linux 9上使用Docker快速安装并部署RocketMQ。这种方法不仅简化了安装过程,还提供了一个灵活的环境来管理和扩展消息队列系统。RocketMQ作为一款高性能的分布式消息系统,通过Docker可以实现快速部署和高效管理。
55 2
|
25天前
|
消息中间件 Linux RocketMQ
在Red Hat Enterprise Linux 9上使用Docker快速安装并部署
通过以上步骤,你可以在Red Hat Enterprise Linux 9上使用Docker快速安装并部署RocketMQ。这种方法不仅简化了安装过程,还提供了一个灵活的环境来管理和扩展消息队列系统。RocketMQ作为一款高性能的分布式消息系统,通过Docker可以实现快速部署和高效管理。
32 3
|
27天前
|
关系型数据库 MySQL Linux
基于阿里云服务器Linux系统安装Docker完整图文教程(附部署开源项目)
基于阿里云服务器Linux系统安装Docker完整图文教程(附部署开源项目)
224 3
|
11天前
|
API Docker 容器
【赵渝强老师】构建Docker Swarm集群
本文介绍了如何使用三台虚拟主机构建Docker Swarm集群。首先在master节点上初始化集群,然后通过特定命令将node1和node2作为worker节点加入集群。最后,在master节点上查看集群的节点信息,确认集群构建成功。文中还提供了相关图片和视频教程,帮助读者更好地理解和操作。
|
11天前
|
调度 Docker 容器
【赵渝强老师】Docker Swarm集群的体系架构
Docker Swarm自1.12.0版本起集成至Docker引擎,无需单独安装。它内置服务发现功能,支持跨多服务器或宿主机创建容器,形成集群提供服务。相比之下,Docker Compose仅限于单个宿主机。Docker Swarm采用主从架构,Swarm Manager负责管理和调度集群中的容器资源,用户通过其接口发送指令,Swarm Node根据指令创建容器运行应用。
|
11天前
|
Docker 容器
【赵渝强老师】使用二进制包方式安装Docker
本文介绍了在企业生产环境中无法直接访问外网时,如何使用Docker官方提供的二进制包进行Docker的离线安装。文章详细列出了从安装wget、下载Docker安装包、解压、复制命令到启动Docker服务的具体步骤,并提供了相关命令和示例图片。最后,还介绍了如何设置Docker为开机自启模式。
|
11天前
|
缓存 Ubuntu Linux
如何安装Docker
如何安装Docker
83 0
下一篇
无影云桌面