概述
日志服务加工服务的一个作业使用协同消费组, 对源日志库进行流式消费, 对每一条日志传给加工规则处理后再输出.
调度原理
调度机制
对每一个加工作业, 加工服务的调度器会启动一个或多个运行实例, 每个运行实例扮演一个消费者的角色去消费1个或者多个源logstore的shard, 调度器会根据运行实例的内存与CPU消耗情况决定或减少并行运行实例数, 最多启动与源logstore的shard数量一样的运行实例.
运行实例
对分配的每个shard读取用户配置的起点的数据, 在内存中将源日志传递给加载的加工规则引擎, 处理后, 再输出给配置的目标Logstore. 加工规则引擎也会根据规则从外部加载资源进行富化等操作. 运行实例会利用消费组机制保存每个shard消费到的位置, 确保意外停止后再启动时可以继续从断点处继续消费.
作业停止
当用户配置作业时间范围没有配置后终点时, 运行实例默认不会退出.
当作业因为某些原因被停止(例如用户临时停止), 再次启动时, 对每个shard会默认从上次保存消费的点继续消费.
当用户配置作业时间范围有终止点时, 运行实例处理到配置的终点时间所接收的日志后会自动退出.
规则引擎原理: 基本操作
加工规则使用ETL语言编写, 可以理解为一个加工步骤的集合. 其中每一个步骤其实是一个Python的函数调用. 规则引擎加载规则后按步骤顺序执行.
例如这里4个以e_开头的函数的调用定义了4个主要步骤.
e_set("log_type", "access_log")
e_drop_fields("__action")
e_if(e_search("ret: pass"), e_set("result", "pass"))
e_if(e_search("ret: unknown"), DROP)
对应的逻辑如图:
基本逻辑
正常情况下: 规则中定义的每个事件函数会顺序执行, 每一个函数会对每个事件处理和修改, 返回一个修改的事件.
例如e_set("log_type", "access_log")会对每个事件添加一个字段"log_type"值为"access_log", 下一个函数接收到的事件就是最新的.
条件判断
某些步骤可以设定条件, 也就是不满足条件的事件会跳过本次操作. 相当于一个if的逻辑.
例如e_if(e_search("ret: pass"), e_set("result", "pass")), 会首先检查字段ret是否包含pass, 不满足不会做任何操作. 如果满足, 则会设置字段result值为pass.
停止处理
某些步骤可能返回0个事件, 表示删除事件, 例如e_if(e_search("ret: unknown"), DROP), 对每个字段ret的值是unknown的事件会丢弃. 这条事件被丢弃后, 后续的操作将不再进行. 自动重新开始下一条事件.
规则引擎原理: 输出, 复制与分裂
规则引擎也支持复制输出与分裂事件, 例如这里4个以e_开头的函数的调用定义了4个主要步骤.
e_coutput("archive_logstore") )
e_split("log_type")
e_if(e_search("log_type: alert"), e_output("alert_logstore") )
e_set("result", "pass")
假设现在处理一条源日志如下:
log_type: access,alert
content: admin login to database.
对应的逻辑如图:
输出事件
默认输出事件可以视为一种特殊的停止处理, 例如第3步中对log_type为alert的事件, 调用e_output("alert_logstore")
, 提前输出事件到目标, 并删除事件, 其后续的操作也不会再进行.
复制输出事件
函数e_coutput会复制一份当前的事件输出, 并继续处理后续. 例如第1步中, 会将所有经过的日志输出到archive_logstore目标中.
分裂并行
第2步中, e_split("log_type")表示, 根据字段log_type的值, 例如是"access,alert", 分裂成2条事件, 2条事件完全一样, 除了字段log_type的值分别为access和alert.
分裂后的每条事件都会分别继续进行后续的步骤.
进一步参考
欢迎扫码加入官方钉钉群获得实时更新与阿里云工程师的及时直接的支持: