Apache Kafka – KIP 32,33 Time Index

简介:

32, 33都是和时间相关的,

KIP-32 - Add timestamps to Kafka message

引入版本,0.10.0.0

需要给kafka的message加上时间戳,这样更方便一些, 比如在做retention,rolling,或getMessageByTime的时候

在config里面可以配置,

  1. message.timestamp.type - This topic level configuration defines the type of timestamp in the messages of a topic. The valid values are CreateTime or LogAppendTime.
  2. max.message.time.difference.ms - This configuration only works when message.timestamp.type=CreateTime. The broker will only accept messages whose timestamp differs no more than max.message.time.difference.ms from the broker local time.

type,可以定义,是以用户指定的event time,或是kafka处理的processing time来作为时间戳

如果选用用户createTime,会产生的问题是,首先这个时间戳不一定是递增的

max.message.time.difference.ms 的默认值是Long.MaxValue,如果设置该项,会丢弃时间异常的message,即过老或过新的;

 

然后在ProducerRecord,ConsumerRecord,增加timestamp字段

Add a timestamp field to ProducerRecord and ConsumerRecord.

对于createTime,需要user自己在创建ProducerRecord的时候去设置,timestamp

如果是LogAppendTime,broker会在收到message后自动填上这个timestamp

 

使用CreateTime和LogAppendTime的区别?

The key decision we made in this KIP is whether use LogAppendTime(Broker Time) or CreateTime(Application Time)

The good things about LogAppendTime are:

  1. Broker is more robust.
  2. Monotonically increasing.
  3. Deterministic behavior for log rolling and retention.
  4. If CreateTime is required, it can always be put into the message payload.

The good things about CreateTime are:

  1. More intuitive to users.
  2. User may want to have log retention based on when the message is created instead of when the message enters the pipeline.
  3. Immutable after entering the pipeline.

场景不一样,CreateTime主要是对于分析的场景, 
其实message里面往往是包含有event time的,所以单纯从队列而言,LogAppendTime就足够,而且各种逻辑会简单很多

 

好,现在message里面有时间了,怎么用?

所以提出,

KIP-33 - Add a time based log index

引入版本,0.10.1.0

动机,

Kafka has a few timestamp based functions, including

  1. Searching message by timestamp
  2. Time based log rolling
  3. Time based log retention.

Currently these operations depend on the create time / modification time of the log segment file. This has a few issues.

  1. Searching offset by timestamp has very coarse granularity (log segment level), it also does not work well when replica is reassigned.
  2. The time based log rolling and retention does not work well when replica is reassigned.

之前retention,rolling,或search message都会用到time 
而之前的时间都是用的是log segment的创建时间,这样会有些问题

尤其当发生replica reassigned后,log segment的时间会变成最新,所以就不准确了

这里会引入,time-based log index,来建立时间索引

所以log目录下的文件,就从原来的log file,offset index两个,增加time-based log index,变成3个

The log index works for both LogAppendTime and CreateTime.

Create another index file for each log segment with name SegmentBaseOffset.timeindex. The density of the index is upper bounded by index.interval.bytes configuration.

格式,

Time Index Entry => Timestamp Offset

Timestamp => int64

Offset => int32

 

创建timeIndex的过程,

  1. When a new log segment is created, the broker will create a time index file for the log segment. 
  2. The default initial / max size of the time index files is the same as the offset index files. (time index entry is 1.5x of the size of offset index entry, user should set the configuration accordingly).
  3. Each log segment maintains the largest timestamp so far in that segment. The initial value of the largest timestamp is -1 for a newly created segment.
  4. When broker receives a message, if the message is not rejected due to timestamp exceeds threshold, the message will be appended to the log. (The timestamp will either be LogAppendTime or CreateTime depending on the configuration)
  5. When broker appends the message to the log segment, if an offset index entry is inserted, it will also insert a time index entry if the max timestamp so far is greater than the timestamp in the last time index entry.
    • For message format v0, the timestamp is always -1, so no time index entry will be inserted when message is appended.
  6. When the current active segment is rolled out or closed. A time index entry will be inserted into the time index to ensure the last time index entry has the largest timestamp of the log segment.
    1. If largest timestamp in the segment is non-negative (at least one message has a timestamp), the entry will be(largest_timestamp_in_the_segment -> base_offset_of_the_next_segment)
    2. If largest timestamp in the segment is -1 (No message in the segment has a timestamp), the time index will be empty and the largest timestamp would be default to the segment last modification time.

The time index is not monotonically increasing for the segments of a partition. Instead, it is only monotonically increasing within each individual time index file. i.e. It is possible that the time index file for a later log segment contains smaller timestamp than some timestamp in the time index file of an earlier segment.

 

