文章目录
一、前言
二、与 Kafka 集群交互
三、使用 Apache Druid Kafka Indexing Service 实时消费 Kafka 数据
四、关于 SLS Indexing Service
4.1 背景介绍
4.2 准备工作
4.3 使用 SLS Indexing Service
一、前言
Kafka Indexing Service 是 Apache Druid 推出的使用 Apache Druid 的 Indexing Service 服务实时消费 Kafka 数据的插件。
Kafka Indexing Service 可以在 Overlord 上配置 Supervisor(这里的监管者具体是指 KafkaSupervisor,负责监控单个 DataSource 下的 KafkaIndexTask。在其构造的时候,可以接受 KafkaSupervisorSpec 以知晓 Kafka 的 Topic 相关的配置信息,以及摄入的规则,用于生成 KafkaIndexTask 索引任务),并负责管理 Kafka 索引任务的创建和生命周期。这些 KIS 任务使用 Kafka 自身的分区和偏移机制来读取事件,因此能够提供 exactly-once 摄取的保证(旧版本下,Tranquility 采用的是 push 的方式,则完全无法实现不丢不重的特性)。KIS 任务还能够从 Kafka 读取非近期事件,并且不受其他摄取机制强加的窗口期限的影响。另外,Supervisor 会监控索引任务的状态,以便管理故障,并保证了可伸缩性和易复制的特性。更多差异点,详见下面的对比表:
在 0.16.0 版本中,Apache Druid 彻底删除了 Realtime Node 相关的插件,包括了 druid-kafka-eight、druid-kafka-eight-simpleConsumer、druid-rabbitmq 和 druid-rocketmq
虽然新引入的 KIS 有诸多好处,但是世上并不存在“银弹”。因为 KIS 采用了 pull 的方式摄入数据,必然会存在拉取的频率一说。该频率由 offsetFetchPeriod 参数控制,默认 30s 会拉取一次,而最快只能 5s 拉取一次。那为什么不能设置更小的值呢?因为如果过于频繁地向 Kafka 发起请求,可能影响到 Kafka 的稳定性。
补充:上文我们也讲到该插件会在 Overlord 中启动一个 supervisor,supervisor 启动之后会在 Middlemanager 中启动一些 indexing task,这些 task 会连接到 Kafka 集群消费 topic 数据,并完成索引创建。
1.task 创建和运行的过程
2.task停止的过程
二、与 Kafka 集群交互
E-MapReduce Druid 集群与 Kafka 集群交互的配置方式与 Hadoop 集群类似,均需要设置连通性、hosts 等。
对于非安全 Kafka 集群,请按照以下步骤操作:
1.确保集群间能够通信(两个集群在一个安全组下,或两个集群在不同安全组,但两个安全组之间配置了访问规则)。
2.将 Kafka 集群的 hosts 写入到 E-MapReduce Druid 集群每一个节点的 hosts 列表中。
注意 Kafka 集群的 hostname 应采用长名形式,例如 emr-header-1.cluster-xxxxxxxx。
对于安全 Kafka 集群,您需要执行下列操作(前两步与非安全 Kafka 集群相同):
确保集群间能够通信(两个集群在一个安全组下,或两个集群在不同安全组,但两个安全组之间配置了访问规则)。
将 Kafka 集群的 hosts 写入到 E-MapReduce Druid 集群每一个节点的 hosts 列表中。
注意 Kafka 集群的 hostname 应采用长名形式,如 emr-header-1.cluster-xxxxxxxx。
设置两个集群间的 Kerberos 跨域互信(详情请参见 跨域互信 ),推荐做双向互信。
准备一个客户端安全配置文件,文件内容格式如下。
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/ecm/druid-conf/druid.keytab" principal="druid@EMR.1234.COM"; };
文件准备好后,将该配置文件同步到 E-MapReduce Druid 集群的所有节点上,放置于某一个目录下面(例如/tmp/kafka/kafka_client_jaas.conf)。
在 E-MapReduce Druid 配置页面的 overlord.jvm 中新增如下选项。
Djava.security.auth.login.config=/tmp/kafka/kafka_client_jaas.conf
在 E-MapReduce Druid 配置页面的 middleManager.runtime 中配置druid.indexer.runner.javaOpts=-Djava.security.auth.login.confi=/tmp/kafka/kafka_client_jaas.conf和其他 JVM 启动参数。
重启 Druid 服务。
三、使用 Apache Druid Kafka Indexing Service 实时消费 Kafka 数据
在 Kafka 集群(或 Gateway)上执行以下命令创建一个名称为 metrics 的 topic。
-- 如果开启了 Kafka 高安全:
export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf" -- kafka-topics.sh --create --zookeeper emr-header-1:2181,emr-header-2,emr-header-3/kafka-1.0.0 --partitions 1 --replication-factor 1 --topic metrics
实际创建 topic 时,您需要根据您的环境配置来替换上述命令中的各个参数。其中,--zookeeper 参数中 /kafka-1.0.0 是一个路径,该路径的获取方法是:登录阿里云 E-MapReduce 控制台> 进入 Kafka 集群的 Kafka 服务配置页面 > 查看 zookeeper.connect 配置项的值。如果您的 Kafka 集群是自建集群,则您需要根据集群的实际配置来替换 --zookeeper 参数。
定义数据源的数据格式描述文件(名称命名为 metrics-kafka.json),并放置在当前目录下(或放置在其他您指定的目录上)。
{ "type": "kafka", "dataSchema": { "dataSource": "metrics-kafka", "parser": { "type": "string", "parseSpec": { "timestampSpec": { "column": "time", "format": "auto" }, "dimensionsSpec": { "dimensions": ["url", "user"] }, "format": "json" } }, "granularitySpec": { "type": "uniform", "segmentGranularity": "hour", "queryGranularity": "none" }, "metricsSpec": [{ "type": "count", "name": "views" }, { "name": "latencyMs", "type": "doubleSum", "fieldName": "latencyMs" } ] }, "ioConfig": { "topic": "metrics", "consumerProperties": { "bootstrap.servers": "emr-worker-1.cluster-xxxxxxxx:9092(您 Kafka 集群的 bootstrap.servers)", "group.id": "kafka-indexing-service", "security.protocol": "SASL_PLAINTEXT", "sasl.mechanism": "GSSAPI" }, "taskCount": 1, "replicas": 1, "taskDuration": "PT1H" }, "tuningConfig": { "type": "kafka", "maxRowsInMemory": "100000" } }
说明 ioConfig.consumerProperties.security.protocol 和 ioConfig.consumerProperties.sasl.mechanism 为安全相关选项(非安全 Kafka 集群不需要)。
执行下述命令添加 Kafka supervisor。
curl --negotiate -u:druid -b ~/cookies -c ~/cookies -XPOST -H 'Content-Type: application/json' -d @metrics-kafka.json http://emr-header-1.cluster-1234:18090/druid/indexer/v1/supervisor
其中 --negotiate、-u、-b、-c 等是针对安全 E-MapReduce Druid 集群的选项。
在 Kafka 集群上开启一个 console producer。
-- 如果开启了 Kafka 高安全: export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf" echo -e "security.protocol=SASL_PLAINTEXT\nsasl.mechanism=GSSAPI" > /tmp/Kafka/producer.conf -- Kafka-console-producer.sh --producer.config /tmp/kafka/producer.conf --broker-list emr-worker-1:9092,emr-worker-2:9092,emr-worker-3:9092 --topic metrics >
其中 --producer.config /tmp/Kafka/producer.conf 是针对安全 Kafka 集群的选项。
在 kafka_console_producer 的命令提示符下输入一些数据。
{"time": "2018-03-06T09:57:58Z", "url": "/foo/bar", "user": "alice", "latencyMs": 32} {"time": "2018-03-06T09:57:59Z", "url": "/", "user": "bob", "latencyMs": 11} {"time": "2018-03-06T09:58:00Z", "url": "/foo/bar", "user": "bob", "latencyMs": 45}
其中时间戳可用如下 python 命令生成:
python -c 'import datetime; print(datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"))'
准备一个查询文件,命名为 metrics-search.json。
{ "queryType" : "search", "dataSource" : "metrics-kafka", "intervals" : ["2018-03-02T00:00:00.000/2018-03-08T00:00:00.000"], "granularity" : "all", "searchDimensions": [ "url", "user" ], "query": { "type": "insensitive_contains", "value": "bob" } }
在 E-MapReduce Druid 集群 Master 上执行查询。
curl --negotiate -u:Druid -b ~/cookies -c ~/cookies -XPOST -H 'Content-Type:
application/json' -d @metrics-search.json http://emr-header-1.cluster-1234:18082/druid/v2/?pretty
其中--negotiate、-u、-b、-c 等是针对安全 E-MapReduce Druid 集群的选项。
正常返回结果示例:
[ { "timestamp" : "2018-03-06T09:00:00.000Z", "result" : [ { "dimension" : "user", "value" : "bob", "count" : 2 } ] } ]