开发者学堂课程【SaaS 模式云数据仓库实战:日志数据如何同步 MaxCompute】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/761/detail/13347
日志数据如何同步到 MaxCompute
3.方案三:通过 LogHub 投递日志数据到 MaxCompute
这里有两种方式,一种是直接通过logHUB投递到maxcompute表中,另外一种是借助数据集成 Dataworks 通过写脚本的方式去投递数据。先介绍第一种方式,通过 LogHUB 投递数据。
环境准备及步骤:
(1)首先开通日志服务,登录日志服务控制台,创建新的Project或者单击已经创建好的Project名称。
(2)创建新的 Logstore 或者单击已经创建好的 Logstore 名称。
(3)单击对应的 Logstore ,查询分析导入到LogHub的日志数据。下面的几条数据就是导入到logHUB中,需要把这几条数据同步到maxcompute表中去。
(4)选择需要投递的日志库名称并依次展开节点,日志库名称->数据处理->导出->MaxCompute。单击开始投递。
(5)单击开启投递以进入LogHub->数据投递页面。
(6)配置投递规则,在LogHub->数据投递页面配置字段关联等相关内容。
a)自定义一个投递名称
b)项目名选择MaxCompute表名称,请输入自定义的新建的MaxCompute表名称或者选择已有的MaxCompute表。日志库表名可以是已知表也可以是新建一个表,新建表的话,系统会去自动创建一个表。
c)按序,左边填写与MaxCompute表数据列相映射的日志服务字段名称,右边填写或选择MaxCompute表的普通字段名称及字段类型。对于字段关联,左边是logHUB日志当中的字段列,右边是max compute表的字段列,需要注意的是,maxcompute表的字段列必须要映射日志服务的字段名称,最后一列是maxcompute表的字段类型,在下面就是分区字段的填写即Partitiontime。
d)_partition time_格式 :将日志时间作为分区字段,通过日期来筛选数据是MaxCompute常见的过滤数据方法。
_partition_timne_是根据日志_time_值计算得到(不是日志写入服务端时间,也不是日志投递时间) ,结合分区时间格式,向下取整。
(7)在投递管理页面,单击修改即可针对之前的配置信息进行编辑。其中如果想新增列,可以在大数据计算服务MaxCompute修改投递的数据表列信息,则单击修改后会加载最新的数据表信息。
(8)投递任务管理。
配置成功以后,点击确定,会提醒成功配置了数据投递到max compute,在启动投递功能后,日志服务后台会定期启动离线投递任务。用户可以在控制台上看到这些投递任务的状态和错误信息。
当投递任务发生错误时,请查看错误信息,问题解决后可以通过云控制台中日志投递任务管理或SDK来重试失败任务。
(9)查看日志投递的运行状态
当日志开始投递的时候状态是运行中,数据行数显示为零,过一段时间,当数据同步成功以后,可以看到,数据行数变成了11,状态由运行中变成了成功,说明通过logHUB投递日志数据的maxcompute是成功的。
(10)日志投递MaxCompute后,检查数据完整性。
a)通过控制台或API/SDK判断(推荐),有以下两个方法
使用API、SDK或者控制台获取指定Project/Logstore投递任务列表。控制台会对该返回结果进行可视化展示。
b)通过MaxCompute分区粗略估计
比如在MaxCompute中以半小时做一次分区,投递任务为每30分钟一次,当表中包含以下分区:
2019_10_25_10_00
2019_10_25_10_30
当发现分区2019_10_25_11_00出现时,说明11:00之前分区数据已经完整。
缺陷:该方法不依赖API ,判断方式简单但结果并不精确,仅用作粗略估计。如果想要检查的话,建议使用第一种方式。通过控制台或者是API、SDK方式进行判断。
(11)查询MaxCompute表中数据,确认是否导出成功。
(12)注意事项
数加控制台创建、修改投递配置必须由主账号完成,不支持子账号操作。
不同Logstore的数据请勿导入到同一个MaxCompute表中,否则会造成分区冲突、丢失数据等后果。
MaxCompute表至少包含一个数据列、一个分区列。
MaxCompute单表有分区数目6万的限制,分区数超出后无法再写入数据,所以日志服务导入MaxCompute表至多支持3个分区列。请谨慎选择自定义字段作为分区列,保证其值是可枚举的。
日志服务数据的一个字段最多允许映射到一个MaxCompute表的列(数据列或分区列) ,不支持字段冗余,同一个字段名第二次使用时其投递的值为null ,如果null出现在分区列会导致数据无法被投递。
投递MaxCompute是批量任务,请谨慎设置分区列及其类型:保证一个同步任务内处理的数据分区数小于512个 ;用作分区列的字段值不能为空或包括/等MaxCompute保留字段。
不支持海外Region的MaxCompute投递,海外Region的MaxCompute请使用DataWorks进行数据同步。
4.方案三:通过LogHub投递日志数据到MaxCompute DataWorks:
环境准备及步骤:
(1)登录阿里云LogHub控制台,创建Project。
(2)登录DataWorks控制台,单击对应项目进入数据集成。
(3)进入同步资源管理->数据源页面,单击右上角的新增数据源。需要配置一个LohHub数据源。
(4)选择数据源类型为LogHub ,填写新增LogHub数据源对话框中的配置。选择环境、填写一下数据源的名称、同时需要填写LogHubaddpot以及在LOGHUB中创建的Project以及AK信息。
单击测试连通性。测试连通性通过后,说明数据源可以正常使用,单击确定。
(5)配置同步任务
可选择向导模式,通过简单便捷的可视化页面完成任务配置;或者选择脚本模式,深度自定义配置您的同步任务。
新建业务流程->数据集成->新建数据集成节点->数据同步进入数据同步任务配置页面。
使用向导模式需要填写日志开始时间和日至结束时间。
日志开始时间:数据消费的开始时间位点,为yyyyMMddHHmmss格式的时间字符串(比如20191025103000) ,左闭右开。精确到微秒。
日志结束时间:数据消费的结束时间位点,为yyyyMMddHHmmss格式的时间字符串(比如20191025113000) ,左闭右开。
批量条数: 一次读取的数据条数,默认为256。
a)向导模式配置同步任务。
配置数据源及数据去向。数据源就是loghub,填写logstore,配置日志开始时间和日至结束时间。数据去向就是maxcompute的数据源,选择要导入的表,填写分区信息,进行字段映射。
保存数据同步的配置。提交并运行。
查询MaxCompute表中数据,确保日志服务数据已经成功同步到MaxCompute。
b)脚本模式配置同步任务
主要是reader和writer端的脚本配置。需要注意的是,这里有一个begindatetime也就是日志开始的时间和adddatetime日志结束的时间,datesource数据源,填写配置好的数据源的名称,编码格式。必须要进行填写的是logstore日志库
导入模板,选择数据源和目标数据源。
编辑脚本。
绿色框内是writer端,也就是maxcompute端的配置信息,需要填写链名,与loghub配置的名字是进行对应的。填写datesource、oartition、table。
保存脚本数据节点,提交并运行。数据就从Loghub投递到Maxcompute表中。如下图可以看到是投递成功的,总共投递了69条数据。
查询MaxCompute表中数据,确保日志服务数据已经成功同步到MaxCompute。
5.方案四:通过Kafka投递日志数据到MaxCompute
环境准备及步骤:
(1)搭建Kafka集群。
(2)在控制台创建Topic和Consumer Group。
(3)Flume读取日志文件数据写入到Kafka。
为flume构建agent :先进去flume下的配文件夹里面编写构建agent的配置文件。
启动flume的agent
bin/flume- -ng agent -C conf -f配置文件夹名/配置文件名-n a1 -Dflume.root.logger=INFO,console
启动kafka的消费者bin/kafka- -console- -consumer.sh-- zookeeper主机名:2181--topic kafka_ odps
这样就开启了日志采集,文件会写入到kafka中。
(4)通过数据集成DataWorks同步数据到MaxCompute。
新建业务流程->新建数据同步节点->转换为脚本->配置脚本。
(5)配置脚本信息,运行脚本进行数据节点的同步。
红色框是从Kafka的reader端的一个配置,绿色框是一个writer端配置。
红色框这边需要配置一个server,它是kafka server,格式为IP+端口号,框起来的column部分是kafka的属性列,再填写相应的topic,还需要填写beginoffset也就是数据开始消费节点。
绿色框maxcompute的writer端,这些列一一对应kafka的列,填写table表名,填写datesource。脚本配置完成以后,点击提交,进行运行。运行成功后日志数据就会同步到maxcompute表中。
(6)查询MaxCompute表中数据,确保日志服务数据已经成功同步到MaxCompute。
注意事项:
(1)数据同步的脚本编写。reader、writer的配置。不懂可以结合官方文档进行配置。
(2)Flume采集日志数据中配置文件的配置,里面的topic必须与kafka里面创建的topic是一致的。













