日志数据如何同步到 MaxCompute | 学习笔记

本文涉及的产品
大数据开发治理平台 DataWorks,不限时长
简介: 快速学习日志数据如何同步到 MaxCompute,介绍了日志数据如何同步到 MaxCompute 系统机制, 以及在实际应用过程中如何使用。

开发者学堂课程【SaaS 模式云数据仓库实战日志数据如何同步到 MaxCompute】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/332/detail/3719


日志数据如何同步到 MaxCompute

 

内容介绍:

一、实验目的

二、方案介绍

三、方案比较及场景应用

四、操作步骤


一、实验目的

日常工作中企业需要将通过 ECS、容器、移动端、开源软件、网站服务、JS 等接入的实时日志数据进行日志实时查询与分析、采集与消费、数据清洗与流计算(ETL/Stream Processing)、数据仓库对接(Data Warehouse)等场景的使用。


二、方案介绍

image.png

方案一使用 Tunnel 命令上传日志数据到 MaxCompute。

方案二通过 DataHub 投递数据到 MaxCompute。DataHub DataConnector 是把 DataHub 服务中的流式数据同步到其他云产品中的功能目前支持将 Topic 中的数据实时/准实时同步到 MaxCompute、OSS、ElasticSearch、RDS Mysql、ADS、TableStore 中。用户只需要向 DataHub 中写入一次数据,并在 DataHub 服务中配置好同步功能便可以在各个云产品中使用这份数据。

方案三通过 SLS 实时采集与消费(LogHub)投递数据到 MaxCompute。

通过 DataWorks 的数据集成(Data Integration)功能投递至 MaxCompute。

方案四通过 Kafka 订阅实时数据投递至 MaxCompute。

方案二和方案三的差异很小,都是消息队列,对DataHub 建议进行公测或者自研。


三、方案比较及场景应用

Tunnel

Tunnel 用于批量上传数据到离线表里适用于离线计算的场景。对于特殊格式日志,我们一般建议作为一个字段上传,到 MaxComopute 进行拆分。

DataHub

Datahub 用于实时上传数据的场景,主要用于流式计算的场景,数据上传后会保存到实时表里,后续会在几分钟内通过定时住务的形式同步到离线表里,供离线计算使用。

日志服务(SLS)

(实时采集与消费)LogHub 用途数据清洗(ETL)、流计算(Stream Compute)、监控与报警、机器学习与迭代计算等。

投递数仓(LogShipper)稳定可靠的日志投递。将日志中枢数据投递至存储类服务进行存储。

支持压缩、自定义 Partition、以及行列等各种存储方式。

数据仓库+数据分析、审计、推荐系统与用户画像

支持通过控制台数据接入向导一站式配置正则模式采集日志与设置索引。

(实时采集与消费)LogHub实时性强,写入即可消费。Logtail(采集 Agent)实时采集传输,1秒内到服务端(99.9%情况)。写入即可查询分析。

海量数据对数据量不设上限。

种类丰富支持行、列、TextFile 等各种存储格式。

配置灵活支持用户自定义 Partition 等配置。

Kafka

Kafka 是一款分布式发布与订阅的消息中间件有高性能、高吞量的特点,每秒能处理上百万的消息。Kafka 适用于流式数据处理。

大数据领域主要应用于用户行为跟踪、日志收集等场景。

数据集成将消息导入 MaxCompute、OSS、RDS、Hadoop、HBase 等离线数据仓库。


四、操作步骤

1、方案一通过 Tunnel 上传日志数据到 MaxCompute

环境准备及步骤

(1)开通 MaxCompute 服务安装 odpscmd 客户端。

日志数据,通过 Tunnel 上传到 MaxCompute

(2)准备日志服务数据。

(3)创建 MaxCompute 用来储存日志数据。

(4)使用命令

tunnel u C:\Users\Desktop\weijing_loghub_demo.csv tunnel_log。

下图表明执行后,我们投递的日志数据是成功的,说明已经通过 Tunnel 上传日志数据到 MaxCompute

image.png

(5)查询表数据是否导入成功。

