阿里云InfluxDB® 版对接Kapacitor指南

简介: Kapacitor是一个数据处理框架,用于告警、ETL(Extract-Transform-Load)和检测异常。Kapacitor是TICK生态中的最后一环,阿里云InfluxDB® 版对接Kapacitor指南新鲜出炉

背景

InfluxDB subscription(订阅)主要用于Kapacitor,但是任何可以接受UDP、HTTP或者HTTPS连接的终端,都可以订阅InfluxDB并获得写入InfluxDB的数据。
在InfluxDB中,创建、删除和查看subscription都需要admin权限,为了满足客户的需求,完善InfluxDB®的生态,我们最新的引擎版本已经支持客户通过控制台创建admin权限账号,详情见文档

介绍

Kapacitor是一个数据处理框架,用于告警、ETL(Extract-Transform-Load)和检测异常。Kapacitor是TICK生态中的最后一环。Kapacitor的主要特性包括:

  • 既可以处理流数据(streaming data),也可以处理批量数据(batch data)
  • 定期从InfluxDB查询数据,并且通过行协议或其它InfluxDB支持的方式获得数据
  • 执行InfluxQL支持的函数
  • 将处理结果存回InfluxDB
  • 支持添加用户自定义的函数检测异常
  • 与HipChat、OpsGenie、Alerta、Sensu、PagerDuty和Slack等集成

具体的介绍可查看Kapacitor官方文档

Kapacitor & Pull

Kapacitor将Prometheus关于service discover和scraping的代码集成到Kapacitor,所以,Kapacitor也可以支持pull模式。通过Kapacitor拉取数据写入InfluxDB,InfluxDB也可以间接地实现pull模式。

流处理 & 批处理

在Kapacitor中,用户可以使用TICKscript实现数据转换、降精度和告警等功能,特别地,可分为流处理和批处理:

  • 流处理:在InfluxDB中创建subscription,每个写入InfluxDB的数据点,同时也写入Kapacitor。
  • 批处理:Kapacitor周期性地从InfluxDB读取数据(在指定时间区间范围内的数据)。

什么时候需要流处理:

  • 需要实时地对每个数据点进行转换(这种操作也可以使用批处理,但是会有延迟)
  • 需要非常低的延迟,需要马上触发警报的场景
  • 当InfluxDB处理大量查询负载时,需要减轻InfluxDB的查询压力
  • 流处理根据数据点的时间戳了解时间,不需要判断一个数据点是否会在时间窗口外,然而在批处理中,数据点可能会因为延迟而落在相关时间窗口外

什么时候需要批处理:

  • 执行聚合操作
  • 不需要对每个数据点都进行判断是否需要告警(因为不经常发生状态改变)
  • 数据降精度
  • 稍微的延迟不会严重影响您的操作
  • 对于有超高吞吐量的InfluxDB实例(因为Kapacitor处理数据的速度无法像数据写入InfluxDB那样快)
  • 需要注意的是,流处理需要缓存更多的数据,占用较多的内存。

Kapacitor & Continuous Queries

Kapacitor的批处理跟InfluxDB的连续查询(Continuous Queries)有点类似,都是周期性地查询InfluxDB中的数据。那么,什么时候该用Kapacitor,什么时候该用CQ呢?

  • 大量CQ导致InfluxDB查询负载大时用Kapacitor,需要用流处理,因为批处理同样会增加InfluxDB的查询压力
  • 需要执行复杂的数据转换时用Kapacitor,Kapacitor支持用户自定义的函数,不仅仅是InfluxQL函数
  • 需要告警、异常检查等功能时用Kapacitor
  • 只需将少量数据进行降精度时用CQ,当查询压力不大时,用CQ即可

Kapacitor安装

登录Influxdata官网根据操作系统类型选择所需版本,下载链接,本文以centos为例

  • 1、 下载rpm包 wget https://dl.influxdata.com/kapacitor/releases/kapacitor-1.5.3.x86_64.rpm
  • 2、安装sudo yum localinstall kapacitor-1.5.3.x86_64.rpm
  • 3、修改配置文件 /etc/kapacitor/kapacitor.conf ,主要是influxdb相关配置,现在Influxdb只能通过VPC网络外发数据,所以需要登录TSDB控制台,开通VPC网络双向访问,demo如下
