【消息队列】一文搞定大数据消息队列Kafka2

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 【消息队列】一文搞定大数据消息队列Kafka

6.Kafka数据存储流程和原理概述

6.1.Partition

  • topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列
  • 是以文件夹的形式存储在具体Broker本机上




6b57a36f2e87460d94542b100266ef78.jpg

6.2.LEO(LogEndOffset)

  • 表示每个partition的log最后一条Message的位置

6.3.HW(HighWatermark)

  • 表示partition各个replicas数据键同步且一致的offset位置,即表示allreplicas已经commit的位置
  • HW之前的数据才是commit后的,对消费者才可见
  • ISR集合里面最小leo

adda21e9cde948e8a71253ded8c23852.jpg

6.4.offset

  • 每个partition都由一些列有序的、不可变的消息组成,这些消息被连续的追加到partition中
  • partition中的每个消息都有一个连续的序号叫做offset,用于partition唯一标识一条信息
  • 可以认为offset是partition中Message的id

6.5.Segment

  • 每个partition又由多个segment file组成
  • segment file 由2部分组成,分别为index file 和 data file(log file)
  • 两个文件一一对应,后缀“.index”和".log"分别标识索引文件和数据文件
  • 命名规则:partition的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset+1

bd99c1f8947c4a5294141a17c71eca04.jpg



7be6fd86cabd416f907aaff2963b6a2e.jpg

6.6.Kafka高效文件存储设计特点

  • kafka把topic中一个partition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完的文件,减少磁盘占用。
  • 通过索引信息可以快速定位Message
  • producer生产数据,要写入到log文件中,写的过程中一直追加到文件末尾,为顺序写,官网数据表明,同样的磁盘,顺序写能到600M/S,而随机写只是100K/S

7.SpringBoot2.x项目整合Kafka

7.1.引入kafka-clients依赖

<dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.4.0</version>
</dependency>

7.2.配置客户端

AdminClientConfig.BOOTSTARP_SERVERS_CONFIG,"ip:端口"
AdminClient.create(配置信息);
/**
 * 设置admin客户端
 * @return
 */
public static AdminClient initAdminClient(){
    Properties properties = new Properties();
    properties.setProperties(AdminClientConfig.BOOTSTARP_SERVERS_CONFIG,"112.74.55.160:9092");
    AdminClient adminClient = AdminClient.create(properties);
    return adminClient;
}

7.3.创建topic

NewTopic newTopic = new NewTopic(topic名称,分区数量,副本数量);
adminClient.createTopics(Arrays.asList(newTopic)); //返回一个CreateTopicsResult
createTopicsResult.all().get(); //异常处理
@Test
public void createTopic(){
    AdminClient adminClient = initAdminClient();
    //2个分区,1个副本
    NewTopic newTopic = new NewTopic(TOPIC_NAME,2,(short)1);
    CreateTopicsResult createTopicsResult = adminClient.createTopics(Arrays.asList(newTopic));
    try {
        //future等待创建,成功不会有任何报错,失败或者超时会报错
      createTopicsResult.all().get();
     } catch (Exception e) {
     e.printStackTrace();
     }
  System.out.println("ڠୌෛጱtopic");
} 

7.4.删除topic

adminClient.deleteTopics(topic名称的list集合);  //返回一个DeleteTopicsResult
deleteTopicsResult.all().get();
@Test
public void delTopicTest(){
    AdminClient adminClient = initAdminClient();
    DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("xdclass-test-topic"));
    try {
        deleteTopicsResult.all().get();
     } catch (Exception e) {
      e.printStackTrace();
     }
}

7.5.列举topic-list

adminClient.listTopics();
adminClient.listTopics(options); //返回一个ListTopicsResult
Set<String> topics = listTopics.names().get(); //得到一个set集合遍历
//是否查看内部topic,可以不用
ListTopicsOptions options = new ListTopicsOptions();
options.listInternal(true);
@Test
public void listTopic(){
    AdminClient adminClient = initAdminClient();
    //是否查看内部topic,可以不用
    ListTopicsOptions options = new ListTopicsOptions();
    options.listInternal(true);
    ListTopicsResult listTopics = adminClient.listTopics(options);
    Set<String> topics = listTopics.names().get();
    for (String topic : topics) {
    System.err.println(topic);
  }
}

7.6.增加分区数量

NewPartitions.increateTo(5);
infoMap.put(TOPIC_NAME, newPartitions);
adminClient.createPartitions(infoMap);
@Test
 public void incrPartitionsTest() throws Exception{
  Map<String, NewPartitions> infoMap = new HashMap<>();
  NewPartitions newPartitions = NewPartitions.increaseTo(5);
  AdminClient adminClient = initAdminClient();
  infoMap.put(TOPIC_NAME, newPartitions);
  CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(infoMap);
  createPartitionsResult.all().get();
 }

7.7.查看topic详情

