【Druid】(八)Apache Druid 核心插件 Kafka Indexing Service & SLS Indexing Service 2

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 【Druid】(八)Apache Druid 核心插件 Kafka Indexing Service & SLS Indexing Service 2

文章目录


一、前言

二、与 Kafka 集群交互

三、使用 Apache Druid Kafka Indexing Service 实时消费 Kafka 数据

四、关于 SLS Indexing Service

4.1 背景介绍

4.2 准备工作

4.3 使用 SLS Indexing Service


一、前言


Kafka Indexing Service 是 Apache Druid 推出的使用 Apache Druid 的 Indexing Service 服务实时消费 Kafka 数据的插件。


Kafka Indexing Service 可以在 Overlord 上配置 Supervisor(这里的监管者具体是指 KafkaSupervisor,负责监控单个 DataSource 下的 KafkaIndexTask。在其构造的时候,可以接受 KafkaSupervisorSpec 以知晓 Kafka 的 Topic 相关的配置信息,以及摄入的规则,用于生成 KafkaIndexTask 索引任务),并负责管理 Kafka 索引任务的创建和生命周期。这些 KIS 任务使用 Kafka 自身的分区和偏移机制来读取事件,因此能够提供 exactly-once 摄取的保证(旧版本下,Tranquility 采用的是 push 的方式,则完全无法实现不丢不重的特性)。KIS 任务还能够从 Kafka 读取非近期事件,并且不受其他摄取机制强加的窗口期限的影响。另外,Supervisor 会监控索引任务的状态,以便管理故障,并保证了可伸缩性和易复制的特性。更多差异点,详见下面的对比表:



在 0.16.0 版本中,Apache Druid 彻底删除了 Realtime Node 相关的插件,包括了 druid-kafka-eight、druid-kafka-eight-simpleConsumer、druid-rabbitmq 和 druid-rocketmq


虽然新引入的 KIS 有诸多好处,但是世上并不存在“银弹”。因为 KIS 采用了 pull 的方式摄入数据,必然会存在拉取的频率一说。该频率由 offsetFetchPeriod 参数控制,默认 30s 会拉取一次,而最快只能 5s 拉取一次。那为什么不能设置更小的值呢?因为如果过于频繁地向 Kafka 发起请求,可能影响到 Kafka 的稳定性。


补充:上文我们也讲到该插件会在 Overlord 中启动一个 supervisor,supervisor 启动之后会在 Middlemanager 中启动一些 indexing task,这些 task 会连接到 Kafka 集群消费 topic 数据,并完成索引创建。


1.task 创建和运行的过程



2.task停止的过程



二、与 Kafka 集群交互


E-MapReduce Druid 集群与 Kafka 集群交互的配置方式与 Hadoop 集群类似,均需要设置连通性、hosts 等。


对于非安全 Kafka 集群,请按照以下步骤操作:


1.确保集群间能够通信(两个集群在一个安全组下,或两个集群在不同安全组,但两个安全组之间配置了访问规则)。


2.将 Kafka 集群的 hosts 写入到 E-MapReduce Druid 集群每一个节点的 hosts 列表中。


注意 Kafka 集群的 hostname 应采用长名形式,例如 emr-header-1.cluster-xxxxxxxx。


对于安全 Kafka 集群,您需要执行下列操作(前两步与非安全 Kafka 集群相同):


确保集群间能够通信(两个集群在一个安全组下,或两个集群在不同安全组,但两个安全组之间配置了访问规则)。

将 Kafka 集群的 hosts 写入到 E-MapReduce Druid 集群每一个节点的 hosts 列表中。

注意 Kafka 集群的 hostname 应采用长名形式,如 emr-header-1.cluster-xxxxxxxx。


设置两个集群间的 Kerberos 跨域互信(详情请参见 跨域互信 ),推荐做双向互信。

准备一个客户端安全配置文件,文件内容格式如下。


KafkaClient {
      com.sun.security.auth.module.Krb5LoginModule required
      useKeyTab=true
      storeKey=true
      keyTab="/etc/ecm/druid-conf/druid.keytab"
      principal="druid@EMR.1234.COM";
  };

文件准备好后,将该配置文件同步到 E-MapReduce Druid 集群的所有节点上,放置于某一个目录下面(例如/tmp/kafka/kafka_client_jaas.conf)。