查询后发现表中有数据导入,说明日志数据已经成功导入 MaxCompute 表中。

image.png

注意事项

(1)使用 Tunnel 命令行工具上传数据当前不支持通配符或正则表达式命令,如果想使用正则表达式上传,可以借助方案三。

(2)对于特殊格式的日志数据我们一般建议作为一个字段上传 MaxComopute 进行拆分。

2、方案二通过 DataHub 投递日志数据到 MaxCompute 

环境准备及步骤

(1)登录阿里云 DataHub 控制台创建 Project。

image.png

(2)进入 Project 列表->Project 查看创建 Topic,有两种方式,一种是直接创建,一种是导入 MaxComopute 表结构,这里先介绍导入 MaxComopute 表结构。首先输入 MaxComopute 项目,选择项目名称,第二步输入 MaxComopute 表,这个表可以是已经创建的表,也可以是新建的表,新建表名会在 MaxComopute 中自动创建一个表,然后填写 AK 信息,选择自动创建 DataConnector,它会自动创建一个 DataConnector,然后填写 Topic 的名称。

image.png

(3)选择导入 MaxCompute 表结构、并勾选自动创建 DataConnector。

注意准备对应的 MaxCompute 该表字段类型、名称、顺序必须与 DataHub Topic 字段完全一致如果三个条件中的任意一个不满足则归档 Connector 无法创建。Shard 数量中,默认一个 Shard 对应1000个 KPS,可以根据数据流量进行相应的设置,然后设置生命周期。

(4)创建好的 DataConnector 详细信息如下图所示。

包括同步时间、最新写入数据的时间、MaxCompute Endpoint 以及运行状态、章数据量、当前点位,当前点位是从0开始的,下图显示是2,说明导入了3条数据。现在介绍直接创建 Topic,首先需要输入 MaxCompute 的一个项目,然后同样输入 MaxCompute table,可以是已知表,也可以是新建表,然后输入相应的 AK 消息。

image.png

(5)如果已经创建 Topic只需要在详情页的右上角点击

+DataConnector。

分区范围SYSTEM_TIME、EVENT_TIME、USER_DE FINE 三种模式SystemTime 模式会使用写入时间转化为字符串进行分区EventTime 模式会根据 topic 中固定的 event_time 字段时间进行分区(需要在创建 Topic 时增加一个 TIMESTAMP 类型名称为 event_time 的字段并且写入数据时向这个字段写入其微秒时间)UserDefine 模式将会直接使用用户自定义的分区字段字符串分区。分区格式定义现阶段仅支持固定的格式。

(6)创建完成后,回到 DataHub 控制台点击 Topic点击 DataConnector 可以查看 DataConnector 的详细信息,包括配置的目标服务、目标描述以及最新写入数据的时间

image.png

(7)日志数据抽样

下图是点击数据抽样后,可以看到同步到 DataHub 中的一个日志数据。

image.png

查看 DataConnector 的详细信息,可以看到归档信息,包括当前点位、章数据以及运行状态,如果运行状态有失败,需要检查其原因,DataHub 投入数据到 MaxCompute 离线表中时,默认是64兆就会commit一次,合作5分钟进行一次强行写入,可以保证的是至少5分钟会有一次数据同步。

image.png

(8)测试日志数据是否投递成功。

进行全段扫描,可以查询下表有数据,说明通过 DataHub 投递日志数据到 MaxCompute 是成功的。

image.png

注意事项

(1)目前所有 DataConnector 均仅支持同Region 的云服务之间同步数据,不支持同步数据到跨 Region 的服务。

(2)DataConnector 所配置的目标服务 Endpoint 需要填写相应的内网域名(经典网络),不支持使用公网域名同步。

(3)数据同步目前仅支持 at least once 语义,在网络服务异常等小概率场景下可能会导致目的端的数据产生重复但不会丢失,需要做去重处理。

(4)topic 默认20个,如果需要创建更多,需提交工单申请。

3、方案三通过 LogHub 投递日志数据到 MaxCompute

