Kudu存储实战笔记

简介:   有人会问,为啥要用这个叫啥Kudu的,Kudu是啥?  就像官网所说,Kudu是一个针对Apache hadoop 平台而开发的列式存储管理器,在本菜鸟看来,它是一种介于hdfs与hbase的一种存储。

  有人会问,为啥要用这个叫啥Kudu的,Kudu是啥?

  就像官网所说,Kudu是一个针对Apache hadoop 平台而开发的列式存储管理器,在本菜鸟看来,它是一种介于hdfs与hbase的一种存储。它的优势在于:

  1、OLAP工作的快速处理,也就是针对于查询,很快,很牛逼。

  2、针对同时运行顺序和随机工作负载的情况性能很好。

  3、高可用,Table server和master使用Raft Consensus Algorithm节点来保证高可用,什么是Raft Consunsus Algorith?参考:https://www.cnblogs.com/mindwind/p/5231986.html),只要有一半以上的副本可用,该tablet便可用于读写。

  4、结构化数据模型(可以理解为带schema)。

 

   该图显示了一个具有三个 master 和多个 tablet server 的 Kudu 集群,每个服务器都支持多个 tablet。它说明了如何使用 Raft 共识来允许 master 和 tablet server 的 leader 和 f ollow。此外,tablet server 可以成为某些 tablet 的 leader,也可以是其他 tablet 的 follower。leader 以金色显示,而 follower 则显示为蓝色。

  下面是一些基本概念:

  Table(表)

  一张 talbe 是数据存储在 Kudu 的位置。表具有 schema 和全局有序的 primary key(主键)。table 被分成称为 tablets 的 segments。

  Tablet

  一个 tablet 是一张 table 连续的 segment,与其它数据存储引擎或关系型数据库中的 partition(分区)相似。给定的 tablet 冗余到多个 tablet 服务器上,并且在任何给定的时间点,其中一个副本被认为是 leader tablet。任何副本都可以对读取进行服务,并且写入时需要在为 tablet 服务的一组 tablet server之间达成一致性。

  Tablet Server

  一个 tablet server 存储 tablet 和为 tablet 向 client 提供服务。对于给定的 tablet,一个 tablet server 充当 leader,其他 tablet server 充当该 tablet 的 follower 副本。只有 leader服务写请求,然而 leader 或 followers 为每个服务提供读请求。leader 使用 Raft Consunsus Algorithm来进行选举 。一个 tablet server 可以服务多个 tablets ,并且一个 tablet 可以被多个 tablet servers 服务着。

 

  具体我还没有那么深入,写了些api调用玩了一把,下面慢慢讲述,Kudu的API比较恶心的哈。。

  kudu的sql语法与传统的sql语法比较相似,但也不尽相同,直接解析时,具体sql语法请参考官网,下面以类似hive metastore表结构的方式封装了下。以下列sql为例:

    create table combined_t6 (x int64, s string, s2 string, primary key (x, s))
    partition by hash (x) partitions 10, range (x)
    (
      partition 0 <= values <= 49, partition 50 <= values <= 100
    ) REPLICAS 1

    public Boolean create(Table table,String operator) {
        LOGGER.info("kudu Table properties:" + table.getKvInfos().toString());
        List<ColumnSchema> columns = new ArrayList(table.getTableColumnList().size());
       KuduTableGenerateUtil.generateKuduColumn(table.getTableColumnList(),columns); Schema schema
= new Schema(columns); KuduPartitionSchema kuduPartitionSchema = KuduTableGenerateUtil.parserPartition(table); CreateTableOptions tableOptions = KuduTableGenerateUtil.generateKuduTableOptions(table,schema,kuduPartitionSchema); try { getKuduClient(table).createTable(table.getTableName(), schema,tableOptions); } catch (KuduException e) { throw new MetadataInvalidObjectException(e, " create kudu storage table error!!"); } return true; }

  kudu的column属性中,包含有primarfyKey、encoding、compression algorithm、null table 、default value 、block size等属性,所以从上述代码中需要先将kuduColumn进行封装,构造ColumnSchema对象:

     new ColumnSchema.ColumnSchemaBuilder(tableColumn.getColumnName(), getKuduColumnType(tableColumn.getDataType()))
             .key(checkBoolKey(columnCondition.get(MetadataConfigKey.COLUMN_KUDU_PRIMARY_KEY)))
             .nullable(checkBoolKey(columnCondition.get(MetadataConfigKey.COLUMN_KUDU_SCHEMA_IS_NULLTABLE)))
             .defaultValue(defaultValue)
             .desiredBlockSize(getDesiredBlockSize(columnCondition.get(MetadataConfigKey.COLUMN_KUDU_SCHEMA_DESIRED_BLOCKSIZE)))
             .encoding(getColumnEncoding(columnCondition.get(MetadataConfigKey.COLUMN_KUDU_SCHEMA_ENCODING)))
             .compressionAlgorithm(getCompressionType(columnCondition.get(MetadataConfigKey.COLUMN_KUDU_SCHEMA_COMPRESSION_ALGORITHM)))
             .build();

  对于column的数据类型,有很多种,如下:

    private static Type getKuduColumnType(String dataType) {
        switch (dataType.toUpperCase()) {
            case "INT8":
                return Type.INT8;
            case "INT16":
                return Type.INT16;
            case "INT32":
                return Type.INT32;
            case "INT64":
                return Type.INT64;
            case "BINARY":
                return Type.BINARY;
            case "STRING":
                return Type.STRING;
            case "BOOL":
                return Type.BOOL;
            case "FLOAT":
                return Type.FLOAT;
            case "DOUBLE":
                return Type.DOUBLE;
            case "UNIXTIME_MICROS":
                return Type.UNIXTIME_MICROS;
            default:
                return Type.STRING;
        }
    }

  压缩方式包括:

    public static CompressionAlgorithm getCompressionType(String compressionType) {
        if (StringUtils.isNotBlank(compressionType)) {
            switch (compressionType.toUpperCase()) {
                case "UNKNOWN":
                    return CompressionAlgorithm.UNKNOWN;
                case "DEFAULT_COMPRESSION":
                    return CompressionAlgorithm.DEFAULT_COMPRESSION;
                case "NO_COMPRESSION":
                    return CompressionAlgorithm.NO_COMPRESSION;
                case "SNAPPY":
                    return CompressionAlgorithm.SNAPPY;
                case "LZ4":
                    return CompressionAlgorithm.LZ4;
                case "ZLIB":
                    return CompressionAlgorithm.UNKNOWN.ZLIB;
                default:
                    return null;
            }
        }
        return null;
    }

  随之我们要构造,Kudu Partition,Kudu Partition包含两种类型,一种是hashPartition,一种是rangePartition,其实从字面意思应该也能够想到,一种是用于对某个字段进行hash散列,一种是进行分区区间的设置,从而在查询时达到优化的效果,这里通过将sql解析后的转换的KuduPartitionSchema对象分别进行range与hash partition的组装,也就是将sql中 Partition表达式 partition 0 <= values <= 49, partition 50 <= values <= 100 封装:

    public static void generateHashPartition(CreateTableOptions tableOptions, List<HashPartitionSchema> hashPartitionSchemas) {
        if (null != hashPartitionSchemas && hashPartitionSchemas.size() != 0) {

        hashPartitionSchemas.forEach(hashPartitionSchema ->{
            tableOptions.addHashPartitions(hashPartitionSchema.getColumns(), hashPartitionSchema.getBucket());
        });     

      }

   }

