大数据-57 Kafka 高级特性 消息发送相关01-基本流程与原理剖析

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据-57 Kafka 高级特性 消息发送相关01-基本流程与原理剖析

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(正在更新…)

章节内容

上节我们完成了如下的内容:


Java添加POM依赖

Java操作Kafka的API、SpringBoot

实现对Kafka消息发送和消息消费

基本流程

Producer创建时,会创建一个Sender线程并设置为守护进程

生产消息时,内部其实是异步流程,生产的消息先经过拦截器->序列化器->分区器,然后将消息缓存在缓冲区(该缓冲区也是在Producer创建时创建)

批次发送的条件是:缓冲区的大小达到batch.size或者linger.ms达到上限,哪个先到达就算哪个

批次发送后,发往指定的分区后,然后落盘到broker

如果生产者配置了retrires参数大于0并且失败原因允许重试,那么客户端内部会对该消息进行重试

落盘到Broker成功 返回生产元数据给生产者

元数据返回的两种方式:一种是通过阻塞直接返回,另一种是通过回调返回

Broker配置

这里是Broker的常见配置:

bootstrap.servers

生产者客户端与broker集群建立初始链接需要Broker的地址列表,由该初始连接发现Kafka集群中其他的所有Broker,该地址列表不需要写全部的Kafka集群地址,但也不要只写一个防止宕机不可用。


key.serializer

实现了 org.apache.kafka.common.serialization.Serializer 的key序列化类


value.serializer

实现了 org.apache.kafka.common.serialization.Serializer的value序列化类


acks

该项控制着已发消息的持久性。


acks=0,生产者不等待Broker的任何消息确认。

acks=1,Leader将记录写到它本地的地址,就相应客户端的消息,而不等待Follower的副本的确认。

acks=all,Leader等待所有有同步副本消息的确认,保证了只要有一个同步副本存在,消息就不会丢失。

acks=-1,等价于 acks=all

默认值为1

compression.type

生产者生成数据的压缩格式,默认是none(无压缩)。

可选:


none

gzip

snappy

lz4

默认是none


Broker配置补充

额外的配置还有下图的这些内容:

retry.backoff.ms

在向一个指定的主题分区重发消息的时候,重试之间的等待时间。

比如三次重试,每次重试之后等待时间长度,再接着重试。

long型 默认 100


retries

retries 重试次数


当消息发送出现错误的时候,系统会重新发送消息,跟客户端收到错误重新发送一样。

如果设置了重试,还要保证消息有序,则需要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1

request.timeout.ms

客户端等待请求响应时长,如果服务器端响超时,则会请求重试,除非达到重试次数。

设置应该要大于:replica.lag.time.max.ms,以免服务器延迟时间内重发消息。


int型 默认 30000


interceptor.classes

在生产者接收到该消息,向Kafka集群传输之前,由序列化处理之前,可以通过拦截器对消息进行处理。


要求实现:org.apache.kafka.clients.producer.ProducerInterceptor 接口

Map[String, Object] configs 中通过 List集合配置多个拦截器

默认没有拦截器

acks

同上,不介绍了。


batch.size

当多个消息发送到同一个分区时候,生产者尝试将多个记录作为一个批处理,批处理提高了客户端和副武器的处理效率。该配置项以字节为单位控制默认批的大小。


所有批小于等于该值

发送给Broker的请求将包含多个批次,每个分区一个,并包含可发送的数据

如果该值设置的较小,会限制吞吐量(设置为0会完全关闭批处理);若很大则会浪费内存

client.id

生产者发送请求的时候传递给Broker的id字符串

用于在Broker的请求日志中追踪什么应用发送什么消息

一般该ID跟业务有关的字符串

compression.type

同上,不介绍了。


send.buffer.bytes

TCP发送数据的时候用的缓冲区的大小,若设置为0,则用操作系统默认的。


buffer.memory

生产者可以用来缓存等待发送到服务器的记录的总内存字节,如果记录的发送速度超过了将记录发送到服务器的速度,则生产者将阻塞 max.block.ms 的时间,此后将引发异常。

此设置应大致对应于生产者将使用的总内存,但并非生产者使用的所有内存都用于缓冲。


connections.max.idle.ms

当连接空闲时间达到这个值,就关闭连接。

long型 默认 540000


linger.ms

生产者发送请求传输间隔会对需要发送的消息进行累积,然后作为一个批次发送,一般情况是消息的发送速度比消息积累的速度要慢。

有时候客户端需要减少请求次数,即使在负载不大的情况下。该配置设置了一个延迟,生产者消息不会立即将消息送到Broker,而是等待这么一段时间以积累消息,然后将这段消息之类的消息作为一个批次发送,该设置是批处理的另一个上限,一旦此消息达到了batch.size指定的值,消息批会立即发送,如果积累的消息字节数达不到batch.size的值,可以设置该毫秒值,等待这么长时间之后,也会发送消息批。

默认值是0


max.block.ms

控制KafkaProducer.send()和KafkaProducer.partitionFor()阻塞时长,当缓存满了或元数据不可用的时候,这些方法阻塞。在用户提供的序列化器和分区器的阻塞时间不计入。

long型值,默认60000


max.request.size