有两种方式,一种是直接通过 LogHub 投递到 MaxCompute 表中,另外一种是借助数据集成 DataWorks 通过写脚本的方式投递数据,先介绍第一种方式,直接通过 LogHub 投递到 MaxCompute 表中的方式。

环境准备及步骤

(1)开通日志服务登录日志服务控制台创建 Project 或者单击已经创建好的 Project 名称。

image.png

(2)创建新的 Logstore 或者单击已经创建好的 Logstore 名称。

(3)单击对应的 Logstore,查询分析导入到 LogHub 的日志数据。

下面几条数据导入到 LogHub 中,需要这几条数据同步到 MaxCompute 表中。

(4)选择需要投递的日志库名称并依次展开节点日志名称->数据处理->导出->MaxCompute。单击开始投递。

(5)单击开启投递以进入 LogHub->数据投递页面。

image.png

(6)配置投递规则 LogHub->数据投递页面配置字段关联等相关内容。

a)自定义一个投递名称

b)MaxCompute 表名称请输入自定义的新建的 MaxCompute 表名称或者选择已有的 MaxCompute 表。

c)按序左边填写与 MaxCompute 表数据列相映射的日志服务字段名称右边填写或选择 MaxCompute 表的普通字段名称及字段类型。

d)_partition_time_格式将日志时间作为分区字段通过日期来筛选数据是 MaxCompute 常见的过滤数据方法。_partition_time_是根据日志_time_值计算得到(不是日志写入服务端时间也不是日志投递时间)结合分区时间格式向下取整。

(7)在投递管理页面单击修改即可针对之前的配置信息进行编辑。其中如果想新增列可以在大数据计算服务 MaxCompute 修改投递的数据表列信息则单击修改后会加载最新的数据表信息。

(8)投递任务管理。

在启动投递功能后,日志服务后台会定期启动离线投递任务,用户可以在控制台上看到这些投递任务的状态和错误信息。当投递任务发生错误时请查看错误信息,问题解决后可以通过云控制台中日志投递任务管理或 SDK 来重试失败任务。

(9)查看日志投递的运行状态。

当日志开始投递时,它的状态是运行中,数据行数为0,当数据投递成功时,数据行数变成11,状态由运行中变为成功,说明通过 LogHub 投递日志数据到 MaxCompute 中成功了。image.png

(10)日志投递 MaxCompute后,检查数据完整性。

a)通过控制台或 API/SDK 判断(推荐)

使用 API、SDK 或者控制台获取指定 Project/Logstore 投递任务列表。控制台会对该返回结果进行可视化展示。

b)通过 MaxCompute 分区粗略估计

比如在 MaxCompute 中以半小时做一次分区,投递任务为每30分钟一次,当表中包含以下分区

2019_10_25_10_00

2019_10_25_10_30

当发现分区2019_10_25_11_00出现时,说明11:00之前分区数据已经完整。

该方法有一定的缺陷,它不依赖 API判断方式简单但结果并不精确仅用作粗略估计。如果要进行检查,建议使用另一种方式,通过控制台或 API/SDK 进行判断。

(11)查询 MaxCompute 表中数据,确认数据是否导入成功

通过 LogHub 投递日志数据到 MaxCompute注意事项

(1)数加控制台创建、修改投递配置必须由主账号完成,不支持子账号操作。

(2)不同 Logstore 的数据请勿导入到同一个 MaxCompute 表中,否则会造成分区冲突、丢失数据等后果。

(3)MaxCompute 表至少包含一个数据列、一个分区列。

(4)MaxCompute 单表有分区数目6万的限制分区数超出后无法再写入数据,所以日志服务导入 MaxCompute 表至多支持3个分区列。请谨慎选择自定义字段作为分区列,保证其值是可枚举的。

(5)日志服务数据的一个字段最多允许映射到一个 MaxCompute 表的列(数据列或分区列),不支持字段冗余同一个字段名第二次使用时其投递的值为 null,如果 null 出现在分区列会导致数据无法被投递

(6)投递 MaxCompute 是批量任务,请谨慎设置分区列及其类型保证一个同步任务内处理的数据分区数小于512个用作分区列的字段值不能为空或包括/ MaxCompute 保留字段。

