12.Consumer消费者机制和分区策略
12.1.消费者拉取数据机制
消费者为什么是从broker中pull数据,而不是broker主动push给消费者呢
- 消费者采用pull方式拉取,从broker的partition获取数据。
- pull模式则可以根据consumer的消费能力进行自行调节拉取消息的多少,不同的消费者性能不一样。
- 如果broker没有消息,consumer可以配置timeout时间,阻塞一段时间之后在返回。
如果是broker主动push的话,优点是可以快速处理,因为消费者pull模式,可能会造成生产者已经把消息投递到broker,但是消费者没有及时的取pull,但是push容易造成消费者处理不过来,消息堆积和延迟。
12.2.消费者从那个分区进行消费
一个topic有多个partition,一个消费者组里面又多个消费者,那是怎么分配的?
一个主题topic可以有多个消费者,因为里面有多个partition分区(leader分区)
一个partition Leader可以由一个消费者组里的一个消费者消费
12.3.消费者消费的分区策略
顶层接口
org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor
round-robin(RoundRobinAssignor非默认策略)轮询策略
- 按照消费者组进行轮询分配,同个消费者组监听不同主题也是一样,是把所有的partition和所有consumer都列出来,所以消费者组里面订阅的主题是一样才行,主题不一样则会出现分配不均匀的问题。
- 例如7个分区,同组内2个消费者,你一个我一个分配
c-1:topic-0、topic-2、topic-4、topic-6 c-2:topic-1、topic-3、topic-5
弊端
如果同一消费者组内,所有订阅的消息是不同的,在执行分区分配的时候不是轮询分配,可能会导致分区配置不均匀
- range(RangeAssignor默认策略)范围策略
- 按照主题进行分配,如果不平均分配,则第一个消费者会分配比较多的分区,一个消费者监听不同主题也不影响
- 例如7个分区,同组内2个消费者,全部主题平均分配
c-1:topic-0、topic-1、topic-2、topic-3 c-2:topic-4、topic-5、topic-6
弊端
只针对一个topic而言,c-1多消费一个分区影响不大,但是如果N多个Topic,那么针对每个Topic,消费者c-1每次都要多分一个分区,那这会造成负载过大,影响性能
12.4.什么是Rebalance操作
- kafka怎么均匀的分配某个topic下的所有partition到各个消费者,从而使得消息的消费速度达到最快,就是平衡(balance),前面讲了Range范围分区和RoundRobin轮询策略,也支持自定义分区策略。
- 而rebalance(重平衡)其实就是重新进行partition的分配,从而使得partition的分配重新达到平衡状态
12.5.Rebalance触发的机制
- 当消费者组内消费者数量发生变化时(增加或者减少),就会产生重新分配partition
- 分区数量发生变化时(即topic分区的数量发生变化时)
12.6.容灾消费机制
当消费者在消费过程中突然宕机了,重新恢复服务时从那里消费的?
消费者会记录offset,故障恢复后会从这里继续消费,之前版本会存在zookeeper或者本地里,新版默认存在kafka内置的topic中,名称是_consumer_offsets。
该topic默认有50个Partition,每个Partition有三个副本,分区数量可以由offset.topic.num.partition进行配置
- 通过groupId的哈希值和该参数取模的方式来确定某个消费者组已消费的offset保存到_consumer_offsets主题的那个分区中
- 由消费者组名+主题+分区,确定唯一的offset的key,从而获取对应的key
- 三元组:group.id+topic+分区号,而value就是offset的值
13.Consumer配置和Kafka调试日志配置
13.1.配置日志级别
#yum配置文件修改 logging: config: classpath:logback.log
<configuration> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <!-- 格式化输出: %d表示日期, %thread表示线程名, %-5level: 级别从左显示5个字符宽度 %msg:日志消息, %n是换行符 --> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}[%thread] %-5level %logger{50} - %msg%n</pattern> </encoder> </appender> <!--log日志的级别--> <root level="info"> <appender-ref ref="STDOUT" /> </root> </configuration>
13.2.消费者配置
#消费者组id,分组内的消费者只能消费该消息一次,不同的组可以重复此消息 group.id #为true的时候自动提交偏移量 enable.auto.commit #自动提交offset周期 auto.commit.interval.ms #重置消费便宜量策略,消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(消费者长时间读取一个无效的偏移量记录)该怎么处理 #默认是latest,如果需要从头消费,则需要消费者组名变更才可以 auto.offset.reset #序列化器,反序列化机制,生产者序列化,消费者反序列化 key.deserializer
13.3.Kafka消费者Consumer消息配置实战
配置
public static Properties getProperties() { Properties props = new Properties(); //kafka服务器broker地址 props.put("bootstrap.servers", "8.140.116.67:9092"); //消费者组id,同组消费者只能一个消费者消费,不同组可以重复消费消息 props.put("group.id", "xdclass-g1"); //默认是latest,从最新的消息开始消费,earliest是从头开始消费,但是还要改消费者组 props.put("auto.offset.reset","earliest"); //是否自动提交偏移量,一般不自动 props.put("enable.auto.commit", "false"); //自动提交offset延迟时间 props.put("auto.commit.interval.ms", "1000"); //反序列化机制 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return props; }
消费者订阅
订阅主题api:subscribe(topic主题集合) 拉取消息api:poll(Duration.ofMillis(500))
@Test public void simpleConsumerTest(){ Properties props = getProperties(); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //订阅topic主题 consumer.subscribe(Arrays.asList(KafkaProducerTest.TOPIC_NAME)); while (true) { //拉取时间控制,阻塞超时时间,当拉取topic里没有消息时,消费者阻塞500毫秒 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500)); for (ConsumerRecord<String, String> record : records) { System.err.printf("topic = %s, offset = %d, key = %s, value = %s%n",record.topic(), record.offset(), record.key(), record.value()); } } }
13.4.Consumer手工提交offset配置和从头消费配置
配置从头消费partition消息
auto.offset.reset配置策略即可
默认是latest,需要改为earliest且消费者组名变更,即可实现从头消费
//默认是latest,如果需要从头消费partition消息,需要改为earliest,且变更消费者组名,才生效 props.put("auto.offset.reset","earliest");
配置手动提交
- 自动提交的问题
- 没法控制消息是否正常被消费
- 适合非常严谨的场景,比如日志收集发送
- 手工提交offset配置
- 同步commitSync阻塞当前线程(自动失败重试)
- 异步commitAsync不会阻塞当前线程(没有失败重试,回调callback函数获取提交信息,记录日志等)
@Test public void simpleConsumerTest(){ Properties properties = getProperties(); KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(properties); //订阅主题 kafkaConsumer.subscribe(Arrays.asList(KafkaProducerTest.TOPIC_NAME)); while(true){ ConsumerRecords<String,String> poll = kafkaConsumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : poll) { System.out.println("topic:"+record.topic()+"--"+"key:"+record.key()+"--"+"offset:"+record.offset()+"--"+"value:"+record.value()); } //同步阻塞提交 //kafkaConsumer.commitSync(); if(!poll.isEmpty()){ //异步提交offset kafkaConsumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if(exception == null){ System.out.println("手工提交offset成功"); }else{ System.out.println("手工提交offset失败"); exception.printStackTrace(); } } }); } } }
14.kafka数据存储流程
14.1.kafka数据存储流程
kafka采用了分片和索引机制,将每个partition分为多个segment,每个segment对应2个文件log和index
index文件中并没有为每一条message简历索引,采用了稀疏存储的方式 每隔一定字节的数据建立一条索引,避免了索引文件占用过多的空间和资源,从而可以将索引文件保留到内存中 缺点是没有建立索引的数据在查询过程中需要小范围的顺序扫描操作
14.2.自定义的索引分片
在server.properties中进行配置
#默认是1G,segment文件达到1G(index和log文件的总和),进行分片 log.segment.bytes=1073741824
14.3.文件分段的规则
#分段一 00000000000000000000.index 00000000000000000000.log #分段二 数字1234指的是当前文件的最小偏移量offset,即上个文件的最后一个消息offset+1 00000000000000001234.index 00000000000000001234.log #分段三 00000000000000088888.index 00000000000000088888.log
15.分布式系统CAP定理
CAP定理:指的是在一个分布式系统中,Consistency(一致性)、Availability(可用性)、Partition tolerance(分区容错性),三者不可同事获取
- 一致性(C):所有节点都可以访问待最新的数据,锁定其他节点,不一致之前不可读
- 可用性(A):每个请求都是可以得到响应的,不管请求是成功还是失败
- 分区容错性(P):除了全部整体网络故障,其他故障都不能导致系统不可用,节点间通信可能失败,无法避免
- CAP理论就是说分布式存储系统中,最多只能实现上面两个点。而由于当前的网络硬件肯定会出现延迟丢包等问题,所以分区容忍性是我们必须要接受的。所以我么那只能在一致性和可用性之间进行权衡。
CA:如果不要求P(不允许分区),则C(强一致性)和A(可用性)是可以保证的。但放弃P的同时也就意味着放弃了系统的扩展性,也就是分布式节点受限,每办法部署子节点,违背分布式系统设计的初衷。 CP:如果不要求A(可用性),每个请求都需要在服务器之间保持强一致,而P(分区)会导致同步时间无限延长(也就是等待数据同步完成才可以正常访问),一旦发生网络故障或者消息丢是的情况,就要牺牲用户的体验,等待所有数据全部一致了在让去用户去访问 AP:要高可用并且允许分区,则需要放弃一致性。一旦分区发生,节点之间可能会失去联系,为了高可用,每个节点只能用本地数据提供服务,而这样会导致全局数据的不一致性。
结论:
分布式系统中P肯定要满足,所以只能CA中二选一
CP:适合支付、交易类、要求强一致性,宁可业务不可用,也不能出现脏数据
AP:信息流架构,不要求数据强一致性,更想要服务的可用
16.Kafka数据可靠性投递
16.1.生产者发送消息确认机制
保证producer发送到指定的topic,topic的每个partition收到producer发送的数据后,需要像producer发送一个ack确认收到,然后才会进入到下一轮的发送数据,否则就会重新发送数据。
16.2.副本数据同步机制
- 当producer向partition写数据时,根据ack机制,默认ack=1,只会向leader中写入数据。
- 然后leader中的数据会复制到其他的replication中,follower会周期性的从leader中pull数据
- 对于数据的读写操作都在leader replication中,follower之作为副本使用,假如follower在向leader pull数据的时候,还没同步完成,leader突然宕机这回怎末办?
16.3.副本数据同步策略
当producer向parition发送数据时,可以根据request.required.acks参数来时数据的可靠性级别。
ack=0
- producer发送一次消息到partition就不再发送了,不管是否发送成功,
- 发出去的消息可能还在内存中,还没有写道磁盘中,Leader宕机了,producer也认为发送成功了,此时消息就达不到一致性。
ack=1(默认)
只要producer发送到partition中的数据写入到leader的磁盘中就认为发送成功了,返回给producer一个ack标识,不会管follower是否同步完成。
问题:如果leader刚刚接收到消息,还没来的及取同步follower宕机了,这回就会造成数据的丢失
ack=all(-1)
- producer发送数据到partition leader ,只有当leader的所有follower完全同步完成的时候才会返回ack标识,通知消息推送成功。
- 注意:当leader在同步follower的时候,某个follower因为网络的原因一直同步不上,这会kafka里有一个概念叫做isr集合,partition 副本的集合,leader也在里面,假如某个follower一致同步不上,isr就会把他给剔除集合,isr时一个可变的分区集合。
问题1:producer向partition发送数据,部分isr副本同步,leader此时挂掉,那么kafka会从follower中重新选择leader,假如某个follower已经同步完成数据了,当它被选上leader的时候,producer会重现向他发送数据,这会就会造成数据的重复发送。
问题2:假如partition 只有一个副本,那即使是all也会造成数据丢失,接受完消息后宕机,所以ack=all必须跟isr里面至少有两个副本的情况下使用。
在设置requsest.required.acks=-1的同时,也要min.insync.replicas这个参数设定isr中的最小副本为>=2,默认为1,如果isr中副本的数量少于min.insync.replicas的数量时,客户端会报异常。
17.Kafka数据可靠性保障原理之ISR机制
什么是ISR(in-sync replica set)
leader会维持一个与其保持同步的replica集合,该集合就是ISR,每一个leader partition都有一个ISR,keader本身也在ISR集合中,leader动态维护,保证kafka消息不丢失,ISR中至少有一个存活,并且commit成功的。
Partition leader保持同步的Partition Follower集合,当ISR中的Partition Follower完成数据的同步之后,就会给leader发送ack。
如果partition follower长时间未响应(replica.lag.time.max.ms),则partition follower就会被剔除ISR集合
partition leader 发生故障后,就会从ISR中重新选出partition leader
OSR(out-of-sync-replca set)
- 与leader副本分区,同步滞后过多,被剔除ISR集合的副本
AR(Assign Replicas)
- 分区中所有的集合称为AR,即ISR+OSR
18.Kafka的HighWatermark的作用
ACK保障了【生产者】的投递可靠性
partition的多副本保证了【消息存储】的可靠性
hw的作用:保证消费数据的一致性和副本数据的一致性
Follower故障
Folllower发生故障后会被临时踢出ISR集合中,等该Follower恢复后,follower会读取本地磁盘上次记录的HW,并将该log文件高于HW的部分去掉,从HW开始向leader同步,等该follower的LEO大于等于该partition的hw,即follower追上leader后,重新假如ISR集合
Leader故障
Leader发生故障后,会从ISR中选出一个新的leader,为了保证多个副本之间的数据一致性,其余的follower会先将各自的log文件搞与hw的部分劫掉(新的leader不会劫掉),然后在跟新的leader进行同步
19.kafka高可用和高性能
19.1.Kafka-zookeeper集群搭建
(1)解压zookeeper压缩包,重命名为zk1,zk2,zk3
tar -xvf apache-zookeeper-3.7.0-bin.tar.gz mv apache-zookeeper-3.7.0 zk1 cp -r zk1 zk2 cp -r zk1 zk3
(2)修改每个zookeeper的配置文件
端口:
- 2181
- 2182
- 2183
#客户端端口 clientPort=2181 #数据存储路径,以节点为命名区分 dataDir=/tmp/zookeeper/2181 #修改AdminServer端口 admin.serverPort=8888
(3)dataDir中创建myid文件,内容分别为1,2,3作为zk的唯一标识
cd /tmp/zookeeper/2181 echo 1 >myid cd /tmp/zookeeper/2182 echo 2 >myid cd /tmp/zookeeper/2183 echo 3 >myid
(4)配置集群
#各个配置文件zoo.cfg加入集群配置 #server.服务器id=服务器IP地址:服务器直接通信的端口:服务器之间选举投票的端口 server.1=127.0.0.1:2881:3881 server.2=127.0.0.1:2882:3882 server.3=127.0.0.1:2883:3883
(5)启动zk的各个节点
#启动zk1节点 cd /usr/local/software/zk1/bin ./zkServer.sh start #启动zk2节点 cd /usr/local/software/zk2/bin ./zkServer.sh start #启动zk3节点 cd /usr/local/software/zk3/bin ./zkServer.sh start
(6)查看各个几点的状态
./zkServer.sh status
(7)模拟leader副本宕机,follower重新选取leader,宕机后的leader并入ISR集群变为follower
(8)配置kafka集群
端口:
- 9092
- 9093
- 9094
配置
#内网中使用,内网部署kafka集群只须用到listeners,内网需要作为区分时,才需要用到advertised.listeners listeners=PLAINTEXT://172.18.123.229:9092 advertised.listeners=PLAINTEXT://112.74.55.160:9092 #每个节点编号1、2、3 broker.id=1 #端口 port=9092 #配置3个 log.dirs=/tmp/kafka-logs/kafka1 #zk地址 zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
(9)启动三个kafka实例
./kafka-server-start.sh -daemon ../config/server.properties & #守护线程 ./kafka-server-start.sh ../config/server.properties &
./kafka-topics.sh --create --zookeeper localhost:2181,localhost:2182,localhost:2183 --replication-factor 3 --partitions 6 --topic xdclass-cluster-topic
19.2.Kafka中的日志数据清理
kafka将数据持久化到硬盘上,为了控制磁盘容量,需要对过去的消息进行清理,kafka内置有个定时任务检测删除日志,默认是5分钟。
log.retention.check.interval.ms=30000
根据segment单位进行定期清理
启用cleaner
- log.cleaner.enable=true
- log.cleaner.threads=2(清理线程数配置)
支持配置策略对数据定期清理
日志删除
- log.cleanup.policy=delete
#清理超过指定时间的消息,默认是168小时,7天 #还有log.retention.ms,log.metention.minutes,log.retention.hours 优先级高到低 log.retention.hours=168 #超过指定大小后,删除旧的消息,下面是1G的字节数,-1就是没有限制 log.retention.bytes=1073741824 #还有基于日志起始位移(log start offset)
基于【时间删除】日志
每个日志文件都维护一个最大时间戳字段,每次日志写入新的消息时,都会更新该字段 一个日志段,segment写满了被切分之后,时间戳就不会在发生改变,这时kafka可以根据当前的时间去和各个日志文件的时间戳去进行比较来判断是否过期了
基于【大小超过阈值删除】日志
超过阈值的部分不许要大于一个日志字段的大小,这回kafka会把最早的一个日志文件删除 假设热值大小是500MB,当前分区共有四个日志文件,大小分别为500MB,500MB,500MB,10MB 假设阈值设置1500,那么kafka会计算当前文件的总值,500*3+10=1510MB,1510-1500=10 <500,所以kafka不会删除日志文件 假设阈值设置1000,1510-1000 = 510 >500,所以kafka会删除最早的一个日志文件
log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除
日志压缩
- log.cleanup.policy=compact启用压缩策略
- 按照key进行整理,有相同的key不同的value,值保留最后一个
19.3.Kafka的高性能原理分析-ZeroCopy
零拷贝ZeroCopy(SendFile)
案例:将一个File读取并发送出去(Linux有两个上下文,内核态、用户态)
传统linux上
- file文件经过了四次拷贝,调用read,将文件拷贝到了kernel内核态,CPU控制kernel态的数据copy到用户态,调用write时,user态下的内容会copy到内核态的socket的buffer中
- 最后将内核态的socket buffer的数据copy到网卡设备中传送
- 缺点:增加了上下文切换,浪费了两次无效拷贝
ZeroCopy
- 请求kernel直接把disk的data传输给socket,而不是通过应用程序传输,ZeroCopy大大提高了应用程序的性能,较少不必要的内核缓冲区很用户缓冲区间的拷贝,从而减少cpu的开销和减少了kernel和user模式的 上下文切换,达到性能的提升
- 对应零拷贝技术有mmap及sendfile
- mmap:小文件传输快
- sendfile:大文件传输必mmap快
kafka高性能
- 存储模型,topic多个分区,每个分区多个segment段
- index索引文件查找,利用分段和稀疏索引
- 磁盘顺序写入
- 异步操作少阻塞sender和main线程,批量操作
- 页缓存Page cache,没有利用jvm内存,因为容易GC影响性能
- 零拷贝ZeroCopy
20.SpringBoot项目整合Spring-kafka
20.1.Springboot项目整合spring-kafka依赖发送消息
- 添加pom文件
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
- 配置文件修改增加生产者信息
spring: kafka: bootstrap-servers: 192.168.111.31:9092,192.168.111.31:9093,192.168.111.31:9094 producer: # 消息重发的次数。 retries: 0 #一个批次可以使用的内存大小 batch-size: 16384 # 设置生产者内存缓冲区的大小。 buffer-memory: 33554432 # 键的序列化方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化方式 value-serializer: org.apache.kafka.common.serialization.StringSerializer acks: all
- 发送消息
private static final String TOPIC_NAME = "user.register.topic"; @Autowired private KafkaTemplate<String, Object> kafkaTemplate; /** * 发送消息 * @param phone */ @GetMapping("/api/user/{phone}") public void sendMessage1(@PathVariable("phone") String phone) { kafkaTemplate.send(TOPIC_NAME, phone).addCallback(success -> { // 消息发送到的topic String topic = success.getRecordMetadata().topic(); // 消息发送到的分区 int partition = success.getRecordMetadata().partition(); // 消息在分区内的offset long offset = success.getRecordMetadata().offset(); System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset); }, failure -> { System.out.println("发送消息失败:" + failure.getMessage()); }); }
ework.kafka
spring-kafka
- 配置文件修改增加生产者信息 ```bash spring: kafka: bootstrap-servers: 192.168.111.31:9092,192.168.111.31:9093,192.168.111.31:9094 producer: # 消息重发的次数。 retries: 0 #一个批次可以使用的内存大小 batch-size: 16384 # 设置生产者内存缓冲区的大小。 buffer-memory: 33554432 # 键的序列化方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化方式 value-serializer: org.apache.kafka.common.serialization.StringSerializer acks: all
- 发送消息
private static final String TOPIC_NAME = "user.register.topic"; @Autowired private KafkaTemplate<String, Object> kafkaTemplate; /** * 发送消息 * @param phone */ @GetMapping("/api/user/{phone}") public void sendMessage1(@PathVariable("phone") String phone) { kafkaTemplate.send(TOPIC_NAME, phone).addCallback(success -> { // 消息发送到的topic String topic = success.getRecordMetadata().topic(); // 消息发送到的分区 int partition = success.getRecordMetadata().partition(); // 消息在分区内的offset long offset = success.getRecordMetadata().offset(); System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset); }, failure -> { System.out.println("发送消息失败:" + failure.getMessage()); }); }