阿里云InfluxDB® 版对接Kapacitor指南

简介: Kapacitor是一个数据处理框架,用于告警、ETL(Extract-Transform-Load)和检测异常。Kapacitor是TICK生态中的最后一环,阿里云InfluxDB® 版对接Kapacitor指南新鲜出炉

背景

InfluxDB subscription(订阅)主要用于Kapacitor,但是任何可以接受UDP、HTTP或者HTTPS连接的终端,都可以订阅InfluxDB并获得写入InfluxDB的数据。
在InfluxDB中,创建、删除和查看subscription都需要admin权限,为了满足客户的需求,完善InfluxDB®的生态,我们最新的引擎版本已经支持客户通过控制台创建admin权限账号,详情见文档

介绍

Kapacitor是一个数据处理框架,用于告警、ETL(Extract-Transform-Load)和检测异常。Kapacitor是TICK生态中的最后一环。Kapacitor的主要特性包括:

  • 既可以处理流数据(streaming data),也可以处理批量数据(batch data)
  • 定期从InfluxDB查询数据,并且通过行协议或其它InfluxDB支持的方式获得数据
  • 执行InfluxQL支持的函数
  • 将处理结果存回InfluxDB
  • 支持添加用户自定义的函数检测异常
  • 与HipChat、OpsGenie、Alerta、Sensu、PagerDuty和Slack等集成

具体的介绍可查看Kapacitor官方文档

Kapacitor & Pull

Kapacitor将Prometheus关于service discover和scraping的代码集成到Kapacitor,所以,Kapacitor也可以支持pull模式。通过Kapacitor拉取数据写入InfluxDB,InfluxDB也可以间接地实现pull模式。

流处理 & 批处理

在Kapacitor中,用户可以使用TICKscript实现数据转换、降精度和告警等功能,特别地,可分为流处理和批处理:

  • 流处理:在InfluxDB中创建subscription,每个写入InfluxDB的数据点,同时也写入Kapacitor。
  • 批处理:Kapacitor周期性地从InfluxDB读取数据(在指定时间区间范围内的数据)。

什么时候需要流处理:

  • 需要实时地对每个数据点进行转换(这种操作也可以使用批处理,但是会有延迟)
  • 需要非常低的延迟,需要马上触发警报的场景
  • 当InfluxDB处理大量查询负载时,需要减轻InfluxDB的查询压力
  • 流处理根据数据点的时间戳了解时间,不需要判断一个数据点是否会在时间窗口外,然而在批处理中,数据点可能会因为延迟而落在相关时间窗口外

什么时候需要批处理:

  • 执行聚合操作
  • 不需要对每个数据点都进行判断是否需要告警(因为不经常发生状态改变)
  • 数据降精度
  • 稍微的延迟不会严重影响您的操作
  • 对于有超高吞吐量的InfluxDB实例(因为Kapacitor处理数据的速度无法像数据写入InfluxDB那样快)
  • 需要注意的是,流处理需要缓存更多的数据,占用较多的内存。

Kapacitor & Continuous Queries

Kapacitor的批处理跟InfluxDB的连续查询(Continuous Queries)有点类似,都是周期性地查询InfluxDB中的数据。那么,什么时候该用Kapacitor,什么时候该用CQ呢?

  • 大量CQ导致InfluxDB查询负载大时用Kapacitor,需要用流处理,因为批处理同样会增加InfluxDB的查询压力
  • 需要执行复杂的数据转换时用Kapacitor,Kapacitor支持用户自定义的函数,不仅仅是InfluxQL函数
  • 需要告警、异常检查等功能时用Kapacitor
  • 只需将少量数据进行降精度时用CQ,当查询压力不大时,用CQ即可

Kapacitor安装

登录Influxdata官网根据操作系统类型选择所需版本,下载链接,本文以centos为例

  • 1、 下载rpm包 wget https://dl.influxdata.com/kapacitor/releases/kapacitor-1.5.3.x86_64.rpm
  • 2、安装sudo yum localinstall kapacitor-1.5.3.x86_64.rpm
  • 3、修改配置文件 /etc/kapacitor/kapacitor.conf ,主要是influxdb相关配置,现在Influxdb只能通过VPC网络外发数据,所以需要登录TSDB控制台,开通VPC网络双向访问,demo如下