(7)不支持海外 Region  MaxCompute 投递海外 Region  MaxCompute 请使用DataWorks 进行数据同步。

4、方案三通过 LogHub 借助 DataWorks 投递日志数据到 MaxCompute

环境准备及步骤

(1)登录阿里云 LogHub 控制台,创建 Project

(2)登录 DataWorks 控制台,单击对应项目进入数据集成。

(3)进入同步资源管理->数据源页面,单击右上角的新增数据源。

(4)选择数据源类型为 LogHub,填写新增 LogHub 数据源对话框中的配置。需要填写环境、数据源名称、LogHub Endpoint、在 LogHub 中创建的 project,以及 AK 信息。

填写完成后,单击测试连通性。测试连通性通过后,说明数据源可以正确使用,单击确定。

(5)配置同步任务

可选择向导模式,通过简单便捷的可视化页面完成任务配置,注意填写日志开始的时间和日志结束的时间;或者选择脚本模式,深度自定义配置您的同步任务。

新建业务流程->数据集成->新建数据集成节点->数据同步进入数据同步任务配置页面。

image.png

日志开始时间数据消费的开始时间位点,为 yyyyMMddHHmmss 格式的时间字符串(比如20191025103000),精确到微秒,左闭右开。

日志结束时间数据消费的结束时间位点,为 yyyyMMddHHmmss 格式的时间字符串(比如20191025113000),左闭右开。

批量条数一次读取的数据条数,默认为256。

a)向导模式配置同步任务。

I)配置数据源及数据去向。数据来源就是 LogHub 填写 Logstore,配置日志开始时间、日志结束时间,数据去向就是选择 MaxCompute 中的数据源、选择要导入的表、填写分区信息、进行字段映射。

II)保存数据同步的配置。提交并运行。

III)查询 MaxCompute 表中数据,确保日志服务数据已经成功同步到 MaxComputeimage.png

b)脚本模式配置同步任务。

I)导入模板选择数据源和目标数据源。

II)编辑脚本。

"steps":[

{

"category":"reader",

"name":"Reader",

"parmeter":{

"batchSize":"256"

"beginDateTime":"20191105110000",//日志开始时间

"colum":[

"id",

"name",

"salenum"

]

"datasource":"weitest",//数据源名称

"encoding":"UTF-8",//编码格式

"endDateTime":"20191105235959",//日志结束时间

"fieldDellmlter":".",

"logstore":"loghub_ppt"//日志库

},

"stepType":"loglub"

},

{

"category":"writer",

"name":"writer",

"parmeter":{

"colum":[

"id",

"name",

"salenum"

]

"datasource":"odps_first",

"partition":"at_20191105"

"table":"loghub_odps_copy",

"truncate":true

},

"stepType":"odps"

}

],

III)保存脚本数据节点提交并运行。

IV)查询 MaxCompute 表中数据确保日志服务数据已经成功同步到 MaxCompute。

image.png

5、方案四通过 Kafka 投递日志数据到 MaxCompute

环境准备及步骤

(1)搭建 Kafka 集群。

(2)在控制台创建 Topic  Consumer Group。

(3)Flume 读取日志文件数据写入到 Kafka。

a)为 flume 构建 agent先进flume 下的配文件夹里面编写构建 agent 的配置文件。

b)启动 flume  agent

bin/flume-ng agent-c conf-f 配置文件夹名/配置文件名

-n a1 -Dflume.root.logger=INFOconsole

c)启动 kafka 的消费者 bin/kafka-console-consumer.sh

--zookeeper 主机名2181--topic kafka_odps

这样就开启了日志采集,文件会写入到 kafka 中。

(4)通过数据集成 DataWorks 同步数据到 MaxCompute。

新建业务流程->新建数据同步节点->转换为脚本->配置脚本。

(5) 配置脚本信息,运行脚本进行数据节点的同步。

红色框是 read 端配置,绿色框是 writer 端配置。read 端要注意配置 server,格式是 IP和端口号,框写的列是 Kafka 的属性列,然后再填写相应的 topic,还需要填写 beginOffset 数据消费的开始节点。writer 端需要填写 table 的表名、datasource,这些都是必须进行选择的。

