Kafka源码分析之RecordBatch

简介:         RecordBatch是Kafka中Producer中对批量记录的一个封装,它表示正在或将要被发送的一批记录。这个类不是线程安全的,当修改它时必须使用外部同步。RecordBatch中的成员变量如下: // 记录数目recordCount public i...

        RecordBatch是Kafka中Producer中对批量记录的一个封装,它表示正在或将要被发送的一批记录。这个类不是线程安全的,当修改它时必须使用外部同步。RecordBatch中的成员变量如下:

    // 记录数目recordCount
    public int recordCount = 0;
    
    // 最大记录大小maxRecordSize
    public int maxRecordSize = 0;
    
    // 尝试次数attempts
    public volatile int attempts = 0;
    
    // RecordBatch创建时间createdMs
    public final long createdMs;
    
    
    public long drainedMs;
    
    // 上次尝试时间lastAttemptMs
    public long lastAttemptMs;
    
    // 内存记录MemoryRecords,在内存中存储Record
    public final MemoryRecords records;
    
    // 主题和分区的复合体topicPartition
    public final TopicPartition topicPartition;
    
    // Produce请求结果ProduceRequestResult实例produceFuture
    public final ProduceRequestResult produceFuture;
    
    // 上次添加记录时间lastAppendTime
    public long lastAppendTime;
    
    // 回调函数结构体Thunk列表thunks
    private final List<Thunk> thunks;
    
    // 重试标志位retry
    private boolean retry;
        RecordBatch中最主要的一个成员变量是MemoryRecords类型的records,它是Producer发送的记录record在内存中的一个数据集,另外还有一些记录数目recordCount、最大记录大小maxRecordSize、RecordBatch创建时间createdMs、上次尝试时间lastAttemptMs、主题和分区的复合体topicPartition、Produce请求结果ProduceRequestResult实例produceFuture、回调函数结构体列表thunks等重要变量。

        Thunk是RecordBatch的一个静态内部类,它是对Produce记录回调函数Callback和其相关参数FutureRecordMetadata的一个封装,其定义如下:

    /**
     * A callback and the associated FutureRecordMetadata argument to pass to it.
     */
    final private static class Thunk {
        final Callback callback;
        final FutureRecordMetadata future;

        public Thunk(Callback callback, FutureRecordMetadata future) {
            this.callback = callback;
            this.future = future;
        }
    }

         再看它的构造函数,如下:

    // 构造函数
    public RecordBatch(TopicPartition tp, MemoryRecords records, long now) {
        
    	// RecordBatch创建时间createdMs赋值为now
    	this.createdMs = now;
    	
    	// 上次尝试时间lastAttemptMs赋值为now
        this.lastAttemptMs = now;
        
        // 记录records赋值为records
        this.records = records;
        
        // 主题分区topicPartition赋值为tp
        this.topicPartition = tp;
        
        // 构造Produce请求结果ProduceRequestResult实例produceFuture
        this.produceFuture = new ProduceRequestResult();
        
        // 构造回调函数结构体列表thunks
        this.thunks = new ArrayList<Thunk>();
        
        // 上次添加记录时间lastAppendTime赋值为创建时间createdMs,也就是now
        this.lastAppendTime = createdMs;
        
        // 重试标志位retry默认为false
        this.retry = false;
    }
        既然为批量记录,那么RecordBatch的最主要一个功能就是添加记录,而记录又不可能无限制添加,所以tryAppend()方法就是完成这个功能的。它尝试添加记录,如果添加成功,则返回FutureRecordMetadata实例,其中包含了记录在记录集中的相对偏移量,否则返回null,代码如下:

    /**
     * Append the record to the current record set and return the relative offset within that record set
     * 添加记录到当前记录集合,返回记录在记录集中的相对偏移量
     * 
     * @return The RecordSend corresponding to this record or null if there isn't sufficient room.
     */
    public FutureRecordMetadata tryAppend(byte[] key, byte[] value, Callback callback, long now) {
        
    	
    	if (!this.records.hasRoomFor(key, value)) {// 如果内存记录MemoryRecords实例records没有余地
    		// 直接返回null
            return null;
        } else {
        	
        	// 将key、value添加进内存记录MemoryRecords实例records中
            this.records.append(0L, key, value);
            
            // 如果需要,更新最大记录大小maxRecordSize
            this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
            
            // 上次添加记录时间lastAppendTime赋值为now
            this.lastAppendTime = now;
            
            // 构造FutureRecordMetadata实例future
            FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount);
            
            // 将回调函数callback构造成Thunk对象添加到thunks列表
            if (callback != null)
                thunks.add(new Thunk(callback, future));
            
            // 记录数目recordCount加1
            this.recordCount++;
            
            // 返回FutureRecordMetadata实例future
            return future;
        }
    }
        tryAppend()方法需要四个参数,键key、值value、回调函数callback、添加时间now,其处理逻辑如下:

        1、首先判断内存记录MemoryRecords实例records中有没有余地存储该key、value,如果没有,直接返回null,否则继续;

        2、调用MemoryRecords的append()方法,将key、value添加进内存记录MemoryRecords实例records中;

        3、如果key、value对应大小超过当前maxRecordSize,更新最大记录大小maxRecordSize;

        4、更新上次添加记录时间lastAppendTime为now;

        5、根据produceFuture、recordCount构造FutureRecordMetadata实例future;

        6、利用future将回调函数callback构造成Thunk对象添加到thunks列表;

        7、记录数目recordCount加1;

        8、返回FutureRecordMetadata实例future。

        



