SLS数据加工实现跨Logstore维表富化

简介: 跨Logstore维表富化指的是数据加工中通过资源函数(res_log_logstore_pull)从另一个Logstore中获取数据,并对获取到的数据进行富化,精准的过滤出预期数据,便于直观的得到预期信息。

阿里云日志服务介绍

日志服务(Log Service,简称SLS)是云原生观测与分析平台,为Log、Metric、Trace等数据提供大规模、低成本、实时的平台化服务。日志服务一站式提供数据采集、加工、查询与分析、可视化、告警、消费与投递等功能,全面提升您在研发、运维、运营、安全等场景的数字化能力。

数据加工服务是阿里云SLS推出的面向日志ETL处理的服务,主要解决数据加工过程中转换、过滤、分发、富化等场景。


什么是跨Logstore维表富化

跨Logstore维表富化指的是数据加工中通过资源函数(res_log_logstore_pull)从另一个Logstore中获取数据,并对获取到的数据进行富化,精准的过滤出预期数据,便于直观的得到预期信息。如某酒店将客人个人信息存储在ALogstore中,将客人入住信息存储在BLogstore中,现在酒店希望从BLogstore中获取部分字段数据,与ALogstore中的数据拼接。针对该需求,日志服务提供res_log_logstore_pull函数从ALogstore中获取数据,提供e_table_map函数或e_search_table_map函数实现数据富化,快速获取到客人信息。

下文将详细介绍如何通过资源函数(res_log_logstore_pull)从其他Logstore中获取数据,并进行数据富化。

函数简介

函数格式

res_log_logstore_pull(

   endpoint,

   ak_id,

   ak_secret,

   project,

   logstore,

   fields,

   from_time="begin",

   to_time=None,

   fetch_include_data=None,

   fetch_exclude_data=None,

   primary_keys=None,

   fetch_interval=2,

   delete_data=None,

   base_retry_back_off=1,

   max_retry_back_off=60,

   ttl=None,

   role_arn=None,

)

参数说明

名称

类型

是否必填

说明

endpoint

String

访问域名。更多信息,请参见服务入口。默认为HTTPS格式,也支持HTTP格式。特殊情况下,需使用非80、非443端口。

ak_id

String

阿里云账号的AccessKey ID。为了数据安全,建议在高级参数配置中配置。关于如何配置高级参数,请参见创建数据加工作业

ak_secret

String

阿里云账号的AccessKey Secret。为了数据安全,建议在高级参数配置中配置。关于如何配置高级参数,请参见创建数据加工作业

project

String

待拉取数据的Project名称。

logstore

String

待拉取数据的Logstore名称。

fields

String List

字符串列表或者字符串别名列表。日志中不包含某个字段时,该字段的值为空。例如需要将["user_id", "province", "city", "name", "age"]name改名为user_name时,可以配置为["user_id", "province", "city", ("name", "user_name"), ("nickname", "nick_name"), "age"]

from_time

String

首次开始拉取日志的服务器时间,默认值为begin,表示会从第一条数据开始拉取。支持如下时间格式:

  • Unix时间戳。
  • 时间字符串。
  • 特定字符串,例如beginend
  • 表达式:dt_类函数返回的时间。例如dt_totimestamp(dt_truncate(dt_today(tz="Asia/Shanghai"), day=op_neg(-1))),表示昨天拉取日志的开始时间,如果当前时间是2019-5-5 10:10:10 8:00,则上述表达式表示时间2019-5-4 10:10:10 8:00。

to_time

String

首次结束读取日志的服务器时间。默认值为None,表示当前的最后一条日志。支持如下时间格式:

  • Unix时间戳。
  • 时间字符串。
  • 特定字符串。例如beginend
  • 表达式:dt_类函数返回的时间。

不配置或者配置为None表示持续拉取最新的日志。

说明 如果填入的是一个未来时间,只会将该Logstore所有数据拉取完毕,并不会开启持续拉取任务。


fetch_include_data

String

配置字段白名单,满足fetch_include_data时保留数据,否则丢弃。

  • 不配置或配置为None时,表示关闭字段白名单功能。
  • 配置为具体的字段和字段值时,表示保留该字段和字段值所在的日志。

fetch_exclude_data

String

配置字段黑名单,满足fetch_exclude_data时丢弃数据,否则保留。

  • 不配置或配置为None时,表示关闭字段黑名单功能。
  • 配置为具体的字段和字段值时,表示丢弃该字段和字段值所在的日志。

说明 如果您同时设置了fetch_include_datafetch_exclude_data参数,则优先执行fetch_include_data参数,再执行fetch_exclude_data参数。

primary_keys

字符串列表

维护表格时的主键字段列表。如果fields参数中对主键字段进行修改,这里应使用修改后的字段名,将修改后的字段作为主键字段。

说明

  • primary_keys参数只支持单个字符串,且必须存在于fields参数配置的字段中。
  • 待拉取数据的目标Logstore中只能有一个Shard。

fetch_interval

Int

开启持续拉取任务时,每次拉取请求的时间间隔,默认值为2,单位:秒。该值必须大于或者等于1。

