一.简述
Apache Druid已有的扩展模块很多包括:HDFS存储使用的druid-hdfs-storage,Kafka数据接入使用的druid-kafka-indexing-service,将MySQL做为元数据库使用的mysql-metadata-storage,数据排重使用的druid-datasketches;但有时这些也无法满足我们实际应用场景下的个性化需求,那么必要的二次开发增加自定义的模块就成了必然。例如:数据解析器druid-thrift-extensions
二.实现
Ⅰ).创建工程
创建java工程,并在sources目录中添加文件:org.apache.druid.initialization.DruidModule;文件内容如下:
com.yun.druid.data.input.thrift.ThriftExtensionModule
Ⅱ).接口实现
a).ThriftExtensionsModule
ThriftExtensionsModule实现DruidModule接口,并重写getJacksonModules方法;需注册类ThriftInputRowParser.class和别名(yun-thrift)
@Override
public List<? extends Module> getJacksonModules() {
// TODO Auto-generated method stub
return Collections.singletonList(
new SimpleModule("ThriftInputRowParserModule").registerSubtypes(
new NamedType(ThriftInputRowParser.class, "yun-thrift")));
}
b).ThriftInputRowParser
ThriftInputRowParser类实现InputRowParser
接口,并重写 parseBatch方法
@JsonCreator
public ThriftInputRowParser(
@JsonProperty("parseSpec") ParseSpec parseSpec,
@JsonProperty("jarPath") String jarPath,
@JsonProperty("thriftClassName") String thriftClassName)
{
this.jarPath = jarPath;
this.thriftClassName = thriftClassName;
Preconditions.checkNotNull(thriftClassName, "thrift class name");
this.parseSpec = parseSpec;
this.demensions = parseSpec.getDimensionsSpec().getDimensionNames();
}
public List<InputRow> parseBatch(Object input){
final TBase tbase;
try {
if(input instanceof ByteBuffer) {
final byte[] bytes = ((ByteBuffer) input).array();
}
} catch (Exception e) {
// TODO: handle exception
}
return null;
}
Ⅲ).自定义逻辑
具体逻辑实现,根据需求重写parseBatch方法中实现
Ⅳ).打包
将该工程打成jar包,命名规则可以是:druid-namesparce-thrift-extension.jar
三.使用
Ⅰ).jar包位置
在./apache-druid-0.14.0-incubating/extensions目录,创建druid-namesparce-thrift-extension文件夹;然后将自定义模块工程jar包以及依赖的jar包放到该目录
Ⅱ).导入配置
在conf/druid/_common/common.runtime.properties的配置文件中添加如下配置,然后重启角色
druid.extensions.loadList=["druid-histogram", "druid-datasketches", "druid-lookups-cached-global", "druid-kafka-indexing-service", "mysql-metadata-storage", "druid-hdfs-storage", "druid-<namesparce>-thrift-extension"]
Ⅲ).任务中使用
在提交任务的json文件的解析器部分,指定type为自定义的别名;其中,thriftJar、thriftClass和protocol可以不配置
"parser": {
"type": "yun-thrift",
"thriftJar":"druid-<namesparce>-thrift-extension.jar",
"thriftClass":"com.yun.druid.data.input.thrift.ThriftExtensionModule.class",
"protocol":"compact",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "time",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"appName",
"nodeName"
],
"dimensionExclusions": []
}
}
Ⅳ).启动任务验证
根据自己的任务配置文件,参考如下启动命令,启动并验证自定义解析器
curl -XPOST -H'Content-Type: application/json' -d @quickstart/tutorial/kafka-streaming.json http://hostname:8081/druid/indexer/v1/supervisor