大数据-73 Kafka 高级特性 稳定性-事务 相关配置 事务操作Java 幂等性 仅一次发送

简介: 大数据-73 Kafka 高级特性 稳定性-事务 相关配置 事务操作Java 幂等性 仅一次发送

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(正在更新…

章节内容

上节我们完成了如下内容:


Kafka 事务配置

Kafka 事务语义

消息定义

事务概览

事务组

事务协调器等等

04328e1dbca1c57374e2c47a73b69f1c_ff808677c0ae4a0ebd47043976331b9c.png

事务相关配置

BrokerConfigs

transactional.id.timeout.ms:在ms中,事务协调器在生产者TranscationalId提前过期之前等待的最长时间,并且没有从该生产者TransactionalId接收到任何任务状态更新,默认是604800000(7天),这允许每周一次的生产者作业维护它们的ID。

max.transaction.timeout.ms:事务允许的最大超时,如果客户端请求的事务时间超过此时间,broker将在InitPidRequest中返回InvalidTransactionTimeout错误,这可以防止客户端超时过大,从而导致用户无法从事务中包含的主题读取内容

默认值为900000(15分钟),这是消息事务需要发送的事件的保守上限

transaction.state.log.replication.factor:事务状态topic的副本数量,默认值3

transaction.state.log.num.partitions:事务状态主题的分区数,默认值50

transaction.state.log.min.isr:事务状态主题每个分区ISR的最小数量 默认是2

transaction.state.log.segement.bytes:事务状态主题的Segment大小,默认104857600字节

ProducerConfigs

enbale.idempotence:开启幂等

transaction.timeout.ms:事务超时时间,事务协调器在主动中止正在进行的事务之前等待生产者更新事务状态的最长时间,这个配置值将于InitPidRequest一起发送到事务协调器,如果该值大于max.transaction.timeout,在Broker中设置ms时,请求将失败,并出现InvalidTransactionTimeout错误。默认是60000,这使得交易不会阻塞下游消费超过一分钟,这在实时应用程序中通常是允许的。

transactional.id:用于事务性交付的TransactionalId,这支持跨多个生产者会话的可靠性语义,因为它允许客户端确保使用相同Transaction的事务在启动任何新事务之前已经完成。如果没有提供TransactionalId,则生产者仅限于幂等交付。

ConsumerConfigs

isolation.level:read_uncommitted以偏移顺序使用已提交和未提交的消息。read_commmitted仅以偏移量顺序使用非事务性消息或已提交事务性消息,为了维护偏移排序,这个设置意味着我们必须在使用者中缓冲消息,直到看到给定事务中的所有消息。

幂等性

基本流程

Kafka在引入幂等性之前,Producer向Broker发送消息,然后Broker将消息追加到消息流中后给Producer返回ACK信号值,实现流程如下:

生产中,会出现各种不确定的因素,比如在Producer在发送给Broker的时候出现网络异常。比如以下这种异常情况的出现:

上图这种情况,当Producer第一次发送消息给Broker时,Broker消息(x2,y2)追加到消息中,但是在返回ACK信号给Producer时失败了(比如网络异常)。此时,Producer端触发重试机制,将消息(x2,y2)重新发送给Broker,Broker接收到消息后,再次将该消息追加到消息流中,然后成功返回ACK信号给Producer。这样下来,消息流中就被重复追加两条相同的(x2,y2)的消息。


幂等性

保证咋消息重发的时候,消费者不会重复处理,即使在消费者收到重复消息的时候,重复处理,也要保证最终结果的一致性。

所谓幂等性,数学概念就是:f(f(x)) = f(x),f函数表示对消费的处理

比如,银行转账,如果失败,需要重试。不管重试多少次,都要保证结果一定一致的。


幂等性实现

添加唯一Id,类似于数据库的主键,用于标记一个消息:

Kafka为了实现幂等性,它在底层设计架构中引入了Producer和SequenceNumber


ProducerID:在每个新的Producer初始化时,会被分配一个唯一的ProducerID,这个ProducerID对客户端的使用者是不可见的。

SequenceNumber:对于每个ProducerID,Producer发送数据的每个Topic和Partition都对应一个0开始单调递增的SequenceNumber值。

同样的,这是一种理想状态下的发送流程。实际情况下,会有很多不确定的因素,比如Broker在发送ACK信号给Producer时出现了网络异常,导致发送失败。异常情况如下图所示:

7be22704b5037c762c96fb7985c840da_2048e21da3f64b5cb15a0171fe293f4f.png 当Producer发送消息(x2,y2)给Broker时,Broker接收到消息并将其追加到消息流中。此时,Broker返回ACK信号给Producer时,发生异常导致Producer接收ACK信号失败。对于Producer来说,会触发重试机制,将消息(x2,y2)再次发送,但是,由于引入幂等性,在每条消息中附带了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber发送给Broker,而之前Broker缓存之前发送过的相同的消息,那么在消息流中的消息就只有一条(x2,y2),不会出现重复发送的情况。

// 实例化⼀个Producer对象
Producer<String, String> producer = new KafkaProducer<>(props);

在 org.apache.kafka.clients.producer.iinternals.Sender 类中,在run()中有一个maybeWaitForPid()方法,用来生成一个ProducerID,实现代码如下:

private void maybeWaitForPid() {
    if (transactionState == null) {
        return;
    }

    while (!transactionState.hasPid()) {
        try {
            Node node = awaitLeastLoadedNodeReady(requestTimeout);
            if (node != null) {
                ClientResponse response = sendAndAwaitInitPidRequest(node);
                if (response.hasResponse() && (response.responseBody() instanceof InitPidResponse)) {
                    InitPidResponse initPidResponse = (InitPidResponse) response.responseBody();
                    transactionState.setPidAndEpoch(initPidResponse.producerId(), initPidResponse.epoch());
                } else {
                    log.error("Received an unexpected response type for an InitPidRequest from {}. We will back off and try again.", node);
                }
            } else {
                log.debug("Could not find an available broker to send InitPidRequest to. We will back off and try again.");
            }
        } catch (Exception e) {
            log.warn("Received an exception while trying to get a pid. Will back off and retry.", e);
        }
        
        log.trace("Retry InitPidRequest in {}ms.", retryBackoffMs);
        time.sleep(retryBackoffMs);
        metadata.requestUpdate();
    }
}

事务操作

基本介绍

在Kafka事务中,一个原子性操作,根据操作类型可以分为3中情况,情况如下:


只要Producer生产消息,这种场景需要事务的介入

消费消息和生产消息并存,比如Consumer&Producer模式,这种场景是一般Kafka项目中比较常见的模式,需要事务介入。

只有Consumer消费消息,这种操作在实际项目中意义不大,和手动CommitOffets的结果一样,而且这种场景不是事务的引入的目的。

// 初始化事务,需要注意确保transation.id属性被分配
void initTransactions();
// 开启事务
void beginTransaction() throws ProducerFencedException;
// 为Consumer提供的在事务内Commit Offsets的操作
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
 String consumerGroupId) throws ProducerFencedException;
