回顾上一节的日志存储的机制内容:
每个sagment段文件的大小是固定的,因为kafka是一个高吞吐量的消息系统 在kafka不会出现消息堆积的现象,当其中一个segment占满以后,会重新生成同样大小的segment。消息堆积:消费者消费的进度赶不上生产者生产的消息的进度,最后导致生产者生产的消息越来越慢,最后处于阻塞的状态
一、文件存储机制
回顾上一节的日志存储的机制内容:
二、消息确认机制(确认offset)
由offset导致的有消息重复,消息丢失,消息乱序,消息确认也是由offset来确认的
自动提交:间隔多少秒钟自动提交消息。
在昨天的代码中,是可以配置自动提交属性的,自动提交的是offset。
enable.auto.commit与auto.commit.interval.ms(间隔多少时间自动提交)必须是成对出现的,否则不会自动提交的。
为什么消息会丢失?
因为kafka是通过offset来确认消费机制的。因为如果没有offset确认机制的话,这个消息会被重复的消费
一般为了提高消费者的效率,而快速的从kafka中去消费消息的话使用多线程对kafka中的消息进行消费。
弊端:
1、比如说我刚好消费了这条消息,但是没有去提交,将这个提交消息的进程给杀死,没有提交offset,这时会导致重复的去消费这个消息,这条消息并没有被标记已经被消费了。所以它可以再次的去消费。
2、比如消费完这条消息,也提交了,但是提交之后,并没有把这条消息处理完,又把线程给杀死了。默认以为把这条消息给消费了。这时消息就消失了,所以消息的消息和重复消费会经常的发生。不像activeMQ一条消息只能被消费一次就不能再被消费了。
手动提交
生产者的代码如下:
package com.weizhaoyang;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class TestTreadProducer implements Runnable {
KafkaProducer<String, String> producer=null;
public TestTreadProducer(){
Properties props = new Properties();
//kafka服务器地址
props.put("bootstrap.servers", "192.168.124.241:9092,192.168.124.242:9092,192.168.124.243:9092");
//ack是判断请求是否为完整的条件(即判断是否成功发送)。all将会阻塞消息,这种设置性能最低,但是最可靠。
props.put("acks", "1");
//retries,如果请求失败,生产者会自动重试,我们指定是0次,如果启用重试,则会有重复消息的可能性。
props.put("retries", 0);
//producer缓存每个分区未发送消息,缓存的大小是通过batch.size()配置设定的。值较大的话将会产生更大的批。并需要更多的内存(因为每个“活跃”的分区都有一个缓冲区)
props.put("batch.size", 16384);
//默认缓冲区可立即发送,即便缓冲区空间没有满;但是,如果你想减少请求的数量,可以设置linger.ms大于0.这将指示生产者发送请求之前等待一段时间
//希望更多的消息补填到未满的批中。这类似于tcp的算法,例如上面的代码段,可能100条消息在一个请求发送,因为我们设置了linger时间为1ms,然后,如果我们
//没有填满缓冲区,这个设置将增加1ms的延迟请求以等待更多的消息。需要注意的是,在高负载下,相近的时间一般也会组成批,即使是linger.ms=0。
//不处于高负载的情况下,如果设置比0大,以少量的延迟代价换取更少的,更有效的请求。
props.put("linger.ms", 1);
//buffer.memory控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值
//通过max.block.ms设定,之后他将抛出一个TimeoutExecption。
props.put("buffer.memory", 33554432);
//key.serializer和value.serializer示例:将用户提供的key和value对象ProducerRecord转换成字节,你可以使用附带的ByteArraySerizlizaer或StringSerializer处理简单的byte和String类型.
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//设置kafka的分区数量
props.put("kafka.partitions", 12);
producer= new KafkaProducer(props);
}
public void sendMsg(String key,String value){
producer.send(new ProducerRecord<String, String>("test-topic1", key, value));
}
@Override
public void run() {
while(true){
int mesageNo=1;
sendMsg("messageKey","messageValue");
++mesageNo;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
new Thread(new TestTreadProducer()).start();
}
}
消费端的代码如下:
package com.weizhaoyang;
import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CopyOnWriteArrayList;
public class TestKafkaLevelConsumer extends ShutdownableThread {
public KafkaConsumer consumer;
private String topic;
//定义一个线程安全的集合
private List<ConsumerRecord> msgList=new CopyOnWriteArrayList<ConsumerRecord>();
public TestKafkaLevelConsumer(String topic) {
super("TestKafkaLevelConsumer", false);
Properties properties=new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.124.241:9092,192.168.124.242:9092,192.168.124.243:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"DemoConsumper");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer") ;
this.topic=topic;
consumer = new KafkaConsumer<>(properties);
}
//一个消费组只能有一个消费者消费到一个分区中的数据,不能多个消费者同时去消费分区中的数据
@Override
public void doWork() {
consumer.subscribe(Arrays.asList(topic));
while(true){
ConsumerRecords<String,String> records=consumer.poll(Duration.ofMillis(1000));
for(ConsumerRecord<String,String> record:records){
System.out.println("partation:"+record.partition()+"offset="+record.offset()+"key="+record.key()+"value="+record.value());
}
}
}
public static void main(String[] args) {
TestKafkaLevelConsumer testKafkaLevelConsumer =new TestKafkaLevelConsumer("test-topic1");
testKafkaLevelConsumer.start();
}
}
运行的结果如下:一秒钟把所有的消息提取出来,然后就提交
之后是每5条提交一次
这就是手动提交的方式。
如果这时把生者和消费者关掉,然后重新启动消费者,再注释手动提交
下面的属性的值
如果设置了earliest,如果 没有提交offset,下次再消费就会有重复消费的情况。
在项目中解决的办法:
如果用kafka的话,内部维护了offset,首先将这个offset,在消息发送的同时,offset已经被同步到数据库中,会在数据库中维护同样的一个offset。消费的时候的offset由数据库指派的,而不是它来标识。比如在分布式的情况下做消费,事务的一致性是无法保证的,比如现在做了一个插入,插入的时候做了一个修改,这时的offset可能就会发生变化。消费完成和提交offset是原子的,要不全部提交。为了保证原子,数据库就是强一致性的处理数据。这时用数据库维护offset。如果消费成功,就更新offset。如果消费不成功,就不更新。如果更新Offset的时候,出现了网路故障等等的问题,没问题,因为数据库自带了事务,事务会自动回滚。消息不会丢失,这也是设计消息不可丢的方案。
offset:可以理解为被消费消息的递增量,可以看作成游标。但是游标只有前进,但是offset有前进和后退。
三、Kafka消息可靠性(offset是可靠消息的保证)
1、kafka消息的问题
kafka就比较适合高吞吐量并且允许少量数据丢失的场景,如果非要保证"消息只读取一次",可以使用 JMS。
kafka Producer消息发送有两种方式(配置参数 producer.type)
producer.type=sync(默认值): 后台线程中消息发送是同步方式,对应的类为
kafka.producer.SyncProducer;producer.type=async: 后台线程中消息发送是异步方式,对应
的类为 kafka.producer.AyncProducer;优点是可批量发送消息(消息个数达到
batch.num.messages=200 或时间达到 “ 时发送)、吞吐量佳,缺点是发送不及时可能导致失;
Kafka Producer 消息发送有三种确认方式(配置参数 acks):
acks=0: producer 不等待 Leader 确认,只管发出即可;最可能丢失消息,适用于高吞吐可丢失的业务;
acks=1(默认值): producer 等待 Leader 写入本地日志后就确认;之后 Leader 向 Followers 同
步时,如果 Leader 宕机会导致消息没同步而丢失,producer 却依旧认为成功;
acks=all/-1: producer 等待 Leader 写入本地日志、而且 Leader 向 Followers 同步完成后才会
确认;最可靠,但是性能最低。
Kafka Consumer 有两个接口:
Low-level API: 消费者自己维护 offset 等值,可以完全控制;
High-level API: 封装了对 parition 和 offset 的管理,使用简单;可能遇到 Consumer 取出消息并更新了 offset,但未处理消息即宕机,从而相当于消息丢失;extend 线程
Kafka 支持 3 种消息传递语义:
最多一次 -消息可能会丢失,但永远不会重新发送。consumer.poll();consumer.commitOffset(); processMsg(messages);
至少一次 -消息永远不会丢失,但可能会重新传递。consumer.poll(); processMsg(messages); consumer.commitOffset();
恰恰一次 - 这就是人们真正想要的,每条信息只传递一次。以事务来保证。
但是第三种没有写出来方法,因为kafka没有事务的定义。如果有事务的话,offset就好保障
了,就有了下面三个问题。
2 消息重复
根本原因:已经消费了数据,但是 offset 没提交。
外在原因:(1)消费数据后、提交 offset 前,线程被杀,或者线程自动退出系统了。
(2)设置 offset 为自动提交,consumer.close() 之前 consumer.unsubscribe();退订了主题,
导致offset没有提交。
(3)consumer 取了一批数据,尚未处理完毕时,达到了 session.timeout.ms,导致没有接收心
跳而挂掉,自动提交offset失败,下次会重复消费本批消息;
解决办法:
(1) 唯一 ID(offset,给消息的消费做一个标识) 保存在外部介质中,每次消费时根据它判断是否已处理;不是由kafka维护的。
(2) 如果在统计消费信息的时候,丢失几条关系不大,则无需理会;看业务场景。
(3) 如果消费者来不及处理,可以这样优化:增加分区以提高并行能力;增加消费者线程;关闭自动提交 enable.auto.commit=false,用手动提交。
第三种方案不可行,因为在增加分区的时候意味着消息会放到多个分区当中,搜索的路径会越来越长。根据索引去查。比如在做电商的项目的时候,如果消息重复 ,一个订单会有多个线程去处理订单信息。这个时候就完蛋了。所以订单不用kafka。订单用mq就可以了。日志信息用kafka比较合适-比如用户的行为分析。
3 消息丢失
根本原因:已经提交了 offset,但数据在内存中的异步线程尚未处理,线程就被杀掉。或者自
动退出,这时消息就真正的丢失了。
消息丢失解决方案:
同步模式下,确认机制设置为-1(不可为1),即让消息写入Leader和Follower之后再确认消息发送成功;
异步模式下,设置为不限制阻塞超时时间(不可为acks=0),当缓冲区满时不清空缓冲池,而是让生产者一直处于阻塞状态;
4 消息乱序 (如何保证kafka中消息按照顺序消费)
传统的队列,在并行处理时,由于网络故障或速度差异,尽管服务器传递是有序的,但消费者接收的顺序可能不一致;Kafka 在主题内部有分区,并行处理时,每个分区仅由消费者组中的一个消费者使用,确保了消费者是该分区的唯一读者,并按顺序使用这些数据。
但是它也仅仅是保证Topic的一个分区顺序处理,不能保证跨分区的消息先后处理顺序,除非只提供一个分区。
Kafka的分区分配策略
现在创建好了10个分区1个副本
这时在241这台机器上有三个topic。这里没有副本的概念。10个分区分布在不同的broker上面,因为副本的数量为1,说明只有由一个分区对应一个副本,这个时候就不会出现多个副本的问题。
这就是在Kafka上轮询的去存放分区
partition.assignmentStrategy 指定分区策略
Range 范围分区(默认的)
假如有10个分区,3个消费者,把分区按照序号排列0,1,2,3,4,5,6,7,8,9;消费者为C1,C2,C3,那么用分区数除以消费者数来决定每个Consumer消费几个Partition,除不尽的前面几个消费者将会多消费一个 最后分配结果如下
C1:0,1,2,3
C2:4,5,6
C3:7,8,9
生产者的消息代码如下:
package com.weizhaoyang.partition;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class ProducerTest {
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
//kafka服务器地址
props.put("bootstrap.servers", "192.168.124.241:9092,192.168.124.242:9092,192.168.124.243:9092");
//ack是判断请求是否为完整的条件(即判断是否成功发送)。all将会阻塞消息,这种设置性能最低,但是最可靠。
props.put("acks", "1");
//retries,如果请求失败,生产者会自动重试,我们指定是0次,如果启用重试,则会有重复消息的可能性。
props.put("retries", 0);
//producer缓存每个分区未发送消息,缓存的大小是通过batch.size()配置设定的。值较大的话将会产生更大的批。并需要更多的内存(因为每个“活跃”的分区都有一个缓冲区)
props.put("batch.size", 16384);
//默认缓冲区可立即发送,即便缓冲区空间没有满;但是,如果你想减少请求的数量,可以设置linger.ms大于0.这将指示生产者发送请求之前等待一段时间
//希望更多的消息补填到未满的批中。这类似于tcp的算法,例如上面的代码段,可能100条消息在一个请求发送,因为我们设置了linger时间为1ms,然后,如果我们
//没有填满缓冲区,这个设置将增加1ms的延迟请求以等待更多的消息。需要注意的是,在高负载下,相近的时间一般也会组成批,即使是linger.ms=0。
//不处于高负载的情况下,如果设置比0大,以少量的延迟代价换取更少的,更有效的请求。
props.put("linger.ms", 1);
//buffer.memory控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值
//通过max.block.ms设定,之后他将抛出一个TimeoutExecption。
props.put("buffer.memory", 33554432);
//key.serializer和value.serializer示例:将用户提供的key和value对象ProducerRecord转换成字节,你可以使用附带的ByteArraySerizlizaer或StringSerializer处理简单的byte和String类型.
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//设置kafka的分区数量
props.put("kafka.partitions", 12);
KafkaProducer<String, String> producer = new KafkaProducer(props);
for (int i = 0; i < 10; i++){
System.out.println("key-->key"+i+" value-->vvv"+i);
producer.send(new ProducerRecord<String, String>("test-topic1", "key" + i, "vvv" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.out.println("分配到了分区:"+recordMetadata.partition());
}
});
Thread.sleep(1000);
}
producer.close();
}
}
运行的结果如下:有的key被分到同一个分区的原因是,key的hash值在模上分区的总数是相同的,所以落到同一个分区上。
如果在上面的代码上,key如果为null的话
这时不会对key进行hash,这时会按照均等的分配到partition中去,这就是会增加搜随的量。
然后分别启动三个消费者,然后让生产者生产10条消息:
这时consumer1消费到了6,5,4分区
consumer2消费到了3,0,2,1
consumer3消费到了9,8,7
总结:由于是多个分区,不同的消费者,消费的是不同分区里的消息,因为一个消费组中的每一个消费者只能消费一个分区中的消息,一次不能消费多个分区中的消息。因为它们是一个组。每一个消费者的GROUP_ID_CONFIG的值是一样的,所以就归为一组了。因为这是以组的方式来订阅,而不是以一个消费者的方式来订阅。
如果是一个消费组中有两个消费者,另一个消费组中有一个消费者的话,这时是其中一个消费组中的两个消费者各消费5条消息,另一个消费者消费10条消息。如果没有配置groupId的话线程直接被终止。所以说在kafka里面以组为单位来进行消费。
如果有11个分区将会是:
C1:0,1,2,3
C2:4,5,6,7
C3:8,9,10
假如我们有两个主题T1,T2,分别有10个分区,最后的分配结果将会是这样:
C1:T1(0,1,2,3) T2(0,1,2,3)
C2:T1(4,5,6) T2(4,5,6)
C3:T1(7,8,9) T2(7,8,9)
在这种情况下,C1多消费了两个分区
RoundRobin 轮询分区
把所有的partition和consumer列出来,然后轮询consumer和partition,尽可能的让把partition均匀的分配给consumer
假如有3个Topic T0(三个分区P0-0,P0-1,P0-2),T1(两个分区P1-0,P1-1),T2(四个分区P2-0,P2-1,P2-2,P2-3)
有三个消费者:C0(订阅了T0,T1),C1(订阅了T1,T2),C2(订阅了T0,T2)!
那么分区过程如下图所示
在kafka里面有故障转移策略,当consumer减少以后会进行Rebalance,这时会重新计算负载的算法。
什么时候触发分区分配策略:
1.同一个Consumer Group内新增或减少Consumer
2.Topic分区发生变化:也会触发分配策略
Rebalance的执行
kafka提供了一个角色Coordinator来执行。当Consumer Group的第一个Consumer启动的时候,他会向kafka集群中的任意一台broker发送GroupCoordinatorRequest请求,broker会返回一个负载最小的broker设置为coordinator,之后该group的所有成员都会和coordinator进行协调通信
整个Rebalance分为两个过程 jionGroup和sysncJion
joinGroup过程
在这一步中,所有的成员都会向coordinator发送JionGroup请求,请求内容包括group_id,member_id.protocol_metadata等,coordinator会从中选出一个consumer作为leader,并且把组成员信息和订阅消息,leader信息,rebanlance的版本信息发送给consumer
Synchronizing Group State阶段
组成员向coordinator发送SysnGroupRequet请求,但是只有leader会发送分区分配的方案(分区分配的方案其实是在消费者确定的),当coordinator收到leader发送的分区分配方案后,会通过SysnGroupResponse把方案同步到各个consumer中