kafka(三)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: kafka(三)

回顾上一节的日志存储的机制内容:

每个sagment段文件的大小是固定的,因为kafka是一个高吞吐量的消息系统  在kafka不会出现消息堆积的现象,当其中一个segment占满以后,会重新生成同样大小的segment。消息堆积:消费者消费的进度赶不上生产者生产的消息的进度,最后导致生产者生产的消息越来越慢,最后处于阻塞的状态

一、文件存储机制

回顾上一节的日志存储的机制内容:

二、消息确认机制(确认offset)

由offset导致的有消息重复,消息丢失,消息乱序,消息确认也是由offset来确认的

自动提交:间隔多少秒钟自动提交消息。

在昨天的代码中,是可以配置自动提交属性的,自动提交的是offset。

enable.auto.commit与auto.commit.interval.ms(间隔多少时间自动提交)必须是成对出现的,否则不会自动提交的。

为什么消息会丢失?

因为kafka是通过offset来确认消费机制的。因为如果没有offset确认机制的话,这个消息会被重复的消费

一般为了提高消费者的效率,而快速的从kafka中去消费消息的话使用多线程对kafka中的消息进行消费。

弊端:

1、比如说我刚好消费了这条消息,但是没有去提交,将这个提交消息的进程给杀死,没有提交offset,这时会导致重复的去消费这个消息,这条消息并没有被标记已经被消费了。所以它可以再次的去消费。

2、比如消费完这条消息,也提交了,但是提交之后,并没有把这条消息处理完,又把线程给杀死了。默认以为把这条消息给消费了。这时消息就消失了,所以消息的消息和重复消费会经常的发生。不像activeMQ一条消息只能被消费一次就不能再被消费了。

手动提交

生产者的代码如下:

  1. package com.weizhaoyang;

  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.ProducerRecord;

  4. import java.util.Properties;

  5. public class TestTreadProducer implements  Runnable {
  6.    KafkaProducer<String, String> producer=null;
  7.    public TestTreadProducer(){
  8.        Properties props = new Properties();
  9.        //kafka服务器地址
  10.        props.put("bootstrap.servers", "192.168.124.241:9092,192.168.124.242:9092,192.168.124.243:9092");
  11.        //ack是判断请求是否为完整的条件(即判断是否成功发送)。all将会阻塞消息,这种设置性能最低,但是最可靠。
  12.        props.put("acks", "1");
  13.        //retries,如果请求失败,生产者会自动重试,我们指定是0次,如果启用重试,则会有重复消息的可能性。
  14.        props.put("retries", 0);
  15.        //producer缓存每个分区未发送消息,缓存的大小是通过batch.size()配置设定的。值较大的话将会产生更大的批。并需要更多的内存(因为每个“活跃”的分区都有一个缓冲区)
  16.        props.put("batch.size", 16384);
  17.        //默认缓冲区可立即发送,即便缓冲区空间没有满;但是,如果你想减少请求的数量,可以设置linger.ms大于0.这将指示生产者发送请求之前等待一段时间
  18.        //希望更多的消息补填到未满的批中。这类似于tcp的算法,例如上面的代码段,可能100条消息在一个请求发送,因为我们设置了linger时间为1ms,然后,如果我们
  19.        //没有填满缓冲区,这个设置将增加1ms的延迟请求以等待更多的消息。需要注意的是,在高负载下,相近的时间一般也会组成批,即使是linger.ms=0。
  20.        //不处于高负载的情况下,如果设置比0大,以少量的延迟代价换取更少的,更有效的请求。
  21.        props.put("linger.ms", 1);
  22.        //buffer.memory控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值
  23.        //通过max.block.ms设定,之后他将抛出一个TimeoutExecption。
  24.        props.put("buffer.memory", 33554432);
  25.        //key.serializer和value.serializer示例:将用户提供的key和value对象ProducerRecord转换成字节,你可以使用附带的ByteArraySerizlizaer或StringSerializer处理简单的byte和String类型.
  26.        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  27.        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  28.        //设置kafka的分区数量
  29.        props.put("kafka.partitions", 12);
  30.        producer= new KafkaProducer(props);
  31.    }
  32.    public  void  sendMsg(String key,String value){
  33.        producer.send(new ProducerRecord<String, String>("test-topic1", key, value));
  34.    }
  35.    @Override
  36.    public void run() {
  37.           while(true){
  38.               int mesageNo=1;
  39.               sendMsg("messageKey","messageValue");
  40.               ++mesageNo;
  41.               try {
  42.                   Thread.sleep(1000);
  43.               } catch (InterruptedException e) {
  44.                   e.printStackTrace();
  45.               }
  46.           }
  47.    }

  48.    public static void main(String[] args) {
  49.           new Thread(new TestTreadProducer()).start();
  50.    }
  51. }