[[influxdb]]
  # Connect to an InfluxDB cluster
  # Kapacitor can subscribe, query and write to this cluster.
  # Using InfluxDB is not required and can be disabled.
  enabled = true
  default = true
  name = "localhost"
  urls = ["https://ts-*****.influxdata.rds.aliyuncs.com:3242"]
  username = "admin"
  password = "Admin1234"
  timeout = 0
  # Absolute path to pem encoded CA file.
  # A CA can be provided without a key/cert pair
  #   ssl-ca = "/etc/kapacitor/ca.pem"
  # Absolutes paths to pem encoded key and cert files.
  #   ssl-cert = "/etc/kapacitor/cert.pem"
  #   ssl-key = "/etc/kapacitor/key.pem"
  # Do not verify the TLS/SSL certificate.
  # This is insecure.
  insecure-skip-verify = false
  # Maximum time to try and connect to InfluxDB during startup
  startup-timeout = "5m"
  # Turn off all subscriptions
  disable-subscriptions = false
  # Subscription mode is either "cluster" or "server"
  subscription-mode = "cluster"
  # Which protocol to use for subscriptions
  # one of 'udp', 'http', or 'https'.
  subscription-protocol = "http"
  # Subscriptions resync time interval
  # Useful if you want to subscribe to new created databases
  # without restart Kapacitord
  subscriptions-sync-interval = "1m0s"
  # Override the global hostname option for this InfluxDB cluster.
  # Useful if the InfluxDB cluster is in a separate network and
  # needs special config to connect back to this Kapacitor instance.
  # Defaults to `hostname` if empty.
  kapacitor-hostname = "192.168.11.1"
  # Override the global http port option for this InfluxDB cluster.
  # Useful if the InfluxDB cluster is in a separate network and
  # needs special config to connect back to this Kapacitor instance.
  # Defaults to the port from `[http] bind-address` if 0.
  http-port = 0
  # Host part of a bind address for UDP listeners.
  # For example if a UDP listener is using port 1234
  # and `udp-bind = "hostname_or_ip"`,
  # then the UDP port will be bound to `hostname_or_ip:1234`
  # The default empty value will bind to all addresses.
  udp-bind = ""
  # Subscriptions use the UDP network protocl.
  # The following options of for the created UDP listeners for each subscription.
  # Number of packets to buffer when reading packets off the socket.
  udp-buffer = 1000
  # The size in bytes of the OS read buffer for the UDP socket.
  # A value of 0 indicates use the OS default.
  udp-read-buffer = 0
  [influxdb.subscriptions]
    # Set of databases and retention policies to subscribe to.
    # If empty will subscribe to all, minus the list in
    # influxdb.excluded-subscriptions
    #
    test =["autogen"]
    telegraf=["autogen"]
    # Format
    # db_name = <list of retention policies>
    # Example:
    # my_database = [ "default", "longterm" ]
  [influxdb.excluded-subscriptions]
    # Set of databases and retention policies to exclude from the subscriptions.
    # If influxdb.subscriptions is empty it will subscribe to all
    # except databases listed here.
    #
    # Format
    # db_name = <list of retention policies>
    #
    # Example:
    # my_database = [ "default", "longterm" ]
  • 4、启动服务 sudo service kapacitor start
  • 5、启动后Influxdb 服务端就可以看到订阅信息
