kafka(三)

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: kafka(三)

回顾上一节的日志存储的机制内容:

每个sagment段文件的大小是固定的,因为kafka是一个高吞吐量的消息系统  在kafka不会出现消息堆积的现象,当其中一个segment占满以后,会重新生成同样大小的segment。消息堆积:消费者消费的进度赶不上生产者生产的消息的进度,最后导致生产者生产的消息越来越慢,最后处于阻塞的状态

一、文件存储机制

回顾上一节的日志存储的机制内容:

二、消息确认机制(确认offset)

由offset导致的有消息重复,消息丢失,消息乱序,消息确认也是由offset来确认的

自动提交:间隔多少秒钟自动提交消息。

在昨天的代码中,是可以配置自动提交属性的,自动提交的是offset。

enable.auto.commit与auto.commit.interval.ms(间隔多少时间自动提交)必须是成对出现的,否则不会自动提交的。

为什么消息会丢失?

因为kafka是通过offset来确认消费机制的。因为如果没有offset确认机制的话,这个消息会被重复的消费

一般为了提高消费者的效率,而快速的从kafka中去消费消息的话使用多线程对kafka中的消息进行消费。

弊端:

1、比如说我刚好消费了这条消息,但是没有去提交,将这个提交消息的进程给杀死,没有提交offset,这时会导致重复的去消费这个消息,这条消息并没有被标记已经被消费了。所以它可以再次的去消费。

2、比如消费完这条消息,也提交了,但是提交之后,并没有把这条消息处理完,又把线程给杀死了。默认以为把这条消息给消费了。这时消息就消失了,所以消息的消息和重复消费会经常的发生。不像activeMQ一条消息只能被消费一次就不能再被消费了。

手动提交

生产者的代码如下:

  1. package com.weizhaoyang;

  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.ProducerRecord;

  4. import java.util.Properties;

  5. public class TestTreadProducer implements  Runnable {
  6.    KafkaProducer<String, String> producer=null;
  7.    public TestTreadProducer(){
  8.        Properties props = new Properties();
  9.        //kafka服务器地址
  10.        props.put("bootstrap.servers", "192.168.124.241:9092,192.168.124.242:9092,192.168.124.243:9092");
  11.        //ack是判断请求是否为完整的条件(即判断是否成功发送)。all将会阻塞消息,这种设置性能最低,但是最可靠。
  12.        props.put("acks", "1");
  13.        //retries,如果请求失败,生产者会自动重试,我们指定是0次,如果启用重试,则会有重复消息的可能性。
  14.        props.put("retries", 0);
  15.        //producer缓存每个分区未发送消息,缓存的大小是通过batch.size()配置设定的。值较大的话将会产生更大的批。并需要更多的内存(因为每个“活跃”的分区都有一个缓冲区)
  16.        props.put("batch.size", 16384);
  17.        //默认缓冲区可立即发送,即便缓冲区空间没有满;但是,如果你想减少请求的数量,可以设置linger.ms大于0.这将指示生产者发送请求之前等待一段时间
  18.        //希望更多的消息补填到未满的批中。这类似于tcp的算法,例如上面的代码段,可能100条消息在一个请求发送,因为我们设置了linger时间为1ms,然后,如果我们
  19.        //没有填满缓冲区,这个设置将增加1ms的延迟请求以等待更多的消息。需要注意的是,在高负载下,相近的时间一般也会组成批,即使是linger.ms=0。
  20.        //不处于高负载的情况下,如果设置比0大,以少量的延迟代价换取更少的,更有效的请求。
  21.        props.put("linger.ms", 1);
  22.        //buffer.memory控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值
  23.        //通过max.block.ms设定,之后他将抛出一个TimeoutExecption。
  24.        props.put("buffer.memory", 33554432);
  25.        //key.serializer和value.serializer示例:将用户提供的key和value对象ProducerRecord转换成字节,你可以使用附带的ByteArraySerizlizaer或StringSerializer处理简单的byte和String类型.
  26.        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  27.        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  28.        //设置kafka的分区数量
  29.        props.put("kafka.partitions", 12);
  30.        producer= new KafkaProducer(props);
  31.    }
  32.    public  void  sendMsg(String key,String value){
  33.        producer.send(new ProducerRecord<String, String>("test-topic1", key, value));
  34.    }
  35.    @Override
  36.    public void run() {
  37.           while(true){
  38.               int mesageNo=1;
  39.               sendMsg("messageKey","messageValue");
  40.               ++mesageNo;
  41.               try {
  42.                   Thread.sleep(1000);
  43.               } catch (InterruptedException e) {
  44.                   e.printStackTrace();
  45.               }
  46.           }
  47.    }

  48.    public static void main(String[] args) {
  49.           new Thread(new TestTreadProducer()).start();
  50.    }
  51. }