消费端的代码如下:

  1. package com.weizhaoyang;

  2. import kafka.utils.ShutdownableThread;
  3. import org.apache.kafka.clients.consumer.ConsumerConfig;
  4. import org.apache.kafka.clients.consumer.ConsumerRecord;
  5. import org.apache.kafka.clients.consumer.ConsumerRecords;
  6. import org.apache.kafka.clients.consumer.KafkaConsumer;

  7. import java.time.Duration;
  8. import java.util.Arrays;
  9. import java.util.List;
  10. import java.util.Properties;
  11. import java.util.concurrent.CopyOnWriteArrayList;

  12. public class TestKafkaLevelConsumer  extends ShutdownableThread {
  13.    public KafkaConsumer  consumer;
  14.    private String topic;
  15.    //定义一个线程安全的集合
  16.    private List<ConsumerRecord> msgList=new CopyOnWriteArrayList<ConsumerRecord>();
  17.    public TestKafkaLevelConsumer(String topic) {
  18.        super("TestKafkaLevelConsumer", false);
  19.        Properties  properties=new Properties();
  20.        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.124.241:9092,192.168.124.242:9092,192.168.124.243:9092");
  21.        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"DemoConsumper");
  22.        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
  23.         properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
  24.        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
  25.        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000");
  26.        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
  27.        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer") ;
  28.        this.topic=topic;
  29.        consumer = new KafkaConsumer<>(properties);
  30.    }
  31.    //一个消费组只能有一个消费者消费到一个分区中的数据,不能多个消费者同时去消费分区中的数据
  32.    @Override
  33.    public void doWork() {
  34.        consumer.subscribe(Arrays.asList(topic));
  35.        while(true){
  36.            ConsumerRecords<String,String> records=consumer.poll(Duration.ofMillis(1000));
  37.            for(ConsumerRecord<String,String> record:records){
  38.                System.out.println("partation:"+record.partition()+"offset="+record.offset()+"key="+record.key()+"value="+record.value());
  39.            }
  40.        }
  41.    }

  42.    public static void main(String[] args) {
  43.        TestKafkaLevelConsumer testKafkaLevelConsumer =new TestKafkaLevelConsumer("test-topic1");
  44.        testKafkaLevelConsumer.start();
  45.    }
  46. }

运行的结果如下:一秒钟把所有的消息提取出来,然后就提交

之后是每5条提交一次

这就是手动提交的方式

如果这时把生者和消费者关掉,然后重新启动消费者,再注释手动提交

下面的属性的值

如果设置了earliest,如果 没有提交offset,下次再消费就会有重复消费的情况

在项目中解决的办法:

如果用kafka的话,内部维护了offset,首先将这个offset,在消息发送的同时,offset已经被同步到数据库中,会在数据库中维护同样的一个offset。消费的时候的offset由数据库指派的,而不是它来标识。比如在分布式的情况下做消费,事务的一致性是无法保证的,比如现在做了一个插入,插入的时候做了一个修改,这时的offset可能就会发生变化。消费完成和提交offset是原子的,要不全部提交。为了保证原子,数据库就是强一致性的处理数据。这时用数据库维护offset。如果消费成功,就更新offset。如果消费不成功,就不更新。如果更新Offset的时候,出现了网路故障等等的问题,没问题,因为数据库自带了事务,事务会自动回滚。消息不会丢失,这也是设计消息不可丢的方案。

offset:可以理解为被消费消息的递增量,可以看作成游标。但是游标只有前进,但是offset有前进和后退。

三、Kafka消息可靠性(offset是可靠消息的保证)

1、kafka消息的问题

