开发者学堂课程【SaaS 模式云数据仓库系列课程 —— 2021数仓必修课:Maxcompute tunnel 上传典型问题】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/55/detail/1044
Maxcompute tunnel 上传典型问题
内容简介:
一、Tunnel 上传功能概述
二、Tunnel 上传问题分类
一、Tunnel上传功能概述
Tunnel Upload 命令主要用于数据的上传
● 支持文件或目录(指一级目录)的上传,每一次上传只支持数上传到一张表或表的分区,有分区的表一定要指定上传的分区,多级分区一定要指定到末级分区。
● tunnel upload log.tbxt
test_project.test_table/pl="bl".p2="b2";
tunnel upload d:\data
sale_detail/sale_date=201312,region=hangzhou -s false;
-fd:本地数据文件的列分到符,默认为通号
-rd:本地救组文件的行分割符,默认为 \vrn
-bs:每次上传至 Tunnel 的败据块大小,默认 100MB
-dbr:是否忽随脏效据(多列、少列、列敛据类型不匹配等情况)
Tunnel Upload 命令上传限制
● 上传不设速度限制,上传速度的瓶颈在网络带宽以及服务器性能。
● 重传 retry 有次数的限制:当重传的次数超过5次,就会继续上传下 block 。上传完成后,可以通过 select count(“)语句,检查是否有效据丢失。
● Tunne 命令不支持上传下线 Array、Map 和 Struct 类型的数据。
● 每个 Tunnel 的 Session 在服务编的生命周期为24小时,的建后24小时内均可使用,也可以跨进程/线程共享使用。但是必须保证同 Blockld 没有重复使用。
● DatalWorks 数据开发不支持 upload 语句
二、Tunnel 上传问题分类
数据问题
如果数据里面有回车或空格如何处理?
如果数据中有回车或者空格,可以给数据设置不同于回车或者空格的分隔符,然后用 -rd 和 -fd 来指定对应的分隔符从而实现数据的上传,如果没有办法做数据的这个分隔符,可以将数据作为单独的一行来上传,然后再使用 utf 解析。
比如像 shop x x_id.100, 在第3条数据标红的这个数据中本身是包含了回车的,他的上传命令的行,它的默认的行分隔符是回车,可以在数据中去做处理,给每一行加这个@,每一条数据加@,然后指定上传的时候通过这个 -rd 行分隔符指定这个@作为行分隔符,这样数据中的回车就可以被上传到最后的这个表中了。
上传之前的目标表要注意是要事先创建好的。通过指定的行分隔符完成了数据中包含回车的上传。
如果这个数据本身是用比如用“,”做列分割,现在的这个 description 字段里本身数据是包含有“,”或者本身是用“|”来分割,但是的数据本身字段里本身是包含这个“|”符号的,这种情况下如果数据的字段本身是有“|”的,可以考虑去换数据的分隔符作为其他的符号,然后再通过 -fd 这个列分隔符指定这个列分隔符是其他的分隔符,然后再来做上传。
比如要上传 Excel 表格,用户有岗位需求的数据,在 Windows 环境上传这个表格内本身是包含这个“,”的。
首先是可以通过 Windows 环境来设置,如果他用的是 Excel 的话,可以设置在Excel 转为非文件的时候,这个分隔符是怎样的。系统会去给这个文件设置分隔符的,但这个分隔符怎么设置,不同的操作系统有不同的设置方式。可以自行搜索一下,这里 win7 的系统举个例子,这个系统来在控制面板\时钟-语言和区域中选择更改日期时间或者数字格式,然后再点击其他设置,么在这个界面就可以找到有列表分隔符这个选项。
可以根据自己的实际情况来设置。完成了这个设置了以后,在用 Excel 本身的功能把数据另存为 CSV 文件,然后再把它转码成utf八编码,正常情况下其实不一定需要转码,只是如果发现后来发现上传发现是乱码了以后,可以来做转码。
第三步就是用 TOP 命令去上传的数据,并且指定好这个分隔符, -FD 是上传,是指定上传时候的列分隔符,在这里去指定分隔符就可以解决的描述字段中数据有“,”的情况。
演示:
首先在这里新建了模拟的数据,这个数据是表示有一些工作岗位的信息,打开控制面板了以后到时钟语言和区域更改日期时间或数字格式,然后选到其他设置输入。
如果数据中使用空格作为列分隔符或者需要对数据做正则表达式过滤应该怎么处理。
正则表达式是非常常见的场景,因为目前命令本身是不支持正则表达式的,如果需要用正则表达式的话,可能需要自己去用 Python 或者 Java 写 TF 来做这个正则表达式过滤,所以遇到正则表达式的需求就可以借助If函数的功能来做。
定义原始数据,这个数据是用来采集访问比如阿里云这个帮助文档的时候,可以记录访问的客户端是什么,这个数据把它去上传的时候,最后想要提取的时候可能是比较麻烦的,因为这个分隔符是空格并且并不是完全的空格,中间还有引号或者横杠,但是只想要取到的这些里面的这些数据,比如访问的网页的URL或者访问的时间,访问的IP,访问的客户端类型,第一步把这个数据给传上去,其实是需要去把这个数据整个的传上去,这个时候可以把的数据作为整体,上传到的这个目前的项目空间中,它这个文件就叫这个 user log TXT 文件,么把这个数据上传上去的时候,为了让它是整列上传,可以指定不存在的列分隔符来上传,这个也是常见的技巧,可以通过指定完全不存在列分隔符,随意去命名。
来达到不去分割列的效果,然后上传完成了以后可以使用的这个 IDAE 编写Python UDF,编写过程可以参考 Max compute 的 SDK, Python SDK 或者参考章节来看怎样去使用的 Idea 或者是使用 Eclipse 来开发这个 Python 的 utf。
from odps.udf import annotate
from odps.udf import BaseUDTF
import re# 此处引入正则函数
regex = ([(\d\.])+) \(.*?)V-"(.*?)" (Nd+) (d+) (Nd+) (Nd+) "-" "(.?)"-(.?) --(.*?) (.?)----
# line -> ip,date,request,code,c1,c2,c3,ua,q1,q2,q3
@annotate('string·>string,string,string,string,string,string,string,string,string,string,string)
#请注意string数量和真实数据保持一致,本例中有11列。
class ParseAccessLog(BaseUDTF):
def process(self, line):
try
t= re.match(regex, line).groups(
self.fonward(t[0]. t[1]. t[2]. t[3], t[4]. t[5]. t[6]. t[7], t[8], t[9], t[10])
except:
pass
字段的这个类型的映射要注意要和取到的真实的数据保持一致,本例中的是有11个,如果这个不一致的话,最后运行出来会发现,取不到想要取得数据的。写好了之后要去把这个注册到 master 上,如果安装了的插件以后可以直接在栏里添加资源,添加好了以后去创建这个 utf ,然后创建 utf 填写这个刚才添加的资源的名称以及这个函数名类名,然后就可以直接在这个命令行中去使用这个ATF了。
utf 写好了以后找到 Max compute 的选择添加资源,然后添加到的项目中,然后这个地方的项目是需要首先在 project Explorer 这个地方添加好的项目,然后怎么去添加也可以参考一下的这个帮助文档中客户端与工具集栏中 Ida 中 Python 开发这一章节怎么去添加的这个项目,添加好了以后輸入现在的项目名称,添加好了以后点击 OK, py 这个脚本就已经存在了 Mask 之中了,然后存在以后就可以根据这个来创建函数。找到刚才上传的资源,函数名称这个地方可以自己命名后就可以选到这个类,选完了以后可以强制更新,这个函数就创建好了。
新建 table 之后,把刚才这个 user1 中存在的数据通过这个函数然后把它重新命名叫做 IP data request codec1 c2 c3 以及一些参数,然后把这些参数写入到 user2 这个表格中,然后选要执行的项目,右键选择就可以得到左下角这个命令行可以执行。
4.如何实玩批拾上传一个目录下的多个文件到同一张表,并且每个文件放在不同的分区内,可以利用Shell脚本实现上述功能,以在 Windows 环境下配合 odpscmd客户端使用 Shell 脚本举例,Linux 环境下原理相同。 Shell 本内容如下。
#!/bin/sh
C:/odpscmd public/bin/odpscmd.bat -e"create table user(data string)partitioned by(dt int);"//首先创建一个分区表user,分区关键字为dt,本例中odpscmd客户端的安装路径为C:/odpscmd_public/bin/odpscmd.bat,您可以根据您的实际环境调整路径。
dir=$(is C:/userlog)//定义变量dir,为存放文件的文件夹下所有文件的名称
pt=0//变量pt用于作为分区值,初始为1,每上传好一个文件+1,从而实现每个文件都存放在不同的分区
for i in Sdir //定义循环,迪历文件夹C:/userlog下的所有文件do
let pt=pt+1 //每次循环结束,变量pt+1
echo Si//显示文件名称
echo Spt//显示分区名称
C:/odpscmd_public/bin/odpscmd.bat -e "alter table user add partition (dt=$pt):tunnel upload
C:/userlog/Si user/dt=Spt-s false-fd“%”-rd“@":“//利用odpscmd首先添加
分区,然后向分区中上传文件done
。
首先创建表,表的名称叫 user ,然后他是来做分区的。
首先创建分区表,然后需要去遍历本地电脑上的路径下的所有的文件,然后把它完成上传而这里的文件是放在C盘下的文件夹,然后的文件都是存在这个C盘下的 user log 里的,然后可以给批定初始值,然后把它作为这个集体分区的值,然后初始为零,然后再写循环每次把这个P值给加1,然后这个地方 Dr 其实就是这个存放的文件的路径,就遍历这个目录下的所有的文件,增加分区的语句每次循环加 1,然后使用的命令去上传这个文件,完全上传了以后,其实是可以看到这个C盘的user log 这个路径下的所有的文件全部都被上传了同表中,并且每分文件里面的自带是在分区里的。
5. MaxCompute 使用 Tunnel Upload 命令上传数据时失败,内存溢出报锁java.lang.OutOfMemoryError.Java heap space 的原因。
从报错上看是数据上传的时候的内存溢出了。目前 TunnelUpload 命令是支持海量数据的上传的,如果出现内存溢出,可能是因为数据的行分隔符和列分隔符设置错误,导致整个文本会被认为是同一条数据,缓存到内存里再做 split,导致内存溢出报错。
这种情况下可以先拿少量的数据测试,把 -rd 及 -fd 调试过了后再上传拿全量的数据。
行分隔符或者列个分隔符设置错误了可能会导致你的整个文本会被认为是同一条数据,内存需要首先把你的整条数据缓存到的内存当中,然后再去做这个split切分,整条数据太大就会导致内存溢出的报错,可以先少量数据测试这个Rd和FD的值是可以使用的再去上传全量的数据。
6. MaxCompute 使用 Tunnel Upload 命令把一个目录下的所有文件上传到一个表里,并且想要自动建立分区具体的命令是是 tunnel upload/data/2018/20180813/*.json app_log/dt=20180813-fd@@’-acp true;,执行报错:
Unrecognized option: -acp
FAILED: error occurred while running tunnel command
出现这种报错通常是因为是用了不支持的命令或字符。MaxCompute 使用 Tunnel Upload 命令上传不支持通配符及正则表达式。
会有这种报错通常是因为使用了不支持的命令或者字符,这个上面这一段黄色的命令中其实就是使用目前不支持使用通配符或正则表达式的,而如果有正则表达式的这个需求的话也可以像刚才的一样写utf来进行处理。
7.MaxCompute 使用 Tunnel Upfoad 命令上传文件数据报错如下
java.io.IOException: RequestId=XXXXXXXX
ErrorCode=StatusConflict, ErrorMessage=You cannot complete the specified operation
under the current upload or download status.
At com.aliyun.odps.tunnel.io.TunnelRecordWriter.close(TunnelRecordWriter.java:93)
at com.xgoods.utils.aliyun.maxcompute.OdpsTunnel.upload(OdpsTunnel.java:92)
at com.xgoods.utils.aliyun.maxcompute.OdpsTunnel.upload(OdpsTunnel.java:45)
at com.xeshop.task.SaleStatFeedTask.doWork(SaleStatFeedTask.java:119)
at com.xgoods.main.AbstractTool.excute(AbstractTool.java:90)at com.xeshop.task.SaleStatFeedTask.main(SaleStatFeedTask.java:305)
A:这个错误的提示是当前已经是在上传或下载中,所以无法再操作。下面这个问题是使用的命令上传文件数据报错,然后由下面这段报错,所以这个报错的时候可能需要稍等一下。
8. MaxCompute 硬用 Tunnel Upload 命令上传传文件数据报错如下的原因。要一次性上传 8000W 的数据,最后在 odps tunnel recordWriter.close() 时报错,报错内容如下:
ErrorCode=StatusConflict, ErrorMessage=You cannot complete the specified operationunder the current upload or download status.
这个报错说明 session 的状态错误,建议重新创建个 session 重新上传一下数据。从报错上看,很可能是前面的操作里已经 close 了这个 session,或者已经commit 了。对于不同的分区,需要每个分区都是单独的一个 session。
为了防止多次 commit 导致的这种报错,可以先检查数据上传是否已经传成功,如果失败的话重新上传一次。
网络问题
Q:MaxCompute 使用 Tunnel Upload 命令上传散据时为什么报锚java.io.lOEsxception: Error writingrequcst body to server ?
这是一个上传数据到服务器的时的异常,通常是因为上传过程中的网络链接断开/超时导致的:
可能是用户的数据源并非是来自本地文件,而是需要从诸如数据库等一类的地方获取,导致效据在写入的过程中还需要等待数据获取导致的超时。目前UploadSession 在上传数据的过程中,如果600秒没有数据上传,则被认为超时。
用户通过公网的 Endpoint 来做数据上传,由于公网网络质量不稳定导致超时。
A:解决方法:
在上传的过程中,先获取数据,再调用 Tunnel SDK 上传数据。
一个 block 可以上传 64M-1G 的数据,好不要超过1万条数据以免带来重试的时间导致的超时。一个 Session 可以挂最多2万个block,
如果用户的数据是在 ECS 上,可以参考访问域名和数据中心配置合适的Endpoint ,可以提速井节省费用。
Q: MaxCompute 使用 Tunnel Upload 命令行上传数据,设置了经典网络的Endpoint ,但为什么会连接 外网的 Tunnel Endpoint ?
A:配置文件 odps_config.ini 里除了 endpoint 之外还需要配量tunnel_endpoint .请参考访问域名和数据中心进行配置。目前只有上海 region 不需要设置 tunnel endpoint。
Q:MaxCompute 使用 Tunnel Upload 命令行上传数据是否支持限速?上传速率太快的话可能会占用服身器过多的 I/O 性能。
A:目前 MaxCompute 使用 Tunnel Upload 命令行不支持限速,需妥通过SDK单独处理。
Q: MaxCompute 使用 Tunnel Upload 命令行上传数据太慢怎么办?
A:如果上传数据太慢,可以考虑使用 -threads 参数将数据切片上传,例如将文件切分为 10片 上传:
odps@ bigdata_DOC>tunnel upload C:\userlog.txt userlog1 -threads 10 -s false -fd "Lu0000"-rd "n";
计费问题
Q: Max compute 使用 Tunnel upload 命令行上传数据计费的带宽,是按照数据压缩钱还是压缩后的大小计费?
A:按照 Tunnel 压缩后的带宽计费