// 提交事务
void commitTransaction() throws ProducerFencedException;
// 放弃事务,类似于回滚事务的操作
void abortTransaction() throws ProducerFencedException;

案例1:单Producer 保证仅一次发送

编写代码

package icu.wzk.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.HashMap;
import java.util.Map;


public class MyTransactionalProducer {

    public static void main(String[] args) {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "h121.wzk.icu:9092");
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // 提供客户端ID
        configs.put(ProducerConfig.CLIENT_ID_CONFIG, "tx_producer");
        // 事务ID
        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my_tx_id");
        // 要求ISR确认
        configs.put(ProducerConfig.ACKS_CONFIG, "all");

        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
        // 初始化事务
        producer.initTransactions();
        // 开启事务
        producer.beginTransaction();

        try {
            // 发送消息
            producer.send(new ProducerRecord<>("tp_tx_01", "tx_msg_02"));
            // 可以在这里设置一些异常来测试 比如:
            // int n = 1 / 0;
        } catch (Exception e) {
            // 中止事务
            producer.abortTransaction();
        } finally {
            producer.close();
        }
    }

}

测试运行

运行之后,控制台输出结果如下:

案例2:消费-转换-生产 事务保证仅一次发送

编写代码

package icu.wzk.kafka;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;


public class MyTransactional {