在 E-MapReduce Druid 配置页面的 overlord.jvm 中新增如下选项。

Djava.security.auth.login.config=/tmp/kafka/kafka_client_jaas.conf

在 E-MapReduce Druid 配置页面的 middleManager.runtime 中配置druid.indexer.runner.javaOpts=-Djava.security.auth.login.confi=/tmp/kafka/kafka_client_jaas.conf和其他 JVM 启动参数。


重启 Druid 服务。


三、使用 Apache Druid Kafka Indexing Service 实时消费 Kafka 数据


在 Kafka 集群(或 Gateway)上执行以下命令创建一个名称为 metrics 的 topic。

-- 如果开启了 Kafka 高安全:

export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf"
 --
 kafka-topics.sh --create --zookeeper emr-header-1:2181,emr-header-2,emr-header-3/kafka-1.0.0 --partitions 1 --replication-factor 1 --topic metrics

实际创建 topic 时,您需要根据您的环境配置来替换上述命令中的各个参数。其中,--zookeeper 参数中 /kafka-1.0.0 是一个路径,该路径的获取方法是:登录阿里云 E-MapReduce 控制台> 进入 Kafka 集群的 Kafka 服务配置页面 > 查看 zookeeper.connect 配置项的值。如果您的 Kafka 集群是自建集群,则您需要根据集群的实际配置来替换 --zookeeper 参数。


定义数据源的数据格式描述文件(名称命名为 metrics-kafka.json),并放置在当前目录下(或放置在其他您指定的目录上)。

{
     "type": "kafka",
     "dataSchema": {
         "dataSource": "metrics-kafka",
         "parser": {
             "type": "string",
             "parseSpec": {
                 "timestampSpec": {
                     "column": "time",
                     "format": "auto"
                 },
                 "dimensionsSpec": {
                     "dimensions": ["url", "user"]
                 },
                 "format": "json"
             }
         },
         "granularitySpec": {
             "type": "uniform",
             "segmentGranularity": "hour",
             "queryGranularity": "none"
         },
         "metricsSpec": [{
                 "type": "count",
                 "name": "views"
             },
             {
                 "name": "latencyMs",
                 "type": "doubleSum",
                 "fieldName": "latencyMs"
             }
         ]
     },
     "ioConfig": {
         "topic": "metrics",
         "consumerProperties": {
             "bootstrap.servers": "emr-worker-1.cluster-xxxxxxxx:9092(您 Kafka 集群的 bootstrap.servers)",
             "group.id": "kafka-indexing-service",
             "security.protocol": "SASL_PLAINTEXT",
             "sasl.mechanism": "GSSAPI"
         },
         "taskCount": 1,
         "replicas": 1,
         "taskDuration": "PT1H"
     },
     "tuningConfig": {
         "type": "kafka",
         "maxRowsInMemory": "100000"
     }
 }

说明 ioConfig.consumerProperties.security.protocol 和 ioConfig.consumerProperties.sasl.mechanism 为安全相关选项(非安全 Kafka 集群不需要)。


执行下述命令添加 Kafka supervisor。

curl --negotiate -u:druid -b ~/cookies -c ~/cookies -XPOST -H 'Content-Type: application/json' -d @metrics-kafka.json http://emr-header-1.cluster-1234:18090/druid/indexer/v1/supervisor

其中 --negotiate、-u、-b、-c 等是针对安全 E-MapReduce Druid 集群的选项。


在 Kafka 集群上开启一个 console producer。


-- 如果开启了 Kafka 高安全:
 export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/ecm/kafka-conf/kafka_client_jaas.conf"
 echo -e "security.protocol=SASL_PLAINTEXT\nsasl.mechanism=GSSAPI" > /tmp/Kafka/producer.conf
 --
 Kafka-console-producer.sh --producer.config /tmp/kafka/producer.conf --broker-list emr-worker-1:9092,emr-worker-2:9092,emr-worker-3:9092 --topic metrics
 >

其中 --producer.config /tmp/Kafka/producer.conf 是针对安全 Kafka 集群的选项。


在 kafka_console_producer 的命令提示符下输入一些数据。

