PyODPS 基本操作 | 学习笔记

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
DataWorks Serverless资源组免费试用套餐,300CU*H 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 快速学习 PyODPS 基本操作

开发者学堂课程【 SaaS  模式云数据仓库系列课程 —— 2021数仓必修课PyODPS  基本操作】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/55/detail/1037


PyODPS  基本操作


内容简介:

一、开箱即用的环境

二、ODPS 入口

三、MaxComputer SQL

四、读写表数据

五、直接使用  tunnel 接口读写表数据

六、Pandas-like API

七、多后端支持:

八、PyODPS DataFrame  实现原理简介


PyODS  还提供了开箱即用的环境首先是 DataWorks,在DataWorks  上面新建一个数据开发就会有  PyODPS  节点,打开节点之后可以进行  python  代码;

第二个在节点上面会有  PyODPS  文档,在这个节点上面的好处就是不需要新建一个账号,可以直接查询,直接在  DataWorks  上面用一个全局变量  ODPS  ,然后直接  SQL  默认没有开启  instance tunnel,DataFrame  需要  execute  等触发执行。


一、开箱即用的环境:PAI

数据——PyODPS(将来可能是notebook)

文档:

http://pyodps.alibaba.net/products-docs/zh-CN/latest/platform-pai-studio-int.  html

交互式环境:标准  Jupyter notebook


二、ODPS入口

可以直接  from odps import OPDS  配置账号,也可以from odps.inter import setup,  提供  room  的方式配置到本地,这种方式会自动将信息输入到本地的目录。

Setup  在运行过程中分两种:

一种是 from odps.inter import enter  指定  room  或是不指定,采用插件的方式进入入口。

image.png



三、MaxComputeSQL

#异步

>>>inst=o. run sql('select*from dual')