kafka就比较适合高吞吐量并且允许少量数据丢失的场景,如果非要保证"消息只读取一次",可以使用 JMS。

kafka Producer消息发送有两种方式(配置参数 producer.type)

producer.type=sync(默认值): 后台线程中消息发送是同步方式,对应的类为

kafka.producer.SyncProducer;producer.type=async: 后台线程中消息发送是异步方式,对应

的类为 kafka.producer.AyncProducer;优点是可批量发送消息(消息个数达到

batch.num.messages=200 或时间达到 “ 时发送)、吞吐量佳,缺点是发送不及时可能导致失;

Kafka Producer 消息发送有三种确认方式(配置参数 acks):

acks=0: producer 不等待 Leader 确认,只管发出即可;最可能丢失消息,适用于高吞吐可丢失的业务;

acks=1(默认值): producer 等待 Leader 写入本地日志后就确认;之后 Leader 向 Followers 同

步时,如果 Leader 宕机会导致消息没同步而丢失,producer 却依旧认为成功;

acks=all/-1: producer 等待 Leader 写入本地日志、而且 Leader 向 Followers 同步完成后才会

确认;最可靠,但是性能最低。

Kafka Consumer 有两个接口:

Low-level API: 消费者自己维护 offset 等值,可以完全控制;

High-level API: 封装了对 parition 和 offset 的管理,使用简单;可能遇到 Consumer 取出消息并更新了 offset,但未处理消息即宕机,从而相当于消息丢失;extend  线程

Kafka 支持 3 种消息传递语义:

最多一次 -消息可能会丢失,但永远不会重新发送。consumer.poll();consumer.commitOffset(); processMsg(messages);

至少一次 -消息永远不会丢失,但可能会重新传递。consumer.poll(); processMsg(messages); consumer.commitOffset();

恰恰一次 - 这就是人们真正想要的,每条信息只传递一次。以事务来保证。

但是第三种没有写出来方法,因为kafka没有事务的定义。如果有事务的话,offset就好保障

了,就有了下面三个问题。

2 消息重复

根本原因:已经消费了数据,但是 offset 没提交。

外在原因:(1)消费数据后、提交 offset 前,线程被杀,或者线程自动退出系统了。

(2)设置 offset 为自动提交,consumer.close() 之前 consumer.unsubscribe();退订了主题,

导致offset没有提交。

(3)consumer 取了一批数据,尚未处理完毕时,达到了 session.timeout.ms,导致没有接收心

跳而挂掉,自动提交offset失败,下次会重复消费本批消息;

解决办法:

(1) 唯一 ID(offset,给消息的消费做一个标识) 保存在外部介质中,每次消费时根据它判断是否已处理;不是由kafka维护的。

(2) 如果在统计消费信息的时候,丢失几条关系不大,则无需理会;看业务场景。

(3) 如果消费者来不及处理,可以这样优化:增加分区以提高并行能力;增加消费者线程;关闭自动提交 enable.auto.commit=false,用手动提交。

第三种方案不可行,因为在增加分区的时候意味着消息会放到多个分区当中,搜索的路径会越来越长。根据索引去查。比如在做电商的项目的时候,如果消息重复 ,一个订单会有多个线程去处理订单信息。这个时候就完蛋了。所以订单不用kafka。订单用mq就可以了。日志信息用kafka比较合适-比如用户的行为分析。

3 消息丢失

根本原因:已经提交了 offset,但数据在内存中的异步线程尚未处理,线程就被杀掉。或者自

动退出,这时消息就真正的丢失了。

消息丢失解决方案:

同步模式下,确认机制设置为-1(不可为1),即让消息写入Leader和Follower之后再确认消息发送成功;

异步模式下,设置为不限制阻塞超时时间(不可为acks=0),当缓冲区满时不清空缓冲池,而是让生产者一直处于阻塞状态;

4 消息乱序 (如何保证kafka中消息按照顺序消费)

传统的队列,在并行处理时,由于网络故障或速度差异,尽管服务器传递是有序的,但消费者接收的顺序可能不一致;Kafka 在主题内部有分区,并行处理时,每个分区仅由消费者组中的一个消费者使用,确保了消费者是该分区的唯一读者,并按顺序使用这些数据。

