# 同步还是异步发送消息,默认“sync”表同步,"async"表异步。异步可以提高发送吞吐量, 也意味着消息将会在本地buffer中,并适时批量发送,但是也可能导致丢失未发送过去的消息 producer.type=sync # 在async模式下,当message被缓存的时间超过此值后,将会批量发送给broker,默认为5000ms # 此值和batch.num.messages协同工作. queue.buffering.max.ms = 5000 # 在async模式下,producer端允许buffer的最大消息量 # 无论如何,producer都无法尽快的将消息发送给broker,从而导致消息在producer端大量沉积 # 此时,如果消息的条数达到阀值,将会导致producer端阻塞或者消息被抛弃,默认为10000 queue.buffering.max.messages=20000 # 如果是异步,指定每次批量发送数据量,默认为200 batch.num.messages=500 # 当消息在producer端沉积的条数达到"queue.buffering.max.meesages"后 # 阻塞一定时间后,队列仍然没有enqueue(producer仍然没有发送出任何消息) # 此时producer可以继续阻塞或者将消息抛弃,此timeout值用于控制"阻塞"的时间 # -1: 无阻塞超时限制,消息不会被抛弃 # 0:立即清空队列,消息被抛弃 queue.enqueue.timeout.ms=-1 # 当producer接收到error ACK,或者没有接收到ACK时,允许消息重发的次数 # 因为broker并没有完整的机制来避免消息重复,所以当网络异常时(比如ACK丢失) # 有可能导致broker接收到重复的消息,默认值为3. message.send.max.retries=3 # producer刷新topic metada的时间间隔,producer需要知道partition leader的位置,以及当前topic的情况 # 因此producer需要一个机制来获取最新的metadata,当producer遇到特定错误时,将会立即刷新 # (比如topic失效,partition丢失,leader失效等),此外也可以通过此参数来配置额外的刷新机制,默认值600000 topic.metadata.refresh.interval.ms=60000
consumer消费者配置详细说明:
# zookeeper连接服务器地址 zookeeper.connect=zk01:2181,zk02:2181,zk03:2181 # zookeeper的session过期时间,默认5000ms,用于检测消费者是否挂掉 zookeeper.session.timeout.ms=5000 #当消费者挂掉,其他消费者要等该指定时间才能检查到并且触发重新负载均衡 zookeeper.connection.timeout.ms=10000 # 指定多久消费者更新offset到zookeeper中。注意offset更新时基于time而不是每次获得的消息。一旦在更新zookeeper发生异常并重启,将可能拿到已拿到过的消息 zookeeper.sync.time.ms=2000 #指定消费 group.id=itcast # 当consumer消费一定量的消息之后,将会自动向zookeeper提交offset信息 # 注意offset信息并不是每消费一次消息就向zk提交一次,而是现在本地保存(内存),并定期提交,默认为true auto.commit.enable=true # 自动更新时间。默认60 * 1000 auto.commit.interval.ms=1000 # 当前consumer的标识,可以设定,也可以有系统生成,主要用来跟踪消息消费情况,便于观察 conusmer.id=xxx # 消费者客户端编号,用于区分不同客户端,默认客户端程序自动产生 client.id=xxxx # 最大取多少块缓存到消费者(默认10) queued.max.message.chunks=50 # 当有新的consumer加入到group时,将会reblance,此后将会有partitions的消费端迁移到新 的consumer上,如果一个consumer获得了某个partition的消费权限,那么它将会向zk注册 "Partition Owner registry"节点信息,但是有可能此时旧的consumer尚没有释放此节点, 此值用于控制,注册节点的重试次数. rebalance.max.retries=5 # 获取消息的最大尺寸,broker不会像consumer输出大于此值的消息chunk 每次feth将得到多条消息,此值为总大小,提升此值,将会消耗更多的consumer端内存 fetch.min.bytes=6553600 # 当消息的尺寸不足时,server阻塞的时间,如果超时,消息将立即发送给consumer fetch.wait.max.ms=5000 socket.receive.buffer.bytes=655360 # 如果zookeeper没有offset值或offset值超出范围。那么就给个初始的offset。有smallest、largest、anything可选,分别表示给当前最小的offset、当前最大的offset、抛异常。默认largest auto.offset.reset=smallest # 指定序列化处理类 derializer.class=kafka.serializer.DefaultDecoder
八、CAP理论
1. 分布式系统当中的CAP理论
分布式系统(distributed system)正变得越来越重要,大型网站几乎都是分布式的。
分布式系统的最大难点,就是各个节点的状态如何同步。
为了解决各个节点之间的状态同步问题,在1998年,由加州大学的计算机科学家 Eric Brewer 提出分布式系统的三个指标,分别是:
- Consistency:一致性
- Availability:可用性
- Partition tolerance:分区容错性
Eric Brewer 说,这三个指标不可能同时做到。最多只能同时满足其中两个条件,这个结论就叫做 CAP 定理。
CAP理论是指:分布式系统中,一致性、可用性和分区容忍性最多只能同时满足两个。
一致性:Consistency
- 通过某个节点的写操作结果对后面通过其它节点的读操作可见
- 如果更新数据后,并发访问情况下后续读操作可立即感知该更新,称为强一致性
- 如果允许之后部分或者全部感知不到该更新,称为弱一致性
- 若在之后的一段时间(通常该时间不固定)后,一定可以感知到该更新,称为最终一致性
可用性:Availability
- 任何一个没有发生故障的节点必须在有限的时间内返回合理的结果
分区容错性:Partition tolerance
- 部分节点宕机或者无法与其它节点通信时,各分区间还可保持分布式系统的功能
一般而言,都要求保证分区容忍性。所以在CAP理论下,更多的是需要在可用性和一致性之间做权衡。
2. Partition tolerance
先看 Partition tolerance,中文叫做"分区容错"。
大多数分布式系统都分布在多个子网络。每个子网络就叫做一个区(partition)。分区容错的意思是,区间通信可能失败。比如,一台服务器放在中国,另一台服务器放在美国,这就是两个区,它们之间可能无法通信。
上图中,G1 和 G2 是两台跨区的服务器。G1 向 G2 发送一条消息,G2 可能无法收到。系统设计的时候,必须考虑到这种情况。
一般来说,分区容错无法避免,因此可以认为 CAP 的 P 总是存在的。即永远可能存在分区容错这个问题
3. Consistency
Consistency 中文叫做"一致性"。意思是,写操作之后的读操作,必须返回该值。举例来说,某条记录是 v0,用户向 G1 发起一个写操作,将其改为 v1。
接下来,用户的读操作就会得到 v1。这就叫一致性。
问题是,用户有可能向 G2 发起读操作,由于 G2 的值没有发生变化,因此返回的是 v0。G1 和 G2 读操作的结果不一致,这就不满足一致性了。
为了让 G2 也能变为 v1,就要在 G1 写操作的时候,让 G1 向 G2 发送一条消息,要求 G2 也改成 v1。
这样的话,用户向 G2 发起读操作,也能得到 v1。
4. Availability
Availability 中文叫做"可用性",意思是只要收到用户的请求,服务器就必须给出回应。
用户可以选择向 G1 或 G2 发起读操作。不管是哪台服务器,只要收到请求,就必须告诉用户,到底是 v0 还是 v1,否则就不满足可用性。
九、Kafka中的CAP机制
kafka是一个分布式的消息队列系统,既然是一个分布式的系统,那么就一定满足CAP定律,那么在kafka当中是如何遵循CAP定律的呢?kafka满足CAP定律当中的哪两个呢?
kafka满足的是CAP定律当中的CA,其中Partition tolerance通过的是一定的机制尽量的保证分区容错性。
其中C表示的是数据一致性。A表示数据可用性。
kafka首先将数据写入到不同的分区里面去,每个分区又可能有好多个副本,数据首先写入到leader分区里面去,读写的操作都是与leader分区进行通信,保证了数据的一致性原则,也就是满足了Consistency原则。然后kafka通过分区副本机制,来保证了kafka当中数据的可用性。但是也存在另外一个问题,就是副本分区当中的数据与leader当中的数据存在差别的问题如何解决,这个就是Partition tolerance的问题。
kafka为了解决Partition tolerance的问题,使用了ISR的同步策略,来尽最大可能减少Partition tolerance的问题。
每个leader会维护一个ISR(a set of in-sync replicas,基本同步)列表。
ISR列表主要的作用就是决定哪些副本分区是可用的,也就是说可以将leader分区里面的数据同步到副本分区里面去,决定一个副本分区是否可用的条件有两个:
- replica.lag.time.max.ms=10000 副本分区与主分区心跳时间延迟
- replica.lag.max.messages=4000 副本分区与主分区消息同步最大差
produce 请求被认为完成时的确认值:request.required.acks=0
。
- ack=0:producer不等待broker同步完成的确认,继续发送下一条(批)信息。
- ack=1(默认):producer要等待leader成功收到数据并得到确认,才发送下一条message。
- ack=-1:producer得到follwer确认,才发送下一条数据。
十、Kafka监控及运维
在开发工作中,消费在Kafka集群中消息,数据变化是我们关注的问题,当业务前提不复杂时,我们可以使用Kafka 命令提供带有Zookeeper客户端工具的工具,可以轻松完成我们的工作。随着业务的复杂性,增加Group和 Topic,那么我们使用Kafka提供命令工具,已经感到无能为力,那么Kafka监控系统目前尤为重要,我们需要观察 消费者应用的细节。
1. kafka-eagle概述
为了简化开发者和服务工程师维护Kafka集群的工作有一个监控管理工具,叫做 Kafka-eagle。这个管理工具可以很容易地发现分布在集群中的哪些topic分布不均匀,或者是分区在整个集群分布不均匀的的情况。它支持管理多个集群、选择副本、副本重新分配以及创建Topic。同时,这个管理工具也是一个非常好的可以快速浏览这个集群的工具,
2. 环境和安装
1. 环境要求
需要安装jdk,启动zk以及kafka的服务
2. 安装步骤
- 下载源码包
kafka-eagle官网:
http://download.kafka-eagle.org/
我们可以从官网上面直接下载最细的安装包即可kafka-eagle-bin-1.3.2.tar.gz这个版本即可
代码托管地址:
https://github.com/smartloli/kafka-eagle/releases
- 解压
这里我们选择将kafak-eagle安装在第三台。
直接将kafka-eagle安装包上传到node03服务器的/export/softwares路径下,然后进行解压
node03服务器执行一下命令进行解压。
- 准备数据库
kafka-eagle需要使用一个数据库来保存一些元数据信息,我们这里直接使用msyql数据库来保存即可,在node03服务器执行以下命令创建一个mysql数据库即可。
进入mysql客户端:
create database eagle;
- 修改kafak-eagle配置文件
执行以下命令修改kafak-eagle配置文件:
vim system-config.properties
修改为如下:
kafka.eagle.zk.cluster.alias=cluster1,cluster2 cluster1.zk.list=node01:2181,node02:2181,node03:2181 cluster2.zk.list=node01:2181,node02:2181,node03:2181 kafka.eagle.driver=com.mysql.jdbc.Driver kafka.eagle.url=jdbc:mysql://node03:3306/eagle kafka.eagle.username=root kafka.eagle.password=123456
- 配置环境变量
kafka-eagle必须配置环境变量,node03服务器执行以下命令来进行配置环境变量: vim /etc/profile
:
export KE_HOME=/opt//kafka-eagle-bin-1.3.2/kafka-eagle-web-1.3.2 export PATH=:$KE_HOME/bin:$PATH
修改立即生效,执行: source /etc/profile
- 启动kafka-eagle
执行以下界面启动kafka-eagle:
cd kafka-eagle-web-1.3.2/bin chmod u+x ke.sh ./ke.sh start
- 主界面
访问kafka-eagle
http://node03:8048/ke/account/signin?/ke/
用户名:admin
密码:123456
十一、Kafka大厂面试题
1. 为什么要使用 kafka?
- 缓冲和削峰:上游数据时有突发流量,下游可能扛不住,或者下游没有足够多的机器来保证冗余,kafka在中间可以起到一个缓冲的作用,把消息暂存在kafka中,下游服务就可以按照自己的节奏进行慢慢处理。
- 解耦和扩展性:项目开始的时候,并不能确定具体需求。消息队列可以作为一个接口层,解耦重要的业务流程。只需要遵守约定,针对数据编程即可获取扩展能力。
- 冗余:可以采用一对多的方式,一个生产者发布消息,可以被多个订阅topic的服务消费到,供多个毫无关联的业务使用。
- 健壮性:消息队列可以堆积请求,所以消费端业务即使短时间死掉,也不会影响主要业务的正常进行。
- 异步通信:很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
2. Kafka消费过的消息如何再消费?
kafka消费消息的offset是定义在zookeeper中的, 如果想重复消费kafka的消息,可以在redis中自己记录offset的checkpoint点(n个),当想重复消费消息时,通过读取redis中的checkpoint点进行zookeeper的offset重设,这样就可以达到重复消费消息的目的了
3. kafka的数据是放在磁盘上还是内存上,为什么速度会快?
kafka使用的是磁盘存储。
速度快是因为:
- 顺序写入:因为硬盘是机械结构,每次读写都会寻址->写入,其中寻址是一个“机械动作”,它是耗时的。所以硬盘 “讨厌”随机I/O, 喜欢顺序I/O。为了提高读写硬盘的速度,Kafka就是使用顺序I/O。
- Memory Mapped Files(内存映射文件):64位操作系统中一般可以表示20G的数据文件,它的工作原理是直接利用操作系统的Page来实现文件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上。
- Kafka高效文件存储设计: Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。通过索引信息可以快速定位
message和确定response的 大 小。通过index元数据全部映射到memory(内存映射文件),
可以避免segment file的IO磁盘操作。通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小。
注:
- Kafka解决查询效率的手段之一是将数据文件分段,比如有100条Message,它们的offset是从0到99。假设将数据文件分成5段,第一段为0-19,第二段为20-39,以此类推,每段放在一个单独的数据文件里面,数据文件以该段中 小的offset命名。这样在查找指定offset的
Message的时候,用二分查找就可以定位到该Message在哪个段中。
- 为数据文件建 索引数据文件分段 使得可以在一个较小的数据文件中查找对应offset的Message 了,但是这依然需要顺序扫描才能找到对应offset的Message。
为了进一步提高查找的效率,Kafka为每个分段后的数据文件建立了索引文件,文件名与数据文件的名字是一样的,只是文件扩展名为.index。
4. Kafka数据怎么保障不丢失?
分三个点说,一个是生产者端,一个消费者端,一个broker端。
- 生产者数据的不丢失
kafka的ack机制:在kafka发送数据的时候,每次发送消息都会有一个确认反馈机制,确保消息正常的能够被收到,其中状态有0,1,-1。
如果是同步模式:
ack设置为0,风险很大,一般不建议设置为0。即使设置为1,也会随着leader宕机丢失数据。所以如果要严格保证生产端数据不丢失,可设置为-1。
如果是异步模式:
也会考虑ack的状态,除此之外,异步模式下的有个buffer,通过buffer来进行控制数据的发送,有两个值来进行控制,时间阈值与消息的数量阈值,如果buffer满了数据还没有发送出去,有个选项是配置是否立即清空buffer。可以设置为-1,永久阻塞,也就数据不再生产。异步模式下,即使设置为-1。也可能因为程序员的不科学操作,操作数据丢失,比如kill -9,但这是特别的例外情况。
注:
ack=0:producer不等待broker同步完成的确认,继续发送下一条(批)信息。
ack=1(默认):producer要等待leader成功收到数据并得到确认,才发送下一条message。
ack=-1:producer得到follwer确认,才发送下一条数据。
- 消费者数据的不丢失
通过offset commit 来保证数据的不丢失,kafka自己记录了每次消费的offset数值,下次继续消费的时候,会接着上次的offset进行消费。
而offset的信息在kafka0.8版本之前保存在zookeeper中,在0.8版本之后保存到topic中,即使消费者在运行过程中挂掉了,再次启动的时候会找到offset的值,找到之前消费消息的位置,接着消费,由于 offset 的信息写入的时候并不是每条消息消费完成后都写入的,所以这种情况有可能会造成重复消费,但是不会丢失消息。
唯一例外的情况是,我们在程序中给原本做不同功能的两个consumer组设置
KafkaSpoutConfig.bulider.setGroupid的时候设置成了一样的groupid,这种情况会导致这两个组共享同一份数据,就会产生组A消费partition1,partition2中的消息,组B消费partition3的消息,这样每个组消费的消息都会丢失,都是不完整的。 为了保证每个组都独享一份消息数据,groupid一定不要重复才行。
- kafka集群中的broker的数据不丢失
每个broker中的partition我们一般都会设置有replication(副本)的个数,生产者写入的时候首先根据分发策略(有partition按partition,有key按key,都没有轮询)写入到leader中,follower(副本)再跟leader同步数据,这样有了备份,也可以保证消息数据的不丢失。
5. 采集数据为什么选择kafka?
采集层 主要可以使用Flume, Kafka等技术。
Flume:Flume 是管道流方式,提供了很多的默认实现,让用户通过参数部署,及扩展API.
Kafka:Kafka是一个可持久化的分布式的消息队列。 Kafka 是一个非常通用的系统。你可以有许多生产者和很多的消费者共享多个主题Topics。
相比之下,Flume是一个专用工具被设计为旨在往HDFS,HBase发送数据。它对HDFS有特殊的优化,并且集成了Hadoop的安全特性。
所以,Cloudera 建议如果数据被多个系统消费的话,使用kafka;如果数据被设计给Hadoop使用,使用Flume。
6. kafka 重启是否会导致数据丢失?
- kafka是将数据写到磁盘的,一般数据不会丢失。
- 但是在重启kafka过程中,如果有消费者消费消息,那么kafka如果来不及提交offset,可能会造成数据的不准确(丢失或者重复消费)。
7. kafka 宕机了如何解决?
- 先考虑业务是否受到影响
kafka 宕机了,首先我们考虑的问题应该是所提供的服务是否因为宕机的机器而受到影响,如果服务提供没问题,如果实现做好了集群的容灾机制,那么这块就不用担心了。
- 节点排错与恢复
想要恢复集群的节点,主要的步骤就是通过日志分析来查看节点宕机的原因,从而解决,重新恢复节点。
8. 为什么Kafka不支持读写分离?
在 Kafka 中,生产者写入消息、消费者读取消息的操作都是与 leader 副本进行交互的,从 而实现的是一种主写主读的生产消费模型。
Kafka 并不支持主写从读,因为主写从读有 2 个很明显的缺点:
- 数据一致性问题:数据从主节点转到从节点必然会有一个延时的时间窗口,这个时间 窗口会导致主从节点之间的数据不一致。某一时刻,在主节点和从节点中 A 数据的值都为 X, 之后将主节点中 A 的值修改为 Y,那么在这个变更通知到从节点之前,应用读取从节点中的 A 数据的值并不为最新的 Y,由此便产生了数据不一致的问题。
- 延时问题:类似 Redis 这种组件,数据从写入主节点到同步至从节点中的过程需要经历 网络→主节点内存→网络→从节点内存 这几个阶段,整个过程会耗费一定的时间。而在 Kafka 中,主从同步会比 Redis 更加耗时,它需要经历 网络→主节点内存→主节点磁盘→网络→从节 点内存→从节点磁盘 这几个阶段。对延时敏感的应用而言,主写从读的功能并不太适用。
而kafka的主写主读的优点就很多了:
- 可以简化代码的实现逻辑,减少出错的可能;
- 将负载粒度细化均摊,与主写从读相比,不仅负载效能更好,而且对用户可控;
- 没有延时的影响;
- 在副本稳定的情况下,不会出现数据不一致的情况。
9. kafka数据分区和消费者的关系?
每个分区只能由同一个消费组内的一个消费者(consumer)来消费,可以由不同的消费组的消费者来消费,同组的消费者则起到并发的效果。
10. kafka的数据offset读取流程
- 连接ZK集群,从ZK中拿到对应topic的partition信息和partition的Leader的相关信息
- 连接到对应Leader对应的broker
- consumer将⾃自⼰己保存的offset发送给Leader
- Leader根据offset等信息定位到segment(索引⽂文件和⽇日志⽂文件)
- 根据索引⽂文件中的内容,定位到⽇日志⽂文件中该偏移量量对应的开始位置读取相应⻓长度的数据并返回给consumer
11. kafka内部如何保证顺序,结合外部组件如何保证消费者的顺序?
kafka只能保证partition内是有序的,但是partition间的有序是没办法的。爱奇艺的搜索架构,是从业务上把需要有序的打到同⼀个partition。
12. Kafka消息数据积压,Kafka消费能力不足怎么处理?
- 如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数=分区数。(两者缺一不可)
- 如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间<生产速度),使处理的数据小于生产的数据,也会造成数据积压。
13. Kafka单条日志传输大小
kafka对于消息体的大小默认为单条最大值是1M但是在我们应用场景中, 常常会出现一条消息大于1M,如果不对kafka进行配置。则会出现生产者无法将消息推送到kafka或消费者无法去消费kafka里面的数据, 这时我们就要对kafka进行以下配置:server.properties
replica.fetch.max.bytes: 1048576 broker可复制的消息的最大字节数, 默认为1M message.max.bytes: 1000012 kafka 会接收单个消息size的最大限制, 默认为1M左右
注意:message.max.bytes必须小于等于replica.fetch.max.bytes,否则就会导致replica之间数据同步失败。