kafka:基础篇

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
日志服务 SLS,月写入数据量 50GB 1个月
云原生网关 MSE Higress,422元/月
简介: **Kafka传统定义**:Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。**发布/订阅**:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息 分为不同的类别,订阅者只接收感兴趣的消息。

1.kafka是什么

Kafka传统定义:Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。发布/订阅:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息 分为不同的类别,订阅者只接收感兴趣的消息。

Kafka最新定义:Kafka是一个开源的分布式事件流平台(Event Streaming Platform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。

消息队列的应用场景无外乎是:削峰填谷、应用解耦、异步处理等等,具体使用案例我们在之前讲rabbitmq基础篇已经详述过,这里不在做讲述,这里说一下消息队列的两种模型:

  • 点对点模型:也叫消息队列模型。如果拿上面那个“民间版”的定义来说,那么系统 A 发送的消息只能被系统 B 接收,其他任何系统都不能读取 A 发送的消息。日常生活的例子比如电话客服就属于这种模型:同一个客户呼入电话只能被一位客服人员处理,第二个客服人员不能为该客户服务。
  • 发布 / 订阅模型:与上面不同的是,它有一个主题(Topic)的概念,你可以理解成逻辑语义相近的消息容器。该模型也有发送方和接收方,只不过提法不同。发送方也称为发布者(Publisher),接收方称为订阅者(Subscriber)。和点对点模型不同的是,这个模型可能存在多个发布者向相同的主题发送消息,而订阅者也可能存在多个,它们都能接收到相同主题的消息。生活中的报纸订阅就是一种典型的发布 / 订阅模型。

2.kafka基础架构和核心概念

在 Kafka 中,发布订阅的对象是主题(Topic),你可以为每个业务、每个应用甚至是每类数据都创建专属的主题。

生产者(Producer):消息生产者,就是向 kafka broker 发消息的客户端,生产者程序通常持续不断地向一个或多个主题发送消息。

消费者(Consumer):消息消费者,向 kafka broker 取消息的客户端,消费者就是订阅这些主题消息的客户端应用程序。

和生产者类似,消费者也能够同时订阅多个主题的消息。我们把生产者和消费者统称为客户端(Clients)。你可以同时运行多个生产者和消费者实例,这些实例会不断地向 Kafka 集群中的多个主题生产和消费消息。

消费者组Consumer Group (CG):由多个 consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费,消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic。

主题(topic) :可以理解为一个队列,生产者和消费者面向的都是一个 topic;

分区(Partition):为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上, 一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列;

副本(Replica):副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 kafka 仍然能够继续工作,kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本, 一个 leader 和若干个 follower

leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对 象都是 leader。

follower:每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据 的同步。leader 发生故障时,某个 follower 会成为新的 follower

副本的工作机制也很简单:生产者总是向领导者副本写消息;而消费者总是从领导者副本读消息。至于追随者副本,它只做一件事:向领导者副本发送请求,请求领导者把最新生产的消息发给它,这样它能保持与领导者的同步。

Kafka 使用消息日志(Log)来保存数据,一个日志就是磁盘上一个只能追加写(Append-only)消息的物理文件。因为只能追加写入,故避免了缓慢的随机 I/O 操作,改为性能较好的顺序 I/O 写操作,这也是实现 Kafka 高吞吐量特性的一个重要手段。不过如果你不停地向一个日志写入消息,最终也会耗尽所有的磁盘空间,因此 Kafka 必然要定期地删除消息以回收磁盘。怎么删除呢?简单来说就是通过日志段(Log Segment)机制。在 Kafka 底层,一个日志又近一步细分成多个日志段,消息被追加写到当前最新的日志段中,当写满了一个日志段后,Kafka 会自动切分出一个新的日志段,并将老的日志段封存起来。Kafka 在后台还有定时任务会定期地检查老的日志段是否能够被删除,从而实现回收磁盘空间的目的

项目推荐:基于SpringBoot2.x、SpringCloud和SpringCloudAlibaba企业级系统架构底层框架封装,解决业务开发时常见的非功能性需求,防止重复造轮子,方便业务快速开发和企业技术栈框架统一管理。引入组件化的思想实现高内聚低耦合并且高度可配置化,做到可插拔。严格控制包依赖和统一版本管理,做到最少化依赖。注重代码规范和注释,非常适合个人学习和企业使用

Github地址https://github.com/plasticene/plasticene-boot-starter-parent

Gitee地址https://gitee.com/plasticene3/plasticene-boot-starter-parent

微信公众号Shepherd进阶笔记

交流探讨群:Shepherd_126

3.kafka的命令操作

在我们部署好kafka环境之后,我可以查看kafka的bin目录的文件,提供许多操作执行脚本,包括服务端、生产者、消费者、主题等等。

[root@shepherd-master bin]# ls
connect-distributed.sh        kafka-consumer-offset-checker.sh     kafka-replica-verification.sh       kafka-verifiable-producer.sh
connect-standalone.sh         kafka-consumer-perf-test.sh          kafka-run-class.sh                  windows
kafka-acls.sh                 kafka-delete-records.sh              kafka-server-start.sh               zookeeper-security-migration.sh
kafka-broker-api-versions.sh  kafka-mirror-maker.sh                kafka-server-stop.sh                zookeeper-server-start.sh
kafka-configs.sh              kafka-preferred-replica-election.sh  kafka-simple-consumer-shell.sh      zookeeper-server-stop.sh
kafka-console-consumer.sh     kafka-producer-perf-test.sh          kafka-streams-application-reset.sh  zookeeper-shell.sh
kafka-console-producer.sh     kafka-reassign-partitions.sh         kafka-topics.sh
kafka-consumer-groups.sh      kafka-replay-log-producer.sh         kafka-verifiable-consumer.sh

主题命令操作

[root@shepherd-master bin]# kafka-topics.sh

对应参数如下:

参数 描述
--bootstrap-server 连接的 Kafka Broker 主机名称和端口号。 操作的 topic 名称。这个参数新版kafka中添加的,从v2.8版本开始,Kafka可以不再依赖ZooKeeper,以前老版是使用zookeeper地址进行连接
--topic 操作的 topic 名称
--create 创建主题
--delete 删除主题
--alter 修改主题
--list 查看所有主题
--describe 查看主题详细描述
--partitions 设置主题分区数
-replication-factor 设置主题分区副本数
--config 更新主题默认的系统配置,比如该主题的最大保存时长,一条消息最大大小等

示例:

# 查看主题列表
bin/kafka-topics.sh --zookeeper 10.10.0.18:2181 --list

# 创建一个名为zfj-topic的主题,3个分区,1个副本
bin/kafka-topics.sh --zookeeper 10.10.0.18:2181 --create --replication-factor 1 --partitions 3 --topic zfj-topic

# 生产者,往主题发送消息
bin/kafka-console-producer.sh --broker-list 10.10.0.18:9092 --topic zfj-topic

# 消费者 消费主题列表的消息
bin/kafka-console-consumer.sh --bootstrap-server 10.10.0.18:9092 --topic zfj-topic

# 查看该主题的详细信息
bin/kafka-topics.sh --zookeeper 10.10.0.18:2181 --describe --topic zfj-topic

上面包含了对生产者、消费者的命令操作,所以这里不在做单独陈述

4.项目整合kafka

springboot项目整合kafka相当简单,引入依赖:

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>

生产者:发送消息

package com.shepherd.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * @author fjzheng
 * @version 1.0
 * @date 2022/1/24 14:48
 */

public class CustomProducer {
   
   
    public static void main(String[] args) {
   
   
        Properties props = new Properties();
        // kafka 集群,broker-list
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.10.0.18:9092");
        props.put("acks", "all");
        // 重试次数
        props.put("retries", 1);
        // 批次大小
        props.put("batch.size", 16384);
        // 等待时间
        props.put("linger.ms", 1);
        // RecordAccumulator 缓冲区大小
        props.put("buffer.memory", 33554432);
        // 序列化
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.shepherd.kafka.producer.MyPartitioner");
        // 创建消息生产者
        Producer<String, String> producer = new KafkaProducer<>(props);
        // 发送消息
        /**
         * {@link ProducerRecord的构建函数
         *     public ProducerRecord(String topic, Integer partition, K key, V value) {
         *         this(topic, partition, (Long)null, key, value, (Iterable)null);
         *     }
         *     public ProducerRecord(String topic, K key, V value) {
         *         this(topic, (Integer)null, (Long)null, key, value, (Iterable)null);
         *     }
         *     public ProducerRecord(String topic, V value) {
         *         this(topic, (Integer)null, (Long)null, (Object)null, value, (Iterable)null);
         *     }
         *
         * 消息分区策略
         * 1、支持指定分区;
         * 2、传入key,然后hash,再对分区数取余
         * 3、随机选择分区,下次选择的分区和上次的为不同分区。
         * }
         */
        for (int i = 0; i < 10; i++) {
   
   
            // 指定分区
         //   producer.send(new ProducerRecord<>("topic1", 1, Integer.toString(i), "val" + Integer.toString(i)));
            // 通过传入key计算分区
            producer.send(new ProducerRecord<String, String>("topic1",
                    Integer.toString(i), "val"+Integer.toString(i)));
        }
            producer.close();
    }

}

通过上面代码我们可知消息发送往主题的哪个分区,有不同策略。同时我也必须知道kafka的消息只有在同一分区才能保证消费严格有序的,所以有时候我们必须吧某一批消息发往同一个分区,这时候我们可以自定义分区策略,只需要实现Partitioner接口即可

package com.shepherd.kafka.producer;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
 * @author fjzheng
 * @version 1.0
 * @date 2022/4/11 11:58
 */

/**
 * 1、实现{@link Partitioner}接口
 * 2、重写partition()方法
 */
public class MyPartitioner implements Partitioner {
   
   
    /**
     *
     * @param s 主题topic
     * @param o key
     * @param bytes key序列化后字节数组
     * @param o1 value
     * @param bytes1 value序列化后的字节数组
     * @param cluster 集群元数据信息可以查看分区
     * @return
     */
    @Override
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
   
   
        // 获取消息的key,这里假设只处理key为整数的
        Integer key = Integer.valueOf(o.toString());
        // 获取topic的分区总数
        Integer count = cluster.partitionCountForTopic(s);
        return key % count;
    }

    @Override
    public void close() {
   
   

    }

    @Override
    public void configure(Map<String, ?> map) {
   
   

    }
}