创建新的log segment的同时,会创建time index file,并初始化

当brokers append一条message到log segment时,首先offset index entry 会被插入(index插入都是有inteval的),同时也会插入一条time index entry (当timestamp大于当前TimeIndex中last entry的时间,所以time index是单调递增的)

并且当active segment发生roll或者closed的时候也会插入一条time index entry(因为index插入有间隔,所以在关闭或新开的时候,需要把last记录插入index)

并且上面在插入time index的时候,会判断时间戳是否当前time index中last entry的时间,所以在单个time index file中,时间是单调递增的

但是在多个time index file之间,无法保证,即在新的time index file中会出现比较老的时间戳;

如果message用的是createTime,这个问题应该会经常碰到

 

按上面的场景,这样,我们在做rolling,retention的时候都会用这个time index

步骤如下,

Enforce time based log retention

To enforce time based log retention, the broker will check from the oldest segment forward to the latest segment. For each segment, the broker checks the last time index entry of a log segment. The timestamp will be the latest timestamp of the messages in the log segment. So if that timestamp expires, the broker will delete the log segment. The broker will stop at the first segment which is not expired. i.e. the broker will not expire a segment even if it is expired, unless all the older segment has been expired.

broker会从最老的segment开始遍历,如果该segment的last time index是过期的,就把这个segment删掉 
如果没有过期,就停止扫描;这样如果后面还有过期的segment,也不会被过期掉

所以如果用createTime,会让log retention变的有点不确定和混乱

 

Enforce time based log rolling

Currently time based log rolling is based on the creating time of the log segment. 
With this KIP, the time based rolling would be changed to only based on the message timestamp. 
More specifically, if the first message in the log segment has a timestamp, A new log segment will be rolled out if timestamp in the message about to be appended is greater than the timestamp of the first message in the segment + log.roll.ms.

When message.timestamp.type=CreateTime, user should set max.message.time.difference.ms appropriately together with log.roll.ms to avoid frequent log segment roll out.

During the migration phase, if the first message in a segment does not have a timestamp, the log rolling will still be based on the (current time - create time of the segment).

log segment的rolling也是根据message的时间,所以这里当message.timestamp.type=CreateTime的时候,必须谨慎的设置max.message.time.difference.ms, 以避免rolling

上述比较好理解,唯一注意的是,如果message.timestamp.type=CreateTime, 以message的timestamp作为依据,会有很多的不确定性

 

Search message by timestamp

When searching by timestamp, broker will start from the earliest log segment and check the last time index entry. If the timestamp of the last time index entry is greater than the target timestamp, the broker will do binary search on that time index to find the closest index entry and scan the log from there. Otherwise it will move on to the next log segment.

Searching by timestamp will have better accuracy. The guarantees provided are:

  • The messages whose timestamp are after the searched timestamp will be consumed.
  • Some messages with earlier timestamp might also be consumed.

The OffsetRequest behaves almost the same as before. If timestamp T is set in the OffsetRequest, the first offset in the returned offset sequence means that if user want to consume from T, that is the offset to start with. The guarantee is that any message whose timestamp is greater than T has a bigger offset. i.e. Any message before this offset has a timestamp <T.

The time index granularity does not change the actual timestamp searching granularity. It only affects the time needed for searching.

这个feature非常赞,现在kafka终于可以支持按时间replay

之前只能是在segment级别去replay

过程就是,

broker会从最早的earliest log segment开始遍历,check last time index entry,如果小于指定时间,说明这个segment里面所有的message都是早于指定时间的,所以skip

继续直到找到一个segment的last time index entry比指定时间大的,说明这个segment中有我们需要的数据

接着,在该segment的time index中进行二分查找,找到最接近的时间index,从对应的offset开始读取

这里会保证,在指定时间后的数据都会被读取到,但注意之前的数据也是有可能被读到的

首先,因为只是找到最接近time index,所以不是精确的,总会多读点

再者,如果是createTime的时间戳,message不是时间单调递增的,所以后面有可能有老的message

在源码上,

KafkaAPIs.handleOffsetRequestV1

会调用,

              fetchOffsetForTimestamp(replicaManager.logManager, topicPartition, timestamp) match {
                case Some(timestampOffset) if allowed(timestampOffset) => timestampOffset
                case _ => TimestampOffset(ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET)
              }

而fetchOffsetForTimestamp的实现如下,

