背景
InfluxDB subscription(订阅)主要用于Kapacitor,但是任何可以接受UDP、HTTP或者HTTPS连接的终端,都可以订阅InfluxDB并获得写入InfluxDB的数据。
在InfluxDB中,创建、删除和查看subscription都需要admin权限,为了满足客户的需求,完善InfluxDB®的生态,我们最新的引擎版本已经支持客户通过控制台创建admin权限账号,详情见文档
介绍
Kapacitor是一个数据处理框架,用于告警、ETL(Extract-Transform-Load)和检测异常。Kapacitor是TICK生态中的最后一环。Kapacitor的主要特性包括:
- 既可以处理流数据(streaming data),也可以处理批量数据(batch data)
- 定期从InfluxDB查询数据,并且通过行协议或其它InfluxDB支持的方式获得数据
- 执行InfluxQL支持的函数
- 将处理结果存回InfluxDB
- 支持添加用户自定义的函数检测异常
- 与HipChat、OpsGenie、Alerta、Sensu、PagerDuty和Slack等集成
具体的介绍可查看Kapacitor官方文档。
Kapacitor & Pull
Kapacitor将Prometheus关于service discover和scraping的代码集成到Kapacitor,所以,Kapacitor也可以支持pull模式。通过Kapacitor拉取数据写入InfluxDB,InfluxDB也可以间接地实现pull模式。
流处理 & 批处理
在Kapacitor中,用户可以使用TICKscript实现数据转换、降精度和告警等功能,特别地,可分为流处理和批处理:
- 流处理:在InfluxDB中创建subscription,每个写入InfluxDB的数据点,同时也写入Kapacitor。
- 批处理:Kapacitor周期性地从InfluxDB读取数据(在指定时间区间范围内的数据)。
什么时候需要流处理:
- 需要实时地对每个数据点进行转换(这种操作也可以使用批处理,但是会有延迟)
- 需要非常低的延迟,需要马上触发警报的场景
- 当InfluxDB处理大量查询负载时,需要减轻InfluxDB的查询压力
- 流处理根据数据点的时间戳了解时间,不需要判断一个数据点是否会在时间窗口外,然而在批处理中,数据点可能会因为延迟而落在相关时间窗口外
什么时候需要批处理:
- 执行聚合操作
- 不需要对每个数据点都进行判断是否需要告警(因为不经常发生状态改变)
- 数据降精度
- 稍微的延迟不会严重影响您的操作
- 对于有超高吞吐量的InfluxDB实例(因为Kapacitor处理数据的速度无法像数据写入InfluxDB那样快)
- 需要注意的是,流处理需要缓存更多的数据,占用较多的内存。
Kapacitor & Continuous Queries
Kapacitor的批处理跟InfluxDB的连续查询(Continuous Queries)有点类似,都是周期性地查询InfluxDB中的数据。那么,什么时候该用Kapacitor,什么时候该用CQ呢?
- 大量CQ导致InfluxDB查询负载大时用Kapacitor,需要用流处理,因为批处理同样会增加InfluxDB的查询压力
- 需要执行复杂的数据转换时用Kapacitor,Kapacitor支持用户自定义的函数,不仅仅是InfluxQL函数
- 需要告警、异常检查等功能时用Kapacitor
- 只需将少量数据进行降精度时用CQ,当查询压力不大时,用CQ即可
Kapacitor安装
登录Influxdata官网根据操作系统类型选择所需版本,下载链接,本文以centos为例
- 1、 下载rpm包 wget https://dl.influxdata.com/kapacitor/releases/kapacitor-1.5.3.x86_64.rpm
- 2、安装sudo yum localinstall kapacitor-1.5.3.x86_64.rpm
- 3、修改配置文件 /etc/kapacitor/kapacitor.conf ,主要是influxdb相关配置,现在Influxdb只能通过VPC网络外发数据,所以需要登录TSDB控制台,开通VPC网络双向访问,demo如下
[[influxdb]]
# Connect to an InfluxDB cluster
# Kapacitor can subscribe, query and write to this cluster.
# Using InfluxDB is not required and can be disabled.
enabled = true
default = true
name = "localhost"
urls = ["https://ts-*****.influxdata.rds.aliyuncs.com:3242"]
username = "admin"
password = "Admin1234"
timeout = 0
# Absolute path to pem encoded CA file.
# A CA can be provided without a key/cert pair
# ssl-ca = "/etc/kapacitor/ca.pem"
# Absolutes paths to pem encoded key and cert files.
# ssl-cert = "/etc/kapacitor/cert.pem"
# ssl-key = "/etc/kapacitor/key.pem"
# Do not verify the TLS/SSL certificate.
# This is insecure.
insecure-skip-verify = false
# Maximum time to try and connect to InfluxDB during startup
startup-timeout = "5m"
# Turn off all subscriptions
disable-subscriptions = false
# Subscription mode is either "cluster" or "server"
subscription-mode = "cluster"
# Which protocol to use for subscriptions
# one of 'udp', 'http', or 'https'.
subscription-protocol = "http"
# Subscriptions resync time interval
# Useful if you want to subscribe to new created databases
# without restart Kapacitord
subscriptions-sync-interval = "1m0s"
# Override the global hostname option for this InfluxDB cluster.
# Useful if the InfluxDB cluster is in a separate network and
# needs special config to connect back to this Kapacitor instance.
# Defaults to `hostname` if empty.
kapacitor-hostname = "192.168.11.1"
# Override the global http port option for this InfluxDB cluster.
# Useful if the InfluxDB cluster is in a separate network and
# needs special config to connect back to this Kapacitor instance.
# Defaults to the port from `[http] bind-address` if 0.
http-port = 0
# Host part of a bind address for UDP listeners.
# For example if a UDP listener is using port 1234
# and `udp-bind = "hostname_or_ip"`,
# then the UDP port will be bound to `hostname_or_ip:1234`
# The default empty value will bind to all addresses.
udp-bind = ""
# Subscriptions use the UDP network protocl.
# The following options of for the created UDP listeners for each subscription.
# Number of packets to buffer when reading packets off the socket.
udp-buffer = 1000
# The size in bytes of the OS read buffer for the UDP socket.
# A value of 0 indicates use the OS default.
udp-read-buffer = 0
[influxdb.subscriptions]
# Set of databases and retention policies to subscribe to.
# If empty will subscribe to all, minus the list in
# influxdb.excluded-subscriptions
#
test =["autogen"]
telegraf=["autogen"]
# Format
# db_name = <list of retention policies>
# Example:
# my_database = [ "default", "longterm" ]
[influxdb.excluded-subscriptions]
# Set of databases and retention policies to exclude from the subscriptions.
# If influxdb.subscriptions is empty it will subscribe to all
# except databases listed here.
#
# Format
# db_name = <list of retention policies>
#
# Example:
# my_database = [ "default", "longterm" ]
- 4、启动服务 sudo service kapacitor start
- 5、启动后Influxdb 服务端就可以看到订阅信息
> SHOW SUBSCRIPTIONS
name: test
retention_policy name mode destinations
---------------- ---- ---- ------------
autogen kapacitor-9ffbc8c7-fb55-46ca-84f7-e8102a7f07fb ANY [http://192.168.11.1:9092]
name: telegraf
retention_policy name mode destinations
---------------- ---- ---- ------------
autogen kapacitor-9ffbc8c7-fb55-46ca-84f7-e8102a7f07fb ANY [http://192.168.11.1:9092]
> quit
[root@r10g04440.sqa.zmf /bin]
Kapacitor task例子
本文主要介绍stream方式的报警和流处理任务
- 报警任务配置及发布如下
[root@izuf63izb8ng929n94h5b6z ~]# cat cpu.tick
dbrp "test"."autogen"
stream
// Select just the cpu measurement from our example database.
|from()
.measurement('cpu')
|alert()
.crit(lambda: int("usage_idle") < 70)
// Whenever we get an alert write it to a file.
.log('/tmp/alerts.log')
* kapacitor define cpu_alter -tick cpu.tick
* kapacitor list task
* kapacitor enable cpu_alter
- 流处理任务配置及发布如下,参考文档
[root@izuf63izb8ng929n94h5b6z ~]# cat telegraf.tick
dbrp "telegraf"."autogen"
stream
|from()
.database('telegraf')
.measurement('cpu')
.groupBy(*)
|window()
.period(5m)
.every(5m)
.align()
|mean('usage_idle')
.as('usage_idle')
|influxDBOut()
.database('telegraf')
.retentionPolicy('autogen')
.measurement('mean_cpu_idle')
.precision('s')
* kapacitor define telgrafstream -tick telegraf.tick
* kapacitor list tasks
* kapacitor enable telgrafstream
* kapacitor list tasks
- 运行效果
[root@izuf63izb8ng929n94h5b6z ~]# kapacitor list tasks
ID Type Status Executing Databases and Retention Policies
cpu_alter stream enabled true ["test"."autogen"]
telgrafstream stream enabled true ["telegraf"."autogen"]