消息队列 Kafka学习总结

简介:

分享的目的

  1. 更深入了解消息中间件Kafka的系统架构;
  2. 更好的使用消息中间件Kafka解决实际项目中的问题;
  3. 通过Kafka的设计架构原理,和使用场景,能够更快速掌握研究其它类似的消息中间件,如RocketMQ, Notify, ActiviteMQ, 能够在实际的业务中更好使用这些消息中间件

分享大纲

Kafka系统架构;
Kafka开发;
Kafka参数调优;

Kafka系统架构

Kafka介绍

Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性:

  1. 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以海量的消息存储也能够保持长时间的稳定性能(高性能);
  2. 高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息(高并发);
  3. 支持通过Kafka服务器和消费机集群来分区消息(高可靠);
  4. 支持Hadoop并行数据加载;
  5. 支持各种语言丰富的客户端(java, C++, python, erlang, .Net, go, Clojure, Scala);
    image.png

Kafka架构

Kafka基本概念介绍

Broker:

Kafka集群包含一个或多个服务器,这种服务器被称为broker;

Topic

每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic,实际开发中通过称为队列, topic名称,即叫做Kafka队名称。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处);

Partition

Partition是物理上的概念,每个Topic包含一个或多个Partition;

Segment

partition物理上由多个segment组成;

Producer

负责发布消息到Kafka broker;

Consumer

消息消费者,向Kafka broker读取消息的客户端;

Consumer Group

每个Consumer属于一个特定的Consumer Group可为每个Consumer指定group name,若不指定group name则属于默认的group, 例如 在电商中发货中心、交易中心分别是两个Kafka consumer group。

Kafka系统核心架构

image.png

Kafka topic在服务端的结构

image.png

  1. 一般情况下,一个topic下包含一个或多个Partition, 2-3个复本,这个在创建topic的时候可以指定;
  2. 多个Partition可以提高读写的吞吐量, 多个副本提高Kafka的可靠性;
  3. 在数据不需要保序的情况下,创建多个Partition最好; 在局部保序的情况下,同一个Partition消费保序即可; 在合局有消费保序的情况下,一个topic对应一个Partition, 如数据库实时同步场景;
  4. 每一个Partition理论上是一个无长的队列,包含多个文件,文件以时间戳+最小的offset命名,不能修改,只能以只能是以append的方式写入,会通过NIO内存映射文件的方持久化到硬盘上, 文件的大小固定,大小达到阈值以后,关闭旧文件 ,重新新建一个内存映射文件 ;在 flush硬盘的时候是顺序IO,因此写入Partition的速度会非常非常快, 过期的文件 根据保存时间,后台异步线程定期清除,释放磁盘空间;

Partition offset

image.png

  1. 一个消费集群,即group 对应一个topic下Partition的一个消费offset;
  2. 两个不同的消费集群对应的同一个Partition 下的消费offset互不影响;
  3. 可以通过时间戳定位Partition的offset, 这个一般在特殊的情况下才会这样这样做,大部分情况下,zk上都会有记录;
  4. 同一个Partition 只能严格顺序消费;

Kafka事物

数据传输的事务定义通常有以下三种级别

1.

最多一次: 消息不会被重复发送,最多被传输一次,但也有可能一次不传输;

2.

最少一次: 消息不会被漏发送,最少被传输一次,但也有可能被重复传输;

3.

精确的一次(Exactly once): 不会漏传输也不会重复传输,每个消息都传输被一次而且仅仅被传输一次,这是大家所期望的,但是很难做到;

Kafka的事物解决方案:

1.

当发布消息时,Kafka有一个committed的概念,一旦消息被提交了,只要消息被写入的分区的所在的副本broker是活动的,数据就不会丢失,但是如果producer发布消息时发生了网络错误, Kafka现在也没一个完美的解决方案;

2.

如果consumer崩溃了,会有另外一个consumer接着消费消息,它需要从一个合适的 offset 继续处理。这种情况下可以有以下选择:

2. 1

consumer可以先读取消息,然后将offset写入日志文件中,然后再处理消息。这存在一种可能就是在存储offset后还没处理消息就crash了,新的consumer继续从这个offset处理那么就会有些消息永远不会被处理,这就是上面说的“最多一次”。

2.2