[[influxdb]]
  # Connect to an InfluxDB cluster
  # Kapacitor can subscribe, query and write to this cluster.
  # Using InfluxDB is not required and can be disabled.
  enabled = true
  default = true
  name = "localhost"
  urls = ["https://ts-*****.influxdata.rds.aliyuncs.com:3242"]
  username = "admin"
  password = "Admin1234"
  timeout = 0
  # Absolute path to pem encoded CA file.
  # A CA can be provided without a key/cert pair
  #   ssl-ca = "/etc/kapacitor/ca.pem"
  # Absolutes paths to pem encoded key and cert files.
  #   ssl-cert = "/etc/kapacitor/cert.pem"
  #   ssl-key = "/etc/kapacitor/key.pem"
  # Do not verify the TLS/SSL certificate.
  # This is insecure.
  insecure-skip-verify = false
  # Maximum time to try and connect to InfluxDB during startup
  startup-timeout = "5m"
  # Turn off all subscriptions
  disable-subscriptions = false
  # Subscription mode is either "cluster" or "server"
  subscription-mode = "cluster"
  # Which protocol to use for subscriptions
  # one of 'udp', 'http', or 'https'.
  subscription-protocol = "http"
  # Subscriptions resync time interval
  # Useful if you want to subscribe to new created databases
  # without restart Kapacitord
  subscriptions-sync-interval = "1m0s"
  # Override the global hostname option for this InfluxDB cluster.
  # Useful if the InfluxDB cluster is in a separate network and
  # needs special config to connect back to this Kapacitor instance.
  # Defaults to `hostname` if empty.
  kapacitor-hostname = "192.168.11.1"
  # Override the global http port option for this InfluxDB cluster.
  # Useful if the InfluxDB cluster is in a separate network and
  # needs special config to connect back to this Kapacitor instance.
  # Defaults to the port from `[http] bind-address` if 0.
  http-port = 0
  # Host part of a bind address for UDP listeners.
  # For example if a UDP listener is using port 1234
  # and `udp-bind = "hostname_or_ip"`,
  # then the UDP port will be bound to `hostname_or_ip:1234`
  # The default empty value will bind to all addresses.
  udp-bind = ""
  # Subscriptions use the UDP network protocl.
  # The following options of for the created UDP listeners for each subscription.
  # Number of packets to buffer when reading packets off the socket.
  udp-buffer = 1000
  # The size in bytes of the OS read buffer for the UDP socket.
  # A value of 0 indicates use the OS default.
  udp-read-buffer = 0
  [influxdb.subscriptions]
    # Set of databases and retention policies to subscribe to.
    # If empty will subscribe to all, minus the list in
    # influxdb.excluded-subscriptions
    #
    test =["autogen"]
    telegraf=["autogen"]
    # Format
    # db_name = <list of retention policies>
    # Example:
    # my_database = [ "default", "longterm" ]
  [influxdb.excluded-subscriptions]
    # Set of databases and retention policies to exclude from the subscriptions.
    # If influxdb.subscriptions is empty it will subscribe to all
    # except databases listed here.
    #
    # Format
    # db_name = <list of retention policies>
    #
    # Example:
    # my_database = [ "default", "longterm" ]
  • 4、启动服务 sudo service kapacitor start
  • 5、启动后Influxdb 服务端就可以看到订阅信息