public static void generateRangePartition(Schema schema, CreateTableOptions tableOptions, RangePartitionSchema rangePartitionSchema) {
        tableOptions.setRangePartitionColumns(rangePartitionSchema.getColumns());
        List<RangeSplit> ranges = rangePartitionSchema.getRanges();
        ranges.forEach(range -> {
            tableOptions.addRangePartition(
                    getPartialRow(
                            range.getLower(),
                            schema,
                            rangePartitionSchema.getColumns()),
                    getPartialRow(
                            range.getUpper(),
                            schema,
                            rangePartitionSchema.getColumns()),
                    getRangePartitionBound(
                            range.getLowerBoundType()),
                    getRangePartitionBound(
                            range.getUpperBoundType())
            );
       });
    }
    public static RangePartitionBound getRangePartitionBound(String boundType) {
        if (StringUtils.isNotBlank(boundType)) {
            switch (boundType) {
                case "EXCLUSIVE_BOUND":
                    return RangePartitionBound.EXCLUSIVE_BOUND;
                case "INCLUSIVE_BOUND":
                    return RangePartitionBound.INCLUSIVE_BOUND;
                default:
                    return null;
            }
        }
        return null;
    }

  最后构造,CreateTableOptions对象:

    public static CreateTableOptions generateKuduTableOptions(Table table, Schema schema, KuduPartitionSchema kuduPartitionSchema) {
        CreateTableOptions tableOptions = new CreateTableOptions();
        String numReplicas = table.getKvInfos().get(MetadataConfigKey.TABLE_KUDU_REPLICAS);
        if (StringUtils.isNotBlank(numReplicas)) {
            tableOptions.setNumReplicas(Integer.valueOf(numReplicas));
        }
        if (kuduPartitionSchema.getHashPartitionSchemaList() != null && kuduPartitionSchema.getHashPartitionSchemaList().size() != 0) {
            generateHashPartition(tableOptions, kuduPartitionSchema.getHashPartitionSchemaList());
        }
        if (kuduPartitionSchema.getRangePartitionSchema() != null) {
            generateRangePartition(schema, tableOptions, kuduPartitionSchema.getRangePartitionSchema());
        }
        return tableOptions;
    }

  没有hbase编程便捷。。不过对于kudu的连接而言,只需要配置kudu master的地址,便可创建连接。

    public KuduClient getKuduClient(Table table){
        if(null == kuduClient){
            try{
                String kuduMaster = table.getStorageClusterKvs().get(MetadataConfigKey.CLUSTER_KUDU_MASTER);
                kuduClient = new KuduClient.KuduClientBuilder(kuduMaster).build();
            }catch(Exception e){
                throw new MetadataRuntimeException(e, " create kuduClient error!!");
            }
        }
        return kuduClient;
    }

   活儿干不完啊~改天再深入完 哈哈~