consumer可以先读取消息,处理消息,最后记录offset,当然如果在记录offset之前就crash了,新的consumer会重复的消费一些消息,这就是上面说的“最少一次”;

2.3

“精确一次”可以通过将提交分为两个阶段来解决:保存了offset后提交一次,消息处理成功之后再提交一次。但是还有个更简单的做法:将消息的offset和消息被处理后的结果保存在一起;

Kafka一次消息的生命周期

image.png

Kafka zk结点介绍

broker注册结点

broker/ids/[0…N]

topic结点

broker/topics/[topic]

Broker/topics/[topic]/3->2

broker id 为3的broker为某个topic提供了2个分区进行消息的存储

消费分区与消费者的关系

/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]

消息消费进度Offset记录

/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]

消费者注册

/consumers/[group_id]/ids/[consumer_id]

消费者监听:

/consumers/[group_id]/ids
/brokers/ids/[0…N]

Kafka日志文件

Kafka所有日志文件均存储在server.properties文件配置参数log.dirs下,假设 log.dirs配置为/export/kafka/log/, 同一个topic下有多个不同partition,每个partition为一个目录,目录的命名规范为: topic名称+有序序号,第一个partition序号从0开始,序号最大值为partitions数量减1,例如: 一个名为trade为的topic, 有4个partition, 则在/export/kafka/log/目录下有下面的信息:
trade-0 trade-1 trade-2 trade-3
partition中文件存储方式:每个partion目录相当于一个巨型文件被平均分配到多个大小相等segment数据文件中。但每个段segment file消息数量不一定相等,这种特性方便old segment file快速被删除。每个partiton只需要支持顺序读写就行了,segment文件生命周期由服务端配置参数决定。这样做的好处就是能快速删除无用文件,有效提高磁盘利用率, 下面的partition中文件存储方式:
image.png

partition中segment文件存储结构

  1. segment file组成:由2大部分组成,分别为index file和data file,此2个文件一一对应,成对出现,后缀”.index”和“.log”分别表示为segment索引文件、数据文件.
  2. segment文件命名规则:partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充,示例如下。
    image.png

partition中通过offset查找message

第一步查找segment file;
第二步通过segment file查找message;
示例: 查找为offset=368776的message, 如下图所示;
image.png

  1. 定位segment file: 其中00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0.

    第二个文件00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1.第三个文件
    00000000000000737337.index的起始偏移量为737338=737337 + 1,其他后续文件依次类推,以起
    始偏移量命名并排序这些文件,只要根据offset 二分查找文件列表,就可以快速定位到具体文件。当
    offset=368776时定位到00000000000000368769.index|log
  2. 在segment file中查找:当offset=368776时,依次定位到00000000000000368769.index的元数据物理

    位置和00000000000000368769.log的物理偏移地址,然后再通过00000000000000368769.log顺序
    查找直到offset=368776为止;
    

Kafka 负载均衡

生产者的负载均衡

四层的负载均衡,不常用, 使用zookeeper进行负载均衡, 常 用, 其中partitioner.class 的配置决定了Kafka生产者的负载均衡, 有三种情况:

  1. kafka.producer.DefaultPartitioner (Hash模式)
  2. kafka.producer.ByteArrayPartitioner (Hash模式)
  3. 不指定随机轮询模式

消费者负载均衡

partition和consumer 对应好, 平均
1: 5个partition, 5个consumer, (1, 1, 1, 1, 1)
2: 8个 partition, 5个consumer, (2, 2, 2, 1, 1)
3: 5个 partition, 8个consumer, (1, 1, 1, 1, 1, 0, 0, 0)

Kafka线上部署

image.png

1.一般情况下,根据并发量,一个Kafka集群有3-5台broker机器,阿里是一条业务线7-8台物理机,内存是192G,为了减少消息的堆积,一个topic下128+个Partition, 默认是消费集群机器数量的2-3倍, 例如线上消费集群是64台,partition建议设置为128 – 192台之间, Note however that there cannot be more consumer instances in a consumer group than partitions.
2.一主多备部署:一个备的broker在和主的broker在同一个机房,另一个备的broker部署在同城的另一个机房, 进一步增加高可靠性.

  1. 为防止消息堆积,建议同一个topic的消费集群的消费能力不能小于生产的集群.
  2. 为了提高网络的利用率,建议一次性发送的消息尽可能的大,避免小包网络传输.