> SHOW SUBSCRIPTIONS
name: test
retention_policy name                                           mode destinations
---------------- ----                                           ---- ------------
autogen          kapacitor-9ffbc8c7-fb55-46ca-84f7-e8102a7f07fb ANY  [http://192.168.11.1:9092]
name: telegraf
retention_policy name                                           mode destinations
---------------- ----                                           ---- ------------
autogen          kapacitor-9ffbc8c7-fb55-46ca-84f7-e8102a7f07fb ANY  [http://192.168.11.1:9092]
> quit
[root@r10g04440.sqa.zmf /bin]

Kapacitor task例子

本文主要介绍stream方式的报警和流处理任务

  • 报警任务配置及发布如下
[root@izuf63izb8ng929n94h5b6z ~]# cat cpu.tick 
dbrp "test"."autogen"
stream
    // Select just the cpu measurement from our example database.
    |from()
        .measurement('cpu')
    |alert()
        .crit(lambda: int("usage_idle") <  70)
        // Whenever we get an alert write it to a file.
        .log('/tmp/alerts.log')
* kapacitor define cpu_alter -tick cpu.tick
* kapacitor list task
* kapacitor enable cpu_alter 
[root@izuf63izb8ng929n94h5b6z ~]# cat telegraf.tick 
dbrp "telegraf"."autogen"
stream
    |from()
        .database('telegraf')
        .measurement('cpu')
        .groupBy(*)
    |window()
        .period(5m)
        .every(5m)
        .align()
    |mean('usage_idle')
        .as('usage_idle')
    |influxDBOut()
        .database('telegraf')
        .retentionPolicy('autogen')
        .measurement('mean_cpu_idle')
        .precision('s')
* kapacitor define telgrafstream -tick telegraf.tick
* kapacitor list tasks
* kapacitor enable telgrafstream
* kapacitor list tasks
  • 运行效果
[root@izuf63izb8ng929n94h5b6z ~]# kapacitor list tasks
ID            Type      Status    Executing Databases and Retention Policies
cpu_alter     stream    enabled   true      ["test"."autogen"]
telgrafstream stream    enabled   true      ["telegraf"."autogen"]
目录
相关文章
|
Python Windows
Windows定时任务 每隔一段时间(最小到秒级)执行一次指定的Python脚本
Windows定时任务 每隔一段时间(最小到秒级)执行一次指定的Python脚本
Windows定时任务 每隔一段时间(最小到秒级)执行一次指定的Python脚本
|
开发框架 前端开发 JavaScript
Python 有哪些Web框架?比如Flask、Django等知识梳理
Python 有哪些Web框架?比如Flask、Django等知识梳理
882 1
|
1月前
|
存储 监控 调度
Apache DolphinScheduler 数据库模式深度解析:从表结构到调度逻辑
Apache DolphinScheduler 作为开源分布式工作流调度平台,其数据库模式是核心支撑。本文从表结构、模块设计到企业实践,解析如何通过七大表组与分布式架构,实现跨集群调度、高可用与插件扩展,助力3000+企业高效管理数据任务,推动云原生时代下的智能调度演进。(238字)
|
11月前
|
运维 负载均衡 算法
如何选择适合的硬件负载均衡设备?
如何选择适合的硬件负载均衡设备?
494 138
|
设计模式 API 数据处理
TICK 中Kapacitor功能和使用说明
TICK 中Kapacitor功能和使用说明
436 2
|
9月前
|
人工智能 定位技术 开发者
魔搭上线最大MCP中文社区,支付宝、MiniMax等MCP独家首发
4月15日,中国第一AI开源社区魔搭(ModelScope)推出全新MCP广场,上架千余款热门的MCP服务,包括支付宝、MiniMax等全新MCP服务在魔搭独家首发。魔搭社区为AI开发者提供丰富的MCP服务及调试工具,并支持第三方平台集成和调用,通过开源开放的方式加速Agent及AI应用的创新和落地。
681 3
|
10月前
|
SQL 消息中间件 Serverless
​Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
​Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
332 4
|
监控 JavaScript
Vuex学习二:Vuex的重点属性学习,state、mutations、getters、actions、module。
这篇文章是关于Vuex状态管理库的深入学习,涵盖了其核心概念如state、getters、mutations、actions和modules,并通过实例代码展示了它们的使用和重要性。
334 1
|
SQL 安全 数据库
已成功与服务器建立连接 但是在登录过程中发生错误。 provider 共享内存提供程序 error 0 管道的另一端上无任何进程。
用户 'sa' 登录失败。该用户与可信 SQL Server 连接无关联。  说明: 执行当前 Web 请求期间,出现未处理的异常。
4680 0
|
开发框架 前端开发 JavaScript
基于SqlSugar的数据库访问处理的封装,支持.net FrameWork和.net core的项目调用
基于SqlSugar的数据库访问处理的封装,支持.net FrameWork和.net core的项目调用

热门文章

最新文章