EMR Druid 探索(二)
EMR Druid
上文介绍了 Druid 的特点、使用场景以及性能。EMR 在 3.11.0 引入了 Druid,并专门推出了一种新的集群类型:Druid 集群。在具体使用时,Druid 集群可以与 Hadoop 集群结合,以 HDFS 集群作为 deep storage 的存储,以 YARN 作为批量索引的计算引擎。Druid 集群亦可以与 Kafka 集群结合,将 Kafka 作为实时数据的来源。用户亦可用 Druid 摄取来自于 Spark Streaming 或者 Storm 的实时数据。
EMR 在 Druid 基础上也做了一些很有用的扩展,比如,
- 支持 deep storage 到 OSS(一个很有用的功能,不必担心 HDFS 集群变动带来的数据危险),以及将 OSS 文件作为批量索引的数据来源
- 支持 metadata 到 RDS,跟上一条结合在一起,Druid 集群升级就会变得非常方便
- 在用户侧 EMR Druid 集成了 Superset 这一 BI 工具,可以方便地查询 Druid 数据,而不用写复杂的 DSL
- 方便地扩容、缩容(针对 task 节点)
- 丰富的指标监控以及告警规则、坏节点迁移,提供了高可靠的运维保障
- 支持高安全、HA 等
示例
下面的流程展示了如何创建一个 Druid 集群,并进行批量索引。
首先,选择 Druid 集群类型。Druid 集群有几个必选组件,此外 HDFS、YARN 作为可选组件。在这里我们建议将这两个组件选上,一方面,Druid deep storage 到 OSS 的功能是借助于 HDFS 实现的,如果选择了 HDFS 用户不用做任何配置,如果没有选择的话,用户需要自己添加 HDFS 的配置文件,另一方面,选择这两个组件可以方便的在 Druid 集群内部进行批量索引测试,如果不需要,可以选择关闭他们。
下一步是选择节点类型和机器配置。Druid 本身不是一个纯内存计算系统,但是仍然会使用大量的内存。Druid 对内存的使用有两方面,一是计算过程中的数据,二是索引文件的 mapping。如果您的数据量很大,建议您选择大内存机型。8g 内存机型仅推荐作为测试机型。对于 core 节点,如果您的数据量很大,建议选择 d1 机型,这种机器本身配有多个本地磁盘,容量大,速度快,Druid 可以缓存更多 segments 到本地盘,而且 segments 加载速度会更快。
选择好机器,点击下一步,填写集群名称,
再下一步、确认,等待一段时间,集群就创建好了。进入集群管理页面查看一下,
下面我们将展示一个批量索引,并将索引的结果存储到 OSS 上。为此,首先在 OSS 上(用同一个账户)创建一个 oss://bucket/path,比如 “oss://emr-druid-cn-hangzhou/segments/”。之后,在 Druid 的配置页面,找到 druid.storage.storageDirectory,修改之。注意由于 OSS 访问是借助了 hdfs 的底层,因此 druid.storage.type 仍然为 “hdfs”,如图。
依次点“保存”、“部署配置文件到主机”、“重启所有组件”。
组件重启成功之后,ssh 到 emr-header-1 节点,进入 druid 的安装目录 “/usr/lib/druid-current”,其下带有一份 quickstart 的演示示例。进入该目录,首先解压 “wikiticker-2015-09-12-sampled.json.gz”:
gunzip wikiticker-2015-09-12-sampled.json.gz
之后将该文件上传至 HDFS 的一个路径下面,这里我们选择 “/” 目录:
hdfs dfs -put wikiticker-2015-09-12-sampled.json /
也可以将该文件放置于某一 OSS 路径下面,EMR Druid 也是支持的。编辑 quickstart 下面的 wikiticker-index.json,其内容如下:
{
"type" : "index_hadoop",
"spec" : {
"ioConfig" : {
"type" : "hadoop",
"inputSpec" : {
"type" : "static",
"paths" : "/wikiticker-2015-09-16-sampled.json"
}
},
"dataSchema" : {
"dataSource" : "wikiticker",
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "day",
"queryGranularity" : "none",
"intervals" : ["2015-09-12/2015-09-13"]
},
"parser" : {
"type" : "hadoopyString",
"parseSpec" : {
"format" : "json",
"dimensionsSpec" : {
"dimensions" : [
"channel",
"cityName",
"comment",
"countryIsoCode",
"countryName",
"isAnonymous",
"isMinor",
"isNew",
"isRobot",
"isUnpatrolled",
"metroCode",
"namespace",
"page",
"regionIsoCode",
"regionName",
"user"
]
},
"timestampSpec" : {
"format" : "auto",
"column" : "time"
}
}
},
"metricsSpec" : [
{
"name" : "count",
"type" : "count"
},
{
"name" : "added",
"type" : "longSum",
"fieldName" : "added"
},
{
"name" : "deleted",
"type" : "longSum",
"fieldName" : "deleted"
},
{
"name" : "delta",
"type" : "longSum",
"fieldName" : "delta"
},
{
"name" : "user_unique",
"type" : "hyperUnique",
"fieldName" : "user"
}
]
},
"tuningConfig" : {
"type" : "hadoop",
"partitionsSpec" : {
"type" : "hashed",
"targetPartitionSize" : 5000000
},
"jobProperties" : {
"mapreduce.job.classloader": "true"
}
}
},
"hadoopDependencyCoordinates": ["org.apache.hadoop:hadoop-client:2.7.2"]
}
注意索引文件路径为 HDFS(或 OSS)相应数据文件的路径,tuningConfig 里添加 "mapreduce.job.classloader": "true" 的 jobProperties,另外最后一行指定 Hadoop 版本为 2.7.2(EMR Hadoop 版本)。
运行如下命令提交索引作业
curl -XPOST -H 'Content-Type:application/json' -d @quickstart/wikiticker-index.json http://emr-header-1:18090/druid/indexer/v1/task
这时作业就被提交到了 Druid 集群,确切地说是 Overlord 节点。Overlord 节点会远程通知 Middlemanager 节点启动索引 task。该 task 依据索引类型做出处理,由于这里索引类型为 “index_hadoop”,该 task 会生成相应的 job 并提交到 YARN 执行。相应的执行情况可以通过页面查看:
首先打通到 emr-header-1 的 ssh 隧道
ssh -N -D 8157 root@<ip_of_emr-header-1>
然后启动设置了 proxy 的浏览器,proxy 指向 8157 端口,以 chrome 为例
open -n /Applications/Google\ Chrome.app --args --proxy-server="socks5://localhost:8157" --host-resolver-rules="MAP * 0.0.0.0 , EXCLUDE localhost" --user-data-dir=/tmp/
然后在地址栏输入“http://emr-header-1:18090/console.html
,就能够看到相应的作业了,
同时打开 http://emr-header-1:8088/cluster/apps
能够看到相应的 Hadoop 作业:
等待 Overlord 页面上显示索引成功,就能在 OSS 看到生成了相应的索引文件
运行一下查询命令(查询语句在 quickstart/wikiticker-top-pages.json)
curl -XPOST -H 'Content-Type:application/json' -d @quickstart/wikiticker-top-pages.json 'http://emr-header-1:18082/druid/v2/?pretty'
返回结果
[ {
"timestamp" : "2015-09-12T00:46:58.771Z",
"result" : [ {
"edits" : 33,
"page" : "Wikipedia:Vandalismusmeldung"
}, {
"edits" : 28,
"page" : "User:Cyde/List of candidates for speedy deletion/Subpage"
}, {
"edits" : 27,
"page" : "Jeremy Corbyn"
}, {
"edits" : 21,
"page" : "Wikipedia:Administrators' noticeboard/Incidents"
}, {
"edits" : 20,
"page" : "Flavia Pennetta"
}, {
"edits" : 18,
"page" : "Total Drama Presents: The Ridonculous Race"
},...
]
说明
- 上述示例中我们用到了 Druid 安装包自带的示例。事实上由于 Druid 接口均为 rest 接口,你可以在任何一台能够连接到 Druid 集群的机器上运行命令;
- Druid 的 rest api 使用起来有诸多不便,幸运的是 Druid 有对 SQL 的支持,虽然还不支持全部标准 SQL,但也足以应付多数场景了。EMR Druid 集成了 Superset 工具,可以方便的使用该工具对 Druid 数据进行查询和管理,具体使用方式见这里;
- 实际应用中,Druid 更多的使用流的方式索引数据,如果要求数据不丢失,且只被消费一次,通常是流+批的形式。流的方式先摄入数据,保存一份批的数据晚上再处理一遍,确保数据不丢。如果数据要求不是很高,批可以不用。具体流的使用方式参见 EMR 文档、Druid 官方文档或者 Tranquility 官方文档;
- 掌握必要的排错技巧是必要的,druid 索引排错指南见这里。