    public static void main(String[] args) {
        KafkaProducer<String, String> producer = getProducer();
        KafkaConsumer<String, String> consumer = getConsumer();
        // 事务的初始化
        producer.initTransactions();
        // 订阅主题
        consumer.subscribe(Collections.singleton("tp_tx_01"));

        final ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

        // 开启事务
        producer.beginTransaction();
        try {
            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record);
                producer.send(new ProducerRecord<>("tp_tx_out_01", record.key(), record.value()));
                offsets.put(
                        new TopicPartition(record.topic(), record.partition()),
                        // 偏移量表示下一条要消费的消息
                        new OffsetAndMetadata(record.offset() + 1)
                );
            }
            // 将该消息的偏移量提交作为事务的一部分,随事务提交和回滚(不提交消费偏移量)
            producer.sendOffsetsToTransaction(offsets, "consumer_grp_02");
            // 提交事务
            producer.commitTransaction();
        } catch (Exception e) {
            e.printStackTrace();
            // 中止事务
            producer.abortTransaction();
        } finally {
            producer.close();
            consumer.close();
        }
    }

    public static KafkaProducer<String, String> getProducer() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "h121.wzk.icu:9092");
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // 设置ClientID
        configs.put(ProducerConfig.CLIENT_ID_CONFIG, "tx_producer_01");
        // 设置事务ID
        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx_id_02");
        // 要求ISR确认
        configs.put(ProducerConfig.ACKS_CONFIG, "all");
        // 启用幂等性
        configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

        return producer;
    }

    public static KafkaConsumer<String, String> getConsumer() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "h121.wzk.icu:9092");
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,  StringDeserializer.class);
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 设置消费组ID
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_grp_02");
        // 不启⽤消费者偏移量的⾃动确认,也不要⼿动确认
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer_client_02");
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
        return consumer;
    }

}

测试运行

(由于我测试的云服务器的Kafka掉线了,我又启动了一次,重新执行一次案例1。)

下面是案例2直接的结果如下图:

目录
相关文章
|
5月前
|
Java API 数据处理
Java新特性:使用Stream API重构你的数据处理
Java新特性:使用Stream API重构你的数据处理
Java API 开发者
172 0
|
6月前
|
消息中间件 Java Kafka
Java 事件驱动架构设计实战与 Kafka 生态系统组件实操全流程指南
本指南详解Java事件驱动架构与Kafka生态实操,涵盖环境搭建、事件模型定义、生产者与消费者实现、事件测试及高级特性,助你快速构建高可扩展分布式系统。
337 7
|
7月前
|
并行计算 Java API
Java List 集合结合 Java 17 新特性与现代开发实践的深度解析及实战指南 Java List 集合
本文深入解析Java 17中List集合的现代用法,结合函数式编程、Stream API、密封类、模式匹配等新特性,通过实操案例讲解数据处理、并行计算、响应式编程等场景下的高级应用,帮助开发者提升集合操作效率与代码质量。
342 1
|
7月前
|
安全 Java 微服务
Java 最新技术和框架实操:涵盖 JDK 21 新特性与 Spring Security 6.x 安全框架搭建
本文系统整理了Java最新技术与主流框架实操内容,涵盖Java 17+新特性(如模式匹配、文本块、记录类)、Spring Boot 3微服务开发、响应式编程(WebFlux)、容器化部署(Docker+K8s)、测试与CI/CD实践,附完整代码示例和学习资源推荐,助你构建现代Java全栈开发能力。
831 1
|
7月前
|
缓存 安全 Java
Java 并发新特性实战教程之核心特性详解与项目实战
本教程深入解析Java 8至Java 19并发编程新特性,涵盖CompletableFuture异步编程、StampedLock读写锁、Flow API响应式流、VarHandle内存访问及结构化并发等核心技术。结合电商订单处理、缓存系统、实时数据流、高性能计数器与用户资料聚合等实战案例,帮助开发者高效构建高并发、低延迟、易维护的Java应用。适合中高级Java开发者提升并发编程能力。
283 0
|
7月前
|
安全 Java API
Java 17 及以上版本核心特性在现代开发实践中的深度应用与高效实践方法 Java 开发实践
本项目以“学生成绩管理系统”为例,深入实践Java 17+核心特性与现代开发技术。采用Spring Boot 3.1、WebFlux、R2DBC等构建响应式应用,结合Record类、模式匹配、Stream优化等新特性提升代码质量。涵盖容器化部署(Docker)、自动化测试、性能优化及安全加固,全面展示Java最新技术在实际项目中的应用,助力开发者掌握现代化Java开发方法。
335 1
|
7月前
|
IDE Java API
Java 17 新特性与微服务开发的实操指南
本内容涵盖Java 11至Java 17最新特性实战,包括var关键字、字符串增强、模块化系统、Stream API、异步编程、密封类等,并提供图书管理系统实战项目,帮助开发者掌握现代Java开发技巧与工具。
357 1
|
7月前
|
Java 数据库连接 API
Java 8 + 特性及 Spring Boot 与 Hibernate 等最新技术的实操内容详解
本内容涵盖Java 8+核心语法、Spring Boot与Hibernate实操,按考试考点分类整理,含技术详解与代码示例,助力掌握最新Java技术与应用。
230 2