Kafka消息分区&producer拦截器&无消息丢失(八)

简介: Kafka消息分区&producer拦截器&无消息丢失(八)

上篇文章说了,acks,1代表什么都不管,即使配置了回调也不会起作用,0代表不会等待replic副本里的不会持久化,只要broker leader持久化成功则返回给producer。-1代表all,则表示全部持久化成功才返回成功给producer,Retries,batch.size:kafka,linger.ms,buffer.memory,compression.type等参数。

producer参数---Kafka从入门到精通(七)


一、消息分区机制


producer发送过程有个很重要的步骤,就是确定发送的消息在哪个topic分区中。Producer提供了分区策略和对应的分区器(partitioner)供用户使用。新版本的会把相同key的消息发送到partition上,如果没有指定key,则会通过轮询分配均匀在topic所在分区,而对于旧版本的无法分配均匀。

自定义分区机制:

对于有key的消息,java版本的producer会通过自己的算法计算key的哈希值,然后在总分区取模分配到目标分区。但有的时候用户想实现自己的分区策略,而这又是默认partitioner无法实现的,那么此刻就可以用producer提供的自定义分区策略。

/**
 * @author keying
 */
public class AuditPartitioner implements Partitioner {
    private Random random;
    @Override
    public int partition(String topic, Object keyObj, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        String key = (String) keyObj;
        List<PartitionInfo> partitionInfoList = cluster.availablePartitionsForTopic(topic);
        int auditPartition = partitionInfoList.size() - 1;
        return key == null || key.isEmpty() ||
                !key.contains("audit") ? random.nextInt(partitionInfoList.size() - 1) : auditPartition;
        //return 0;
    }
    @Override
    public void close() {
    }
    @Override
    public void configure(Map<String, ?> configs) {
        random = new Random();
    }
}

若自定义分区机制,则需要做两件事:

1、先定义一个类,实现org.apache.kafka.clients.producer.Partitioner接口,主要重写partition方法。

2、在构造kafkaProducer的时候propertites设置partitioner参数。

Partition方法里主要接受参数有topic,key和value,还有集群元数据信息,一起来确定目标分区,而close方法则是用于关闭partitioner的,主要是为了关闭那些创建partitioner时初始化的系统资源等。

举个例子如何实现自定义的partitioner呢,假设我们有个类似审计功能,审计功能发送kafka的时候可以给他分配字符串“audit”,我们想让这类消息发到topic最后一个分区上,便于后续统一处理,而对于相同topic下的其他消息则采用随机发送的策略发送到其他分区上。

所以,用户可以根据key来指定一些策略,还可以根据value信息做一些定制化分区策略。


二、消息序列化


网络中发送数据都是以字节的方式,kafka也不例外,它可以是字符串,一个整数,一个数组或者其他任意对象类型。序列化器(serializer)负责在producer发送将消息转换成字节数组,而与之相反,解序列化器(deserializer)则用于将consumer接受到的字节数组转换成相应的对象。

Kafka1.0.0默认提供十几种序列化器,常见的serializer用的是StringSerializer,然后其他的还有LongSerializer,IntegerSerializer等。如果是复杂的类型,比如Avro则需要自定义序列化。


三、Producer拦截器


Producer拦截器相当于一个新的功能,他可以在producer发送消息之后以及回调之前有机会对消息做些定制化需求,比如修改消息等。同时,producer允许用户指定多个interceptor

按序作用于同一条消息从而形成一个拦截器,intercetpor的实现接口是producerInterceptor,其定义方法如下:

onSend(producerRecord):该方法封装进kafkaProducer.send方法中,即他运行在用户主线程中。Producer确保在消息被序列化以计算分区前调用该方法。用户可以在该方法对消息做任何处理,但最好不要修改消息的所属topic和分区,否则影响分区计算。

onAcknowledgement(recordMetadata,Exception):该消息会在被应答之前或者消息发送失败时候调用,并且通常在producer回调触发之前调用。OnAcknoewledgement运行在producer的I/O线程中,因此不要在该方法放入很重的逻辑,否则会拖慢producer的消息发送效率。

Close:关闭interceptor,主要做一些资源清理工作。


如前所述,interceptor可能运行在多个线程中,因此具体实现时候需要用户自行确认保护线程安全。若指定多个interceptor,则producer将按照指定顺序调用他们,同时把每个interceptor中捕获的异常记录到错误日志中而不是向上传递。

/**
 * @author keying
 * @date 2022-08-07 17:24:21
 */