但是它也仅仅是保证Topic的一个分区顺序处理,不能保证跨分区的消息先后处理顺序,除非只提供一个分区。

Kafka的分区分配策略

现在创建好了10个分区1个副本

这时在241这台机器上有三个topic。这里没有副本的概念。10个分区分布在不同的broker上面,因为副本的数量为1,说明只有由一个分区对应一个副本,这个时候就不会出现多个副本的问题。


这就是在Kafka上轮询的去存放分区

partition.assignmentStrategy 指定分区策略

Range 范围分区(默认的)

假如有10个分区,3个消费者,把分区按照序号排列0,1,2,3,4,5,6,7,8,9;消费者为C1,C2,C3,那么用分区数除以消费者数来决定每个Consumer消费几个Partition,除不尽的前面几个消费者将会多消费一个 最后分配结果如下

C1:0,1,2,3

C2:4,5,6

C3:7,8,9

生产者的消息代码如下:

  1. package com.weizhaoyang.partition;

  2. import org.apache.kafka.clients.producer.*;

  3. import java.util.Properties;

  4. public class ProducerTest {
  5.    public static void main(String[] args) throws InterruptedException {
  6.        Properties props = new Properties();
  7.        //kafka服务器地址
  8.        props.put("bootstrap.servers", "192.168.124.241:9092,192.168.124.242:9092,192.168.124.243:9092");
  9.        //ack是判断请求是否为完整的条件(即判断是否成功发送)。all将会阻塞消息,这种设置性能最低,但是最可靠。
  10.        props.put("acks", "1");
  11.        //retries,如果请求失败,生产者会自动重试,我们指定是0次,如果启用重试,则会有重复消息的可能性。
  12.        props.put("retries", 0);
  13.        //producer缓存每个分区未发送消息,缓存的大小是通过batch.size()配置设定的。值较大的话将会产生更大的批。并需要更多的内存(因为每个“活跃”的分区都有一个缓冲区)
  14.        props.put("batch.size", 16384);
  15.        //默认缓冲区可立即发送,即便缓冲区空间没有满;但是,如果你想减少请求的数量,可以设置linger.ms大于0.这将指示生产者发送请求之前等待一段时间
  16.        //希望更多的消息补填到未满的批中。这类似于tcp的算法,例如上面的代码段,可能100条消息在一个请求发送,因为我们设置了linger时间为1ms,然后,如果我们
  17.        //没有填满缓冲区,这个设置将增加1ms的延迟请求以等待更多的消息。需要注意的是,在高负载下,相近的时间一般也会组成批,即使是linger.ms=0。
  18.        //不处于高负载的情况下,如果设置比0大,以少量的延迟代价换取更少的,更有效的请求。
  19.        props.put("linger.ms", 1);
  20.        //buffer.memory控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值
  21.        //通过max.block.ms设定,之后他将抛出一个TimeoutExecption。
  22.        props.put("buffer.memory", 33554432);
  23.        //key.serializer和value.serializer示例:将用户提供的key和value对象ProducerRecord转换成字节,你可以使用附带的ByteArraySerizlizaer或StringSerializer处理简单的byte和String类型.
  24.        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  25.        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  26.        //设置kafka的分区数量
  27.        props.put("kafka.partitions", 12);

  28.        KafkaProducer<String, String> producer = new KafkaProducer(props);
  29.        for (int i = 0; i < 10; i++){
  30.            System.out.println("key-->key"+i+"  value-->vvv"+i);
  31.            producer.send(new ProducerRecord<String, String>("test-topic1", "key" + i, "vvv" + i), new Callback() {
  32.                @Override
  33.                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  34.                    System.out.println("分配到了分区:"+recordMetadata.partition());
  35.                }
  36.            });
  37.            Thread.sleep(1000);
  38.        }
  39.        producer.close();
  40.    }
  41. }

运行的结果如下:有的key被分到同一个分区的原因是,key的hash值在模上分区的总数是相同的,所以落到同一个分区上。

如果在上面的代码上,key如果为null的话

这时不会对key进行hash,这时会按照均等的分配到partition中去,这就是会增加搜随的量。

然后分别启动三个消费者,然后让生产者生产10条消息:

