Kafka 分区备份实战

本文涉及的产品
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,182元/月
简介: 1.概述   在 Kafka 集群中,我们可以对每个 Topic 进行一个或是多个分区,并为该 Topic 指定备份数。这部分元数据信息都是存放在 Zookeeper 上,我们可以使用 zkCli 客户端,通过 ls 和 get 命令来查看元数据信息。

1.概述

  在 Kafka 集群中,我们可以对每个 Topic 进行一个或是多个分区,并为该 Topic 指定备份数。这部分元数据信息都是存放在 Zookeeper 上,我们可以使用 zkCli 客户端,通过 ls 和 get 命令来查看元数据信息。通过 log.dirs 属性控制消息存放路径,每个分区对应一个文件夹,文件夹命名方式为:TopicName-PartitionIndex,该文件夹下存放这该分区的所有消息和索引文件,如下图所示:

2.内容

  Kafka 集群在生产消息入库的时候,通过 Key 来进行分区存储,按照相应的算法,生产分区规则,让所生产的消息按照该规则分布到不同的分区中,以达到水平扩展和负载均衡。而我们在消费这些消息的时候,可以使用多线程来消费该 Topic 下的所有分区中的消息。

  分区规则的制定,通过实现 kafka.producer.Partitioner 接口,该接口我们可以进行重写,按照自己的方式去实现分区规则。如下,我们按照 Key 的 Hash 值,然后取模得到分区索引,代码如下所示:

package cn.hadoop.hdfs.kafka.partition;

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

/**
 * @Date Nov 3, 2016
 *
 * @Author dengjie
 *
 * @Note 先 Hash 再取模,得到分区索引
 */
public class CustomerPartitioner implements Partitioner {

    public CustomerPartitioner(VerifiableProperties props) {
    }

    public int partition(Object key, int numPartitions) {
        int partition = 0;
        String k = (String) key;
        partition = Math.abs(k.hashCode()) % numPartitions;
        return partition;
    }

}

  在创建 Topic 的时候,若按照上述规则创建分区,分区数最后为 Brokers 的整数倍,这样才能发挥其负载均衡的作用,比如:当前我们集群节点由 3 个 Broker 组成,如下图所示:

2.1 创建分区

  我们在创建分区的时候,可以通过 Kafka 提供的客户端命令进行创建,如下,我们创建一个6分区,3备份的一个 Topic,命令如下所示:

./kafka-topics.sh --create --zookeeper k1:2181,k2:2181,k3:2181 --replication-factor 3 --partitions 6 --topic ke_test

  这里需要注意的是,指定备份数的时候,备份数要小于等于 Brokers 数。否则创建失败。在创建分区的时候,假设,我们只创建 2 个分区,而我们上述图中, Brokers 有 3 个,会造成有一个 Broker 上没有该 Topic 的分区,以致分布不均。

2.2 分区入库

  一般,我们在入库消息的时候,都有使用 Kafka 的 API,如下,我们使用生产 API ,按照上述的 Hash 取模规则,进行分区入库,代码如下所示:

package cn.hadoop.hdfs.kafka.partition;

import java.util.List;
import java.util.Properties;

import cn.hadoop.hdfs.kafka.partition.data.FileRead;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
 * @Date Nov 3, 2016
 *
 * @Author dengjie
 *
 * @Note 按照先 Hash 再取模的规则,进行分区入库
 */
public class PartitionerProducer {
    public static void main(String[] args) {
        producerData();
    }

    private static void producerData() {
        Properties props = new Properties();
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("metadata.broker.list", "k1:9092,k2:9092,k3:9092");
        props.put("partitioner.class", "cn.hadoop.hdfs.kafka.partition.CustomerPartitioner");
        Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props));
        String topic = "ke_test";
        List<String> list = FileRead.readData();
        for (int i = 0; i < list.size(); i++) {
            String k = "key" + i;
            String v = new String(list.get(i));
            producer.send(new KeyedMessage<String, String>(topic, k, v));
            if (i == (list.size() - 1)) {
                return;
            }
        }
        producer.close();
    }
}

  这里,我们分析发现,生产者在生产消息入库时,会按照 CustomerPartitioner 的规则,进行分区入库,在入库时,将 Key 先做 Hash,然后分区数取模(这里分区数是 6).我们计算可以得到一下信息:

hashCode("key0") % 6 = 1
hashCode("key1") % 6 = 2
hashCode("key2") % 6 = 3
hashCode("key3") % 6 = 4
hashCode("key4") % 6 = 5
hashCode("key5") % 6 = 0
// ... 以此循环

  按照该表述规则进行分区入库。

2.3 分区入库验证

  接下里,我们通过 Kafka 的消费者 API 来验证,在消费时,消费 Topic 各分区的详情,代码如下所示:

package cn.hadoop.hdfs.kafka.partition;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

/**
 * @Date Nov 3, 2016
 *
 * @Author dengjie
 *
 * @Note 通过 Kafka 的消费者 API 验证分区入库的消息
 */