消费端的代码如下:

  1. package com.weizhaoyang;

  2. import kafka.utils.ShutdownableThread;
  3. import org.apache.kafka.clients.consumer.ConsumerConfig;
  4. import org.apache.kafka.clients.consumer.ConsumerRecord;
  5. import org.apache.kafka.clients.consumer.ConsumerRecords;
  6. import org.apache.kafka.clients.consumer.KafkaConsumer;

  7. import java.time.Duration;
  8. import java.util.Arrays;
  9. import java.util.List;
  10. import java.util.Properties;
  11. import java.util.concurrent.CopyOnWriteArrayList;

  12. public class TestKafkaLevelConsumer  extends ShutdownableThread {
  13.    public KafkaConsumer  consumer;
  14.    private String topic;
  15.    //定义一个线程安全的集合
  16.    private List<ConsumerRecord> msgList=new CopyOnWriteArrayList<ConsumerRecord>();
  17.    public TestKafkaLevelConsumer(String topic) {
  18.        super("TestKafkaLevelConsumer", false);
  19.        Properties  properties=new Properties();
  20.        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.124.241:9092,192.168.124.242:9092,192.168.124.243:9092");
  21.        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"DemoConsumper");
  22.        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
  23.         properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
  24.        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
  25.        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000");
  26.        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
  27.        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer") ;
  28.        this.topic=topic;
  29.        consumer = new KafkaConsumer<>(properties);
  30.    }
  31.    //一个消费组只能有一个消费者消费到一个分区中的数据,不能多个消费者同时去消费分区中的数据
  32.    @Override
  33.    public void doWork() {
  34.        consumer.subscribe(Arrays.asList(topic));
  35.        while(true){
  36.            ConsumerRecords<String,String> records=consumer.poll(Duration.ofMillis(1000));
  37.            for(ConsumerRecord<String,String> record:records){
  38.                System.out.println("partation:"+record.partition()+"offset="+record.offset()+"key="+record.key()+"value="+record.value());
  39.            }
  40.        }
  41.    }

  42.    public static void main(String[] args) {
  43.        TestKafkaLevelConsumer testKafkaLevelConsumer =new TestKafkaLevelConsumer("test-topic1");
  44.        testKafkaLevelConsumer.start();
  45.    }
  46. }

运行的结果如下:一秒钟把所有的消息提取出来,然后就提交

之后是每5条提交一次

这就是手动提交的方式

如果这时把生者和消费者关掉,然后重新启动消费者,再注释手动提交

下面的属性的值

如果设置了earliest,如果 没有提交offset,下次再消费就会有重复消费的情况

在项目中解决的办法:

如果用kafka的话,内部维护了offset,首先将这个offset,在消息发送的同时,offset已经被同步到数据库中,会在数据库中维护同样的一个offset。消费的时候的offset由数据库指派的,而不是它来标识。比如在分布式的情况下做消费,事务的一致性是无法保证的,比如现在做了一个插入,插入的时候做了一个修改,这时的offset可能就会发生变化。消费完成和提交offset是原子的,要不全部提交。为了保证原子,数据库就是强一致性的处理数据。这时用数据库维护offset。如果消费成功,就更新offset。如果消费不成功,就不更新。如果更新Offset的时候,出现了网路故障等等的问题,没问题,因为数据库自带了事务,事务会自动回滚。消息不会丢失,这也是设计消息不可丢的方案。

offset:可以理解为被消费消息的递增量,可以看作成游标。但是游标只有前进,但是offset有前进和后退。

三、Kafka消息可靠性(offset是可靠消息的保证)

1、kafka消息的问题

kafka就比较适合高吞吐量并且允许少量数据丢失的场景,如果非要保证"消息只读取一次",可以使用 JMS。