adminClient.describeTopics(Arrays.asList(TOPIC_NAME))
@Test
public void getTopicInfo() throws Exception {
  AdminClient adminClient = initAdminClient();
  DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(TOPIC_NAME));
  Map<String, TopicDescription> stringTopicDescriptionMap =
describeTopicsResult.all().get();
  Set<Map.Entry<String, TopicDescription>> entries = stringTopicDescriptionMap.entrySet();
  entries.stream().forEach((entry)-> System.out.println("name :"+entry.getKey()+" , desc:"+ entry.getValue()));
 }

8.producer发送到Broker分区策略

8.1.生产者发送消息到broker的策略

  • 如果指定Partition ID,则PR(ProducerRecord)被发送到指定的Partition。
  • 如果未指定Partition ID,但是指定了Key,PR就会按照Key的哈希取模发送到对应的Partition
  • 如果未指定Partition ID,也未指定Key,PR会按照默认round-robin轮询模式发送到每个Partition
  • 如果即制定了Partition ID,也指定了Key,PR会被发送到指定的Partition

注意:Partition有多个副本,但只有一个replicationLeader复制该Partition和生产者消费者交互,消费者默认的消费Partition是range模式。

8.2.生产者到broker发送流程

Kafka的客户端发送数据到服务器,会经过内存缓冲区(默认是16KB),通过KafkaProducer发送出去的消息都是先进入到客户端本地的内存缓冲区里,然后把很多消息收集到Batch里面,然后在通过Sender线程发送到Broker上面,这样才尽可能的提高性能。



4c16afba511743ca93d48cfcfaee9f9c.jpg

8.3.生产者常见配置

#Kafka地址,即broker的ip地址
bootstrap.servers
#当producer向leader发送数据时,可以通过request.required.acks来设置数据可靠性的级别,分别是0,1,all
acks
#请求失败,生产者会自动重试,默认是0次,如果启动重试,则会有消息重复消费的可能性
retries
#每个分区未发送消息的总字节大小,超过的化就会提交到服务端broker,默认是16kb
batch.size
#默认是0,指消息立即发送,即便是batch.size缓冲区还没满,到达linger.ms设置的秒数,也会提交消息到服务器
linger.ms
#buffer.memory用来约束KafkaProducer能够使用的缓冲区大小,默认是32MB
#注意:buffer.memory不能设置的太小,否则一旦写满,就会阻塞用户线程,不能在向kafka里写消息了
#buffer.momery一定要比batch.size设置的大,否则会报申请内存不足的错误。
buffer.memory
#key的序列化器,将用户提供的 key和value对象ProducerRecord 进行序列化处理,key.serializer必须被设置,即使消息中没有指定key,序列化器必须是一个实现org.apache.kafka.common.serialization.Serializer接口的类,将key序列化成字节数组。
key.serializer
value.serializer

9.producer API讲解

9.1.封装配置属性

public static Properties getProperties(){
        Properties props = new Properties();
      //kafka服务器地址
        props.put("bootstrap.servers", "112.74.55.160:9092");
        //props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "112.74.55.160:9092");
      //当producer向leader发下哦那个数据时,可以通过request.required.acks参数来设置数据可靠性级别,0,1,all
        props.put("acks", "all");
        //props.put(ProducerConfig.ACKS_CONFIG, "all");
      //表示请求失败时,生产者会尝试自动重连,0表示不重连(默认),如果开启重连的话,可能会导致消息重复消费
        props.put("retries", 0);
        //props.put(ProducerConfig.RETRIES_CONFIG, 0);
      //设置batch缓冲区的大小,表示当到达缓冲区的大小时,sender线程将拿取batch中的消息,默认是16kb
        props.put("batch.size", 16384);
      //表示缓冲区消息停留的时间,默认是0,立即发送,配置之后即使batch中的数据没有达到设定的值,到达时间后也会发送消息
        props.put("linger.ms", 1);
      //用来约束kafka Producer能够使用的内存缓冲区的大小,默认是32MB
        props.put("buffer.memory", 33554432);
    //序列化机制
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        return props;
    }

9.2.生产者投递消息同步发送

send()方法是异步的,添加消息到缓冲区等待发送,并立即返回
生产者将单个消息批量在一起发送提高效率,即batch.size和linger.ms结合
实现同步发送:一条消息发送之后,会阻塞当前线程,直至返回ack
发送消息后返回一个Future对象,调用get即可
消息发送主要是两个线程:Main主线程,Sender线程
main线程发送消息到RecordAccumulator即返回
sender线程从RecordAccumulator拉取信息发送到broker
batch.size和linger.ms两个参数可以影响 sender 线程发送次数
@Test
public void testSend(){
    Properites props = getProperties();
    Producer<String,String> producer = new KafkaProducer<>(props);
    for(int i = 0;i < 3;i++){
        Future<RecordMetadata> future = producer.send(TOPIC_NAME,"xdclass-key"+i,"xdclass-value"+i);
        try{
            RecordMetadata recordMetadata = future.get(); //不关心结果的话,可以不用这部
            System.out.println("发送状态:"+recordMetadata.toString());
        }catch (InterruptedException e) {
                e.printStackTrace();
        } catch (ExecutionException e) {
                e.printStackTrace();
        }
    }
    //记得要关必
    producer.close();
}