{"time": "2018-03-06T09:57:58Z", "url": "/foo/bar", "user": "alice", "latencyMs": 32}
{"time": "2018-03-06T09:57:59Z", "url": "/", "user": "bob", "latencyMs": 11}
{"time": "2018-03-06T09:58:00Z", "url": "/foo/bar", "user": "bob", "latencyMs": 45}

其中时间戳可用如下 python 命令生成:


python -c 'import datetime; print(datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"))'

准备一个查询文件,命名为 metrics-search.json。

{
     "queryType" : "search",
     "dataSource" : "metrics-kafka",
     "intervals" : ["2018-03-02T00:00:00.000/2018-03-08T00:00:00.000"],
     "granularity" : "all",
     "searchDimensions": [
         "url",
         "user"
     ],
     "query": {
         "type": "insensitive_contains",
         "value": "bob"
     }
 }

在 E-MapReduce Druid 集群 Master 上执行查询。

curl --negotiate -u:Druid -b ~/cookies -c ~/cookies -XPOST -H 'Content-Type:

application/json' -d @metrics-search.json http://emr-header-1.cluster-1234:18082/druid/v2/?pretty

其中--negotiate、-u、-b、-c 等是针对安全 E-MapReduce Druid 集群的选项。


正常返回结果示例:


[ {
   "timestamp" : "2018-03-06T09:00:00.000Z",
   "result" : [ {
     "dimension" : "user",
     "value" : "bob",
     "count" : 2
   } ]
 } ]


目录
相关文章
|
2月前
|
消息中间件 安全 Kafka
Apache Kafka安全加固指南:保护你的消息传递系统
【10月更文挑战第24天】在现代企业环境中,数据的安全性和隐私保护至关重要。Apache Kafka作为一款广泛使用的分布式流处理平台,其安全性直接影响着业务的稳定性和用户数据的安全。作为一名资深的Kafka使用者,我深知加强Kafka安全性的重要性。本文将从个人角度出发,分享我在实践中积累的经验,帮助读者了解如何有效地保护Kafka消息传递系统的安全性。
128 7
|
2月前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
100 5
|
2月前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
95 4
|
2月前
|
消息中间件 监控 大数据
优化Apache Kafka性能:最佳实践与调优策略
【10月更文挑战第24天】作为一名已经对Apache Kafka有所了解并有实际使用经验的开发者,我深知在大数据处理和实时数据流传输中,Kafka的重要性不言而喻。然而,在面对日益增长的数据量和业务需求时,如何保证系统的高性能和稳定性成为了摆在我们面前的一个挑战。本文将从我的个人视角出发,分享一些关于如何通过合理的配置和调优来提高Kafka性能的经验和建议。
92 4
|
2月前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
69 5
|
2月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
52 1
|
2月前
|
消息中间件 Java Kafka
初识Apache Kafka:搭建你的第一个消息队列系统
【10月更文挑战第24天】在数字化转型的浪潮中,数据成为了企业决策的关键因素之一。而高效的数据处理能力,则成为了企业在竞争中脱颖而出的重要武器。在这个背景下,消息队列作为连接不同系统和服务的桥梁,其重要性日益凸显。Apache Kafka 是一款开源的消息队列系统,以其高吞吐量、可扩展性和持久性等特点受到了广泛欢迎。作为一名技术爱好者,我对 Apache Kafka 产生了浓厚的兴趣,并决定亲手搭建一套属于自己的消息队列系统。
78 2
初识Apache Kafka:搭建你的第一个消息队列系统
|
2月前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
89 2
|
2月前
|
消息中间件 Ubuntu Java
Ubuntu系统上安装Apache Kafka
Ubuntu系统上安装Apache Kafka
|
2月前
|
消息中间件 监控 Kafka
Apache Kafka 成为处理实时数据流的关键组件。Kafka Manager 提供了一个简洁的 Web 界面
随着大数据技术的发展,Apache Kafka 成为处理实时数据流的关键组件。Kafka Manager 提供了一个简洁的 Web 界面,方便管理和监控 Kafka 集群。本文详细介绍了 Kafka Manager 的部署步骤和基本使用方法,包括配置文件的修改、启动命令、API 示例代码等,帮助你快速上手并有效管理 Kafka 集群。
54 0

热门文章

最新文章

推荐镜像

更多