kafka Producer消息发送有两种方式(配置参数 producer.type)

producer.type=sync(默认值): 后台线程中消息发送是同步方式,对应的类为

kafka.producer.SyncProducer;producer.type=async: 后台线程中消息发送是异步方式,对应

的类为 kafka.producer.AyncProducer;优点是可批量发送消息(消息个数达到

batch.num.messages=200 或时间达到 “ 时发送)、吞吐量佳,缺点是发送不及时可能导致失;

Kafka Producer 消息发送有三种确认方式(配置参数 acks):

acks=0: producer 不等待 Leader 确认,只管发出即可;最可能丢失消息,适用于高吞吐可丢失的业务;

acks=1(默认值): producer 等待 Leader 写入本地日志后就确认;之后 Leader 向 Followers 同

步时,如果 Leader 宕机会导致消息没同步而丢失,producer 却依旧认为成功;

acks=all/-1: producer 等待 Leader 写入本地日志、而且 Leader 向 Followers 同步完成后才会

确认;最可靠,但是性能最低。

Kafka Consumer 有两个接口:

Low-level API: 消费者自己维护 offset 等值,可以完全控制;

High-level API: 封装了对 parition 和 offset 的管理,使用简单;可能遇到 Consumer 取出消息并更新了 offset,但未处理消息即宕机,从而相当于消息丢失;extend  线程

Kafka 支持 3 种消息传递语义:

最多一次 -消息可能会丢失,但永远不会重新发送。consumer.poll();consumer.commitOffset(); processMsg(messages);

至少一次 -消息永远不会丢失,但可能会重新传递。consumer.poll(); processMsg(messages); consumer.commitOffset();

恰恰一次 - 这就是人们真正想要的,每条信息只传递一次。以事务来保证。

但是第三种没有写出来方法,因为kafka没有事务的定义。如果有事务的话,offset就好保障

了,就有了下面三个问题。

2 消息重复

根本原因:已经消费了数据,但是 offset 没提交。

外在原因:(1)消费数据后、提交 offset 前,线程被杀,或者线程自动退出系统了。

(2)设置 offset 为自动提交,consumer.close() 之前 consumer.unsubscribe();退订了主题,

导致offset没有提交。

(3)consumer 取了一批数据,尚未处理完毕时,达到了 session.timeout.ms,导致没有接收心

跳而挂掉,自动提交offset失败,下次会重复消费本批消息;

解决办法:

(1) 唯一 ID(offset,给消息的消费做一个标识) 保存在外部介质中,每次消费时根据它判断是否已处理;不是由kafka维护的。

(2) 如果在统计消费信息的时候,丢失几条关系不大,则无需理会;看业务场景。

(3) 如果消费者来不及处理,可以这样优化:增加分区以提高并行能力;增加消费者线程;关闭自动提交 enable.auto.commit=false,用手动提交。

第三种方案不可行,因为在增加分区的时候意味着消息会放到多个分区当中,搜索的路径会越来越长。根据索引去查。比如在做电商的项目的时候,如果消息重复 ,一个订单会有多个线程去处理订单信息。这个时候就完蛋了。所以订单不用kafka。订单用mq就可以了。日志信息用kafka比较合适-比如用户的行为分析。

3 消息丢失

根本原因:已经提交了 offset,但数据在内存中的异步线程尚未处理,线程就被杀掉。或者自

动退出,这时消息就真正的丢失了。

消息丢失解决方案:

同步模式下,确认机制设置为-1(不可为1),即让消息写入Leader和Follower之后再确认消息发送成功;

异步模式下,设置为不限制阻塞超时时间(不可为acks=0),当缓冲区满时不清空缓冲池,而是让生产者一直处于阻塞状态;

4 消息乱序 (如何保证kafka中消息按照顺序消费)

传统的队列,在并行处理时,由于网络故障或速度差异,尽管服务器传递是有序的,但消费者接收的顺序可能不一致;Kafka 在主题内部有分区,并行处理时,每个分区仅由消费者组中的一个消费者使用,确保了消费者是该分区的唯一读者,并按顺序使用这些数据。

但是它也仅仅是保证Topic的一个分区顺序处理,不能保证跨分区的消息先后处理顺序,除非只提供一个分区。