Kafka 保证

  1. 消息通过生产者被发送出去,将会按消息发送时的顺序追加到到一个指定的topic partition,也是说,记录M1和记录M2被同一个生产者发送,并且M1要先于M2发送,那么在日志文件中M1将会有比M2更小的offset(生产保序)
  2. 一个消费者实例顺序消费存储在日志中的记录(消费保序)
  3. 对于一个有N个复本的topic,最多情况下N-1个broker server丢失数据,也可以保证在记录被提交的情况下,不会丢数据(高可用);

Kafka开发

Kafka应用开发

kafka java客户端maven依赖

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.9.2</artifactId>
  <version>0.8.2.2</version>
</dependency>

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>0.10.0.0</version>
</dependency>

Kafka producer代码示例

public class KafkaProducerTest {  
      
    private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerTest.class);  
      
    private static Properties properties = null;  
      
    //  kafka连续配置 项
    static {  
        properties = new Properties();  
       properties.put("bootstrap.servers", "centos.master:9092,centos.slave1:9092,centos.slave2:9092");
         properties.put("producer.type", "sync");  
         properties.put("request.required.acks", "1");  
         properties.put("serializer.class", "kafka.serializer.DefaultEncoder");  
         properties.put("partitioner.class", "kafka.producer.DefaultPartitioner");  
         properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");  
         properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");  
    }
public void produce() {  
       KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<byte[],byte[]>(properties);  
        ProducerRecord<byte[],byte[]> kafkaRecord = new ProducerRecord<byte[],byte[]>(  
                "test", "kkk".getBytes(), "vvv".getBytes());  
        kafkaProducer.send(kafkaRecord, new Callback() {  
            public void onCompletion(RecordMetadata metadata, Exception e) {  
                if(null != e) {  
                    LOG.info("the offset of the send record is {}", metadata.offset());  
                    LOG.error(e.getMessage(), e);  
                }  
                LOG.info("complete!");  
            }  
        });  
        kafkaProducer.close();  
    }  
  
    public static void main(String[] args) {  
        KafkaProducerTest kafkaProducerTest = new KafkaProducerTest();  
        for (int i = 0; i < 10; i++) {  
            kafkaProducerTest.produce();  
        }  
    }  
}  

Kafka Consumer代码示例

public class ConsumerSample {

   public static void main(String[] args) {
                
       Properties props = new Properties();
      props.put("zk.connect", "localhost:2181");
     props.put("zk.connectiontimeout.ms", "1000000");
      props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");  
      props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");  
     props.put("groupid", "test_group");

     ConsumerConfig consumerConfig = new ConsumerConfig(props);
     ConsumerConnector consumerConnector = 
               Consumer.createJavaConsumerConnector(consumerConfig);

      HashMap<String, Integer> map = new HashMap<String, Integer>();
      map.put("test-topic", 4);
      Map<String, List<KafkaStream<Message>>> topicMessageStreams = 
   consumerConnector.createMessageStreams(map);
      List<KafkaStream<Message>> streams = topicMessageStreams.get("test-topic");
         ExecutorService executor = Executors.newFixedThreadPool(4); 
     for (final KafkaStream<Message> stream : streams) {
             executor.submit(new Runnable() {
            public void run() {
                  for (MessageAndMetadata msgAndMetadata : stream) {
                  System.out.println("topic: " + msgAndMetadata.topic());
                  Message message = (Message) msgAndMetadata.message();
                  ByteBuffer buffer = message.payload();
                  buffer.get(bytes);
                  String tmp = new String(bytes);
                  System.out.println("message content: " + tmp);
                 }
            }
       });
      }
  }
}

Kafka开发注意事项

  1. 生产端注意设置好producer.type和partitioner.class 这两个参数,第一个参数 对写入吞吐量影响巨大,要结合实际的业务场景来设置,第二个参数关系到消费的结果是不是正常的,也要结合实际的业务场景来设置.
  2. 生产端和消费端的key.serializer 和 value.serializer分别设置一样,否则消费端可能会产生乱码.
  3. 根据实际的业务场景,设置好生产端和消费端的其它配置参数.