public class PartitionerConsumer {
    public static void main(String[] args) {
        String topic = "ke_test";
        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while (it.hasNext()) {
            MessageAndMetadata<byte[], byte[]> mam = it.next();
            System.out.println("consume: Partition [" + mam.partition() + "] Message: [" + new String(mam.message())
                    + "] ..");
        }

    }

    private static ConsumerConfig createConsumerConfig() {
        Properties props = new Properties();
        props.put("group.id", "group1");
        props.put("zookeeper.connect", "zk1:2181,zk2:2181,zk3:2181");
        props.put("zookeeper.session.timeout.ms", "40000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");
        return new ConsumerConfig(props);
    }
}

  这里笔者只是验证消费数据,若在实际生产线上,需将上述单线程消费改造成多线程消费,来提升处理消息的能力。

2.4 验证结果

  这里,我们线运行生产者,让其生产消息,并分区入库;然后,在启动消费者,消费消息验证其结果,如下图所示:

3.总结

  需要注意的是,分区数建议为 Brokers 的整数倍,让其达到均匀分布;备份数必须小于等于 Brokers。以及,多线程消费的控制,其线程数建议和分区数相等。

4.结束语

  这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

联系方式:
邮箱:smartloli.org@gmail.com
Twitter: https://twitter.com/smartloli
QQ群(Hadoop - 交流社区1): 424769183
温馨提示:请大家加群的时候写上加群理由(姓名+公司/学校),方便管理员审核,谢谢!

热爱生活,享受编程,与君共勉!


作者:哥不是小萝莉 [关于我][犒赏

出处:http://www.cnblogs.com/smartloli/

转载请注明出处,谢谢合作!

目录
相关文章
|
安全 机器人 API
简单几步,钉钉机器人秒变通义千问对话机器人
通过阿里云计算巢AppFlow平台,无需编码,只需简单几步,即可将钉钉机器人转化为通义千问对话机器人。首先在灵积模型服务平台获取API Key,然后在AppFlow中配置连接器,授权并保存Webhook Url。在钉钉中创建自定义机器人,选择Outgoing功能,填写签名和Webhook地址。最后,@机器人即可开始对话。此外,还提供了通过钉钉开放平台创建机器人的步骤。AppFlow简化了集成过程,加速了企业自动化服务流程。
907 0
|
敏捷开发 人工智能 测试技术
从 0 开始构建知识图谱的 5 个启动建议
Gartner 在《2023 年人工智能技术成熟度曲线》报告中,建议企业可以考虑采取以下行动来开启知识图谱:
解决办法:gpg : 从公钥服务器接收失败:公钥服务器错误
解决办法:gpg : 从公钥服务器接收失败:公钥服务器错误
8558 0
|
存储 缓存 NoSQL
Redis常见面试题(二):redis分布式锁、redisson、主从一致性、Redlock红锁;Redis集群、主从复制,哨兵模式,分片集群;Redis为什么这么快,I/O多路复用模型
redis分布式锁、redisson、可重入、主从一致性、WatchDog、Redlock红锁、zookeeper;Redis集群、主从复制,全量同步、增量同步;哨兵,分片集群,Redis为什么这么快,I/O多路复用模型——用户空间和内核空间、阻塞IO、非阻塞IO、IO多路复用,Redis网络模型
Redis常见面试题(二):redis分布式锁、redisson、主从一致性、Redlock红锁;Redis集群、主从复制,哨兵模式,分片集群;Redis为什么这么快,I/O多路复用模型
|
监控 负载均衡 网络协议
OSPF在小型网络中的应用:简化配置与高效管理
OSPF在小型网络中的应用:简化配置与高效管理
487 1
|
存储 运维 网络协议
运维的基本概念:服务器和网络基础知识
运维的基本概念:服务器和网络基础知识
1009 0
运维的基本概念:服务器和网络基础知识
|
Docker 容器
docker中自定义网络
【10月更文挑战第5天】
226 3
|
监控 数据处理 开发者
利用Lua代码简化局域网管理软件开发
使用Lua脚本语言可以提升局域网管理软件的开发效率和代码可维护性。示例包括:使用LuaSocket扫描局域网设备;通过动态加载和应用配置文件展示配置管理;利用实时监控功能,当网络流量超过阈值时触发警报;以及通过HTTP POST自动提交监控数据到服务器。Lua的简洁语法和强大功能简化了网络管理和自动化任务。
296 3
|
Java 关系型数据库 MySQL
【Java Spring开源项目】新蜂(NeeBee)商城项目运行、分析、总结
【Java Spring开源项目】新蜂(NeeBee)商城项目运行、分析、总结
822 4
|
运维 Linux 网络安全
利用群晖NAS+shell脚本实现运维命令执行结果文件自动上传
利用群晖NAS+shell脚本实现运维命令执行结果文件自动上传
983 0