日志服务数据加工:原理篇

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,恶意文件检测 1000次 1年
对象存储 OSS,内容安全 1000次 1年
简介: 本篇介绍日志服务数据加工的原理, 包括调度原理, 规则引擎的逻辑处理的基本和高级原理

概述

日志服务加工服务的一个作业使用协同消费组, 对源日志库进行流式消费, 对每一条日志传给加工规则处理后再输出.

调度原理

image

调度机制

对每一个加工作业, 加工服务的调度器会启动一个或多个运行实例, 每个运行实例扮演一个消费者的角色去消费1个或者多个源logstore的shard, 调度器会根据运行实例的内存与CPU消耗情况决定或减少并行运行实例数, 最多启动与源logstore的shard数量一样的运行实例.

运行实例

对分配的每个shard读取用户配置的起点的数据, 在内存中将源日志传递给加载的加工规则引擎, 处理后, 再输出给配置的目标Logstore. 加工规则引擎也会根据规则从外部加载资源进行富化等操作. 运行实例会利用消费组机制保存每个shard消费到的位置, 确保意外停止后再启动时可以继续从断点处继续消费.

作业停止

当用户配置作业时间范围没有配置后终点时, 运行实例默认不会退出.
当作业因为某些原因被停止(例如用户临时停止), 再次启动时, 对每个shard会默认从上次保存消费的点继续消费.
当用户配置作业时间范围有终止点时, 运行实例处理到配置的终点时间所接收的日志后会自动退出.

规则引擎原理: 基本操作

加工规则使用ETL语言编写, 可以理解为一个加工步骤的集合. 其中每一个步骤其实是一个Python的函数调用. 规则引擎加载规则后按步骤顺序执行.

例如这里4个以e_开头的函数的调用定义了4个主要步骤.

e_set("log_type", "access_log")
e_drop_fields("__action")
e_if(e_search("ret: pass"), e_set("result", "pass"))
e_if(e_search("ret: unknown"), DROP)

对应的逻辑如图:
image

基本逻辑

正常情况下: 规则中定义的每个事件函数会顺序执行, 每一个函数会对每个事件处理和修改, 返回一个修改的事件.
例如e_set("log_type", "access_log")会对每个事件添加一个字段"log_type"值为"access_log", 下一个函数接收到的事件就是最新的.

条件判断

某些步骤可以设定条件, 也就是不满足条件的事件会跳过本次操作. 相当于一个if的逻辑.
例如e_if(e_search("ret: pass"), e_set("result", "pass")), 会首先检查字段ret是否包含pass, 不满足不会做任何操作. 如果满足, 则会设置字段result值为pass.

停止处理

某些步骤可能返回0个事件, 表示删除事件, 例如e_if(e_search("ret: unknown"), DROP), 对每个字段ret的值是unknown的事件会丢弃. 这条事件被丢弃后, 后续的操作将不再进行. 自动重新开始下一条事件.

规则引擎原理: 输出, 复制与分裂

规则引擎也支持复制输出与分裂事件, 例如这里4个以e_开头的函数的调用定义了4个主要步骤.

e_coutput("archive_logstore") )
e_split("log_type")
e_if(e_search("log_type: alert"), e_output("alert_logstore") )
e_set("result", "pass")

假设现在处理一条源日志如下:

log_type: access,alert
content: admin login to database.

对应的逻辑如图:
image

输出事件

默认输出事件可以视为一种特殊的停止处理, 例如第3步中对log_type为alert的事件, 调用e_output("alert_logstore"), 提前输出事件到目标, 并删除事件, 其后续的操作也不会再进行.

复制输出事件

函数e_coutput会复制一份当前的事件输出, 并继续处理后续. 例如第1步中, 会将所有经过的日志输出到archive_logstore目标中.

分裂并行

第2步中, e_split("log_type")表示, 根据字段log_type的值, 例如是"access,alert", 分裂成2条事件, 2条事件完全一样, 除了字段log_type的值分别为access和alert.
分裂后的每条事件都会分别继续进行后续的步骤.

进一步参考

欢迎扫码加入官方钉钉群获得实时更新与阿里云工程师的及时直接的支持:
image

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
5天前
|
存储 数据采集 JavaScript
深入理解数仓开发(一)数据技术篇之日志采集
深入理解数仓开发(一)数据技术篇之日志采集
|
1月前
|
人工智能 数据可视化 开发工具
Git log 进阶用法(含格式化、以及数据过滤)
Git log 进阶用法(含格式化、以及数据过滤)
|
1月前
|
监控 NoSQL MongoDB
mongoDB查看数据的插入日志
【5月更文挑战第9天】mongoDB查看数据的插入日志
317 4
|
1月前
|
监控 NoSQL MongoDB
mongoDB查看数据的插入日志
【5月更文挑战第2天】mongoDB查看数据的插入日志
323 0
|
1月前
|
存储 监控 数据可视化
无需重新学习,使用 Kibana 查询/可视化 SLS 数据
本文演示了使用 Kibana 连接 SLS ES 兼容接口进行查询和分析的方法。
66617 11
|
22天前
|
监控 NoSQL MongoDB
mongoDB查看数据的插入日志
【5月更文挑战第22天】mongoDB查看数据的插入日志
28 3
|
24天前
|
SQL Oracle 关系型数据库
实时计算 Flink版产品使用合集之从Oracle数据库同步数据时,checkpoint恢复后无法捕获到任务暂停期间的变更日志,如何处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
25天前
|
SQL 关系型数据库 数据库
实时计算 Flink版产品使用合集之同步PostgreSQL数据时,WAL 日志无限增长,是什么导致的
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
25天前
|
Oracle 关系型数据库 MySQL
实时计算 Flink版产品使用合集之是否支持从库归档日志捕获数据
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
关系型数据库 MySQL 数据管理
MySQL通过 bin-log 恢复从备份点到灾难点之间数据
MySQL通过 bin-log 恢复从备份点到灾难点之间数据
219 0

相关产品

  • 日志服务