Kafka常见的使用模式

Consumer 消费消息模式

  1. 推模式
  2. 拉模式(常见)

Producer 发布消息模式

局部保序模式(hash映射)
全部保序模式 (只有一个Partition)
不保序模式(轮训模式)

Kafka的使用场景

Messaging, 大规模分布式网站

异步,解耦、削峰

Website Activity Tracking

通过agent定时收集每台主机的syslog信息

Metrics

Log Aggregation

Event Sourcing

Commit Log

异地机房的数据近实时同步(全局保序和局部保序)

Kafka参数调优

Broker 参数调优

  1. log.dirs Kafka数据存放的目录。可以指定多个目录,中间用逗号分隔,当新partition被创建的时会被存放到 当前存放partition最少的目录.
  2. num.io.threads 服务器用来执行读写请求的IO线程数,此参数的数量至少要等于服务器上磁盘的数量.
  3. queued.max.requests I/O线程可以处理请求的队列大小,若实际请求数超过此大小,网络线程将停止接收新的请求,建议500-1000.
  4. num.partitions 默认partition数量,如果topic在创建时没有指定partition数量,默认使用此值,建议修改为consumer 数量的1-3倍.
  5. log.segment.bytes Segment文件的大小,超过此值将会自动新建一个segment,此值可以被topic级别的参数覆盖, 建议1G ~ 5G.
  6. default.replication.factor 默认副本数量,建议改为2.
  7. num.replica.fetchers Leader处理replica fetch消息的线程数量, 建议设置大点2-4.
  8. offsets.topic.num.partitions offset提交主题分区的数量,建议设置为100 ~ 200.

Producer 参数调优

  1. request.required.acks :用来控制一个produce请求怎样才能算完成, 主要是来表示写入数据的持久化的,有三个值(0, 1, -1), 持久化的程度依次增高.
  2. producer.type : 同步异步模式。async表示异步,sync表示同步。如果设置成异步模式,可以允许生产者以batch的形式push数据,这样会极大的提高broker性能,推荐设置为异步.
  3. partitioner.class : Partition类,默认对key进行hash, 即 kafka.producer.DefaultPartitioner.
  4. compression.codec :指定producer消息的压缩格式,可选参数为: “none”, “gzip” and “snappy”.
  5. queue.buffering.max.ms :启用异步模式时,producer缓存消息的时间。比如我们设置成1000时,它会缓存1秒的数据再一次发送出去,这样可以极大的增加broker吞吐量,但也会造成时效性的降低.
  6. queue.buffering.max.messages:采用异步模式时producer buffer 队列里最大缓存的消息数量,如果超过这个数值,producer就会阻塞或者丢掉消息.
  7. batch.num.messages:采用异步模式时,一个batch缓存的消息数量。达到这个数量值时producer才会发送消息.

Consumer参数调优

  1. fetch.message.max.bytes:查询topic-partition时允许的最大消息大小,consumer会为每个partition缓存此大小的消息到内存,因此,这个参数可以控制consumer的内存使用量。这个值应该至少比server允许的最大消息大小大,以免producer发送的消息大于consumer允许的消息.
  2. num.consumer.fetchers:拉数据的线程数量,为了保序,建议一个,用默认值.
  3. auto.commit.enable:如果此值设置为true,consumer会周期性的把当前消费的offset值保存到zookeeper,当consumer失败重启之后将会使用此值作为新开始消费的值.
  4. auto.commit.interval.ms: Consumer提交offset值到zookeeper的周期.

Kafka常见问题总结

客户端消费出现空指针异常

 重新设置消费的partition offset

客户端无法消费

网络不通
jar包冲突(netty, slf4j)

同类产品

RocketMq
JMQ

其它消息产品

notify
ActiveMQ(Apache)
Hornet(Jboss)