public class OneInterceptor implements ProducerInterceptor<String, String> {
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord record) {
        return new ProducerRecord(record.topic(), record.partition(), 
                record.timestamp(), System.currentTimeMillis() + "," + record.value().toString());
    }
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }
    @Override
    public void close() {
    }
    @Override
    public void configure(Map<String, ?> configs) {
    }
}
/**
 * @author keying
 * @date 2022-08-07 17:27:40
 */
public class TwoInterceptor implements ProducerInterceptor<String, String> {
    private int errorCounter = 0;
    private int successCounter = 0;
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        return null;
    }
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            successCounter++;
        } else {
            errorCounter++;
        }
    }
    @Override
    public void close() {
    }
    @Override
    public void configure(Map<String, ?> configs) {
        System.out.println("成功:"+successCounter);
        System.out.println("失败:"+errorCounter);
    }
}

上面例子是实现一个简单的双inteceptor组成的拦截器,第一个拦截器会在消息发送前将时间戳加入到value,第二个拦截器则会统计成功和失败的次数。


四、无消息丢失配置


Producer采用的是异步发送消息机制,kafkaProducer.send方法仅仅把消息放入缓冲区,由一个专属的I/O线程负责提取缓冲区的消息并封装到batch中,然后发送出去。显然,整个过程存在数据丢失的窗口,若I/O线程在发送之前崩溃,则数据会丢失。

另一个问题则是消息会乱序,比如客户端依次发送两条消息到不同的分区:

Producer.send(records1);和producer.send(records2);

若此刻某些原因,网络出现瞬时抖动,导致records1发送失败,同时kafka又配置了重试机制,max.in.flight.requests.per.connection大于1(默认是5),这样会造成消息乱序,而实际场景很多情况需要包装按顺序消费。

所以这两个问题,kafka该如何规避呢?首先消息丢失很容易想到kafka的同步发送,但这样性能会很差,并不在实际场景中推荐使用。如何配置保证消息不会丢失呢?

Block.on.buffer.full = true
Acks=all 或者 -1
Retries=Integer.MAX_VALUE
Max.in.flight.request.per.connection=1

使用回调机制的send发送消息

CallBack逻辑中显式立即关闭producer,使用close(0)

Unclean.leader.election.enable=false
Replication.factor=3
Min.insync.replicas = 2
Replication.factor>min.insync.replicas
Enable.auto.commit=false


Producer端配置:

Block.on.buffer.full = true,实际上这个参数在kafka0.9.0版本已经被标记为deprecated的,并且使用max.block.ms替代,但还是推荐用户显示的设置它为true,使得内存缓冲区被填满时producer处于阻塞状态,并且停止接受新消息而不是抛出异常。否则producer生产速度过快会耗尽缓冲区,新版本0.10.0.0不用管这个参数,直接设置max.block.ms参数。


Acks = all很好理解,就是所有leader broker和副本replict里的follower都收到消息,才回复producer消息成功发送。


Retries=Integer.MAX_VALUE:这里设置无限大有点极端,想表达的是无线重试,但放心这里不会重试那些无法恢复的错误,只会重试那些可恢复的异常,所以可以放心的设置比较大的值,保证消息不会丢失。


max.in.flight.request.per.connection=1:设置为1防止消息在topic下乱序,这个设置的效果限制了producer在单个broker上连续发送的未响应请求数量。因此如果设置成1,则producer在某个broker发送响应之前将无法再给broker发送producer请求。


使用带回调的send,普通的send官方解释是fire and forget,只管把消息发出去,不管后续,如果发送失败,不会收到任何通知,这里肯定要带回调的send发送。


CallbackBack逻辑中显式处理立刻关闭producer:在calllback失败处逻辑立即使用kafkaProcuer.close(0),这样做的目的就是为了防止消息乱序问题。若不使用close关闭,默认情况下producer会被允许将未完成的消息发送出去,这样可能造成消息乱序。


Broker端配置:

Unclean.leader.election.eable = false:关闭unclean leader选举,即不允许非isr中的副本被选举成leader,从而避免broker端因为日志水位截断造成数据丢失。

Replication.factor>=3 :设置成3主要参考业界的三备份原则,强调多个副本才好。

Min.insync.replias>1:用于控制某条消息至少被写入ISR中多个副本才算成功,大于1代表提升持久性,只有在acks设置成-1或者all的时候才生效。

确保 replication.factors>min.insync.replicas :若两者相等,则只要有一个副本挂掉,则分区无法正常使用,虽然持久性很高,但可用性被降低,建议 replication.factory = min.insync.replicas + 1。