Kafka的分区分配策略

现在创建好了10个分区1个副本

这时在241这台机器上有三个topic。这里没有副本的概念。10个分区分布在不同的broker上面,因为副本的数量为1,说明只有由一个分区对应一个副本,这个时候就不会出现多个副本的问题。


这就是在Kafka上轮询的去存放分区

partition.assignmentStrategy 指定分区策略

Range 范围分区(默认的)

假如有10个分区,3个消费者,把分区按照序号排列0,1,2,3,4,5,6,7,8,9;消费者为C1,C2,C3,那么用分区数除以消费者数来决定每个Consumer消费几个Partition,除不尽的前面几个消费者将会多消费一个 最后分配结果如下

C1:0,1,2,3

C2:4,5,6

C3:7,8,9

生产者的消息代码如下:

  1. package com.weizhaoyang.partition;

  2. import org.apache.kafka.clients.producer.*;

  3. import java.util.Properties;

  4. public class ProducerTest {
  5.    public static void main(String[] args) throws InterruptedException {
  6.        Properties props = new Properties();
  7.        //kafka服务器地址
  8.        props.put("bootstrap.servers", "192.168.124.241:9092,192.168.124.242:9092,192.168.124.243:9092");
  9.        //ack是判断请求是否为完整的条件(即判断是否成功发送)。all将会阻塞消息,这种设置性能最低,但是最可靠。
  10.        props.put("acks", "1");
  11.        //retries,如果请求失败,生产者会自动重试,我们指定是0次,如果启用重试,则会有重复消息的可能性。
  12.        props.put("retries", 0);
  13.        //producer缓存每个分区未发送消息,缓存的大小是通过batch.size()配置设定的。值较大的话将会产生更大的批。并需要更多的内存(因为每个“活跃”的分区都有一个缓冲区)
  14.        props.put("batch.size", 16384);
  15.        //默认缓冲区可立即发送,即便缓冲区空间没有满;但是,如果你想减少请求的数量,可以设置linger.ms大于0.这将指示生产者发送请求之前等待一段时间
  16.        //希望更多的消息补填到未满的批中。这类似于tcp的算法,例如上面的代码段,可能100条消息在一个请求发送,因为我们设置了linger时间为1ms,然后,如果我们
  17.        //没有填满缓冲区,这个设置将增加1ms的延迟请求以等待更多的消息。需要注意的是,在高负载下,相近的时间一般也会组成批,即使是linger.ms=0。
  18.        //不处于高负载的情况下,如果设置比0大,以少量的延迟代价换取更少的,更有效的请求。
  19.        props.put("linger.ms", 1);
  20.        //buffer.memory控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值
  21.        //通过max.block.ms设定,之后他将抛出一个TimeoutExecption。
  22.        props.put("buffer.memory", 33554432);
  23.        //key.serializer和value.serializer示例:将用户提供的key和value对象ProducerRecord转换成字节,你可以使用附带的ByteArraySerizlizaer或StringSerializer处理简单的byte和String类型.
  24.        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  25.        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  26.        //设置kafka的分区数量
  27.        props.put("kafka.partitions", 12);

  28.        KafkaProducer<String, String> producer = new KafkaProducer(props);
  29.        for (int i = 0; i < 10; i++){
  30.            System.out.println("key-->key"+i+"  value-->vvv"+i);
  31.            producer.send(new ProducerRecord<String, String>("test-topic1", "key" + i, "vvv" + i), new Callback() {
  32.                @Override
  33.                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  34.                    System.out.println("分配到了分区:"+recordMetadata.partition());
  35.                }
  36.            });
  37.            Thread.sleep(1000);
  38.        }
  39.        producer.close();
  40.    }
  41. }

运行的结果如下:有的key被分到同一个分区的原因是,key的hash值在模上分区的总数是相同的,所以落到同一个分区上。

如果在上面的代码上,key如果为null的话

这时不会对key进行hash,这时会按照均等的分配到partition中去,这就是会增加搜随的量。

然后分别启动三个消费者,然后让生产者生产10条消息:

这时consumer1消费到了6,5,4分区


consumer2消费到了3,0,2,1


consumer3消费到了9,8,7

