日志数据如何同步到MaxCompute-阿里云开发者社区

开发者社区> 刘-建伟> 正文

日志数据如何同步到MaxCompute

简介: 日常工作中,企业需要将通过ECS、容器、移动端、开源软件、网站服务、JS等接入的实时日志数据进行应用开发。包括对日志实时查询与分析、采集与消费、数据清洗与流计算、数据仓库对接等场景。本次分享主要介绍日志数据如何同步到MaxCompute。
+关注继续查看

摘要:日常工作中,企业需要将通过ECS、容器、移动端、开源软件、网站服务、JS等接入的实时日志数据进行应用开发。包括对日志实时查询与分析、采集与消费、数据清洗与流计算、数据仓库对接等场景。本次分享主要介绍日志数据如何同步到MaxCompute。具体讲解如何通过Tunnel,DataHub,日志服务SLS以及Kafka将日志数据投递到MaxCompute的参数介绍和详细同步过程等内容。

演讲嘉宾简介:刘建伟,阿里云智能技术支持工程师

本次直播视频精彩回顾,戳这里!https://yq.aliyun.com/live/1575
以下内容根据演讲视频以及PPT整理而成。
本次分享主要围绕以下四个方面:
一、实验目的
二、方案介绍
三、方案比较及场景应用
四、操作步骤
一、实验目的及方案介绍
1.实验目的
日常工作中,企业需要将通过ECS、容器、移动端、开源软件、网站服务、JS等接入的实时日志数据进行应用开发。包括日志实时查询与分析、采集与消费、数据清洗与流计算、数据仓库对接等场景。
9B27D287_9A68_49b9_B93D_4D4678AF8E4F
2.方案介绍
日志数据同步到MaxCompute的场景主要有四个方案。
方案一:使用Tunnel命令上传日志数据到MaxCompute。
方案二:通过DataHub投递数据到MaxCompute。DataHub DataConnector是把DataHub服务中的流式数据同步到其他云产品中的功能,目前支持将Topic中的数据实时/准实时同步到MaxCompute、OSS、ElasticSearch、RDS Mysql、ADS.、TableStore中。用户只需要向DataHub中写入一次数据,并在DataHub服务中配好同步功能,便可以在各个云产品中使用这份数据。
方案三:通过SLS实时采集与消费( LogHub )投递数据到MaxCompute。也可通过DataWorks的数据集成( Data Integration )功能投递至MaxCompute。
方案四:通过Kafka订阅实时数据投递至MaxCompute。
其中方案二(DataHub)和方案三( LogHub )差异化不强,均属于消息队列。一般来说DataHub用于进行公测或自研。
_1
三、方案比较及场景应用
1. Tunnel
Tunnel主要用于批量上传数据到离线表中,适用于离线计算的场景。对于特殊格式日志,一般建议将日志作为一个整体字段上传到MaxCompute表中,再进行拆分。
2. DataHub
DataHub用于实时上传数据的场景,主要用于流式计算场景。数据上传后会保存到实时表里,后续会在几分钟内通过定时任务的形式同步到离线表里,供离线计算使用。
3.日志服务(SLS)
LogHub:可适用于数据清洗(ETL)、流计算( Stream Compute)、监控与报警、机器学习与迭代计算等场景。其实时性强,写入即可消费。
Logtail (采集Agent ):实时采集传输,1秒内到达服务端( 99.9%)。写入即可查询分析。此外可支持海量数据,对数据量不设上限。种类丰富,支持行、列、TextFile等各种存储格式。而且配置灵活,支持用户自定义Partition等配置。
LogShipper(投递数仓):可支持稳定可靠的日志投递。将日志中枢数据投递至存储类服务进行存储。支持压缩、自定义Parition,以及行列等各种存储方式。可以进行数据仓库、数据分析、审计c推荐系统与用户画像场景的开发。支持通过控制台数据接入向导一站式配置正则模式采集日志与设置索引。
4.Kafka
Kafka是一款分布式发布与订阅的消息中间件,有高性能、高吞量的特点,每秒能处理上百万消息。Kafka适用于流式数据处理。可应用场景分别是大数据领域和数据集成。大数据领域主要应用于用户行为跟踪、日志收集等场景。结合数仓将消息导入MaxCompute、 OSS、RDS、Hadoop.、HBase等离线数据仓库。

