开发者学堂课程【SaaS 模式云数据仓库实战:日志数据如何同步 MaxCompute】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/761/detail/13347
日志数据如何同步到 MaxCompute
内容介绍:
一、目录
二、实验目的
三、方案介绍
四、方案比较及场景应用
五、操作步骤
一、目录
主要从以下四个方面进行讲解
1.实验目的
2.方案介绍
3.方案比较及场景应用
4.操作步骤(重点)
二、实验目的
日常工作中,企业需要将通过ECS、容器、移动端、开源软件、网站服务、JS等接入的实时日志数据进行日志实时查询与分析、采集与消费、数据清洗与流计算(ETL/Stream Processing )、数据仓库对接(Data Warehouse )等场景的使用。
通过本次实验,可以学习了解日志数据如何同步到MaxCompute。
三、方案介绍
方案一︰使用Tunnel命令上传日志数据到MaxCompute。
方案二:通过DataHub投递数据到MaxCompute。DataHub DataConnector是把DataHub服务中的流式数据同步到其他云产品中的功能,目前支持将Topic中的数据实时/准实时同步到MaxCompute、OSS、ElasticSearch.RDS Mysql、ADS、TableStore中。用户只需要向DataHub中写入一次数据,并在DataHub服务中配置好同步功能,便可以在各个云产品中使用这份数据。
方案三∶通过SLS实时采集与消费( LogHub )投递数据到MaxCompute。
通过DataWorks的数据集成(Data Integration )功能投递至MaxCompute。
方案四∶通过Kafka订阅实时数据投递至MaxCompute。
这里面的方案二Datahub和方案四kafka差异性很小,都是消息队列,Data HUB建议进行公测。
四、方案比较及场景应用
1.Tunnel
Tunnel用于批量上传数据到离线表里,适用于离线计算的场景。对于特殊格式日志,一般建议作为一个字段整体上传,到MaxComopute里在进行拆分。
2.DataHub
Datahub 用于实时上传数据的场景,主要用于流式计算的场景。数据上传后会保存到实时表里,后续会在几分钟内通过定时任务的形式同步到离线表里,供离线计算使用。
3.日志服务(SLS)
(实时采集与消费)LogHub用途︰数据清洗(ETL)、流计算( Stream Compute )、监控与报警、机器学习与迭代计算等。
投递数仓(LogShipper ) :稳定可靠的日志投递。将日志中枢数据投递至存储类服务进行存储。支持压缩、自定义Partition、以及行列等各种存储方式。
数据仓库+数据分析、审计、推荐系统与用户画像。
支持通过控制台数据接入向导一站式配置正则模式采集日志与设置索引。
(实时采集与消费) LogHub :实时性强,写入即可消费。Logtail (采集Agent )实时采集传输,秒内到服务端(99.9%情况)。写入即可查询分析。
海量数据:对数据量不设上限。
种类丰富:支持行、列、TextFile等各种存储格式。
配置灵活:支持用户自定义Partition等配置。
4.Kafka
Kafka是一款分布式发布与订阅的消息中间件,有高性能、高吞量的特点,每秒能处理上百万的消息。Kafka适用于流式数据处理。
大数据领域:主要应用于用户行为跟踪、日志收集等场景。
数据集成: 将消息导入MaxCompute、OSS、 RDS、 Hadoop、 HBase等离线数据仓库。
五、操作步骤
1.方案一:通过 Tunnel 上传日志数据到 MaxCompute
环境准备及步骤:
(1)开通MaxCompute服务,安装odpscmd客户端。
(2)准备日志服务数据。
(3)创建MaxCompute表,用来储存日志数据。
(4)使用tunnel命令:
tunnel u C:\Users \Desktop\weijing._loghub_demo.csv tunnel_log。
u指的是上传,后面加一个文件路径,紧接着就是MaxCompute表名(用来存储日志数据),执行以后投递日志数据是成功的,说明已经通过Tunnel上传日志数据到MaxCompute
(5)进行验证:查询表数据是否导入成功。查询以后发现表中有数据导进来说日志数据成功导入MaxCompute表中
(6)注意事项
使用Tunnel命令行工具上传数据当前不支持通配符或正则表达式命令。如果想要使用正则表达式上传的话可以借助于方案三LogHub
对于特殊格式的日志数据,一般建议作为一个字段上传,到MaxComopute里在进行拆分。
2.方案二:通过 DataHub 投递日志数据到 MaxCompute
环境准备及步骤:
(1)登录阿里云DataHub控制台,创建Project。
(2)进入Project列表->Project查看 ,创建Topic。创建Topic有两种方式,一种是直接创建,一种是导入Maxcompute表结构。这里先介绍一下导入Maxcompute表结构,首先输入Maxcompute项目,选择项目名称,第二是输入Maxcompute的表,可以是已经创建的表,也可以是新建的表名,新建表名的话会自动在maxcompute里面创建一个表,填写AK信息。
(3)选择导入MaxCompute表结构、并勾选自动创建DataConnector。选择之后在创建topic的时候自动创建DataConnector,填写topic的名称。
注意:准备对应的MaxCompute表,该表字段类型、名称、顺序必须与DataHub Topic字段完全一致,如果三个条件中的任意一个不满足,则归档Connector无法创建。
Sant的数量,默认一个sant对应1000个KPS,根据自己的数据流量进行相应的设置,再设置一个生命周期。
(4)创建好的DataConnector详细信息如下图所示。包括同步时间、最新写入数据的时间,maxcomputeaddpot以及运行状态、当前的数据量、当前的点位(从0开始,如下图里面显示的是2说明有三条数据导入进来了)
(5)直接创建topic:输入Maxcompute项目和Maxcompute table,可以是已知表也可以是新建的表,然后输入相关的AK信息。
如果已经创建Topic ,只需要在详情页的右上角点击+ DataConnector.
分区范围: SYSTEM_TIME、EVENT_TIME、USER_DEFINE三种模式, SystemTime模式会使用写入时间转化为字符串进行分区, EventTime模式会根据topic中固定的event_time字段时间进行分区(需要在创建Topic时增加一个TIMESTAMP类型名称为event_time的字段,并且写入数据时向这个字段写入其微秒时间) ,UserDefine模式将会直接使用用户自定义的分区字段字符串分区。分区格式定义,现阶段仅支持固定的格式。
(6)创建好以后回到DataHub控制台,点击Topic ,可以看到sheet通道,它会记录数据写入的时间、最新数据的时间、数据量、当前存储量。点击数据抽样以后可以看到数据写入DataHub里面的数据。点击DataConnector可以查看DataConnector详细信息。包括配置的目标服务、目标的描述以及最新写入数据的时间。
(7)日志数据抽样
第一个图是点击数据抽样时可以看到同步到DateHub中的日志数据。然后查看dataConnector详细信息可以看到归档信息,包括当前点位、章数据以及运行状态。如果运行状态如果失败的话,需要检查一下是什么原因。一般是ADDPOT配置的有问题,DataHub投递日志数据到MaxCompute离线表中的时候默认64M就会connect一次,或者是五分钟进行一次强行写入,可以保障的是至少五分钟会有一次数据同步进去。
(8)测试日志数据是否投递成功。进行全表扫描,可以查询表里面有数据显进来说明,通过data HUB投递数据到maxcompute是成功的。通过这个方案的话,需要注意以下四点
注意事项:
目前所有DataConnector均仅支持同一Region的云服务之间同步数据 ,不支持同步数据到跨Region的服务。
DataConnector所配置的目标服务Endpoint需要填写相应的内网域名(经典网络),不支持使用公网域名同步。
数据同步目前仅支持at least once语义(最少保障一次同步),在网络服务异常等小概率场景下可能会导致目的端的数据产生重复但不会丢失,需要做去重处理。
topic默认20个,如果需要创建更多,需提交工单申请。