> SHOW SUBSCRIPTIONS
name: test
retention_policy name                                           mode destinations
---------------- ----                                           ---- ------------
autogen          kapacitor-9ffbc8c7-fb55-46ca-84f7-e8102a7f07fb ANY  [http://192.168.11.1:9092]
name: telegraf
retention_policy name                                           mode destinations
---------------- ----                                           ---- ------------
autogen          kapacitor-9ffbc8c7-fb55-46ca-84f7-e8102a7f07fb ANY  [http://192.168.11.1:9092]
> quit
[root@r10g04440.sqa.zmf /bin]

Kapacitor task例子

本文主要介绍stream方式的报警和流处理任务

  • 报警任务配置及发布如下
[root@izuf63izb8ng929n94h5b6z ~]# cat cpu.tick 
dbrp "test"."autogen"
stream
    // Select just the cpu measurement from our example database.
    |from()
        .measurement('cpu')
    |alert()
        .crit(lambda: int("usage_idle") <  70)
        // Whenever we get an alert write it to a file.
        .log('/tmp/alerts.log')
* kapacitor define cpu_alter -tick cpu.tick
* kapacitor list task
* kapacitor enable cpu_alter 
[root@izuf63izb8ng929n94h5b6z ~]# cat telegraf.tick 
dbrp "telegraf"."autogen"
stream
    |from()
        .database('telegraf')
        .measurement('cpu')
        .groupBy(*)
    |window()
        .period(5m)
        .every(5m)
        .align()
    |mean('usage_idle')
        .as('usage_idle')
    |influxDBOut()
        .database('telegraf')
        .retentionPolicy('autogen')
        .measurement('mean_cpu_idle')
        .precision('s')
* kapacitor define telgrafstream -tick telegraf.tick
* kapacitor list tasks
* kapacitor enable telgrafstream
* kapacitor list tasks
  • 运行效果
[root@izuf63izb8ng929n94h5b6z ~]# kapacitor list tasks
ID            Type      Status    Executing Databases and Retention Policies
cpu_alter     stream    enabled   true      ["test"."autogen"]
telgrafstream stream    enabled   true      ["telegraf"."autogen"]
目录
相关文章
|
分布式计算 数据可视化 时序数据库
使用阿里云InfluxDB®和Spark Streaming实时处理时序数据
本文重点介绍怎样利用阿里云InfluxDB®和spark structured streaming来实时计算、存储和可视化数据。下面将介绍如何购买和初始化阿里云InfluxDB®,扩展spark foreach writer,以及设计阿里云InfluxDB®数据库时需要注意的事项。
6937 0
|
监控 时序数据库
Telegraf+Influxdb+Chronograf+Kapacitor主机性能监控告警
一.简述 通过TICK(Telegraf+Influxdb+Chronograf+Kapacitor)进行主机性能监控告警,职责描述如下: Telegraf的职能是数据采集,用于主机性能数据,包括主机CPU、内存、IO、进程状态、服务状态等 Influxdb的职能是时序数据库,用于存储Teleg.
4974 0
|
5月前
|
设计模式 API 数据处理
TICK 中Kapacitor功能和使用说明
TICK 中Kapacitor功能和使用说明
82 2
|
存储 传感器 机器学习/深度学习
MRS IoTDB时序数据库的架构设计与实现(上)
MRS IoTDB是近年来最新推出的时序数据库产品,其领先的设计理念在时序数据库领域展现出越来越强大的竞争力,得到了越来越多的用户认可。为了大家更好地了解MRS IoTDB,本文将会系统地为大家介绍MRS IoTDB的来龙去脉和功能特性,重点为大家介绍MRS IoTDB时序数据库的架构设计与实现,这次先为大家介绍MRS IoTDB的整体架构设计,后续系列文章会为大家逐步展开细节介绍。
447 0
MRS IoTDB时序数据库的架构设计与实现(上)
|
存储 算法 数据管理
MRS IoTDB时序数据库的架构设计与实现(下)
MRS IoTDB集群是完全对等的分布式架构,既基于Raft协议避免了单点故障问题,又通过Multi-Raft协议避免了单一Raft共识组带来的单点性能问题,同时对分布式协议的底层通讯、并发控制和高可用机制做了进一步优化。
280 0
MRS IoTDB时序数据库的架构设计与实现(下)
|
存储 数据采集 监控
CentOS7下部署Open-Falcon小米开源监控系统
CentOS7下部署Open-Falcon小米开源监控系统
490 0
CentOS7下部署Open-Falcon小米开源监控系统
|
存储 分布式计算 关系型数据库
基于MinIO/Deleta Lake/Dremio和Superset或Metabase搭建简单的数据湖
基于MinIO/Deleta Lake/Dremio和Superset或Metabase搭建简单的数据湖
1638 0
基于MinIO/Deleta Lake/Dremio和Superset或Metabase搭建简单的数据湖
|
SQL 存储 数据采集
SpringBoot整合TICK(Telegraf+InfluxDB+Chronograf +Kapacitor)监控系列之一:InfluxDB
TICK各个模块说明如下所示: T(Telegraf):服务监控数据采集,包括服务器CPU、内存、IO、进程状态、服务状态等等; I(InfluxDB):时序型数据库,存储Telegraf采集的监控数据,每条数据都会有time序列; C(Chronograf):时间序列数据可视化展示; K(Kapacitor):可以按照预先编写好的规则,实时地订阅influxDB数据或者批量查询数据,并进行告警。
SpringBoot整合TICK(Telegraf+InfluxDB+Chronograf +Kapacitor)监控系列之一:InfluxDB
|
SQL 存储 资源调度
CDP Impala的准入控制架构
Apache Impala 是 Cloudera 支持的大规模并行内存 SQL 引擎,专为分析和针对存储在 Apache Hive、Apache HBase 和 Apache Kudu 表中的数据的即席查询而设计。支持强大的查询和高并发性 Impala 可以使用大量的集群资源。在多租户环境中,这可能会无意中影响相邻的服务,例如 YARN、HBase 甚至 HDFS。Impala 准入控制通过将查询引导到离散资源池中以实现工作负载隔离、集群利用率和优先级排序,从而在 Impala 内实现细粒度的资源分配。
CDP Impala的准入控制架构
|
存储 弹性计算 算法
阿里云InfluxDB®高可用设计
阿里云InfluxDB®是一版免运维,稳定可靠,可弹性伸缩的在线时序数据库服务,目前围绕InfluxDB的TIG(Telegraf/InfluxDB/Grafana)生态和高可用服务版本已经商业化,可以在阿里云官网直接购买。
4235 0
阿里云InfluxDB®高可用设计