总结:由于是多个分区,不同的消费者,消费的是不同分区里的消息,因为一个消费组中的每一个消费者只能消费一个分区中的消息,一次不能消费多个分区中的消息。因为它们是一个组。每一个消费者的GROUP_ID_CONFIG的值是一样的,所以就归为一组了。因为这是以组的方式来订阅,而不是以一个消费者的方式来订阅。

如果是一个消费组中有两个消费者,另一个消费组中有一个消费者的话,这时是其中一个消费组中的两个消费者各消费5条消息,另一个消费者消费10条消息。如果没有配置groupId的话线程直接被终止。所以说在kafka里面以组为单位来进行消费。

如果有11个分区将会是:

C1:0,1,2,3

C2:4,5,6,7

C3:8,9,10

假如我们有两个主题T1,T2,分别有10个分区,最后的分配结果将会是这样:

C1:T1(0,1,2,3) T2(0,1,2,3)

C2:T1(4,5,6)      T2(4,5,6)

C3:T1(7,8,9)      T2(7,8,9)

在这种情况下,C1多消费了两个分区

RoundRobin 轮询分区

把所有的partition和consumer列出来,然后轮询consumer和partition,尽可能的让把partition均匀的分配给consumer

假如有3个Topic T0(三个分区P0-0,P0-1,P0-2),T1(两个分区P1-0,P1-1),T2(四个分区P2-0,P2-1,P2-2,P2-3)

有三个消费者:C0(订阅了T0,T1),C1(订阅了T1,T2),C2(订阅了T0,T2)!

那么分区过程如下图所示

在kafka里面有故障转移策略,当consumer减少以后会进行Rebalance,这时会重新计算负载的算法。

什么时候触发分区分配策略:

1.同一个Consumer Group内新增或减少Consumer

2.Topic分区发生变化:也会触发分配策略

Rebalance的执行

kafka提供了一个角色Coordinator来执行。当Consumer Group的第一个Consumer启动的时候,他会向kafka集群中的任意一台broker发送GroupCoordinatorRequest请求,broker会返回一个负载最小的broker设置为coordinator,之后该group的所有成员都会和coordinator进行协调通信

整个Rebalance分为两个过程 jionGroup和sysncJion

joinGroup过程

在这一步中,所有的成员都会向coordinator发送JionGroup请求,请求内容包括group_id,member_id.protocol_metadata等,coordinator会从中选出一个consumer作为leader,并且把组成员信息和订阅消息,leader信息,rebanlance的版本信息发送给consumer

Synchronizing Group State阶段

组成员向coordinator发送SysnGroupRequet请求,但是只有leader会发送分区分配的方案(分区分配的方案其实是在消费者确定的),当coordinator收到leader发送的分区分配方案后,会通过SysnGroupResponse把方案同步到各个consumer中

相关文章
|
2天前
|
消息中间件 存储 缓存
kafka(一)
kafka(一)
|
4月前
|
消息中间件 存储 分布式计算
|
4月前
|
消息中间件 分布式计算 Java
|
4月前
|
消息中间件 Java Kafka
Kafka
Kafka
40 1
|
4月前
|
消息中间件 存储 Java
玩转Kafka—初步使用
玩转Kafka—初步使用
45 0
|
10月前
|
消息中间件 缓存 算法
Kafka为什么这么快?
Kafka 是一个基于发布-订阅模式的消息系统,它可以在多个生产者和消费者之间传递大量的数据。Kafka 的一个显著特点是它的高吞吐率,即每秒可以处理百万级别的消息。那么 Kafka 是如何实现这样高得性能呢?本文将从七个方面来分析 Kafka 的速度优势。
60 1
|
10月前
|
消息中间件 开发框架 Java
113 Kafka介绍
113 Kafka介绍
66 0
|
消息中间件 缓存 Java
Kafka介绍
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。 Kafka是一种高吞吐量的分布式发布订阅消息系统,作为消息中间件来说都起到了系统间解耦、异步、削峰等作用,同时又提供了Kafka streaming插件包在应用端实现实时在线流处理,它可以收集并处理用户在网站中的所有动作流数据以及物联网设备的采样信息
154 0
|
消息中间件 分布式计算 Java
浅谈kafka 一
浅谈kafka 一
|
消息中间件 存储 缓存
kafka
kafka
350 0