>>>print(inst. get logview address()

>>>inst. wait for success()

#同步

>>>inst=o. execute sql('select*from dual/)

# ipython/jupyter

%load_ext_odps

%enter

%sql select * from dual

select sum(sepal_length)

from products_iris

group by category

#设置运行时参数

.>>>o.execute_sql(' select * from  dual’,

hints={‘odps. sql. mapper. split. size':16})

#通过instance tunnel读取SQL执行结果,能读到全部数据

>>>inst=o. execute sql('select * from dual’)

>>>with inst. Open_reader() as reader:

for record in reader:

print(record)

#通过result接口读取SQL执行接口

>>>with o. execute sql('select * from

Dual '). open reader(use_tunnel=False);

for record in reader:

print(record)

image.png


五、读写表数据

>>>t=o. get _ table(‘pyodps_iris')

#读数据

>>>with t.open reader() as reader:

for record in reader:

print(record)

#写数据

>>>with t.open_writer() as writer?

record=t. new_record()

record[‘field1']='a'

record[‘field2’]=1

writer. write(record)

#写 pandas DataFrame

>>>import pandas as pd

>>>from odps. df import DataFrame

>>>df = pd.DataFrame([['a',1]], columns=[‘field1','field2']

>>> DataFrame(df). persist('my_odps_able’).//核心代码

但常见的问题是  pandas DataFrame  不能用 writer  写  DataFrame  ,所以要写pandas DataFrame  需要  PyODPS  来进行一个接口,直接用  pandas DataFrame来创建一个 presist.

image.png



六、直接使用  tunnel  接口读写表数据

>>>from os. tunnel import Table Tunnel

>>>tunnel=Table Tunnel(o)

>>>table=o. get table('my table')

#写数据

>>>upload_session =tunnel. Create_upload _session(table. name)

>>>with upload_session.open_record_writer(0) as writer?

>>> record=table. new record(['a',l])

>>> writer. write(record)

>>>upload session. commit([0])

#读数据

>>> download_session = tunnel.create_download_session(table. name)

>>> withdownload_session.open_record_reader(0, download_session.count) as reader

>>> for record in reader:

>>> #处理record

也是可以直接裸露  tunnel  接口,需要创建一个  download_session,用  tunnel接口的好处是相比较  record_reader  有个细节是对分步式的写数据来说是简单的。

如果是用  DataWorks  写的话先要创建一个  download_session,再分发到每个机器上,在机器上打开相同  id  的   download_session,在把数据写进去,最后需要有一台机器汇总一下,record  后面有一个参数,例如是0的话,本地要有十个,那么第零个写第零个,第一个写第一个,第十个写第十个,那在最终会有一个写了多少数据, commite list   后面 就要写0,1,2·······……这样就可以直接裸露 tunnel 接口来完成分步式数据的上传。

但如果用    open table Reader   是很难做出分布式的效果

image.png

PyODPS DataFrame  框架:

提供一个功能非常强大的框架,这个框架是设计成了前后端的,简单来说是写了一个代码,可以在不同的后端执行,可以是本地、提交执行、数据库上。创建的代码方式是一样的只是执行不一样,比如用  table  创建了一个  PyODPS DataFrame  那就是在  MySQL  上执行,是可以很方便的进行一些调试

image.png

PoODPS DataFrame  的背景:

像传统  Scipy stack  的一些同学会更习惯  python  的数据分析的技术栈,让numpy、scipy  这种好用的数据库。

Pandas   的表达力非常好但是只能单机计算、数据大小要小于内存;ODPS(SQL)  是适合与大数据的分析但表达力(contro、flow等)不够好。结合这两者的优点,提供了  PoODPS DataFrame  ,它具有 pandas-like 语法,还是基于  pandas  的语法但又有些不同,还是不能提供索引;具有拓展适合大数据语法:MapReduce API  ;还可以利用  ODPS   的计算能力;以及根据数据量选择seahawkes 或者是  ODPS   的后端,虚线中的部分就是其功能。

image.png



七、Pandas-like API

列选择: iris['sepal width'. ‘sepal_length’]

过滤:

iris[(iris. species=='Iris-setosa')& (iris. sepal_length>5.0)]//这个且只能用&,或用|

计算: (iris. sepal_length*10). log(

自定义函数:iris. sepal_length. map(lambdax.x+1)

聚合: iris. sepal_length.mean()

分组聚合:

iris.groupby('species').agg( shortest=iris.petal_length.min().longest=iris.petal length.max0. average=iris,petal lenath. mean()).

……

Join,MapReduce……

好处在写代码过程中会有一些错误检查,所以对一些类型可以进行检查,不用等到运行

image.png



八、多后端支持

假设这里有一个代码就可以通过  Max Computer sql  或是  DataFrame  其他来完成,但编译出来的是不一样的,但结果是一样的。

image.png

MapReduce  等语法增强:

MapReduce  在大数据库里操作类型是非常的多,可以做很多的拓展,在这里是很轻松的写出来。

Mapper  对每一行进行处理  ,reduce  会进行分组把相同的字段的列分组到一起,reduce(key)  是指定一个字段进行聚合处理,这里创建一个  count,采用闭包的方式。

image.png

案例1:分组  kmeans

from opds.df import output

N_CENTER = 4center_fields = [‘c_(0)_(1)’.format(i_center, i_field) for i_center in range (N_ CENTER)  for i_field in range (3)]

#output(['label’]*center_fields,['int']+['float']*len(center_fields))

def train(keys):

import numpy as np

from aklearn.clustor import MiniBatchKMeansmodel = MiniBatchKMeans(s_clusters=4,random_ atate=0batch size=10)

def h(row,done):

buffer.append(np. asarray(row(1:))

if len(buffer)>=10 or done:

model.partial_fit(no,asarray(but(or)?

del buffer[1]

if done:

centers=model.cluster_centers_

return[int(rowl))]+np.ravel(centers). tolist()

return h

libraries=['scipy.zip',’sklearn. zip']

df.map_reduce(reducer=train, group-[‘lable’]). execute

最佳实践

1.少用  open,reader,多用  DataFram e(从  Table  创建)处理数据

2.使用 pandas  后端本地调试

三方库

1.弄清楚函数在哪里运行:https://yq. aliyun. com/articles/704713

2.在  PyODPSDataFrame 的自定义函数里使用三方库:

pandas、scipy、scikit-learn:https://yq. aliyun. com/articles/591508

nltk:https://www. attatech. org/articles/97526

talib:https://www. attatech. org/articles/81863

XGBoost:https://www. attatch. org/articles/116529

PyODPS DataFrame  实现原理简介

首先在  PyODPS DataFrame  中写一个代码会有以下图的结构框架,但不会计算,等到真正计算的时候会根据不同的来源来做一个运行在执行之前会有可视化、Optimizer  操作合并的优化可以用  df.realize  来进行查看。

在真正执行的时候会根据不同的后端选择不同的执行器

>>>local_df=read cosv(-…). exclude(…)

>>>remote_df-DataFrame(odps. get_table('my_table'))

>>>remote_df2-remote_df. groupby(field'). agg(…). cache()

>>>remote_df3-remote_df2(remotedf2. field<10)

>>>df=local_df. join(remote_df3, on=‘id')

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
一站式大数据开发治理平台DataWorks初级课程
DataWorks 从 2009 年开始,十ー年里一直支持阿里巴巴集团内部数据中台的建设,2019 年双 11 稳定支撑每日千万级的任务调度。每天阿里巴巴内部有数万名数据和算法工程师正在使用DataWorks,承了阿里巴巴 99%的据业务构建。本课程主要介绍了阿里巴巴大数据技术发展历程与 DataWorks 几大模块的基本能力。 课程目标 &nbsp;通过讲师的详细讲解与实际演示,学员可以一边学习一边进行实际操作,可以深入了解DataWorks各大模块的使用方式和具体功能,让学员对DataWorks数据集成、开发、分析、运维、安全、治理等方面有深刻的了解,加深对阿里云大数据产品体系的理解与认识。 适合人群 &nbsp;企业数据仓库开发人员 &nbsp;大数据平台开发人员 &nbsp;数据分析师 &nbsp;大数据运维人员 &nbsp;对于大数据平台、数据中台产品感兴趣的开发者
相关文章
|
SQL 数据采集 分布式计算
DataWorks 基本操作演示|学习笔记
快速学习 DataWorks 基本操作演示
4313 0
DataWorks 基本操作演示|学习笔记
|
5月前
|
JSON Shell API
3.基本操作
3.基本操作
|
3月前
|
SQL 分布式计算 数据挖掘
PyODPS
【7月更文挑战第19天】
98 2
|
4月前
|
分布式计算 DataWorks 大数据
MaxCompute操作报错合集之pyODPS导入python包的时候报错,该怎么办
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
5月前
|
分布式计算 DataWorks 关系型数据库
MaxCompute产品使用合集之我需要在MaxCompute客户端添加Python第三方包,我该怎么操作
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
11月前
|
SQL
数据的基本操作
数据的基本操作。
38 1
|
数据处理 Python
|
SQL 数据挖掘 数据格式
Python数据分析(二):DataFrame基本操作
查看数据(查看对象的方法对于Series来说同样适用)
1147 0
|
分布式计算 MaxCompute
PyODPS
PyODPS
157 1
|
数据库
Bartender基本操作
本教程使用的是Bartender10,其他版本的Bartender使用上差不多。