单个请求的最大字节数,该设置会限制单个请求总消息批的个数,以免单个请求发送太多的数据,服务器有自己的限制批大小的设置,与该配置可能不一样

int 型 默认 1048576


partitioner.class

实现了接口 org.apache.kafka.clients.producer.Partitioner 的分区器实现类。默认值:org.apache.kafka.clients.producer.internals.DefaultPartitioner


receive.buffer.bytes

TCP接收缓存(SO_RECVBUF),设置为01,则使用操作系统默认的值。

int型 默认32768


security.protocol

跟 Broker 通信的协议:PLAINTEXT、SSL、SASL_PLAINTEXT、ASAL_SSL

String型 默认 PLAINTEXT


max.in.flight.requests.per.connection

单个连接上未确认请求的最大数量,达到这个数量,客户端阻塞。

如果该值大于1,则存在失败的请求,在重试的时候消息顺序不能保证。

int型 默认5


reconnect.backoff.max.ms

对于每个连续的连接失败,每台主机退避将成倍增加,直到达到此最大值。


reconnect.backoff.ms

尝试重连指定主机的基础等待时间,避免该主机的密集重连。


相关文章
|
26天前
|
弹性计算 人工智能 架构师
阿里云携手Altair共拓云上工业仿真新机遇
2024年9月12日,「2024 Altair 技术大会杭州站」成功召开,阿里云弹性计算产品运营与生态负责人何川,与Altair中国技术总监赵阳在会上联合发布了最新的“云上CAE一体机”。
阿里云携手Altair共拓云上工业仿真新机遇
|
2天前
|
人工智能 Rust Java
10月更文挑战赛火热启动,坚持热爱坚持创作!
开发者社区10月更文挑战,寻找热爱技术内容创作的你,欢迎来创作!
308 14
|
18天前
|
存储 关系型数据库 分布式数据库
GraphRAG:基于PolarDB+通义千问+LangChain的知识图谱+大模型最佳实践
本文介绍了如何使用PolarDB、通义千问和LangChain搭建GraphRAG系统,结合知识图谱和向量检索提升问答质量。通过实例展示了单独使用向量检索和图检索的局限性,并通过图+向量联合搜索增强了问答准确性。PolarDB支持AGE图引擎和pgvector插件,实现图数据和向量数据的统一存储与检索,提升了RAG系统的性能和效果。
|
5天前
|
JSON 自然语言处理 数据管理
阿里云百炼产品月刊【2024年9月】
阿里云百炼产品月刊【2024年9月】,涵盖本月产品和功能发布、活动,应用实践等内容,帮助您快速了解阿里云百炼产品的最新动态。
阿里云百炼产品月刊【2024年9月】
|
20天前
|
人工智能 IDE 程序员
期盼已久!通义灵码 AI 程序员开启邀测,全流程开发仅用几分钟
在云栖大会上,阿里云云原生应用平台负责人丁宇宣布,「通义灵码」完成全面升级,并正式发布 AI 程序员。
|
22天前
|
机器学习/深度学习 算法 大数据
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
2024“华为杯”数学建模竞赛,对ABCDEF每个题进行详细的分析,涵盖风电场功率优化、WLAN网络吞吐量、磁性元件损耗建模、地理环境问题、高速公路应急车道启用和X射线脉冲星建模等多领域问题,解析了问题类型、专业和技能的需要。
2584 22
【BetterBench博士】2024 “华为杯”第二十一届中国研究生数学建模竞赛 选题分析
|
4天前
|
存储 人工智能 搜索推荐
数据治理,是时候打破刻板印象了
瓴羊智能数据建设与治理产品Datapin全面升级,可演进扩展的数据架构体系为企业数据治理预留发展空间,推出敏捷版用以解决企业数据量不大但需构建数据的场景问题,基于大模型打造的DataAgent更是为企业用好数据资产提供了便利。
177 2
|
2天前
|
编译器 C#
C#多态概述:通过继承实现的不同对象调用相同的方法,表现出不同的行为
C#多态概述:通过继承实现的不同对象调用相同的方法,表现出不同的行为
102 65
|
6天前
|
Linux 虚拟化 开发者
一键将CentOs的yum源更换为国内阿里yum源
一键将CentOs的yum源更换为国内阿里yum源
283 2
|
22天前
|
机器学习/深度学习 算法 数据可视化
【BetterBench博士】2024年中国研究生数学建模竞赛 C题:数据驱动下磁性元件的磁芯损耗建模 问题分析、数学模型、python 代码
2024年中国研究生数学建模竞赛C题聚焦磁性元件磁芯损耗建模。题目背景介绍了电能变换技术的发展与应用,强调磁性元件在功率变换器中的重要性。磁芯损耗受多种因素影响,现有模型难以精确预测。题目要求通过数据分析建立高精度磁芯损耗模型。具体任务包括励磁波形分类、修正斯坦麦茨方程、分析影响因素、构建预测模型及优化设计条件。涉及数据预处理、特征提取、机器学习及优化算法等技术。适合电气、材料、计算机等多个专业学生参与。
1580 16
【BetterBench博士】2024年中国研究生数学建模竞赛 C题:数据驱动下磁性元件的磁芯损耗建模 问题分析、数学模型、python 代码