目录
相关文章
|
2天前
|
消息中间件 Java Kafka
初识Apache Kafka:搭建你的第一个消息队列系统
【10月更文挑战第24天】在数字化转型的浪潮中,数据成为了企业决策的关键因素之一。而高效的数据处理能力,则成为了企业在竞争中脱颖而出的重要武器。在这个背景下,消息队列作为连接不同系统和服务的桥梁,其重要性日益凸显。Apache Kafka 是一款开源的消息队列系统,以其高吞吐量、可扩展性和持久性等特点受到了广泛欢迎。作为一名技术爱好者,我对 Apache Kafka 产生了浓厚的兴趣,并决定亲手搭建一套属于自己的消息队列系统。
10 2
初识Apache Kafka:搭建你的第一个消息队列系统
|
22天前
|
消息中间件 大数据 Kafka
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(二)
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(二)
22 2
|
22天前
|
消息中间件 NoSQL 大数据
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
大数据-77 Kafka 高级特性-稳定性-延时队列、重试队列 概念学习 JavaAPI实现(一)
28 1
|
5月前
|
消息中间件 负载均衡 NoSQL
Redis系列学习文章分享---第七篇(Redis快速入门之消息队列--List实现消息队列 Pubsub实现消息队列 stream的单消费模式 stream的消费者组模式 基于stream消息队列)
Redis系列学习文章分享---第七篇(Redis快速入门之消息队列--List实现消息队列 Pubsub实现消息队列 stream的单消费模式 stream的消费者组模式 基于stream消息队列)
67 0
|
6天前
|
消息中间件 中间件 Kafka
解锁Kafka等消息队列中间件的测试之道
在这个数字化时代,分布式系统和消息队列中间件(如Kafka、RabbitMQ)已成为日常工作的核心组件。本次公开课由前字节跳动资深专家KK老师主讲,深入解析消息队列的基本原理、架构及测试要点,涵盖功能、性能、可靠性、安全性和兼容性测试,并探讨其主要应用场景,如应用解耦、异步处理和限流削峰。课程最后设有互动答疑环节,助你全面掌握消息队列的测试方法。
8 0
|
3月前
|
图形学 人工智能 C#
从零起步,到亲手实现:一步步教你用Unity引擎搭建出令人惊叹的3D游戏世界,绝不错过的初学者友好型超详细指南 ——兼探索游戏设计奥秘与实践编程技巧的完美结合之旅
【8月更文挑战第31天】本文介绍如何使用Unity引擎从零开始创建简单的3D游戏世界,涵盖游戏对象创建、物理模拟、用户输入处理及动画效果。Unity是一款强大的跨平台游戏开发工具,支持多种编程语言,具有直观编辑器和丰富文档。文章指导读者创建新项目、添加立方体对象、编写移动脚本,并引入基础动画,帮助初学者快速掌握Unity开发核心概念,迈出游戏制作的第一步。
125 1
|
3月前
|
消息中间件 传感器 缓存
为什么Kafka能秒杀众多消息队列?揭秘它背后的五大性能神器,让你秒懂Kafka的极速之道!
【8月更文挑战第24天】Apache Kafka作为分布式流处理平台的领先者,凭借其出色的性能和扩展能力广受好评。本文通过案例分析,深入探讨Kafka实现高性能的关键因素:分区与并行处理显著提升吞吐量;批量发送结合压缩算法减少网络I/O次数及数据量;顺序写盘与页缓存机制提高写入效率;Zero-Copy技术降低CPU消耗;集群扩展与负载均衡确保系统稳定性和可靠性。这些机制共同作用,使Kafka能够在处理大规模数据流时表现出色。
59 3
|
3月前
|
消息中间件 存储 Kafka
ZooKeeper助力Kafka:掌握这四大作用,让你的消息队列系统稳如老狗!
【8月更文挑战第24天】Kafka是一款高性能的分布式消息队列系统,其稳定运行很大程度上依赖于ZooKeeper提供的分布式协调服务。ZooKeeper在Kafka中承担了四大关键职责:集群管理(Broker的注册与选举)、主题与分区管理、领导者选举机制以及消费者组管理。通过具体的代码示例展示了这些功能的具体实现方式。
72 2
|
3月前
|
消息中间件 存储 Kafka
现代消息队列与云存储问题之Kafka在海量队列场景下存在性能的问题如何解决
现代消息队列与云存储问题之Kafka在海量队列场景下存在性能的问题如何解决
|
3月前
|
消息中间件
快来体验 消息队列RabbitMQ版入门训练营 打卡学习领好礼
快来体验 消息队列RabbitMQ版入门训练营 打卡学习领好礼
60 0

相关产品

  • 云消息队列 Kafka 版