异常处理
Exception处理
InvalidProducerEpoch
这是一种Fatal Error,它说明当前Producer是一个过期的实例,有Transaction ID相同但epoch更新的Producer实例被创建并使用。此时Producer会停止并抛出Exception。
InvalidPidMapping
Transaction Coordinator没有与该Transaction ID对应的PID。此时Producer会通过包含有Transaction ID的InitPidRequest请求创建一个新的PID。
NotCorrdinatorForGTransactionalId
该Transaction Coordinator不负责该当前事务。Producer会通过FindCoordinatorRequest请求重新寻找对应的Transaction Coordinator。
InvalidTxnRequest
违反了事务协议。正确的Client实现不应该出现这种Exception。如果该异常发生了,用户需要检查自己的客户端实现是否有问题。
CoordinatorNotAvailable
Transaction Coordinator仍在初始化中。Producer只需要重试即可。
DuplicateSequenceNumber
发送的消息的序号低于Broker预期。该异常说明该消息已经被成功处理过,Producer可以直接忽略该异常并处理下一条消息
InvalidSequenceNumber
这是一个Fatal Error,它说明发送的消息中的序号大于Broker预期。此时有两种可能
- 数据乱序。比如前面的消息发送失败后重试期间,新的消息被接收。正常情况下不应该出现该问题,因为当幂等发送启用时,max.inflight.requests.per.connection被强制设置为1,而acks被强制设置为all。故前面消息重试期间,后续消息不会被发送,也即不会发生乱序。并且只有ISR中所有Replica都ACK,Producer才会认为消息已经被发送,也即不存在Broker端数据丢失问题。
- 服务器由于日志被Truncate而造成数据丢失。此时应该停止Producer并将此Fatal Error报告给用户。
InvalidTransactionTimeout
InitPidRequest调用出现的Fatal Error。它表明Producer传入的timeout时间不在可接受范围内,应该停止Producer并报告给用户。
处理Transaction Coordinator失败
写PREPARE_COMMIT/PREPARE_ABORT前失败
Producer通过FindCoordinatorRequest找到新的Transaction Coordinator,并通过EndTxnRequest请求发起COMMIT或ABORT流程,新的Transaction Coordinator继续处理EndTxnRequest请求——写PREPARE_COMMIT或PREPARE_ABORT,写Transaction Marker,写COMPLETE_COMMIT或COMPLETE_ABORT。
写完PREPARE_COMMIT/PREPARE_ABORT后失败
此时旧的Transaction Coordinator可能已经成功写入部分Transaction Marker。新的Transaction Coordinator会重复这些操作,所以部分Partition中可能会存在重复的COMMIT或ABORT,但只要该Producer在此期间没有发起新的事务,这些重复的Transaction Marker就不是问题。
写完COMPLETE_COMMIT/ABORT后失败
旧的Transaction Coordinator可能已经写完了COMPLETE_COMMIT或COMPLETE_ABORT但在返回EndTxnRequest之前失败。该场景下,新的Transaction Coordinator会直接给Producer返回成功。
事务过期机制
事务超时
transaction.timeout.ms
终止过期事务
当Producer失败时,Transaction Coordinator必须能够主动的让某些进行中的事务过期。否则没有Producer的参与,Transaction Coordinator无法判断这些事务应该如何处理,这会造成:
- 如果这种进行中事务太多,会造成Transaction Coordinator需要维护大量的事务状态,大量占用内存
- Transaction Log内也会存在大量数据,造成新的Transaction Coordinator启动缓慢
- READ_COMMITTED的Consumer需要缓存大量的消息,造成不必要的内存浪费甚至是OOM
- 如果多个Transaction ID不同的Producer交叉写同一个Partition,当一个Producer的事务状态不更新时,READ_COMMITTED的Consumer为了保证顺序消费而被阻塞
为了避免上述问题,Transaction Coordinator会周期性遍历内存中的事务状态Map,并执行如下操作
- 如果状态是BEGIN并且其最后更新时间与当前时间差大于transaction.remove.expired.transaction.cleanup.interval.ms(默认值为1小时),则主动将其终止:1)未避免原Producer临时恢复与当前终止流程冲突,增加该Producer对应的PID的epoch,并确保将该更新的信息写入Transaction Log;2)以更新后的epoch回滚事务,从而使得该事务相关的所有Broker都更新其缓存的该PID的epoch从而拒绝旧Producer的写操作
- 如果状态是PREPARE_COMMIT,完成后续的COMMIT流程————向各<Topic, Partition>写入Transaction Marker,在Transaction Log内写入COMPLETE_COMMIT
- 如果状态是PREPARE_ABORT,完成后续ABORT流程
终止Transaction ID
某Transaction ID的Producer可能很长时间不再发送数据,Transaction Coordinator没必要再保存该Transaction ID与PID等的映射,否则可能会造成大量的资源浪费。因此需要有一个机制探测不再活跃的Transaction ID并将其信息删除。
Transaction Coordinator会周期性遍历内存中的Transaction ID与PID映射,如果某Transaction ID没有对应的正在进行中的事务并且它对应的最后一个事务的结束时间与当前时间差大于transactional.id.expiration.ms(默认值是7天),则将其从内存中删除并在Transaction Log中将其对应的日志的值设置为null从而使得Log Compact可将其记录删除。
与其它系统事务机制对比
PostgreSQL MVCC
Kafka的事务机制与《MVCC PostgreSQL实现事务和多版本并发控制的精华》一文中介绍的PostgreSQL通过MVCC实现事务的机制非常类似,对于事务的回滚,并不需要删除已写入的数据,都是将写入数据的事务标记为Rollback/Abort从而在读数据时过滤该数据。
两阶段提交
Kafka的事务机制与《分布式事务(一)两阶段提交及JTA》一文中所介绍的两阶段提交机制看似相似,都分PREPARE阶段和最终COMMIT阶段,但又有很大不同。
- Kafka事务机制中,PREPARE时即要指明是PREPARE_COMMIT还是PREPARE_ABORT,并且只须在Transaction Log中标记即可,无须其它组件参与。而两阶段提交的PREPARE需要发送给所有的分布式事务参与方,并且事务参与方需要尽可能准备好,并根据准备情况返回Prepared或Non-Prepared状态给事务管理器。
- Kafka事务中,一但发起PREPARE_COMMIT或PREPARE_ABORT,则确定该事务最终的结果应该是被COMMIT或ABORT。而分布式事务中,PREPARE后由各事务参与方返回状态,只有所有参与方均返回Prepared状态才会真正执行COMMIT,否则执行ROLLBACK
- Kafka事务机制中,某几个Partition在COMMIT或ABORT过程中变为不可用,只影响该Partition不影响其它Partition。两阶段提交中,若唯一收到COMMIT命令参与者Crash,其它事务参与方无法判断事务状态从而使得整个事务阻塞
- Kafka事务机制引入事务超时机制,有效避免了挂起的事务影响其它事务的问题
- Kafka事务机制中存在多个Transaction Coordinator实例,而分布式事务中只有一个事务管理器
Zookeeper
Zookeeper的原子广播协议与两阶段提交以及Kafka事务机制有相似之处,但又有各自的特点
- Kafka事务可COMMIT也可ABORT。而Zookeeper原子广播协议只有COMMIT没有ABORT。当然,Zookeeper不COMMIT某消息也即等效于ABORT该消息的更新。
- Kafka存在多个Transaction Coordinator实例,扩展性较好。而Zookeeper写操作只能在Leader节点进行,所以其写性能远低于读性能。
- Kafka事务是COMMIT还是ABORT完全取决于Producer即客户端。而Zookeeper原子广播协议中某条消息是否被COMMIT取决于是否有一大半FOLLOWER ACK该消息。