目录
相关文章
|
6月前
|
人工智能 自然语言处理 供应链
AI直播销售奇迹:00后DeepSeek的3.3亿真相探究
近日,“00后主播借DeepSeek技术直播卖出3.3亿”的新闻引发关注。此次成功不仅得益于主播个人魅力,更离不开“交个朋友”直播间团队对AI技术的深度融合。通过DeepSeek大模型,AI在内容生成、流程优化等方面大幅提升效率,实现了直播话术自动生成、多场景适配及全球化支持。团队还利用AI进行选品、合规审核和数据分析,优化直播策略。多位主播精细分工,结合强大的背景资源,确保高效带货。AI与真人主播互补,提升转化率。尽管存在版权、就业等争议,此次销售奇迹展示了AI技术商业化的潜力,并为行业提供了宝贵案例。
235 0
目标如何设定:7 分钟重新认识 SMART 原则。
你有过很多目标,但都没达成。于是你找到了一种解决方案——SMART 目标管理原则。它是五个单词首字母的缩写——Specific、Measurable、Achievable、Relevant 和 Time-bound——你的目标必须是具体的、可衡量的、可达到的、和其他目标相关的、有时间限制的。
790 0
|
机器学习/深度学习 人工智能 自然语言处理
【机器学习】人力资源管理的新篇章:AI驱动的高效与智能化
【机器学习】人力资源管理的新篇章:AI驱动的高效与智能化
|
SQL 存储 数据处理
Flink SQL 问题之提交程序运行报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
498 3
|
SQL 消息中间件 Kubernetes
flink问题之on kubernetes 构建失败如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
208 1
|
边缘计算 人工智能 运维
浪潮信息分论坛回顾来啦!共商开源开放、赋能智算之道 | 2023 龙蜥操作系统大会
在龙蜥操作系统大会上,浪潮信息首次展示以云峦 KeyarchOS 为基础底座的系统软件全栈能力。
浪潮信息分论坛回顾来啦!共商开源开放、赋能智算之道 | 2023 龙蜥操作系统大会
|
机器学习/深度学习 人工智能 算法
人工智能处理方言和口音多样性
人工智能处理方言和口音多样性
1690 2
|
Java Maven 容器
spring.factories文件作用详解
spring.factories文件作用详解
436 0
|
机器学习/深度学习 运维 监控
无人值守时代,运维如何保障发布质量?
阿里巴巴千亿交易背后,如何尽量避免发布故障?在面对实际运维过程中遇到的问题该如何解决?阿里巴巴运维技术专家少荃,给我们带来了解决方案和思路。
5291 0
|
机器学习/深度学习 SQL 存储
大语言模型技术原理
大语言模型作为一个被验证可行的方向,其“大”体现在训练数据集广,模型参数和层数大,计算量大,其价值体现在通用性上,并且有更好的泛化能力。相较于传统特定领域训练出来的语言模型,有更广泛的应用场景。这篇文章参考Google和OpenAI相关论文及部分作者的补充,结合我的理解尝试用大家普遍看得明白的语言,对其技术发展和主要实现进行解析。
1034 0
大语言模型技术原理

热门文章

最新文章