4.5 批量索引
4.5.1 与Hadoop集群交互
您在创建E-MapReduce Druid集群时如果勾选了HDFS和YARN(自带Hadoop集群),那么系统将会自动为您配置好与HDFS和YARN的交互,您无需做额外操作。下面的介绍是配置独立E-MapReduce Druid集群与独立Hadoop集群之间交互,这里假设E-MapReduce Druid集群cluster id 为1234,Hadoop集群cluster id为5678。另外请严格按照指导进行操作,如果操作不当,集群可能就不会按照预期工作。
对于与非安全独立Hadoop集群交互,请按照如下操作进行:
确保集群间能够通信(两个集群在一个安全组下,或两个集群在不同安全组,但两个安全组之间配置了访问规则)。
在E-MapReduce Druid集群的每个节点的指定路径下,放置一份Hadoop集群中/etc/ecm/hadoop-conf路径下的core-site.xml、hdfs-site.xml、yarn-site.xml、 mapred-site.xml文件。 这些文件在E-MapReduce Druid集群节点上放置的路径与E-MapReduce集群的版本有关,详情说明如下:
EMR-3.23.0及以上版本:/etc/ecm/druid-conf/druid/cluster/_common
EMR-3.23.0以下版本:/etc/ecm/druid-conf/druid/_common
说明 如果创建集群时选了自带Hadoop,则在上述目录下会有几个软链接指向自带Hadoop的配置,请先移除这些软链接。
将Hadoop集群的hosts写入到E-MapReduce Druid集群的hosts列表中,注意Hadoop集群的hostname应采用长名形式,如emr-header-1.cluster-xxxxxxxx,且最好将Hadoop的hosts放在本集群hosts之后,例如:
... 10.157.*.* emr-as.cn-hangzhou.aliyuncs.com 10.157.*.* eas.cn-hangzhou.emr.aliyuncs.com 192.168.*.* emr-worker-1.cluster-1234 emr-worker-1 emr-header-2.cluster-1234 emr-header-2 iZbp1h9g7boqo9x23qb**** 192.168.*.* emr-worker-2.cluster-1234 emr-worker-2 emr-header-3.cluster-1234 emr-header-3 iZbp1eaa5819tkjx55y**** 192.168.*.* emr-header-1.cluster-1234 emr-header-1 iZbp1e3zwuvnmakmsje**** --以下为hadoop集群的hosts信息 192.168.*.* emr-worker-1.cluster-5678 emr-header-2.cluster-5678 iZbp195rj7zvx8qar4f**** 192.168.*.* emr-worker-2.cluster-5678 emr-header-3.cluster-5678 iZbp15vy2rsxoegki4q**** 192.168.*.* emr-header-1.cluster-5678 iZbp10tx4egw3wfnh5o****
于安全Hadoop集群,请按如下操作进行:
确保集群间能够通信(两个集群在一个安全组下,或两个集群在不同安全组,但两个安全组之间配置了访问规则)。
在E-MapReduce Druid集群的每个节点的指定路径下,放置一份Hadoop集群/etc/ecm/hadoop-conf路径下的core-site.xml、hdfs-site.xml、yarn-site.xml、 mapred-site.xml文件,并修改core-site.xml中hadoop.security.authentication.use.has 为 false。
其中,core-site.xml、hdfs-site.xml、yarn-site.xml、 mapred-site.xml文件在E-MapReduce Druid集群节点上放置的路径与E-MapReduce集群的版本有关,详情说明如下:
EMR-3.23.0 及以上版本:/etc/ecm/druid-conf/druid/cluster/_common
EMR-3.23.0 以下版本:/etc/ecm/druid-conf/druid/_common
其中,hadoop.security.authentication.use.has是一个客户端配置,目的是让用户能够使用AccessKey进行认证。如果使用Kerberos认证方式,则需要disable该配置。
将Hadoop集群的hosts写入到E-MapReduce Druid集群每个节点的hosts列表中,注意Hadoop集群的hostname应采用长名形式,如emr-header-1.cluster-xxxxxxxx,且最好将Hadoop的hosts放在本集群hosts之后。
设置两个集群间的Kerberos跨域互信,详情请参见跨域互信。
在Hadoop集群的所有节点下都创建一个本地druid账户(useradd -m -g hadoop druid),或者设置 druid.auth.authenticator.kerberos.authToLocal(具体预发规则请参见Druid-Kerberos)创建Kerberos账户到本地账户的映射规则。推荐第一种做法,操作简便不易出错。
说明 默认在安全Hadoop集群中,所有Hadoop命令必须运行在一个本地的账户中,该本地账户需要与principal的name部分同名。YARN也支持将一个principal映射至本地一个账户,即上文第二种做法。
重启Druid服务。
4.5.2 使用Hadoop对批量数据创建索引
E-MapReduce Druid自带了一个名为wikiticker的例子,位于${DRUID_HOME}/quickstart/tutorial下面(${DRUID_HOME}默认为/usr/lib/druid-current)。wikiticker文件(wikiticker-2015-09-12-sampled.json.gz)的每一行是一条记录,每条记录是个json对象。其格式如下所示:
{ "time": "2015-09-12T00:46:58.771Z", "channel": "#en.wikipedia", "cityName": null, "comment": "added project", "countryIsoCode": null, "countryName": null, "isAnonymous": false, "isMinor": false, "isNew": false, "isRobot": false, "isUnpatrolled": false, "metroCode": null, "namespace": "Talk", "page": "Talk:Oswald Tilghman", "regionIsoCode": null, "regionName": null, "user": "GELongstreet", "delta": 36, "added": 36, "deleted": 0 }
使用Hadoop对批量数据创建索引,请按照如下步骤进行操作:
1.将该压缩文件解压,并放置于HDFS的一个目录下(如 hdfs://emr-header-1.cluster-5678:9000/druid)。 在Hadoop集群上执行如下命令。
### 如果是在独立Hadoop集群上进行操作,做好两个集群互信之后需要拷贝一个 druid.keytab到Hadoop集群再kinit。 kinit -kt /etc/ecm/druid-conf/druid.keytab druid ### hdfs dfs -mkdir hdfs://emr-header-1.cluster-5678:9000/druid hdfs dfs -put ${DRUID_HOME}/quickstart/tutorial/wikiticker-2015-09-12-sampled.json hdfs://emr-header-1.cluster-5678:9000/druid
对于安全集群执行 HDFS 命令前先修改/etc/ecm/hadoop-conf/core-site.xml中hadoop.security.authentication.use.has为false。
请确保已经在Hadoop集群每个节点上创建名为druid的Linux账户。
2.准备一个数据索引任务文件${DRUID_HOME}/quickstart/tutorial/wikiticker-index.json,如下所示:
{ "type" : "index_hadoop", "spec" : { "ioConfig" : { "type" : "hadoop", "inputSpec" : { "type" : "static", "paths" : "hdfs://emr-header-1.cluster-5678:9000/druid/wikiticker-2015-09-12-sampled.json" } }, "dataSchema" : { "dataSource" : "wikiticker", "granularitySpec" : { "type" : "uniform", "segmentGranularity" : "day", "queryGranularity" : "none", "intervals" : ["2015-09-12/2015-09-13"] }, "parser" : { "type" : "hadoopyString", "parseSpec" : { "format" : "json", "dimensionsSpec" : { "dimensions" : [ "channel", "cityName", "comment", "countryIsoCode", "countryName", "isAnonymous", "isMinor", "isNew", "isRobot", "isUnpatrolled", "metroCode", "namespace", "page", "regionIsoCode", "regionName", "user" ] }, "timestampSpec" : { "format" : "auto", "column" : "time" } } }, "metricsSpec" : [ { "name" : "count", "type" : "count" }, { "name" : "added", "type" : "longSum", "fieldName" : "added" }, { "name" : "deleted", "type" : "longSum", "fieldName" : "deleted" }, { "name" : "delta", "type" : "longSum", "fieldName" : "delta" }, { "name" : "user_unique", "type" : "hyperUnique", "fieldName" : "user" } ] }, "tuningConfig" : { "type" : "hadoop", "partitionsSpec" : { "type" : "hashed", "targetPartitionSize" : 5000000 }, "jobProperties" : { "mapreduce.job.classloader": "true" } } }, "hadoopDependencyCoordinates": ["org.apache.hadoop:hadoop-client:2.8.5"] }
说明
spec.ioConfig.type设置为hadoop。
spec.ioConfig.inputSpec.paths为输入文件路径。
tuningConfig.type为hadoop。
tuningConfig.jobProperties设置了mapreduce job的classloader。
hadoopDependencyCoordinates 制定了hadoop client的版本。
3.在E-MapReduce Druid集群上运行批量索引命令
cd ${DRUID_HOME} curl --negotiate -u:druid -b ~/cookies -c ~/cookies -XPOST -H 'Content-Type:application/json' -d @quickstart/tutorial/wikiticker-index.json http://emr-header-1.cluster-1234:18090/druid/indexer/v1/task
其中 -negotiate、-u、-b、-c等选项是针对安全E-MapReduce Druid集群的。Overlord的端口默认为18090。
4.查看作业运行情况
在浏览器访问http://emr-header-1.cluster-1234:18090/console.html查看作业运行情况
5.根据Druid语法查询数据
Druid有自己的查询语法。请准备一个描述您如何查询json格式的查询文件,如下所示为对wikiticker数据的一个top N查询(${DRUID_HOME}/quickstart/tutorial/wikiticker-top-pages.json):
{ "queryType" : "topN", "dataSource" : "wikiticker", "intervals" : ["2015-09-12/2015-09-13"], "granularity" : "all", "dimension" : "page", "metric" : "edits", "threshold" : 25, "aggregations" : [ { "type" : "longSum", "name" : "edits", "fieldName" : "count" } ] }
在命令行界面运行下面的命令即可看到查询结果。
cd ${DRUID_HOME} curl --negotiate -u:druid -b ~/cookies -c ~/cookies -XPOST -H 'Content-Type:application/json' -d @quickstart/tutorial/wikiticker-top-pages.json 'http://emr-header-1.cluster-1234:18082/druid/v2/?pretty'
4.6 实时索引
对于数据从Kafka集群实时到E-MapReduce Druid集群进行索引,我们推荐使用Kafka Indexing Service扩展,提供了高可靠保证,支持exactly-once语义。关于Druid Kafka Indexing Service 实时消费Kafka数据具体步骤,请参见Kafka Indexing Service。
如果您的数据实时打到了阿里云日志服务(SLS),并想用E-MapReduce Druid实时索引这部分数据,我们提供了SLS Indexing Service扩展。使用SLS Indexing Service避免了您额外建立并维护Kafka集群的开销。SLS Indexing Service的作用与Kafka Indexing Service相同,也提供高可靠保证和 Exactly-Once语义。在这里,您完全可以把SLS当成一个Kafka来使用。详情请参见 SLS-Indexing-Service。
Kafka Indexing Service和SLS Indexing Service是类似的,都使用拉的方式从数据源拉取数据到E-MapReduce Druid集群,并提供高可靠保证和 exactly-once语义。
4.7 索引失败问题分析思路
当发现索引失败时,一般遵循如下排错思路:
对于批量索引
1.如果curl直接返回错误,或者不返回,检查一下输入文件格式,或者curl加上 -v 参数,观察REST API的返回情况。
2.在Overlord页面观察作业执行情况,如果失败,查看页面上的logs。
3.在很多情况下并没有生成logs,如果是Hadoop作业,打开YARN页面查看是否有索引作业生成,并查看作业执行log。
4.如果上述情况都没有定位到错误,需要登录到E-MapReduce Druid集群,查看Overlord的执行日志(位于/mnt/disk1/log/druid/overlord-emr-header-1.cluster-xxxx.log),如果是HA集群,查看您提交作业的那个Overlord。
5.如果作业已经被提交到Middlemanager,但是从Middlemanager返回了失败,则需要从Overlord中查 看作业提交到了那个worker,并登录到相应的worker,查看Middlemanager的日志(位于/mnt/disk1/log/druid/middleManager-emr-header-1.cluster-xxxx.log)。
对于Kafka Indexing Service和SLS Indexing Service
1.首先查看Overlord的Web页面:http://emr-header-1:18090, 查看Supervisor的运行状态,检查payload是否合理。
2.查看失败task的log。
3.如果不能从task log定位出失败原因,则需要从Overlord log排查问题。