四、操作步骤
1. 方案一:通过Tunnel命令上传日志数据到MaxCompute
环境准备及步骤:
(1) 开通MaxCompute服务,安装odpscmd客户端。
(2) 准备日志服务数据。
(3) 创建MaxCompute表,用于储存日志数据。
(4) 使用特征命令:tunnel u C:UsersDesktopweijing_loghub_demo.csv tunnel_log。
u即upload,随后是上传路径和用于存储日志数据的表名。如下图所示,执行后日志数据投递成功,说明已经通过Tunnel命令将日志数据上传到MaxCompute中。11111
(5) 查询表数据是否导入成功。
111111
查询后显示表中已有数据导入,说明日志数据成功导入MaxCompute表中。
注意事项:
(1) 使用Tunnel命令行工具上传数据当前不支持通配符或正则表达式命令。若用户想借助正则表达式上传数据,可使用方案三(LogHub支持正则表达式)。
(2) 对于特殊格式的日志数据,一般建议作为一个整体字段上传,到MaxCompute里,再进行拆分。
2. 方案二:通过DataHub投递日志数据到MaxCompute
环境准备及步骤:
(1) 登录阿里云DataHub控制台,创建Project。
(2) 进入Project管理页面 Project列表->Project查看,创建Topic。创建Topic有两种方式,一是直接创建,或者是导入MaxCompute表结构。
(3) 选择导入MaxCompute表结构。输入MaxCompute项目,选择项目名称。输入MaxCompute表,此MaxCompute表可以是已经创建的表,也可新建表名,会自动创建一个MaxCompute表。然后填写AccessidAccessKey信息。勾选自动创建DataConnector,勾选后会在创建Topic时自动创建一个DataConnector。填写Topic名称。Shard数量默认为1k KPS,用户可根据自己的数据流量进行设置。再设置生命周期。
111
注意:Schema对应MaxCompute表,该表字段类型、名称、顺序必须与DataHub Topic字段完全一致,如果三个条件中的任意一个不满足,则归档Connector无法创建。
(4) 创建好的DataConnector详细信息如下图所示。包括已同步时间、最新写入数据时间、MaxCompute Endpoint、运行状态、脏数据量、当前点位。当前点位从0开始,下图示例中为2,说明已导入三条数据。
111
直接创建Topic: 首先输入一个MaxCompute项目,再输入MaxCompute表,可为已知表或新建表。再输入相应ACCESSIDACCESSKEY信息。
(5) 如果已经创建Topic,只需要在详情页的右上角点击 + DataConnector。
分区范围包含SYSTEM_TIME、EVENT_TIME、USER_DEFINE三种模式,SystemTime模式会使用写入时间转化为字符串进行分区。EventTime模式会根据topic中固定的event_time字段时间进行分区(需要在创建Topic时增加一个TIMESTAMP类型名称为event_time的字段,并且写入数据时向这个字段写入其微秒时间)。UserDefine模式将会直接使用用户自定义的分区字段字符串分区。分区格式目前仅支持固定格式。
1111
(6) 回到DataHub控制台,点击Topic。
如下图所示,可以看到Shard通道,会记录数据写入时间、最新数据时间、数据量以及当前存储量。
111
点击DataConnector,可查看其详细信息。包括配置的目标服务、目标描述以及最新写入数据的时间。
111
(7) 日志数据抽样。如下图所示点击数据抽样可以查看写入DataHub中的日志数据。
111
点击查看DataConnector详细信息,可看到归档信息包括当前点位、脏数据、运行状态。如果运行状态为失败,需要检查原因,一般可能为Endpoint配置问题。DataHub投入数据到MaxCompute离线表中默认每60M commit一次,或五分钟进行一次强行写入,可以保证至少五分钟一次数据同步。
111
1111
(8) 测试日志数据是否投递成功。如下图所示,进行扫描发现有数据写入,说明通过DataHub投递数据至MaxCompute成功。11111
注意事项:
(1) 目前所有DataConnector均仅支持同一Region的云服务之间同步数据,不支持同步数据到跨Region的服务。
(2) DataConnector所配置的目标服务Endpoint需要填写相应的内网域名(经典网络),不支持使用公网域名同步。
(3) 数据同步目前仅支持at least once语义,在网络服务异常等小概率场景下可能会导致目的端的数据产生重复但不会丢失,需要做去重处理。
(4) topic默认可创建20个,如果需要创建更多,需提交工单申请。
3. 方案三:通过LogHub投递日志数据到MaxCompute
直接通过LogHub投递:
环境准备及步骤:
(1) 开通日志服务,登录日志服务控制台,创建新的Project或者单击已创建的Project名称。
(2) 创建新的Logstore或者单击已经创建好的Logstore名称。
(3) 单击对应的Logstore,查询分析导入到LogHub的日志数据。如下图演示所示几条数据即为LogHub的日志数据,需要将其同步到MaxCompute表中。
111
(4) 选择需要投递的日志库名称并依次展开节点,日志库名称->数据处理->导出->MaxCompute。单击开始投递。
(5) 单击开启投递以进入LogHub->数据投递页面。进行相关配置。
(6) 配置投递规则,在LogHub->数据投递页面配置字段关联等相关内容。如下图所示。
a) 自定义一个投递名称
b) MaxCompute表名称,项目名以及日志表明。输入自定义的新建的MaxCompute表名称或者选择已有的MaxCompute表。
c) 字段关联。左边是LogHub表的字段列,右边是MaxCompute表中的字段列。按序,左边填写与MaxCompute表数据列相映射的日志服务字段名称,右边填写或选择MaxCompute表的普通字段名称及字段类型。
d) 分区字段:__partition_time__
格式:将日志时间作为分区字段,通过日期来筛选数据是MaxCompute常见的过滤数据方法。__partition_time__是根据日志__time__值计算得到(不是日志写入服务端时间,也不是日志投递时间),结合分区时间格式,向下取整。
111
(7) 在投递管理页面,单击修改即可对之前的配置信息进行编辑。如果想新增列,可以在大数据计算服务MaxCompute修改投递的数据表列信息,单击修改后会加载最新的数据表信息。
(8) 投递任务管理。如下图所示,配置成功后点击确定,会提示成功配置了一个数据投递到MaxCompute。
111
启动投递功能后,日志服务后台会定期启动离线投递任务。用户可以在控制台上看到这些投递任务的状态和错误信息。当投递任务发生错误时,需要查看错误信息,问题解决后可以通过云控制台中日志投递任务管理或SDK来重试失败任务。
(9) 查看日志投递的运行状态。如下图所示日志开始投递后状态是运行中,运行行数为0,日志投递成功后运行行数为11,状态转变为成功。说明通过LogHub投递日志到MaxCompute成功。
1111
11111
(10) 日志投递MaxCompute后,检查数据完整性。
a) 通过控制台或API/SDK判断(推荐)。使用API、SDK或者控制台获取指定Project/Logstore投递任务列表。控制台会对该返回结果进行可视化展示。
b) 通过MaxCompute分区粗略估计,比如在MaxCompute中以半小时做一次分区,投递任务为每30分钟一次,当表中包含以下分区:

2019_10_25_10_00
2019_10_25_10_30

当发现分区2019_10_25_11_00出现时,说明11:00之前分区数据已经完整。该方法不依赖API,判断方式简单但结果并不精确,仅用作粗略估计。
(11) 查询MaxCompute表中数据。
111
注意事项:
(1) 数加控制台创建、修改投递配置必须由主账号完成,不支持子账号操作。
(2) 不同Logstore的数据不可导入到同一个MaxCompute表中,否则会造成分区冲突、丢失数据等后果。
(3) MaxCompute表至少包含一个数据列、一个分区列。
(4) MaxCompute单表有分区数目6万的限制,分区数超出后无法再写入数据,所以日志服务导入MaxCompute表至多支持3个分区列。需谨慎选择自定义字段作为分区列,保证其值可枚举。
(5) 日志服务数据的一个字段最多允许映射到一个MaxCompute表的列(数据列或分区列),不支持字段冗余,同一个字段名第二次使用时其投递的值为null,如果null出现在分区列会导致数据无法被投递。
(6) 投递MaxCompute是批量任务,需谨慎设置分区列及其类型:保证一个同步任务内处理的数据分区数小于512个;用作分区列的字段值不能为空或包括‘/’等MaxCompute保留字段。
(7) 不支持海外Region的MaxCompute投递,海外Region的MaxCompute可使用DataWorks进行数据同步。
DataWorks:
环境准备及步骤:
(1) 登录阿里云LogHub控制台,创建Project。
(2) 登录DataWorks控制台,单击对应项目进入数据集成。
(3) 配置LogHub数据源。进入同步资源管理->数据源页面,单击右上角的新增数据源。
(4) 选择数据源类型为LogHub,填写新增LogHub数据源对话框中的配置,如下图所示。选择环境,填写数据源名称、LogHub Endpoint、在LogHub中创建的Project以及ACCESSIDACCESSKEY信息。填写完成后单击测试连通性。测试连通性通过说明数据源可以正确使用。单击确定。
111
(5) 配置同步任务。可选择向导模式,通过简单便捷的可视化页面完成任务配置;或者选择脚本模式,深度自定义配置同步任务。
新建业务流程->数据集成->新建数据集成节点->数据同步进入数据同步任务配置页面。
使用向导模式需要填写日志开始时间和结束时间。日志开始时间表示数据消费的开始时间位点,为yyyyMMddHHmmss格式的时间字符串(比如20191025103000),左闭右开。日志结束时间表示数据消费的结束时间位点,格式与日志开始时间相同。
批量条数:一次读取的数据条数,默认为256。
111
a) 向导模式配置同步任务。
配置数据源及数据去向。数据源为LogHub,填写Logstore,配置日志开始时间和日志结束时间。数据去向可选择MaxCompute的一个数据源,选择需要导入的表,填写数据信息,进行字段映射。保存数据同步的配置。提交并运行。查询MaxCompute表中数据,确保日志服务数据已经成功同步到MaxCompute。
b) 脚本模式配置同步任务。
导入模板,选择数据源和目标数据源。编辑脚本。需要填写reader和writer的脚本配置。
如下图红色框框选内容所示,reader端需要填写beginDataTime(日志开始时间)、datasourse(数据源,填写已配置好的数据源名称)、encoding(编码格式)、endDataTime(日志结束时间)、logstore(日志库)等。下图绿色框框选内容所示为writer端即(MaxCompute端)相应配置信息。需要填写列名,需要和LogHub配置的列名对应。填写datasourse、partition、table、truncate。
111
保存脚本数据节点,脚本模式配置成功后单击提交并运行。运行后数据即会从LogHub投递到MaxCompute表中。111
如上图演示截图所示,投递成功。查询MaxCompute表中数据,确保日志服务数据已经成功同步到MaxCompute。如下图所示。1111
4. 方案四:通过Kafka投递日志数据到MaxCompute
环境准备及步骤:
(1) 搭建Kafka集群。
(2) 在控制台创建Topic和Consumer Group。
(3) Flume读取日志文件数据写入到Kafka。首先为flume构建agent:进入flume下的配文件夹中编写构建agent的配置文件。再通过此命令启动flume的agent:bin/flume-ng agent -c conf -f 配置文件夹名/配置文件名-n a1 -Dflume.root.logger=INFO,console。然后启动Kafka的消费者进行数据写入bin/Kafka-console-consumer.sh:--zookeeper 主机名:2181 --topic Kafka_odps 。这样就开启了日志采集,文件会写入到Kafka中。
(4) 数据写入到Kafka,通过数据集成DataWorks同步数据到MaxCompute。
新建业务流程->新建数据同步节点->转换为脚本->配置脚本。
(5) 配置脚本信息,运行脚本进行数据节点的同步。如下图红色框框选内容为reader端配置,绿色框框选内容为writer端配置。注意reader端需要配置server,为Kafka broke Server,格式为:ip 端口号。框选的列为Kafka的属性列、数据列。下图演示中所填写内容为属性列,再填写相应Topic。需要填写begin offset,即数据消费开始节点。MaxCompute writer端,一一对应Kafka左边的列,需要填写table表名、datasourse等。脚本配置完成后点击提交并运行。运行成功后日志数据就会投递到MaxCompute表中。
111
(6) 查询MaxCompute表中数据,确保日志服务数据成功同步到MaxCompute。如下图演示所示。111
注意事项:
(1) 数据同步的脚本编写,Reader、Writer的配置。如果脚本配置有误,数据投递将会失败,运行失败。可以结合官方文档进行配置。
(2) Flume采集日志数据中配置文件的配置。配置文件中的Topic必须与Kafka创建的Topic保持一致。

