如何将日志服务的数据秒级同步到表格存储

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,恶意文件检测 1000次 1年
对象存储 OSS,内容安全 1000次 1年
简介: 最近在容器服务的官方镜像中,新增了loghub-shipper的镜像,使用该镜像,可以订阅日志服务中的日志库,以秒级的延时将日志数据从日志服务中读出并转换成结构化数据存储在表格存储中,以满足实时在线服务的精确查询需求。

最近在容器服务的官方镜像中,新增了loghub-shipper的镜像,使用该镜像,可以订阅日志服务中的日志库,以秒级的延时将日志数据从日志服务中读出并转换成结构化数据存储在表格存储中,以满足实时在线服务的精确查询需求。

什么是日志服务?

日志服务(Log Service,Log)是针对日志场景的一站式解决方案,解决海量日志数据采集/订阅、转储与查询功能,比如在海量游戏日志收集与分析场景上的应用。

什么是表格存储?

表格存储(TableStore)提供海量NoSQL数据的存储与实时访问服务,能够支持单表PB级数据量、百万TPS,现在针对不同的场景有两种实例规格可供使用:__高性能实例__,高并发低延时的特点适用于金融风控互联网社交等应用,而__容量型实例__以更具性价比的存储和访问成本,更适用于日志、物联网IoT等场景,比如基于表格存储的高性能监控数据存储计算方案

为什么需要将数据从日志服务结构化到表格存储中?

在日志服务中,日志数据以json形式进行存储,并以日志组为写入与读取的基本单位,这种情况下无法根据特定条件(比如某个App近12个小时的日志数据)对日志做快速的查询和分析,只能拿到一段时间内所有设备、应用的日志信息。

假设日志数据格式如下:

{"__time__":1453809242,"__topic__":"","__source__":"10.170.148.237","ip":"10.200.98.220","time":"26/Jan/2016:19:54:02 +0800","url":"POST /PutData?Category=YunOsAccountOpLog&AccessKeyId=U0UjpekFQOVJW45A&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=pD12XYLmGxKQ%2Bmkd6x7hAgQ7b1c%3D HTTP/1.1","status":"200","user-agent":"aliyun-sdk-java"}

···

将其写入到表格存储中主键为”ip”、”time”的数据表中,其格式如下:

ip time source status user-agent url
10.200.98.220 26/Jan/2016:19:54:02 +0800 10.170.148.237 200 aliyun-sdk-java POST /PutData…

在表格存储中,很容易对某个ip根据时间对历史数据进行精确的检索。

应用场景

日志数据结构化到表格存储之后,可以充分利用表格存储的高并发、低延时、按量计费的特点提供精确查询的在线服务:

  1. 电商、信息类网站的最近浏览记录展示。
  2. 推荐引擎的数据源,基于用户历史行为习惯进行分析。
  3. 实时推荐计算,根据用户最近浏览实时调整推荐策略。
  4. 对某个时间点的日志信息的精确查询,比如某台出故障的机器最近24小时的运行情况。
  5. 应用程序API基于时间段的延时统计、分析。

开启数据同步

准备工作

在表格存储上建好两张表:数据表和状态表。

数据表用来存储从日志服务中同步过来的日志数据。教程里我们假设这张表有三个主键列,分别是

rename,类型是 STRING。
trans-pkey,类型是 INTEGER。
keep,类型是 STRING。

状态表用来存储每个日志服务Project以及各个shard上日志数据的同步进度,该表的主键需要按照如下方式设置。 您的服务于多个 project 和 log store 的传送服务可以复用同一张状态表。 我们建议您将这张表的数据生命周期设置为一天或二天,这样可以降低使用成本。 状态表的主键有四列,分别为:

project_logstore,类型为 STRING。
shard,类型为 INTEGER。
target_table,类型为 STRING。
timestamp,类型为 INTEGER。

容器服务控制台创建集群和初始化

创建集群

TIPS:当已经有ECS服务器时,可以将已购买的云服务器添加到指定集群,添加已有云服务器步骤