delete_data

String

对满足条件且配置了primary_keys的数据,在表格中进行删除操作。更多信息,请参见查询字符串语法

base_retry_back_off

Number

拉取数据失败后重新拉取的时间间隔,默认值为1,单位:秒。

max_retry_back_off

Int

拉取数据失败后,重试请求的最大时间间隔,默认值为60,单位:秒。建议使用默认值。

ttl

Int

开启持续拉取任务时,拉取日志产生时间开始ttl时间内的日志,单位为秒。默认值为None,表示拉取全部时间的日志。

role_arn

String

阿里云账号的角色。为了数据安全,建议在高级参数配置中配置。关于如何配置高级参数,请参见创建数据加工作业

返回结果

返回多列表格。

场景应用

某酒店将客人个人信息存储在名为user_logstore的Logstore中,将客人入住信息存储在名为check-in_logstore的Logstore中,现在酒店希望从check-in_logstore中获取部分字段数据,与user_logstore中的数据拼接。此处通过res_log_logstore_pull函数从check-in_logstore中获取数据,使用e_table_map函数或e_search_table_map函数实现数据富化。

原始数据

  • 用于存储个人信息的Logstore(user_logstore)

topic:xxx

city:xxx

cid:12345

name:maki


topic:xxx

city:xxx

cid:12346

name:vicky


topic:xxx

city:xxx

cid:12347

name:mary

  • 用于存储入住信息的Logstore(check-in_logstore)

time:1567038284

status:check in

cid:12345

name:maki

room_number:1111


time:1567038284

status:check in

cid:12346

name:vicky

room_number:2222


time:1567038500

status:check in

cid:12347

name:mary

room_number:3333


time:1567038500

status:leave

cid:12345

name:maki

room_number:1111

场景一  使用e_table_map筛选cid字段相同的客户信息

通过两个Logstore中相同的cid字段进行匹配,设置起始时间("08-31 19:00:02 8:00"),只有cid字段的值完全相同,才能匹配成功。匹配成功后,返回Logstore(check-in_logstore)中的room_number字段和字段值,与Logstore(check-in_logstore)中的数据拼接,生成新的数据。

加工规则

e_table_map(

   res_log_logstore_pull(

       "cn-chengdu.log.aliyuncs.com",

       "LT***6F",

       "9D***R7",

       "test-gy-cd",

       "check-in_logstore",

       fields=["cid", "room_number"],

       from_time="08-31 19:00:02 8:00",

   ),

   "cid",

   "room_number",

)

加工结果

topic:xxx

city:xxx

cid:12345

name:maki

room_number:1111


topic:xxx

city:xxx

cid:12346

name:vicky

room_number:2222


topic:xxx

city:xxx

cid:12347

name:mary

room_number:3333


场景二 使用e_search_table_map筛选出cid字段相同的客户信息

使用e_search_table_map函数对Logstore(check-in_logstore)和Logstore(user_logstore)做搜索匹配,搜索Logstore(check-in_logstore)中cid字段,返回该数据中的room_number字段和字段值,与Logstore(user_logstore)中的数据拼接,生成新的数据。

加工规则

e_search_table_map(

   res_log_logstore_pull(

       "cn-chengdu.log.aliyuncs.com",

       "LT***6F",

       "9D***R7",

       "test-gy-cd",

       "check-in_logstore",

       fields=["cid", "room_number"],

       from_time="begin",

   ),

   "cid",

   "room_number",

)

加工结果

topic:xxx

city:xxx

cid:12345

name:maki

room_number:1111


topic:xxx

city:xxx

cid:12346

name:vicky

room_number:2222


topic:xxx

city:xxx

cid:12347

name:mary

room_number:3333

场景三 获取到房号为某值的客户信息

通过fetch_include_data设置白名单,获取指定字段的数据。例如fetch_include_data="room_number:1111"表示在获取数据过程中,获取room_number值为1111的数据,与Logstore(user_logstore)中的数据拼接,生成新的数据。

加工规则

e_table_map(

   res_log_logstore_pull(

       "cn-chengdu.log.aliyuncs.com",

       "LT***6F",

       "9D***R7",

       "test-gy-cd",

       "check-in_logstore",

       fields=["cid", "name", "room_number", "status"],

       fetch_include_data="room_number:1111",

   ),

   "cid",

   "room_number",

)

加工结果

topic:xxx

city:xxx

cid:12347

name:mary


topic:xxx

city:xxx

cid:12346

name:vicky


topic:xxx

city:xxx

cid:12345

name:maki

room_number:1111

场景四 过滤掉房号为固定值的客户信息

通过fetch_exclude_data设置黑名单,过滤指定字段的数据。例如fetch_exclude_data="room_number:1111"表示在获取数据过程中,丢弃room_number值为1111的数据,与Logstore(user_logstore)中的数据拼接,生成新的数据。

加工规则

e_table_map(

   res_log_logstore_pull(

       "cn-chengdu.log.aliyuncs.com",

       "LT***6F",

       "9D***R7",

       "test-gy-cd",

       "check-in_logstore",

       fields=["cid", "name", "room_number", "status"],

       fetch_exclude_data="room_number:1111",

   ),

   "cid",

   "room_number",

)