相关文章
|
消息中间件 Kafka
Kafka Producer整体架构概述及源码分析(下)
Kafka Producer整体架构概述及源码分析
105 0
|
消息中间件 缓存 Kafka
Kafka Producer整体架构概述及源码分析(上)
Kafka Producer整体架构概述及源码分析
159 0
Kafka Producer整体架构概述及源码分析(上)
|
消息中间件 存储 缓存
源码分析Kafka 消息拉取流程(文末两张流程图)
源码分析Kafka 消息拉取流程(文末两张流程图)
源码分析Kafka 消息拉取流程(文末两张流程图)
|
消息中间件 存储 设计模式
源码分析 Kafka 消息发送流程(文末附流程图)
源码分析 Kafka 消息发送流程(文末附流程图)
源码分析 Kafka 消息发送流程(文末附流程图)
|
消息中间件 存储 Java
深度剖析 Kafka Producer 的缓冲池机制【图解 + 源码分析】
上次跟大家分享的文章「Kafka Producer 异步发送消息居然也会阻塞?」中提到了缓冲池,后面再经过一番阅读源码后,发现了这个缓冲池设计的很棒,被它的设计思想优雅到了,所以忍不住跟大家继续分享一波。
324 0
深度剖析 Kafka Producer 的缓冲池机制【图解 + 源码分析】
|
消息中间件 缓存 算法
从源码分析如何优雅的使用 Kafka 生产者(下)
在上文 设计一个百万级的消息推送系统 中提到消息流转采用的是 Kafka 作为中间件。 其中有朋友咨询在大量消息的情况下 Kakfa 是如何保证消息的高效及一致性呢? 正好以这个问题结合 Kakfa 的源码讨论下如何正确、高效的发送消息。
|
消息中间件 缓存 Java
从源码分析如何优雅的使用 Kafka 生产者(上)
在上文 设计一个百万级的消息推送系统 中提到消息流转采用的是 Kafka 作为中间件。 其中有朋友咨询在大量消息的情况下 Kakfa 是如何保证消息的高效及一致性呢? 正好以这个问题结合 Kakfa 的源码讨论下如何正确、高效的发送消息。
|
消息中间件 Kafka Shell
【kafka源码】Topic的创建源码分析(附视频)
【kafka源码】Topic的创建源码分析(附视频)
【kafka源码】Topic的创建源码分析(附视频)
|
消息中间件 Kafka
从源码分析如何优雅的使用 Kafka 生产者
前言 在上文 设计一个百万级的消息推送系统 中提到消息流转采用的是 Kafka 作为中间件。 其中有朋友咨询在大量消息的情况下 Kakfa 是如何保证消息的高效及一致性呢? 正好以这个问题结合 Kakfa 的源码讨论下如何正确、高效的发送消息。
752 0

热门文章

最新文章