文章目录
一、前言
二、与 Kafka 集群交互
三、使用 Apache Druid Kafka Indexing Service 实时消费 Kafka 数据
四、关于 SLS Indexing Service
4.1 背景介绍
4.2 准备工作
4.3 使用 SLS Indexing Service
一、前言
Kafka Indexing Service 是 Apache Druid 推出的使用 Apache Druid 的 Indexing Service 服务实时消费 Kafka 数据的插件。
Kafka Indexing Service 可以在 Overlord 上配置 Supervisor(这里的监管者具体是指 KafkaSupervisor,负责监控单个 DataSource 下的 KafkaIndexTask。在其构造的时候,可以接受 KafkaSupervisorSpec 以知晓 Kafka 的 Topic 相关的配置信息,以及摄入的规则,用于生成 KafkaIndexTask 索引任务),并负责管理 Kafka 索引任务的创建和生命周期。这些 KIS 任务使用 Kafka 自身的分区和偏移机制来读取事件,因此能够提供 exactly-once 摄取的保证(旧版本下,Tranquility 采用的是 push 的方式,则完全无法实现不丢不重的特性)。KIS 任务还能够从 Kafka 读取非近期事件,并且不受其他摄取机制强加的窗口期限的影响。另外,Supervisor 会监控索引任务的状态,以便管理故障,并保证了可伸缩性和易复制的特性。更多差异点,详见下面的对比表:
在 0.16.0 版本中,Apache Druid 彻底删除了 Realtime Node 相关的插件,包括了 druid-kafka-eight、druid-kafka-eight-simpleConsumer、druid-rabbitmq 和 druid-rocketmq
虽然新引入的 KIS 有诸多好处,但是世上并不存在“银弹”。因为 KIS 采用了 pull 的方式摄入数据,必然会存在拉取的频率一说。该频率由 offsetFetchPeriod 参数控制,默认 30s 会拉取一次,而最快只能 5s 拉取一次。那为什么不能设置更小的值呢?因为如果过于频繁地向 Kafka 发起请求,可能影响到 Kafka 的稳定性。
补充:上文我们也讲到该插件会在 Overlord 中启动一个 supervisor,supervisor 启动之后会在 Middlemanager 中启动一些 indexing task,这些 task 会连接到 Kafka 集群消费 topic 数据,并完成索引创建。
1.task 创建和运行的过程
2.task停止的过程
二、与 Kafka 集群交互
E-MapReduce Druid 集群与 Kafka 集群交互的配置方式与 Hadoop 集群类似,均需要设置连通性、hosts 等。
对于非安全 Kafka 集群,请按照以下步骤操作:
1.确保集群间能够通信(两个集群在一个安全组下,或两个集群在不同安全组,但两个安全组之间配置了访问规则)。
2.将 Kafka 集群的 hosts 写入到 E-MapReduce Druid 集群每一个节点的 hosts 列表中。
注意 Kafka 集群的 hostname 应采用长名形式,例如 emr-header-1.cluster-xxxxxxxx。
对于安全 Kafka 集群,您需要执行下列操作(前两步与非安全 Kafka 集群相同):
确保集群间能够通信(两个集群在一个安全组下,或两个集群在不同安全组,但两个安全组之间配置了访问规则)。
将 Kafka 集群的 hosts 写入到 E-MapReduce Druid 集群每一个节点的 hosts 列表中。
注意 Kafka 集群的 hostname 应采用长名形式,如 emr-header-1.cluster-xxxxxxxx。
设置两个集群间的 Kerberos 跨域互信(详情请参见 跨域互信 ),推荐做双向互信。
准备一个客户端安全配置文件,文件内容格式如下。
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/etc/ecm/druid-conf/druid.keytab" principal="druid@EMR.1234.COM"; };
文件准备好后,将该配置文件同步到 E-MapReduce Druid 集群的所有节点上,放置于某一个目录下面(例如/tmp/kafka/kafka_client_jaas.conf)。
在 E-MapReduce Druid 配置页面的 overlord.jvm 中新增如下选项。
Djava.security.auth.login.config=/tmp/kafka/kafka_client_jaas.conf
在 E-MapReduce Druid 配置页面的 middleManager.runtime 中配置druid.indexer.runner.javaOpts=-Djava.security.auth.login.confi=/tmp/kafka/kafka_client_jaas.conf和其他 JVM 启动参数。
重启 Druid 服务。