加工结果

topic:xxx

city:xxx

cid:12347

name:mary

room_number:3333


topic:xxx

city:xxx

cid:12346

name:vicky

room_number:2222


topic:xxx

city:xxx

cid:12345

name:maki


场景五 获取已经离开的客户信息

通过配置primary_keys数据和设置delete_data,过滤不需要进行加工的数据。例如在名为check-in_logstore的Logstore中,获取已经离开的客户的信息,如果获取到的数据中包含status:check in表示客人未离开,则开启主键维护功能不加工该数据,即通过delete_data="status:check in"来过滤不需要进行加工的数据

加工规则

e_table_map(

   res_log_logstore_pull(

       "cn-chengdu.log.aliyuncs.com",

       "LT***6F",

       "9D***R7",

       "test-gy-cd",

       "check-in_logstore",

       ["cid", "name", "room_number", "status", "time"],

       primary_keys="cid",

       delete_data="status:check in",

   ),

   "cid",

   ["room_number", "status"],

)

加工结果

cid:12347

city:xxx

name:mary

topic:xxx


cid:12346

city:xxx

name:vicky

topic:xxx


cid:12345

city:xxx

name:maki

topic:xxx

room_number:1111

status:leave

场景六 密钥未知时,使用slr服务角色授权,获取cid字段相同的客户信息

使用slr服务角色授权,通过两个Logstore中相同的cid字段进行匹配,只有cid字段的值完全相同,才能匹配成功。匹配成功后,返回Logstore(check-in_logstore)中的room_number字段和字段值,与Logstore(check-in_logstore)中的数据拼接,生成新的数据。

加工规则

e_table_map(

   res_log_logstore_pull(

       "cn-chengdu.log.aliyuncs.com",

       "",

       "",

       "test-gy-cd",

       "check-in_logstore",

       fields=["cid", "room_number"],

       from_time="08-31 19:00:02 8:00",

       role_arn="acs:ram::***:role/aliyunserviceroleforslsaudit",

   ),

   "cid",

   "room_number",

)

加工结果

cid:12347

city:xxx

name:mary

room_number:3333

topic:xxx


cid:12346

city:xxx

name:vicky

room_number:2222

topic:xxx


cid:12345

city:xxx

name:maki

room_number:1111

topic:xxx


场景七 密钥未知时,使用自定义角色授权,获取cid字段相同的客户信息

使用自定义(gy-new-role)角色授权,通过两个Logstore中相同的cid字段进行匹配,只有cid字段的值完全相同,才能匹配成功。匹配成功后,返回Logstore(check-in_logstore)中的room_number字段和字段值,与Logstore(check-in_logstore)中的数据拼接,生成新的数据。

加工规则

e_table_map(

   res_log_logstore_pull(

       "cn-chengdu.log.aliyuncs.com",

       "",

       "",

       "test-gy-cd",

       "check-in_logstore",

       fields=["cid", "room_number"],

       from_time="08-31 19:00:02 8:00",

       role_arn="acs:ram::***:role/gy-new-role",

   ),

   "cid",

   "room_number",

)

加工结果

cid:12347

city:xxx

name:mary

room_number:3333

topic:xxx


cid:12346

city:xxx

name:vicky

room_number:2222

topic:xxx


cid:12345

city:xxx

name:maki

room_number:1111

topic:xxx


相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
目录
相关文章
|
13天前
|
存储 数据采集 JavaScript
深入理解数仓开发(一)数据技术篇之日志采集
深入理解数仓开发(一)数据技术篇之日志采集
|
1月前
|
监控 NoSQL MongoDB
mongoDB查看数据的插入日志
【5月更文挑战第9天】mongoDB查看数据的插入日志
321 4
|
1月前
|
SQL Oracle 关系型数据库
实时计算 Flink版产品使用合集之从Oracle数据库同步数据时,checkpoint恢复后无法捕获到任务暂停期间的变更日志,如何处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
30天前
|
监控 NoSQL MongoDB
mongoDB查看数据的插入日志
【5月更文挑战第22天】mongoDB查看数据的插入日志
30 3
|
1月前
|
SQL 关系型数据库 数据库
实时计算 Flink版产品使用合集之同步PostgreSQL数据时,WAL 日志无限增长,是什么导致的
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
Oracle 关系型数据库 MySQL
实时计算 Flink版产品使用合集之是否支持从库归档日志捕获数据
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
关系型数据库 MySQL 数据管理
MySQL通过 bin-log 恢复从备份点到灾难点之间数据
MySQL通过 bin-log 恢复从备份点到灾难点之间数据
222 0
|
20天前
|
SQL 监控 关系型数据库
|
15天前
|
SQL 数据采集 DataWorks
DataWorks产品使用合集之pyodps的线程限制是什么意思
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
15天前
|
DataWorks 数据可视化 安全
DataWorks产品使用合集之SLS日志中新增了存在iotId这个字段,同步的时候怎么手动增加
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。