背景
阿里云日志服务提供可托管、可扩展、高可用的数据加工服务。数据加工服务可用于数据的规整、富化、流转、脱敏和过滤。本文为读者带来了数据加工动态解析与分发的最佳实践。
场景
现有多个不同的APP,所有APP的程序运行日志都输入到同一个中心Logstore中。每个APP的日志都是以分隔符分隔的文本日志,但是日志字段schema各不相同。日志样例如下:
APP_1的日志样例 content: schema_app1|113.17.4.39|www.zsc.mock.com|PUT|1082|404|https|28.3|Mozilla/5.0 APP_2的日志样例 content: schema_app2|183.93.165.82|db-01|MySQL|5.5|0|cn-shanghai|1072|user-2 APP_3的日志样例 content: schema_app3|root|container4|image3|www.jd.mock.com|221.176.106.202|200|01/Apr/2021:06:27:56
上述APP的日志格式为:"schema_id|字段值1|字段值2|字段值3..."
- schema_id为该日志的字段schema的ID
- "字段值X"是日志的各个字段值,每个字段值的字段名由schema_id对应schema定义。
所有schema的定义存储在OSS的一个文件中,并与schema_id一一映射。Schema定义文件的内容格式如下:
{ "schema_app1": { "fields": ["client_ip", "host", "http_method", "resquest_length", "status_code", "request_time", "user_agent"], "logstore": "logstore_app1" }, "schema_app2": { "fields": ["client_ip", "db_name", "db_type", "db_version", "fail", "region", "check_rows", "user_name"], "logstore": "logstore_app2" }, "schema_app3": { "fields": ["user", "container_name", "image_name", "referer", "container_ip", "status_code", "datetime"], "logstore": "logstore_app3" }, }
其中schema_app1等是schema_id。每个schema的定义包含两个字段,fields和logstore,fields定义了该schema对应的字段名列表,logstore定义了该schema的日志要分发的目标logstore名。
需求
- 对中心Logstore中不同Schema的日志进行动态解析(Schema在动态变化),将分隔符分隔的各个字段值映射到对应的字段名上,形成结构化的日志。
- 不同Schema的日志分发到不同的Logstore中。
例子:
- 中心Logstore的原始日志
content: schema_app1|113.17.4.39|www.zsc.mock.com|PUT|1082|404|https|28.3|Mozilla/5.0 content: schema_app2|183.93.165.82|db-01|MySQL|5.5|0|cn-shanghai|1072|user-2
- 加工后的日志
输出到logstore_app1: {"client_ip": "113.17.4.39", "host": "www.zsc.mock.com", "http_method": "PUT", "resquest_length": 1082, "status_code": 404, "request_time": 28.3, "user_aent": "Mozilla/5.0"} 输出到logstore_app2: {"client_ip": "183.93.165.82", "db_name": "db-01", "db_type": "MySQL", "db_version": "5.5", "fail": 0, "region": "cn-shanghai", "check_rows": 1072, "user_name": "user-2"}
数据加工语法
数据加工的创建流程参考创建数据加工任务
# 1.原始日志切分出schema_id和日志内容raw_content e_set("split_array", str_partition(v("content"), "|")) e_set("schema_id", lst_get(v("split_array"), 0)) e_set("raw_content", lst_get(v("split_array"), 2)) # 2.根据schema_id从OSS读取对应的schema内容 e_set( "schema", dct_get( res_oss_file( endpoint="http://oss-cn-hangzhou.aliyuncs.com", ak_id=res_local("AK_ID"), ak_key=res_local("AK_KEY"), bucket="ali-licheng-demo", file="schema_lib/schema.json", change_detect_interval=20, ), v("schema_id"), ), ) # 3.从schema中读取字段名列表fields和分发的目标Logstore e_set("fields", dct_get(v("schema"), "fields")) e_set("target_logstore", dct_get(v("schema"), "logstore")) # 丢弃多余字段 e_keep_fields("raw_content", "fields", "target_logstore", F_TIME, F_META) # 4.解析分隔符日志,并映射到fields中的字段上 e_psv("raw_content", json_parse(v("fields"))) # 丢弃多余字段 e_drop_fields("fields", "raw_content") # 5.根据schema中定义的logstore名,动态分发 e_output(project="licheng-simulator-test", logstore=v("target_logstore"))
上述加工语法的加工总体流程如下:
1.将原始日志切分出schema_id和日志内容raw_content,即:
原始日志: content: schema_app1|113.17.4.39|www.zsc.mock.com|PUT|1082|404|https|28.3|Mozilla/5.0 切分为: schema_id: schema_app1 raw_content: 113.17.4.39|www.zsc.mock.com|PUT|1082|404|https|28.3|Mozilla/5.0
2.根据schema_id从OSS读取对应的schema内容
- res_oss_file函数用于读取OSS中的文件,具体用法参考res_oss_file使用说明。
3. 从schema中读取字段名列表fields和分发的目标Logstore
- 每个schema中都定义了该schema日志的字段名列表以及分发的目标Logstore名
4.解析分隔符日志,并映射到fields中的字段上
- 使用e_psv函数解析竖线分隔符日志(|),并映射到schema中定义的字段名列表上。
- 参考e_csv, e_psv, e_tsv用法指南
5.根据schema中定义的分发logstore名,实现日志的动态分发。
- e_output用法参考e_output,e_coutput用法指南
加工后的结果示例
后续维护
后续维护过程中,如果APP日志的Schema发生变化,或者有新的APP日志进来,只需在OSS中的schema库文件中修改和增加对应APP的schema定义即可,无需对加工任务做任何修改。