然后再创建生产者的参数配置添加下面参数就可以啦

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.shepherd.kafka.producer.MyPartitioner");

消费者:消费消息

package com.shepherd.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

/**
 * @author fjzheng
 * @version 1.0
 * @date 2022/4/11 15:05
 */
public class CustomConsumer {
   
   
    public static void main(String[] args) {
   
   
        // 创建消费者的配置对象
        Properties properties = new Properties();
        // 给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.10.0.18:9092");
        // 配置序列化 必须
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 设置消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, " test");
        // 创建消费者对象

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
        // 添加消费者订阅的主题topic,可以添加多个
        ArrayList<String> topics = new ArrayList<>();
        topics.add("first");
        topics.add("topic1");
        kafkaConsumer.subscribe(topics);
        // 拉取数据
        while (true) {
   
   
            // 每隔1s消费一批数据
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
   
   
                // 打印数据
                System.out.println(consumerRecord);
            }
        }
    }
}

当然以上操作都是自己手写实现kafka生产者、消费者客户端,我们大可不别这样,因为spring给我们封装好了,我们只需要简单配置,再结合注解便可优雅使用kafka

配置如下:

server.port=1008
###########【Kafka集群】###########
spring.kafka.bootstrap-servers=10.10.0.18:9092
###########【初始化生产者配置】###########
# 重试次数
spring.kafka.producer.retries=0
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=1
# 批量大小
spring.kafka.producer.batch-size=16384
# 提交延时
spring.kafka.producer.properties.linger.ms=0
# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了