f56a0829692d45c99f133afe53d528a4.jpg

9.3.异步发送配置回调函数

发送消息配置回调函数即可,该回调方法会在Producer收到ack时被调用,为异步调用
回调函数有两个参数RecordMetadata和Exception,如果Exception是null,则发送消息成功,否则失败
@Test
    public void testSendCallback(){
        Properties properties = getProperties();
        Producer<String, String> producer = new KafkaProducer<>(properties);
        for (int i = 0; i < 3; i++) {
            producer.send(new ProducerRecord<>(TOPIC_NAME, "xdclass-key" + i, "xdclass-value" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if(e == null){
                        System.out.println("发送状态:"+recordMetadata.toString());
                    }else{
                        e.printStackTrace();
                    }
                }
            });
        }
        producer.close();
    }

9.4.producer发送消息到指定分区

发送到指定topic的第五个分区

@Test
    public void testSendCallbackAndPartition(){
        Properties properties = getProperties();
        Producer<String, String> producer = new KafkaProducer<>(properties);
        for (int i = 0; i < 3; i++) {
      //在ProducerRecord中配置,第二个参数
            producer.send(new ProducerRecord<>(TOPIC_NAME, 4,"xdclass-key" + i, "xdclass-value" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if(e == null){
                        System.out.println("发送状态:"+recordMetadata.toString());
                    }else{
                        e.printStackTrace();
                    }
                }
            });
        }
        producer.close();
    }

10.ProducerRecord介绍

10.1.ProducerRecord(简称PR)

发送给Kafka Broker的key/value键值对,封装基础数据信息

--Topic(topic名称)
--Partition ID(可选,分区的ID)
--Key(可选,指定的key)
--value(发送的消息value)

key默认是null,大多数应用程序会用到key

  • 如果key为空,kafka会使用默认的partitioner,使用RoundRobin算法将消息均衡的分布在各个partition上
  • 如果key不为空,kafka使用自己实现的hash方法对key进行散列,决定消息改写到Topic的那个partition,拥有相同的key的消息会被写道同一个partition上,实现顺序消息

11.生产者自定义partition分区规则

11.1.源码解读默认分区器

org.apache.kafka.clients.producer.internals.DefaultPartitioner


efdbe4b2549142c9826f668bf75aa21b.jpg

11.2.自定义Partitioner类实现Partitioner接口

public class MyPartitioner implements Partitioner {
    //在partition方法里实现自定义的配置规则
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if("xdclass".equals(key)) {
            return 0;
        }
        //使用hash值取模,确定分区(默认的也是这个方式)
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
    @Override
    public void close() {
    }
    @Override
    public void configure(Map<String, ?> configs) {
    }
}

11.3.配置自定义的Partitioner生效

在配置的对象里加上配置props.put(“partitioner.class”,“com.lixiang.config.MyPartitioner”); //自定义的配置路径

props.put("partitioner.class", "com.lixiang.config.MyPartitioner");


相关文章
|
1月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
135 0
|
21天前
|
消息中间件 Java Kafka
初识Apache Kafka:搭建你的第一个消息队列系统
【10月更文挑战第24天】在数字化转型的浪潮中,数据成为了企业决策的关键因素之一。而高效的数据处理能力,则成为了企业在竞争中脱颖而出的重要武器。在这个背景下,消息队列作为连接不同系统和服务的桥梁,其重要性日益凸显。Apache Kafka 是一款开源的消息队列系统,以其高吞吐量、可扩展性和持久性等特点受到了广泛欢迎。作为一名技术爱好者,我对 Apache Kafka 产生了浓厚的兴趣,并决定亲手搭建一套属于自己的消息队列系统。
43 2
初识Apache Kafka:搭建你的第一个消息队列系统
|
1月前
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
40 3
|
1月前
|
消息中间件 druid 大数据
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
32 2
|
1月前
|
消息中间件 分布式计算 druid
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
54 1
|
25天前
|
消息中间件 中间件 Kafka
解锁Kafka等消息队列中间件的测试之道
在这个数字化时代,分布式系统和消息队列中间件(如Kafka、RabbitMQ)已成为日常工作的核心组件。本次公开课由前字节跳动资深专家KK老师主讲,深入解析消息队列的基本原理、架构及测试要点,涵盖功能、性能、可靠性、安全性和兼容性测试,并探讨其主要应用场景,如应用解耦、异步处理和限流削峰。课程最后设有互动答疑环节,助你全面掌握消息队列的测试方法。
|
1月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
131 0
|
1月前
|
消息中间件 资源调度 大数据
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
41 0
|
4月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。