这时consumer1消费到了6,5,4分区


consumer2消费到了3,0,2,1


consumer3消费到了9,8,7

总结:由于是多个分区,不同的消费者,消费的是不同分区里的消息,因为一个消费组中的每一个消费者只能消费一个分区中的消息,一次不能消费多个分区中的消息。因为它们是一个组。每一个消费者的GROUP_ID_CONFIG的值是一样的,所以就归为一组了。因为这是以组的方式来订阅,而不是以一个消费者的方式来订阅。

如果是一个消费组中有两个消费者,另一个消费组中有一个消费者的话,这时是其中一个消费组中的两个消费者各消费5条消息,另一个消费者消费10条消息。如果没有配置groupId的话线程直接被终止。所以说在kafka里面以组为单位来进行消费。

如果有11个分区将会是:

C1:0,1,2,3

C2:4,5,6,7

C3:8,9,10

假如我们有两个主题T1,T2,分别有10个分区,最后的分配结果将会是这样:

C1:T1(0,1,2,3) T2(0,1,2,3)

C2:T1(4,5,6)      T2(4,5,6)

C3:T1(7,8,9)      T2(7,8,9)

在这种情况下,C1多消费了两个分区

RoundRobin 轮询分区

把所有的partition和consumer列出来,然后轮询consumer和partition,尽可能的让把partition均匀的分配给consumer

假如有3个Topic T0(三个分区P0-0,P0-1,P0-2),T1(两个分区P1-0,P1-1),T2(四个分区P2-0,P2-1,P2-2,P2-3)

有三个消费者:C0(订阅了T0,T1),C1(订阅了T1,T2),C2(订阅了T0,T2)!

那么分区过程如下图所示

在kafka里面有故障转移策略,当consumer减少以后会进行Rebalance,这时会重新计算负载的算法。

什么时候触发分区分配策略:

1.同一个Consumer Group内新增或减少Consumer

2.Topic分区发生变化:也会触发分配策略

Rebalance的执行

kafka提供了一个角色Coordinator来执行。当Consumer Group的第一个Consumer启动的时候,他会向kafka集群中的任意一台broker发送GroupCoordinatorRequest请求,broker会返回一个负载最小的broker设置为coordinator,之后该group的所有成员都会和coordinator进行协调通信

整个Rebalance分为两个过程 jionGroup和sysncJion

joinGroup过程

在这一步中,所有的成员都会向coordinator发送JionGroup请求,请求内容包括group_id,member_id.protocol_metadata等,coordinator会从中选出一个consumer作为leader,并且把组成员信息和订阅消息,leader信息,rebanlance的版本信息发送给consumer

Synchronizing Group State阶段

组成员向coordinator发送SysnGroupRequet请求,但是只有leader会发送分区分配的方案(分区分配的方案其实是在消费者确定的),当coordinator收到leader发送的分区分配方案后,会通过SysnGroupResponse把方案同步到各个consumer中

相关文章
|
7月前
|
消息中间件 存储 分布式计算
|
3月前
|
消息中间件 存储 缓存
kafka(一)
kafka(一)
|
3月前
|
消息中间件 存储 算法
kafka(二)
kafka(二)
|
4月前
|
消息中间件 Kafka
kafka里的acks是什么
【8月更文挑战第3天】kafka里的acks是什么
226 0
|
7月前
|
消息中间件 分布式计算 Java
|
7月前
|
消息中间件 Java Kafka
Kafka
Kafka
53 1
|
7月前
|
消息中间件 存储 分布式计算
kafka 详细介绍
kafka 详细介绍
|
7月前
|
消息中间件 存储 Java
玩转Kafka—初步使用
玩转Kafka—初步使用
55 0
|
消息中间件 缓存 算法
Kafka为什么这么快?
Kafka 是一个基于发布-订阅模式的消息系统,它可以在多个生产者和消费者之间传递大量的数据。Kafka 的一个显著特点是它的高吞吐率,即每秒可以处理百万级别的消息。那么 Kafka 是如何实现这样高得性能呢?本文将从七个方面来分析 Kafka 的速度优势。
82 1
|
消息中间件 分布式计算 Java
浅谈kafka 一
浅谈kafka 一