# 生产端缓冲区大小
spring.kafka.producer.buffer-memory = 33554432
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 自定义分区器
# spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner

###########【初始化消费者配置】###########
# 默认的消费组ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms=180000
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消费端监听的topic不存在时,项目启动会报错(关掉)
spring.kafka.listener.missing-topics-fatal=false
# 设置批量消费
# spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
# spring.kafka.consumer.max-poll-records=50

# 设置消息的自定义分区策略
spring.kafka.producer.properties.partitioner.class=com.shepherd.kafka.partition.CustomizePartitioner

生产者:

package com.shepherd.kafka.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author fjzheng
 * @version 1.0
 * @date 2022/1/24 18:10
 */
@RestController
@RequestMapping("/api/kafka/produce")
public class ProducerController {
   
   
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    /**
     *  Kafka Producer 是异步发送消息的,也就是说如果你调用的是 producer.send(msg) 这个 API,那么它通常会立即返回,
     *  所以成功与否不确定,不带回调的发送消息是不能保证消息成功发送的,最终可能导致消息丢失。
     * @param message
     */
    @GetMapping("/{message}")
    public void sendMessageNoCallback(@PathVariable("message") String message) {
   
   
        kafkaTemplate.send("topic1", message);
    }

    /**
     *
     * @param message
     */
    @GetMapping("/callback1/{message}")
    public void sendMessage2(@PathVariable("message") String message) {
   
   
        kafkaTemplate.send("topic1", message).addCallback(success -> {
   
   
            // 消息发送到的topic
            String topic = success.getRecordMetadata().topic();
            // 消息发送到的分区
            int partition = success.getRecordMetadata().partition();
            // 消息在分区内的offset
            long offset = success.getRecordMetadata().offset();
            System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
        }, failure -> {
   
   
            System.out.println("发送消息失败:" + failure.getMessage());
        });
    }

