开发者学堂课程【 SaaS 模式云数据仓库系列课程 —— 2021数仓必修课:PyODPS 基本操作】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/55/detail/1037
PyODPS 基本操作
内容简介:
一、开箱即用的环境
二、ODPS 入口
三、MaxComputer SQL
四、读写表数据
五、直接使用 tunnel 接口读写表数据
六、Pandas-like API
七、多后端支持:
八、PyODPS DataFrame 实现原理简介
PyODS 还提供了开箱即用的环境首先是 DataWorks,在DataWorks 上面新建一个数据开发就会有 PyODPS 节点,打开节点之后可以进行 python 代码;
第二个在节点上面会有 PyODPS 文档,在这个节点上面的好处就是不需要新建一个账号,可以直接查询,直接在 DataWorks 上面用一个全局变量 ODPS ,然后直接 SQL 默认没有开启 instance tunnel,DataFrame 需要 execute 等触发执行。
一、开箱即用的环境:PAI
数据——PyODPS(将来可能是notebook)
文档:
http://pyodps.alibaba.net/products-docs/zh-CN/latest/platform-pai-studio-int. html
交互式环境:标准 Jupyter notebook
二、ODPS入口
可以直接 from odps import OPDS 配置账号,也可以from odps.inter import setup, 提供 room 的方式配置到本地,这种方式会自动将信息输入到本地的目录。
Setup 在运行过程中分两种:
一种是 from odps.inter import enter 指定 room 或是不指定,采用插件的方式进入入口。
三、MaxComputeSQL
#异步
>>>inst=o. run sql('select*from dual')
>>>print(inst. get logview address()
>>>inst. wait for success()
#同步
>>>inst=o. execute sql('select*from dual/)
# ipython/jupyter
%load_ext_odps
%enter
%sql select * from dual
select sum(sepal_length)
from products_iris
group by category
#设置运行时参数
.>>>o.execute_sql(' select * from dual’,
hints={‘odps. sql. mapper. split. size':16})
#通过instance tunnel读取SQL执行结果,能读到全部数据
>>>inst=o. execute sql('select * from dual’)
>>>with inst. Open_reader() as reader:
for record in reader:
print(record)
#通过result接口读取SQL执行接口
>>>with o. execute sql('select * from
Dual '). open reader(use_tunnel=False);
for record in reader:
print(record)
五、读写表数据
>>>t=o. get _ table(‘pyodps_iris')
#读数据
>>>with t.open reader() as reader:
for record in reader:
print(record)
#写数据
>>>with t.open_writer() as writer?
record=t. new_record()
record[‘field1']='a'
record[‘field2’]=1
writer. write(record)
#写 pandas DataFrame
>>>import pandas as pd
>>>from odps. df import DataFrame
>>>df = pd.DataFrame([['a',1]], columns=[‘field1','field2']
>>> DataFrame(df). persist('my_odps_able’).//核心代码
但常见的问题是 pandas DataFrame 不能用 writer 写 DataFrame ,所以要写pandas DataFrame 需要 PyODPS 来进行一个接口,直接用 pandas DataFrame来创建一个 presist.
六、直接使用 tunnel 接口读写表数据
>>>from os. tunnel import Table Tunnel
>>>tunnel=Table Tunnel(o)
>>>table=o. get table('my table')
#写数据
>>>upload_session =tunnel. Create_upload _session(table. name)
>>>with upload_session.open_record_writer(0) as writer?
>>> record=table. new record(['a',l])
>>> writer. write(record)
>>>upload session. commit([0])
#读数据
>>> download_session = tunnel.create_download_session(table. name)
>>> withdownload_session.open_record_reader(0, download_session.count) as reader
>>> for record in reader:
>>> #处理record
也是可以直接裸露 tunnel 接口,需要创建一个 download_session,用 tunnel接口的好处是相比较 record_reader 有个细节是对分步式的写数据来说是简单的。
如果是用 DataWorks 写的话先要创建一个 download_session,再分发到每个机器上,在机器上打开相同 id 的 download_session,在把数据写进去,最后需要有一台机器汇总一下,record 后面有一个参数,例如是0的话,本地要有十个,那么第零个写第零个,第一个写第一个,第十个写第十个,那在最终会有一个写了多少数据, commite list 后面 就要写0,1,2·······……这样就可以直接裸露 tunnel 接口来完成分步式数据的上传。
但如果用 open table Reader 是很难做出分布式的效果
PyODPS DataFrame 框架:
提供一个功能非常强大的框架,这个框架是设计成了前后端的,简单来说是写了一个代码,可以在不同的后端执行,可以是本地、提交执行、数据库上。创建的代码方式是一样的只是执行不一样,比如用 table 创建了一个 PyODPS DataFrame 那就是在 MySQL 上执行,是可以很方便的进行一些调试
PoODPS DataFrame 的背景:
像传统 Scipy stack 的一些同学会更习惯 python 的数据分析的技术栈,让numpy、scipy 这种好用的数据库。
Pandas 的表达力非常好但是只能单机计算、数据大小要小于内存;ODPS(SQL) 是适合与大数据的分析但表达力(contro、flow等)不够好。结合这两者的优点,提供了 PoODPS DataFrame ,它具有 pandas-like 语法,还是基于 pandas 的语法但又有些不同,还是不能提供索引;具有拓展适合大数据语法:MapReduce API ;还可以利用 ODPS 的计算能力;以及根据数据量选择seahawkes 或者是 ODPS 的后端,虚线中的部分就是其功能。
七、Pandas-like API
列选择: iris['sepal width'. ‘sepal_length’]
过滤:
iris[(iris. species=='Iris-setosa')& (iris. sepal_length>5.0)]//这个且只能用&,或用|
计算: (iris. sepal_length*10). log(
自定义函数:iris. sepal_length. map(lambdax.x+1)
聚合: iris. sepal_length.mean()
分组聚合:
iris.groupby('species').agg( shortest=iris.petal_length.min().longest=iris.petal length.max0. average=iris,petal lenath. mean()).
……
Join,MapReduce……
好处在写代码过程中会有一些错误检查,所以对一些类型可以进行检查,不用等到运行
八、多后端支持
假设这里有一个代码就可以通过 Max Computer sql 或是 DataFrame 其他来完成,但编译出来的是不一样的,但结果是一样的。
MapReduce 等语法增强:
MapReduce 在大数据库里操作类型是非常的多,可以做很多的拓展,在这里是很轻松的写出来。
Mapper 对每一行进行处理 ,reduce 会进行分组把相同的字段的列分组到一起,reduce(key) 是指定一个字段进行聚合处理,这里创建一个 count,采用闭包的方式。
案例1:分组 kmeans
from opds.df import output
N_CENTER = 4
center_fields = [‘c_(0)_(1)’.format(i_center, i_field) for i_center in range (N_ CENTER) for i_field in range (3)]
#output(['label’]*center_fields,['int']+['float']*len(center_fields))
def train(keys):
import numpy as np
from aklearn.clustor import MiniBatchKMeans
model = MiniBatchKMeans(s_clusters=4,random_ atate=0batch size=10)
def h(row,done):
buffer.append(np. asarray(row(1:))
if len(buffer)>=10 or done:
model.partial_fit(no,asarray(but(or)?
del buffer[1]
if done:
centers=model.cluster_centers_
return[int(rowl))]+np.ravel(centers). tolist()
return h
libraries=['scipy.zip',’sklearn. zip']
df.map_reduce(reducer=train, group-[‘lable’]). execute
最佳实践
1.少用 open,reader,多用 DataFram e(从 Table 创建)处理数据
2.使用 pandas 后端本地调试
三方库
1.弄清楚函数在哪里运行:https://yq. aliyun. com/articles/704713
2.在 PyODPSDataFrame 的自定义函数里使用三方库:
pandas、scipy、scikit-learn:https://yq. aliyun. com/articles/591508
nltk:https://www. attatech. org/articles/97526
talib:https://www. attatech. org/articles/81863
XGBoost:https://www. attatch. org/articles/116529
PyODPS DataFrame 实现原理简介
首先在 PyODPS DataFrame 中写一个代码会有以下图的结构框架,但不会计算,等到真正计算的时候会根据不同的来源来做一个运行在执行之前会有可视化、Optimizer 操作合并的优化可以用 df.realize 来进行查看。
在真正执行的时候会根据不同的后端选择不同的执行器
>>>local_df=read cosv(-…). exclude(…)
>>>remote_df-DataFrame(odps. get_table('my_table'))
>>>remote_df2-remote_df. groupby(field'). agg(…). cache()
>>>remote_df3-remote_df2(remotedf2. field<10)
>>>df=local_df. join(remote_df3, on=‘id')