Apache Druid自定义扩展模块-数据解析器

简介: 一.简述 Apache Druid已有的扩展模块很多包括:HDFS存储使用的druid-hdfs-storage,Kafka数据接入使用的druid-kafka-indexing-service,将MySQL做为元数据库使用的mysql-metadata-storage,数据排重使用的druid-datasketches;但有时这些也无法满足我们实际应用场景下的特殊需求,那么必要的二次开发增加自定义的模块就成了必然。

一.简述

Apache Druid已有的扩展模块很多包括:HDFS存储使用的druid-hdfs-storage,Kafka数据接入使用的druid-kafka-indexing-service,将MySQL做为元数据库使用的mysql-metadata-storage,数据排重使用的druid-datasketches;但有时这些也无法满足我们实际应用场景下的个性化需求,那么必要的二次开发增加自定义的模块就成了必然。例如:数据解析器druid-thrift-extensions

二.实现

Ⅰ).创建工程

创建java工程,并在sources目录中添加文件:org.apache.druid.initialization.DruidModule;文件内容如下:

com.yun.druid.data.input.thrift.ThriftExtensionModule

Ⅱ).接口实现

a).ThriftExtensionsModule

ThriftExtensionsModule实现DruidModule接口,并重写getJacksonModules方法;需注册类ThriftInputRowParser.class别名(yun-thrift)

@Override
    public List<? extends Module> getJacksonModules() {
        // TODO Auto-generated method stub
        return Collections.singletonList(
                new SimpleModule("ThriftInputRowParserModule").registerSubtypes(
                        new NamedType(ThriftInputRowParser.class, "yun-thrift")));
    }

b).ThriftInputRowParser

ThriftInputRowParser类实现InputRowParser

接口,并重写 parseBatch方法

@JsonCreator
    public ThriftInputRowParser(
            @JsonProperty("parseSpec") ParseSpec parseSpec,
            @JsonProperty("jarPath") String jarPath,
            @JsonProperty("thriftClassName") String thriftClassName)
    {
        this.jarPath = jarPath;
        this.thriftClassName = thriftClassName;
        
        Preconditions.checkNotNull(thriftClassName, "thrift class name");
        
        this.parseSpec = parseSpec;
        this.demensions = parseSpec.getDimensionsSpec().getDimensionNames();
    }
    
    public List<InputRow> parseBatch(Object input){
        final TBase tbase;
        try {
            
            if(input instanceof ByteBuffer) {
                final byte[] bytes = ((ByteBuffer) input).array();
            }
            
        } catch (Exception e) {
            // TODO: handle exception
        }
        return null;
    }

Ⅲ).自定义逻辑

具体逻辑实现,根据需求重写parseBatch方法中实现

Ⅳ).打包

将该工程打成jar包,命名规则可以是:druid-namesparce-thrift-extension.jar

三.使用

Ⅰ).jar包位置

./apache-druid-0.14.0-incubating/extensions目录,创建druid-namesparce-thrift-extension文件夹;然后将自定义模块工程jar包以及依赖的jar包放到该目录

Ⅱ).导入配置

conf/druid/_common/common.runtime.properties的配置文件中添加如下配置,然后重启角色

druid.extensions.loadList=["druid-histogram", "druid-datasketches", "druid-lookups-cached-global", "druid-kafka-indexing-service", "mysql-metadata-storage", "druid-hdfs-storage", "druid-<namesparce>-thrift-extension"]

Ⅲ).任务中使用

在提交任务的json文件的解析器部分,指定type为自定义的别名;其中,thriftJar、thriftClass和protocol可以不配置

    "parser": {
            "type": "yun-thrift",
            "thriftJar":"druid-<namesparce>-thrift-extension.jar",
            "thriftClass":"com.yun.druid.data.input.thrift.ThriftExtensionModule.class",
            "protocol":"compact",
            "parseSpec": {
                "format": "json",
                "timestampSpec": {
                    "column": "time",
                    "format": "auto"
                },
                "dimensionsSpec": {
                    "dimensions": [
                        "appName",
                        "nodeName"
                    ],
                    "dimensionExclusions": []
                }
            }

Ⅳ).启动任务验证

根据自己的任务配置文件,参考如下启动命令,启动并验证自定义解析器

curl -XPOST -H'Content-Type: application/json' -d @quickstart/tutorial/kafka-streaming.json http://hostname:8081/druid/indexer/v1/supervisor
目录
相关文章
|
26天前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
60 3
|
1月前
|
存储 分布式计算 druid
大数据-152 Apache Druid 集群模式 配置启动【下篇】 超详细!(一)
大数据-152 Apache Druid 集群模式 配置启动【下篇】 超详细!(一)
38 1
大数据-152 Apache Druid 集群模式 配置启动【下篇】 超详细!(一)
|
12天前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
41 2
|
1月前
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
37 3
|
1月前
|
存储 分布式计算 druid
大数据-155 Apache Druid 架构与原理详解 数据存储 索引服务 压缩机制
大数据-155 Apache Druid 架构与原理详解 数据存储 索引服务 压缩机制
49 3
|
1月前
|
消息中间件 分布式计算 druid
大数据-154 Apache Druid 架构与原理详解 基础架构、架构演进
大数据-154 Apache Druid 架构与原理详解 基础架构、架构演进
27 2
|
1月前
|
消息中间件 druid 大数据
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
29 2
|
1月前
|
消息中间件 分布式计算 druid
大数据-152 Apache Druid 集群模式 配置启动【下篇】 超详细!(二)
大数据-152 Apache Druid 集群模式 配置启动【下篇】 超详细!(二)
39 2
|
1月前
|
消息中间件 分布式计算 druid
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
49 1
|
28天前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
64 0

推荐镜像

更多
下一篇
无影云桌面