复制代码
private def fetchOffsetForTimestamp(logManager: LogManager, topicPartition: TopicPartition, timestamp: Long) : Option[TimestampOffset] = {
    logManager.getLog(topicPartition) match {
      case Some(log) =>
        log.fetchOffsetsByTimestamp(timestamp)
      case None =>
        throw new UnknownTopicOrPartitionException(s"$topicPartition does not exist on the broker.")
    }
  }
复制代码

logManager.getLog会返回partition所对应的Log,The log is a sequence of LogSegments

Log.fetchOffsetsByTimestamp的实现,

复制代码
// Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides
// constant time access while being safe to use with concurrent collections unlike `toArray`.
val segmentsCopy = logSegments.toBuffer
// For the earliest and latest, we do not need to return the timestamp.
if (targetTimestamp == ListOffsetRequest.EARLIEST_TIMESTAMP)
    return Some(TimestampOffset(Record.NO_TIMESTAMP, segmentsCopy.head.baseOffset)) //如果是earliest,返回第一个segment的baseoffset
else if (targetTimestamp == ListOffsetRequest.LATEST_TIMESTAMP)
    return Some(TimestampOffset(Record.NO_TIMESTAMP, logEndOffset)) //如果是latest,返回logEndOffset

val targetSeg = {
  // Get all the segments whose largest timestamp is smaller than target timestamp
  val earlierSegs = segmentsCopy.takeWhile(_.largestTimestamp < targetTimestamp)
  // We need to search the first segment whose largest timestamp is greater than the target timestamp if there is one.
  if (earlierSegs.length < segmentsCopy.length)
    Some(segmentsCopy(earlierSegs.length)) //返回第一个大于targetTimestamp的segment
  else
    None

targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp))
复制代码

根据每个segment里面的time index中的largestTimestamp去比较

找出比targetTimestamp大的,

LogSegment.findOffsetByTimestamp

是在该time index中,继续二分查找,找到最接近的timestamp所对应的offset

相关文章
|
10月前
|
消息中间件 安全 Kafka
Apache Kafka安全加固指南:保护你的消息传递系统
【10月更文挑战第24天】在现代企业环境中,数据的安全性和隐私保护至关重要。Apache Kafka作为一款广泛使用的分布式流处理平台,其安全性直接影响着业务的稳定性和用户数据的安全。作为一名资深的Kafka使用者,我深知加强Kafka安全性的重要性。本文将从个人角度出发,分享我在实践中积累的经验,帮助读者了解如何有效地保护Kafka消息传递系统的安全性。
572 7
|
10月前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
462 5
|
10月前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
294 4
|
10月前
|
消息中间件 监控 大数据
优化Apache Kafka性能:最佳实践与调优策略
【10月更文挑战第24天】作为一名已经对Apache Kafka有所了解并有实际使用经验的开发者,我深知在大数据处理和实时数据流传输中,Kafka的重要性不言而喻。然而,在面对日益增长的数据量和业务需求时,如何保证系统的高性能和稳定性成为了摆在我们面前的一个挑战。本文将从我的个人视角出发,分享一些关于如何通过合理的配置和调优来提高Kafka性能的经验和建议。
286 4
|
10月前
|
消息中间件 Java Kafka
初识Apache Kafka:搭建你的第一个消息队列系统
【10月更文挑战第24天】在数字化转型的浪潮中,数据成为了企业决策的关键因素之一。而高效的数据处理能力,则成为了企业在竞争中脱颖而出的重要武器。在这个背景下,消息队列作为连接不同系统和服务的桥梁,其重要性日益凸显。Apache Kafka 是一款开源的消息队列系统,以其高吞吐量、可扩展性和持久性等特点受到了广泛欢迎。作为一名技术爱好者,我对 Apache Kafka 产生了浓厚的兴趣,并决定亲手搭建一套属于自己的消息队列系统。
258 2
初识Apache Kafka:搭建你的第一个消息队列系统
|
10月前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
507 5
|
10月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
257 1
|
10月前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
293 2
|
10月前
|
消息中间件 Ubuntu Java
Ubuntu系统上安装Apache Kafka
Ubuntu系统上安装Apache Kafka
|
10月前
|
消息中间件 监控 Kafka
Apache Kafka 成为处理实时数据流的关键组件。Kafka Manager 提供了一个简洁的 Web 界面
随着大数据技术的发展,Apache Kafka 成为处理实时数据流的关键组件。Kafka Manager 提供了一个简洁的 Web 界面,方便管理和监控 Kafka 集群。本文详细介绍了 Kafka Manager 的部署步骤和基本使用方法,包括配置文件的修改、启动命令、API 示例代码等,帮助你快速上手并有效管理 Kafka 集群。
162 0

推荐镜像

更多