6.Kafka数据存储流程和原理概述
6.1.Partition
- topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列
- 是以文件夹的形式存储在具体Broker本机上
6.2.LEO(LogEndOffset)
- 表示每个partition的log最后一条Message的位置
6.3.HW(HighWatermark)
- 表示partition各个replicas数据键同步且一致的offset位置,即表示allreplicas已经commit的位置
- HW之前的数据才是commit后的,对消费者才可见
- ISR集合里面最小leo
6.4.offset
- 每个partition都由一些列有序的、不可变的消息组成,这些消息被连续的追加到partition中
- partition中的每个消息都有一个连续的序号叫做offset,用于partition唯一标识一条信息
- 可以认为offset是partition中Message的id
6.5.Segment
- 每个partition又由多个segment file组成
- segment file 由2部分组成,分别为index file 和 data file(log file)
- 两个文件一一对应,后缀“.index”和".log"分别标识索引文件和数据文件
- 命名规则:partition的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset+1
6.6.Kafka高效文件存储设计特点
- kafka把topic中一个partition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完的文件,减少磁盘占用。
- 通过索引信息可以快速定位Message
- producer生产数据,要写入到log文件中,写的过程中一直追加到文件末尾,为顺序写,官网数据表明,同样的磁盘,顺序写能到600M/S,而随机写只是100K/S
7.SpringBoot2.x项目整合Kafka
7.1.引入kafka-clients依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.0</version> </dependency>
7.2.配置客户端
AdminClientConfig.BOOTSTARP_SERVERS_CONFIG,"ip:端口" AdminClient.create(配置信息);
/** * 设置admin客户端 * @return */ public static AdminClient initAdminClient(){ Properties properties = new Properties(); properties.setProperties(AdminClientConfig.BOOTSTARP_SERVERS_CONFIG,"112.74.55.160:9092"); AdminClient adminClient = AdminClient.create(properties); return adminClient; }
7.3.创建topic
NewTopic newTopic = new NewTopic(topic名称,分区数量,副本数量); adminClient.createTopics(Arrays.asList(newTopic)); //返回一个CreateTopicsResult createTopicsResult.all().get(); //异常处理
@Test public void createTopic(){ AdminClient adminClient = initAdminClient(); //2个分区,1个副本 NewTopic newTopic = new NewTopic(TOPIC_NAME,2,(short)1); CreateTopicsResult createTopicsResult = adminClient.createTopics(Arrays.asList(newTopic)); try { //future等待创建,成功不会有任何报错,失败或者超时会报错 createTopicsResult.all().get(); } catch (Exception e) { e.printStackTrace(); } System.out.println("ڠୌෛጱtopic"); }
7.4.删除topic
adminClient.deleteTopics(topic名称的list集合); //返回一个DeleteTopicsResult deleteTopicsResult.all().get();
@Test public void delTopicTest(){ AdminClient adminClient = initAdminClient(); DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("xdclass-test-topic")); try { deleteTopicsResult.all().get(); } catch (Exception e) { e.printStackTrace(); } }
7.5.列举topic-list
adminClient.listTopics(); adminClient.listTopics(options); //返回一个ListTopicsResult Set<String> topics = listTopics.names().get(); //得到一个set集合遍历 //是否查看内部topic,可以不用 ListTopicsOptions options = new ListTopicsOptions(); options.listInternal(true);
@Test public void listTopic(){ AdminClient adminClient = initAdminClient(); //是否查看内部topic,可以不用 ListTopicsOptions options = new ListTopicsOptions(); options.listInternal(true); ListTopicsResult listTopics = adminClient.listTopics(options); Set<String> topics = listTopics.names().get(); for (String topic : topics) { System.err.println(topic); } }
7.6.增加分区数量
NewPartitions.increateTo(5); infoMap.put(TOPIC_NAME, newPartitions); adminClient.createPartitions(infoMap);
@Test public void incrPartitionsTest() throws Exception{ Map<String, NewPartitions> infoMap = new HashMap<>(); NewPartitions newPartitions = NewPartitions.increaseTo(5); AdminClient adminClient = initAdminClient(); infoMap.put(TOPIC_NAME, newPartitions); CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(infoMap); createPartitionsResult.all().get(); }
7.7.查看topic详情
adminClient.describeTopics(Arrays.asList(TOPIC_NAME))
@Test public void getTopicInfo() throws Exception { AdminClient adminClient = initAdminClient(); DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(TOPIC_NAME)); Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get(); Set<Map.Entry<String, TopicDescription>> entries = stringTopicDescriptionMap.entrySet(); entries.stream().forEach((entry)-> System.out.println("name :"+entry.getKey()+" , desc:"+ entry.getValue())); }
8.producer发送到Broker分区策略
8.1.生产者发送消息到broker的策略
- 如果指定Partition ID,则PR(ProducerRecord)被发送到指定的Partition。
- 如果未指定Partition ID,但是指定了Key,PR就会按照Key的哈希取模发送到对应的Partition
- 如果未指定Partition ID,也未指定Key,PR会按照默认round-robin轮询模式发送到每个Partition
- 如果即制定了Partition ID,也指定了Key,PR会被发送到指定的Partition
注意:Partition有多个副本,但只有一个replicationLeader复制该Partition和生产者消费者交互,消费者默认的消费Partition是range模式。
8.2.生产者到broker发送流程
Kafka的客户端发送数据到服务器,会经过内存缓冲区(默认是16KB),通过KafkaProducer发送出去的消息都是先进入到客户端本地的内存缓冲区里,然后把很多消息收集到Batch里面,然后在通过Sender线程发送到Broker上面,这样才尽可能的提高性能。
8.3.生产者常见配置
#Kafka地址,即broker的ip地址 bootstrap.servers #当producer向leader发送数据时,可以通过request.required.acks来设置数据可靠性的级别,分别是0,1,all acks #请求失败,生产者会自动重试,默认是0次,如果启动重试,则会有消息重复消费的可能性 retries #每个分区未发送消息的总字节大小,超过的化就会提交到服务端broker,默认是16kb batch.size #默认是0,指消息立即发送,即便是batch.size缓冲区还没满,到达linger.ms设置的秒数,也会提交消息到服务器 linger.ms #buffer.memory用来约束KafkaProducer能够使用的缓冲区大小,默认是32MB #注意:buffer.memory不能设置的太小,否则一旦写满,就会阻塞用户线程,不能在向kafka里写消息了 #buffer.momery一定要比batch.size设置的大,否则会报申请内存不足的错误。 buffer.memory #key的序列化器,将用户提供的 key和value对象ProducerRecord 进行序列化处理,key.serializer必须被设置,即使消息中没有指定key,序列化器必须是一个实现org.apache.kafka.common.serialization.Serializer接口的类,将key序列化成字节数组。 key.serializer value.serializer
9.producer API讲解
9.1.封装配置属性
public static Properties getProperties(){ Properties props = new Properties(); //kafka服务器地址 props.put("bootstrap.servers", "112.74.55.160:9092"); //props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "112.74.55.160:9092"); //当producer向leader发下哦那个数据时,可以通过request.required.acks参数来设置数据可靠性级别,0,1,all props.put("acks", "all"); //props.put(ProducerConfig.ACKS_CONFIG, "all"); //表示请求失败时,生产者会尝试自动重连,0表示不重连(默认),如果开启重连的话,可能会导致消息重复消费 props.put("retries", 0); //props.put(ProducerConfig.RETRIES_CONFIG, 0); //设置batch缓冲区的大小,表示当到达缓冲区的大小时,sender线程将拿取batch中的消息,默认是16kb props.put("batch.size", 16384); //表示缓冲区消息停留的时间,默认是0,立即发送,配置之后即使batch中的数据没有达到设定的值,到达时间后也会发送消息 props.put("linger.ms", 1); //用来约束kafka Producer能够使用的内存缓冲区的大小,默认是32MB props.put("buffer.memory", 33554432); //序列化机制 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); return props; }
9.2.生产者投递消息同步发送
send()方法是异步的,添加消息到缓冲区等待发送,并立即返回 生产者将单个消息批量在一起发送提高效率,即batch.size和linger.ms结合 实现同步发送:一条消息发送之后,会阻塞当前线程,直至返回ack 发送消息后返回一个Future对象,调用get即可 消息发送主要是两个线程:Main主线程,Sender线程 main线程发送消息到RecordAccumulator即返回 sender线程从RecordAccumulator拉取信息发送到broker batch.size和linger.ms两个参数可以影响 sender 线程发送次数
@Test public void testSend(){ Properites props = getProperties(); Producer<String,String> producer = new KafkaProducer<>(props); for(int i = 0;i < 3;i++){ Future<RecordMetadata> future = producer.send(TOPIC_NAME,"xdclass-key"+i,"xdclass-value"+i); try{ RecordMetadata recordMetadata = future.get(); //不关心结果的话,可以不用这部 System.out.println("发送状态:"+recordMetadata.toString()); }catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } //记得要关必 producer.close(); }
9.3.异步发送配置回调函数
发送消息配置回调函数即可,该回调方法会在Producer收到ack时被调用,为异步调用 回调函数有两个参数RecordMetadata和Exception,如果Exception是null,则发送消息成功,否则失败
@Test public void testSendCallback(){ Properties properties = getProperties(); Producer<String, String> producer = new KafkaProducer<>(properties); for (int i = 0; i < 3; i++) { producer.send(new ProducerRecord<>(TOPIC_NAME, "xdclass-key" + i, "xdclass-value" + i), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if(e == null){ System.out.println("发送状态:"+recordMetadata.toString()); }else{ e.printStackTrace(); } } }); } producer.close(); }
9.4.producer发送消息到指定分区
发送到指定topic的第五个分区
@Test public void testSendCallbackAndPartition(){ Properties properties = getProperties(); Producer<String, String> producer = new KafkaProducer<>(properties); for (int i = 0; i < 3; i++) { //在ProducerRecord中配置,第二个参数 producer.send(new ProducerRecord<>(TOPIC_NAME, 4,"xdclass-key" + i, "xdclass-value" + i), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if(e == null){ System.out.println("发送状态:"+recordMetadata.toString()); }else{ e.printStackTrace(); } } }); } producer.close(); }
10.ProducerRecord介绍
10.1.ProducerRecord(简称PR)
发送给Kafka Broker的key/value键值对,封装基础数据信息
--Topic(topic名称) --Partition ID(可选,分区的ID) --Key(可选,指定的key) --value(发送的消息value)
key默认是null,大多数应用程序会用到key
- 如果key为空,kafka会使用默认的partitioner,使用RoundRobin算法将消息均衡的分布在各个partition上
- 如果key不为空,kafka使用自己实现的hash方法对key进行散列,决定消息改写到Topic的那个partition,拥有相同的key的消息会被写道同一个partition上,实现顺序消息
11.生产者自定义partition分区规则
11.1.源码解读默认分区器
org.apache.kafka.clients.producer.internals.DefaultPartitioner
11.2.自定义Partitioner类实现Partitioner接口
public class MyPartitioner implements Partitioner { //在partition方法里实现自定义的配置规则 @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if("xdclass".equals(key)) { return 0; } //使用hash值取模,确定分区(默认的也是这个方式) return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } }
11.3.配置自定义的Partitioner生效
在配置的对象里加上配置props.put(“partitioner.class”,“com.lixiang.config.MyPartitioner”); //自定义的配置路径
props.put("partitioner.class", "com.lixiang.config.MyPartitioner");