您需要对默认设置根据实际情况做一些改动。

基本设置

  • 低于尽可能和日志服务与表格存储选在同一个区域,这样可以使用私网地址避免公网流量以及公网带来的不确定性。
  • 传送服务并不是一个HTTP服务,不需要暴露任何端口。所以也无需负载均衡。
  • 本教程选择“新增节点”以方便演示。 生产上我们建议您选择“不创建节点”,随后按照添加已有云服务器步骤添加现有的云服务器。
  • 传送服务并不需要高配置的单机。一般情况下,“1核1GB”也足够用了。
    传送服务可以完全动态的水平扩容缩容。 这里可以选择任意多台ECS。

集群初始化需要一些时间,请您稍待。您可以点击左侧“集群”进入集群列表界面查看集群的状态。
集群初始化

创建应用

创建应用

进行基本的应用设置

应用基本设置

按照下图的参数填入一些应用环境参数信息,点击页面最后的“创建并部署”来部署传送服务。

选择镜像

  • 选择传送服务的镜像。点击“选择镜像”之后会弹出如上图的选择窗口。搜索"loghub-shipper"来快速找到传送服务的镜像,选中该镜像(左上角蓝底白色的勾),然后“确定”。

为镜像添加配置信息:
镜像设置

  • 部署服务需要一些时间。您可以通过服务列表来查看。就绪的服务如下图。
    部署服务

到目前为止,传送服务已经搭建好啦,让我们想日志服务中写入一条日志试试看效果吧。

写入一条示例日志数据

LogItem log = new LogItem();
log.PushBack("original", "12345");
log.PushBack("keep", "hoho");
ArrayList logs = new ArrayList<LogItem>();
logs.add(log);
loghub.PutLogs("lhshipper-test", "test-store", "smile", logs, "");

这是一条有两个用户字段的日志,其中original字段的值为 "12345",keep字段的值为 "hoho"。除此以外还有三个日志服务添加的字段,其中 topic 为 "smile",而 __source__ 和 __time__ 字段随环境变化而变化。

查看数据表的信息

使用工具马上可以看到表格存储中数据表已经有了一条数据(转成json格式方便描述)

[{"rename": "12345", "trans-pkey": 12345, "keep": "hoho"},
{"__topic__": "smile", "original": "12345"}]

其中,rename、trans-pkey、keep为主键列,__topic__与original为属性列。

根据环境变量中的配置:

  • 在 transform 中定义 "rename": "original",日志数据中original 的值为"12345",故表格存储中该行的rename的值也为 "12345"
  • 在 transform 中定义 "trans-pkey": "(->int 10 original)",传送服务将original的值以十进制的方式转成整数写入表格存储中的trans-pkey列
  • keep没有任何转换规则,在表格存储中,属性类keep的值与日志数据中的保持一致
  • 日志数据中的 __source__ 和 __time__ 字段由于在exclusive_columns 中配置了["__source__", "__time__" ],这两个字段的数据不会写入数据表中
  • topic和original两个字段以属性列的方式写入数据表

查看状态表

同时,我们也从状态表中看到了如下数据(转成json格式方便描述):

[{"project_logstore": "lhshipper-test|test-store", "shard": 0, "target_table": "loghub_target", "timestamp": 1469100705202},
{"skip_count": 0, "shipper_id": "fb0d62cacc94-loghub-shipper-loghub-shipper-1", "cu_count": 1, "row_count": 1, "__time__": 1469100670}]

其涵义为传送服务的某个worker("shipper_id": "fb0d62cacc94-loghub-shipper-loghub-shipper-1"), 在2016-07-21T11:31:45.202000Z添加了这条状态("timestamp": 1469100705202)

在之前的五分钟里,该worker从名为 lhshipper-test 的日志 project中的 test-store 这个 log store ( "project_logstore": "lhshipper-test|test-store" ) 0号 shard ( "shard": 0 )消费了一条日志, 其中跳过日志0条( "skip_count": 0 ), 写入数据表的日志1条( "row_count": 1 ),消耗了1个CU( "cu_count": 1 )。

