掌握Kafka事务,看这篇就够了

简介: 先赞后看,南哥助你Java进阶一大半Kafka事务实际上引入了原子多分区写入的概念,播客画了以下流程图,展示了事务在分区级别如何工作。我是南哥,一个Java学习与进阶的领路人,相信对你通关面试、拿下Offer进入心心念念的公司有所帮助。

先赞后看,南哥助你Java进阶一大半

Kafka事务实际上引入了原子多分区写入的概念,Federico Valeri播客画了以下流程图,展示了事务在分区级别如何工作。

在这里插入图片描述

我是南哥,一个Java学习与进阶的领路人,相信对你通关面试、拿下Offer进入心心念念的公司有所帮助。

⭐⭐⭐本文收录在《Java学习/进阶/面试指南》:https://github..JavaSouth

1. Kafka事务

1.1 Kafka事务是什么

面试官:Kafka事务你说说看?

Kafka的事务主要应用在以流式处理的应用程序中,流式处理?听起来都觉得很迷糊不知道是什么东西。

Kafka事务支持的流式处理过程一般是这样,A程序从一个A主题消费A消息,对A消息进行处理后,再把结果写入到B主题,后续B程序会对B主题的消息进行消费。也就是消费 - 处理 - 生产的过程。

这样的一个过程涉及了两个消息的消费、一个消息的生产,如何保证这整个过程的事务性,让这整个过程要么成功、要么都不成功,这就是Kafka事务要做的事情。

南哥画下流程图,帮助大家理解理解。

在这里插入图片描述

1.2 重复消费问题

面试官:你说的这个过程,不使用事务有什么问题?

流式处理程序的消费 - 处理 - 生产过程,如果没有事务的保证,可能会出现多种消息重复消费的问题,这就会产生各种奇奇怪怪的问题了。

特别是在金融、支付行业,整个支付过程涉及了多个流程,例如用户下单 -> 库存校验 -> 订单处理-> 实际扣费 -> 清算结算,这些业务场景采用的便是流式处理程序。涉及资金的业务场景,事务的保障就更重要了!!

我说说两个消息重复消费的场景。

还是举例上文的场景:A程序从一个A主题消费A消息,对A消息进行处理后,再把结果写入到B主题,后续B程序会对B主题的消息进行消费。

(1)程序崩溃造成的重复消费

如果A程序对A消息进行处理后,把结果写入到B主题。但在偏移量提交的时候崩溃了,此时Kafka会认为A消息还没有被消费,而A程序崩溃了Kafka会把该分区分配给新的消费者。

问题就来了,新的消费者会重新消费A消息,等于B主题被写入了两条相同的消息,A消息被消费了两次。

(2)僵尸程序造成的重复消费

如果一个消费者程序认为自己没有死亡,但因为停止向Kafka发送心跳一段时间后,Kafka认为它已经死亡了,这种程序叫做僵尸程序。

A程序从Kafka读取A消息后,它暂时挂起了,失去和Kafka的连接也不能提交偏移量。此时Kafka认为其死亡了,会把A消费分配给新的消费者消费。

但后续A程序恢复后,会继续把A消息写入B主题,仍然造成了A消费被消费了两次。

可能很多人会说,这个流程有重复消费的问题,那处理重复消费的问题不就可以了,不必引入Kafka事务这么复杂。但在金融、支付这么严谨、重要的业务场景,我们要的是整个流程哪怕有一丁点出错,整个处理流程全都要进行回滚。

1.3 Kafka事务不能处理的问题

面试官:Kafka事务有不能处理的问题吗?

当然在整个Kafka事务的过程中,会有某些操作是不能回滚的,Kafka事务并不支持处理,我们来看看。

(1)Kafka事务过程加入外部逻辑

例如A程序消费消息A的过程中,发送了一个通知邮件,那整个外部操作是不可逆的,不在事务的处理范围内。

(2)读取Kafka消息后写入数据库

这其实也可以当成一个外部处理逻辑,数据库的事务并不在Kafka事务的处理范围内。

1.4 SpringBoot使用Kafka事务

面试官:接触过SpringBoot发送Kafka事务消息吗?

在SpringBoot项目我们可以轻松使用Kafka事务,通过以下Kafka事务的支持,我们就可以保证消息的发送和偏移量的提交具有事务性,从而避免上述的重复消费问题。

(1)先引入spring-kafka依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>xxx</version>
    </dependency>
</dependencies>

(2)配置Kafka事务管理器和生产者工厂