脚本配置完成后,点击提交,然后运行,运行成功后日志数据便会投递到 MaxCompute 表中。

(6)查询 MaxCompute 表中数据,确保日志服务数据已经成功同步到 MaxCompute。下图表明数据投递成功。

image.png

注意事项:

(1)数据同步的脚本编写。如果脚本配置有误,数据运行将会失败,要注意关于 Reader、Writer的配置。

(2)Flume 采集日志数据中配置文件的配置。配置文件中的 topic 必须与 Kafka 创建的 topic 保持一致。

相关文章
|
2天前
|
机器学习/深度学习 前端开发 数据挖掘
工具变量法(两阶段最小二乘法2SLS)线性模型分析人均食品消费时间序列数据和回归诊断(下)
工具变量法(两阶段最小二乘法2SLS)线性模型分析人均食品消费时间序列数据和回归诊断
72 11
|
8天前
工具变量法(两阶段最小二乘法2SLS)线性模型分析人均食品消费时间序列数据和回归诊断2
工具变量法(两阶段最小二乘法2SLS)线性模型分析人均食品消费时间序列数据和回归诊断
15 0
|
8天前
|
机器学习/深度学习 前端开发 数据挖掘
R语言计量经济学:工具变量法(两阶段最小二乘法2SLS)线性模型分析人均食品消费时间序列数据和回归诊断
R语言计量经济学:工具变量法(两阶段最小二乘法2SLS)线性模型分析人均食品消费时间序列数据和回归诊断
39 0
|
11天前
|
数据采集 搜索推荐 大数据
大数据中的人为数据
【4月更文挑战第11天】人为数据,源于人类活动,如在线行为和社交互动,是大数据的关键部分,用于理解人类行为、预测趋势和策略制定。数据具多样性、实时性和动态性,广泛应用于市场营销和社交媒体分析。然而,数据真实性、用户隐私和处理复杂性构成挑战。解决策略包括数据质量控制、采用先进技术、强化数据安全和培养专业人才,以充分发挥其潜力。
14 3
|
13天前
|
运维 供应链 大数据
数据之势丨从“看数”到“用数”,百年制造企业用大数据实现“降本增效”
目前,松下中国旗下的64家法人公司已经有21家加入了新的IT架构中,为松下集团在中国及东北亚地区节约了超过30%的总成本,减少了近50%的交付时间,同时,大幅降低了系统的故障率。
|
1月前
|
分布式计算 DataWorks 关系型数据库
DataWorks报错问题之dataworks同步rds数据到maxcompute时报错如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
23天前
|
Java
使用Java代码打印log日志
使用Java代码打印log日志
77 1
|
25天前
|
Linux Shell
Linux手动清理Linux脚本日志定时清理日志和log文件执行表达式
Linux手动清理Linux脚本日志定时清理日志和log文件执行表达式
78 1
|
28天前
|
SQL 关系型数据库 MySQL
MySQL数据库,可以使用二进制日志(binary log)进行时间点恢复
对于MySQL数据库,可以使用二进制日志(binary log)进行时间点恢复。二进制日志是MySQL中记录所有数据库更改操作的日志文件。要进行时间点恢复,您需要执行以下步骤: 1. 确保MySQL配置文件中启用了二进制日志功能。在配置文件(通常是my.cnf或my.ini)中找到以下行,并确保没有被注释掉: Copy code log_bin = /path/to/binary/log/file 2. 在需要进行恢复的时间点之前创建一个数据库备份。这将作为恢复的基准。 3. 找到您要恢复到的时间点的二进制日志文件和位置。可以通过执行以下命令来查看当前的二进制日志文件和位
|
1月前
|
监控 Shell Linux
【Shell 命令集合 系统管理 】Linux 自动轮转(log rotation)日志文件 logrotate命令 使用指南
【Shell 命令集合 系统管理 】Linux 自动轮转(log rotation)日志文件 logrotate命令 使用指南
51 0

热门文章

最新文章