好啦,概耗时15分钟就把日志服务的数据秒级同步到表格存储的传送服务已经搭建好啦,日志类数据就可以使用日志服务的Logtail方便的采集到云上,才通过传送服务方便的将数据同步到表格存储中,支持了不同的业务需要,简直是神器啊~

日志服务到表格存储通路的搭建和使用更详细的信息可以参考官网文档

相关实践学习
一小时快速掌握 SQL 语法
本实验带您学习SQL的基础语法,快速入门SQL。
7天玩转云服务器
云服务器ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,可降低 IT 成本,提升运维效率。本课程手把手带你了解ECS、掌握基本操作、动手实操快照管理、镜像管理等。了解产品详情:&nbsp;https://www.aliyun.com/product/ecs
相关文章
|
4月前
|
SQL 关系型数据库 MySQL
我使用flinkcdc的sql形式进行全量同步,4张表,有两张表数据没进去,看日志,id怎么是null呢?
我使用flinkcdc的sql形式进行全量同步,4张表,有两张表数据没进去,看日志,id怎么是null呢?
117 40
|
6月前
|
存储 索引
表格存储根据多元索引查询条件直接更新数据
表格存储是否可以根据多元索引查询条件直接更新数据?
62 3
|
7月前
|
关系型数据库 物联网 PostgreSQL
沉浸式学习PostgreSQL|PolarDB 11: 物联网(IoT)、监控系统、应用日志、用户行为记录等场景 - 时序数据高吞吐存取分析
物联网场景, 通常有大量的传感器(例如水质监控、气象监测、新能源汽车上的大量传感器)不断探测最新数据并上报到数据库. 监控系统, 通常也会有采集程序不断的读取被监控指标(例如CPU、网络数据包转发、磁盘的IOPS和BW占用情况、内存的使用率等等), 同时将监控数据上报到数据库. 应用日志、用户行为日志, 也就有同样的特征, 不断产生并上报到数据库. 以上数据具有时序特征, 对数据库的关键能力要求如下: 数据高速写入 高速按时间区间读取和分析, 目的是发现异常, 分析规律. 尽量节省存储空间
608 1
|
1天前
|
人工智能 数据可视化 开发工具
Git log 进阶用法(含格式化、以及数据过滤)
Git log 进阶用法(含格式化、以及数据过滤)
|
3天前
|
机器学习/深度学习 前端开发 数据挖掘
工具变量法(两阶段最小二乘法2SLS)线性模型分析人均食品消费时间序列数据和回归诊断(下)
工具变量法(两阶段最小二乘法2SLS)线性模型分析人均食品消费时间序列数据和回归诊断
74 11
|
8天前
工具变量法(两阶段最小二乘法2SLS)线性模型分析人均食品消费时间序列数据和回归诊断2
工具变量法(两阶段最小二乘法2SLS)线性模型分析人均食品消费时间序列数据和回归诊断
15 0
|
9天前
|
机器学习/深度学习 前端开发 数据挖掘
R语言计量经济学:工具变量法(两阶段最小二乘法2SLS)线性模型分析人均食品消费时间序列数据和回归诊断
R语言计量经济学:工具变量法(两阶段最小二乘法2SLS)线性模型分析人均食品消费时间序列数据和回归诊断
39 0
|
1月前
|
分布式计算 DataWorks API
DataWorks常见问题之按指定条件物理删除OTS中的数据失败如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
3月前
|
DataWorks NoSQL 关系型数据库
可以使用dataworks从tablestore同步数据到mysql吗?
可以使用dataworks从tablestore同步数据到mysql吗?
32 1
|
4月前
|
SQL 关系型数据库 MySQL
⑩⑥ 【MySQL】详解 触发器TRIGGER,协助 确保数据的完整性,日志记录,数据校验等操作。
⑩⑥ 【MySQL】详解 触发器TRIGGER,协助 确保数据的完整性,日志记录,数据校验等操作。
39 0

相关产品

  • 日志服务