    @GetMapping("/callback2/{message}")
    public void sendMessage3(@PathVariable("message") String message) {
   
   
        kafkaTemplate.send("topic1", message).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
   
   
            @Override
            public void onFailure(Throwable ex) {
   
   
                System.out.println("发送消息失败:"+ex.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> result) {
   
   
                System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
                        + result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
            }
        });
    }
}

消费者:

@Component
public class KafkaConsumerConfig {
   
   
    // 消费监听
    @KafkaListener(topics = {
   
   "topic1"})
    public void onMessage1(ConsumerRecord<?, ?> record){
   
   
        // 消费的哪个topic、partition的消息,打印出消息内容
        System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
    }
}
目录
相关文章
|
8月前
|
消息中间件 存储 Kafka
【Kafka】Kafka 概述分析
【4月更文挑战第5天】【Kafka】Kafka 概述分析
【Kafka】Kafka 概述分析
|
8月前
|
消息中间件 存储 Java
【Kafka】Kafka 组件分析
【4月更文挑战第5天】【Kafka】Kafka 组件分析
|
8月前
|
消息中间件 运维 Kafka
|
消息中间件 监控 关系型数据库
【Kafka系列】(一)Kafka入门(下)
【Kafka系列】(一)Kafka入门(下)
|
消息中间件 存储 Kafka
Kafka最基础使用
Kafka最基础使用
158 0
|
8月前
|
消息中间件 存储 Prometheus
【Kafka】Kafka 提供了哪些系统工具
【4月更文挑战第11天】【Kafka】Kafka 提供了哪些系统工具
|
8月前
|
消息中间件 存储 算法
Kafka基础 (上)
Kafka基础 (上)
50 0
|
8月前
|
消息中间件 Kafka
Kafka - 深入了解Kafka基础架构:Kafka的基本概念
Kafka - 深入了解Kafka基础架构:Kafka的基本概念
114 0
|
消息中间件 存储 Java
【Kafka系列】(一)Kafka入门(上)
【Kafka系列】(一)Kafka入门
|
消息中间件 缓存 负载均衡
Kafka实战(一) : 认识Kafka
Kafka实战(一) : 认识Kafka

热门文章

最新文章

下一篇
开通oss服务