相关文章
|
2月前
|
消息中间件 负载均衡 Kafka
Kafka分区分配策略大揭秘:RoundRobin、Range、Sticky,你真的了解它们吗?
【8月更文挑战第24天】Kafka是一款突出高吞吐量、可扩展性和数据持久性的分布式流处理平台。其核心特性之一是分区分配策略,对于实现系统的负载均衡和高可用性至关重要。Kafka支持三种主要的分区分配策略:RoundRobin(轮询)、Range(范围)和Sticky(粘性)。RoundRobin策略通过轮询方式均衡分配分区;Range策略根据主题分区数和消费者数量分配;而Sticky策略则在保持原有分配的基础上动态调整,以确保各消费者负载均衡。理解这些策略有助于优化Kafka性能并满足不同业务场景需求。
152 59
|
9天前
|
消息中间件 监控 负载均衡
在Kafka中,进行主题的分区和复制
在Kafka中,进行主题的分区和复制
|
2月前
|
消息中间件 监控 算法
Kafka Producer 的性能优化技巧
【8月更文第29天】Apache Kafka 是一个分布式流处理平台,它以其高吞吐量、低延迟和可扩展性而闻名。对于 Kafka Producer 来说,正确的配置和编程实践可以显著提高其性能。本文将探讨一些关键的优化策略,并提供相应的代码示例。
68 1
|
2月前
|
消息中间件 大数据 Kafka
Kafka消息封装揭秘:从Producer到Consumer,一文掌握高效传输的秘诀!
【8月更文挑战第24天】在分布式消息队列领域,Apache Kafka因其实现的高吞吐量、良好的可扩展性和数据持久性备受开发者青睐。Kafka中的消息以Record形式存在,包括固定的头部与可变长度的消息体。生产者(Producer)将消息封装为`ProducerRecord`对象后发送;消费者(Consumer)则从Broker拉取并解析为`ConsumerRecord`。消息格式简化示意如下:消息头 + 键长度 + 键 + 值长度 + 值。键和值均为字节数组,需使用特定的序列化/反序列化器。理解Kafka的消息封装机制对于实现高效、可靠的数据传输至关重要。
51 4
|
2月前
|
消息中间件 监控 Java
【Kafka节点存活大揭秘】如何让Kafka集群时刻保持“心跳”?探索Broker、Producer和Consumer的生死关头!
【8月更文挑战第24天】在分布式系统如Apache Kafka中,确保节点的健康运行至关重要。Kafka通过Broker、Producer及Consumer间的交互实现这一目标。文章介绍Kafka如何监测节点活性,包括心跳机制、会话超时与故障转移策略。示例Java代码展示了Producer如何通过定期发送心跳维持与Broker的连接。合理配置这些机制能有效保障Kafka集群的稳定与高效运行。
38 2
|
2月前
|
消息中间件 Java Kafka
"Kafka快速上手:从环境搭建到Java Producer与Consumer实战,轻松掌握分布式流处理平台"
【8月更文挑战第10天】Apache Kafka作为分布式流处理平台的领头羊,凭借其高吞吐量、可扩展性和容错性,在大数据处理、实时日志收集及消息队列领域表现卓越。初学者需掌握Kafka基本概念与操作。Kafka的核心组件包括Producer(生产者)、Broker(服务器)和Consumer(消费者)。Producer发送消息到Topic,Broker负责存储与转发,Consumer则读取这些消息。首先确保已安装Java和Kafka,并启动服务。接着可通过命令行创建Topic,并使用提供的Java API实现Producer发送消息和Consumer读取消息的功能。
59 8
|
2月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
59 3
|
2月前
|
开发者 图形学 前端开发
绝招放送:彻底解锁Unity UI系统奥秘,五大步骤教你如何缔造令人惊叹的沉浸式游戏体验,从Canvas到动画,一步一个脚印走向大师级UI设计
【8月更文挑战第31天】随着游戏开发技术的进步,UI成为提升游戏体验的关键。本文探讨如何利用Unity的UI系统创建美观且功能丰富的界面,包括Canvas、UI元素及Event System的使用,并通过具体示例代码展示按钮点击事件及淡入淡出动画的实现过程,助力开发者打造沉浸式的游戏体验。
44 0
|
2月前
|
消息中间件 Java Kafka
【Azure 事件中心】开启 Apache Flink 制造者 Producer 示例代码中的日志输出 (连接 Azure Event Hub Kafka 终结点)
【Azure 事件中心】开启 Apache Flink 制造者 Producer 示例代码中的日志输出 (连接 Azure Event Hub Kafka 终结点)
下一篇
无影云桌面