欢迎加入“MaxCompute开发者社区2群”,点击链接申请加入或扫描二维码
https://h5.dingtalk.com/invite-page/index.html?bizSource=____source____&corpId=dingb682fb31ec15e09f35c2f4657eb6378f&inviterUid=E3F28CD2308408A8&encodeDeptId=0054DC2B53AFE745
111

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
利用SVN的post-commit钩子实现多项目自动同步
svn的post-commit钩子脚本在每次提交(commit)之后运行,我们可以在这个脚本里实现一些比较实用的功能,比如发送邮件提醒、自动备份版本库,自动同步代码到web服务器等。
776 0
51.com开放平台日志
【51开放平台日志 2008年12月05日】第三方小应用可以直接使用Flash访问51用户照片文件 出自51.com developers wiki 跳转到: 导航, 搜索 因为Flash的跨域读取文件限制策略,导致之前第三方小应用无法实现Flash跨域读取51用户照片。
481 0
MaxCompute同步数据的网络配置
MaxCompute可以通过数据集成加载不同数据源(例如:MySQL数据库等)数据,同样也可以通过数据集成把MaxCompute的数据导出到各种业务数据库。数据集成功能已经集成到DataWorks作为数据同步任务进行配置、运行。您可直接在DataWorks上配置MaxCompute数据源,再配置读取MaxCompute表或者写入MaxCompute表任务,数据的导入和导出整个过程只需在一个平台上进行操作。
797 0
基于Tablestore 实现大规模订单系统海量订单/日志数据分类存储的实践
前言:从最早的互联网高速发展、到移动互联网的爆发式增长,再到今天的产业互联网、物联网的快速崛起,各种各样新应用、新系统产生了众多订单类型的需求,比如电商购物订单、银行流水、运营商话费账单、外卖订单、设备信息等,产生的数据种类和数据量越来越多;其中订单系统就是一个非常广泛、通用的系统。而随着数据规模的快速增长、大数据技术的发展、运营水平的不断提高,包括数据消费的能力要求越来越高,这对支撑订单系统的数据库设计、存储系统也提出了更多的要求。在新的需求下,传统的经典架构面临着诸多挑战,需要进一步思考架构优化,以更好支撑业务发展;
187 0
使用OpenApi弹性释放和设置云服务器ECS释放
云服务器ECS的一个重要特性就是按需创建资源。您可以在业务高峰期按需弹性的自定义规则进行资源创建,在完成业务计算的时候释放资源。本篇将提供几个Tips帮助您更加容易和自动化的完成云服务器的释放和弹性设置。
12069 0
手把手教你完成MaxCompute JDBC自定义日志配置
本文讲述了如何在MaxCompute JDBC 2.0 beta版本中完成日志功能的相关配置,并进一步对logback的配置文件进行了示例及说明。
5111 0
利用MaxCompute内建函数及UDTF转换json格式日志数据
本文介绍了如何使用MaxCompute UDF对JSON格式的日志进行信息提取和转换。
8204 0
改善代码设计 —— 组织好你的数据(Composing Data)
  系列博客       1. 改善代码设计 —— 优化函数的构成(Composing Methods)       2. 改善代码设计 —— 优化物件之间的特性(Moving Features Between Objects)       3.
700 0
+关注
刘-建伟
MaxCompute技术支持工程师、大数据开发工程师
11
文章
0
问答
来源圈子
更多
MaxCompute(原ODPS)是一项面向分析的大数据计算服务,它以Serverless架构提供快速、全托管的在线数据仓库服务,消除传统数据平台在资源扩展性和弹性方面的限制,最小化用户运维投入,使您经济并高效的分析处理海量数据。
+ 订阅
文章排行榜
最热
最新
相关电子书
更多
《2021云上架构与运维峰会演讲合集》
立即下载
《零基础CSS入门教程》
立即下载
《零基础HTML入门教程》
立即下载