import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {
   
   

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
   
   
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        configProps.put(org.apache.kafka.clients.producer.ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        configProps.put(org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-");

        DefaultKafkaProducerFactory<String, Object> factory = new DefaultKafkaProducerFactory<>(configProps);
        factory.setTransactionIdPrefix("tran-");
        return factory;
    }

    @Bean
    public KafkaTransactionManager<String, Object> transactionManager(ProducerFactory<String, Object> producerFactory) {
   
   
        return new KafkaTransactionManager<>(producerFactory);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> producerFactory) {
   
   
        return new KafkaTemplate<>(producerFactory);
    }
}

(3)使用KafkaTemplate发送事务性消息

import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@EnableKafka
@Service
public class KafkaConsumerService {
   
   

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Transactional
    @KafkaListener(topics = "A")
    public void processMessage(String message) {
   
   
        // 处理从主题A接收到的消息
        String processedMessage = "Processed " + message;
        // 将处理后的消息发送到主题B
        kafkaTemplate.send("B", processedMessage);
        // 提交事务,确保消息发送和偏移量提交一起完成
    }
}

戳这,《JavaSouth》作为一份涵盖Java程序员所需掌握核心知识、面试重点的《Java学习进阶指南》。

在这里插入图片描述

我是南哥,南就南在Get到你的有趣评论➕点赞➕关注。

创作不易,不妨点赞、收藏、关注支持一下,各位的支持就是我创作的最大动力❤️

相关文章
|
6月前
|
消息中间件 分布式计算 Kafka
亿万级别Kafka演进之路:可靠性+事务+消息中间件+源码+日志
Kafka起初是由LinkedIn公司采用Scala语言开发的-一个多分区、多副本且基于ZooKeeper协调的分布式消息系统,现已被捐献给Apache基金会。目前Kafka已经定位为一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用。
|
21天前
|
消息中间件 存储 分布式计算
大数据-72 Kafka 高级特性 稳定性-事务 (概念多枯燥) 定义、概览、组、协调器、流程、中止、失败
大数据-72 Kafka 高级特性 稳定性-事务 (概念多枯燥) 定义、概览、组、协调器、流程、中止、失败
29 4
|
21天前
|
消息中间件 分布式计算 Java
大数据-73 Kafka 高级特性 稳定性-事务 相关配置 事务操作Java 幂等性 仅一次发送
大数据-73 Kafka 高级特性 稳定性-事务 相关配置 事务操作Java 幂等性 仅一次发送
23 2
|
6月前
|
消息中间件 Kafka API
Kafka Exactly Once 语义实现原理:幂等性与事务消息
Apache Kafka的Exactly-Once语义确保了消息处理的准确性和一致性。通过幂等性和事务消息,Kafka实现了要么全处理要么全不处理的原子性。文章详细解析了Kafka事务的工作流程,包括生产者的幂等性(通过序列号保证),以及事务消息的提交和回滚过程。Kafka事务提供了ACID保证,但存在性能限制,如额外的RPC请求和单生产者只能执行一个事务。此外,事务适用于同集群内的操作,跨集群时原子性无法保证。了解这些原理有助于开发者更好地利用Kafka事务构建可靠的数据处理系统。
173 3
 Kafka Exactly Once 语义实现原理:幂等性与事务消息
|
6月前
|
消息中间件 Kafka
【Kafka系列】Kafka事务一般在什么场景下使用呢
面试官:听说你精通Kafka,那我就考考你吧面试官:不用慌尽管说,错了也没关系😊。。。❤️。
【Kafka系列】Kafka事务一般在什么场景下使用呢
|
消息中间件 存储 Kafka
一文读懂 kafka 的事务机制 2
一文读懂 kafka 的事务机制
|
消息中间件 存储 大数据
一文读懂 kafka 的事务机制 1
一文读懂 kafka 的事务机制
|
消息中间件 关系型数据库 MySQL
SpringBoot-Kafka(生产者事务、手动提交offset、定时消费、消息转发、过滤消息内容、自定义分区器、提高吞吐量)
SpringBoot-Kafka(生产者事务、手动提交offset、定时消费、消息转发、过滤消息内容、自定义分区器、提高吞吐量)
SpringBoot-Kafka(生产者事务、手动提交offset、定时消费、消息转发、过滤消息内容、自定义分区器、提高吞吐量)
|
消息中间件 Kubernetes Java
Apache Kafka-事务消息的支持与实现(本地事务)
Apache Kafka-事务消息的支持与实现(本地事务)
632 0
|
消息中间件 存储 Java
「事件驱动架构」Apache Kafka中的事务
「事件驱动架构」Apache Kafka中的事务