pandas+odps实现批量化表信息解析

简介: 最近在做数仓建设,发现很多弊病都来自ods的数据没有探测好,之前都是人手工探查,看具体的字段注释是否是枚举或者最大值最小值等判定,再加上后续数据采集方式的调整,再次探查的成本会增大,我就寻思能够利用pandas来分析odps的数据。

首先是大致思路,如果是数值类型,则当做度量的数据,而如果是日期类型则计算月维度下的分组情况,如果是其他类型则默认为object;先计算缺失值和去重值,如果枚举值等于1个,则为唯一值,不参与计算,不超过三个,则认为其为维度字段,非数值类型的只计算五个枚举值,缺失值和量级;数值类型则当做度量参与到后续分组汇总计算。

汇总计算主要会集中在有维度,无维度以及日期为月上,求最大值,最小值,中位数,平均数,方差和标准差。

上面都是差不多的优点,下面说缺点:

  • 不支持复杂类型的判断
  • 不支持多维度的度量计算
  • 如果需要根据日期升频或者降频需要改代码
  • 速度转换慢,从odps的df到pd的df,没有采用to_pandas的方法,因为之前使用速度很慢,而是取消将数据写入到pd的df,但速度也仅仅是1000条/1秒
  • 输出格式未完成整理好,可能需要一定成本的整理
  • 如果想进行多表操作,需要改代码

使用前有三个地方需要写

main方法中的table_name用来写表名

odps的id,key以及project

写出文件的位置

# 1.判断数据类型# 2.如果是字符串类型,返回七个枚举值,一个枚举值如果长度超过10,后面的字符替换为...# 3.如果字符串类型的枚举值不超过五个,则默认为维度字段,参与到后续计算# 4.如果是日期类型,则取最大值和最小值,并取有维度时每个维度的最大值和最小值# 5.如果数字类型枚举值不超过三个,当做字符串类型计算# 6.如果是数字类型,则取最大值,最小值,中位数,平均数,方差和标准差;以及各维度状态下的...# 7.所有类型都统计缺失值,数据50%的数据# 8.读取odps的数据,解析odps的数据情况# 9.10万条数据解析耗时:# 开始运行 2022-12-14 10:49:47.627770# 开始读取odps 2022-12-14 10:49:48.162656# 开始转换df类型 2022-12-14 10:51:27.100467# 转换odps为pd完成 2022-12-14 10:51:29.562657# 已解析出度量和维度 2022-12-14 10:51:30.925895# 解析完成,开始写出 2022-12-14 10:51:33.160971# 写出成功 2022-12-14 10:51:33.165625fromcollectionsimportdefaultdictimportpandasaspdimportnumpyasnpimportdatetimefromodpsimportODPSfromodps.dfimportDataFrame# 配置odps参数o=ODPS(
access_id='',  # 登陆账号secret_access_key='',  # 登陆密码project='',  # odps上的项目名称endpoint='http://service.odps.aliyun.com/api'# 官方提供的接口)
# 获得数据类型defget_odps_schema(table_name):
df_odps=DataFrame(o.get_table(table_name))
returnlist(df_odps.dtypes)
# 获得原始数据的类型defclean_schema(list_schema):
# print(list_schema[1])end_list=list()
foriinrange(len(list_schema)):
# print(list_schema[i]) 替换脏数据tmp_list=str(list_schema[i]).replace("<column", "").replace("type ", "").replace(">", "").split(',')
end_list.append([tmp_list[0].strip(), tmp_list[1].strip()])
# print(end_list)returnend_list# 直接将maxcomputer的数据转换为pandas的dataframedefexe_sql(sql, schema_type):
data= []
count_i=1witho.execute_sql(sql).open_reader(tunnel=True) asreader:
d=defaultdict(list)  # collection默认一个dictforrecordinreader:
# 速度大概1000/1s 有点慢print("已解析:"+str(count_i), datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'))
count_i=count_i+1forresinrecord:
d[res[0]].append(res[1])  # 解析record中的每一个元组,存储方式为(k,v),以k作为key,存储每一列的内容;data=pd.DataFrame.from_dict(d, orient='index').T# 转换为数据框,并转置,不转置的话是横条数据print("开始转换df类型", datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'))
forschema_item_typeinschema_type:
# print(schema_item_type)ifschema_item_type[1] =="datetime":
passelse:
data[schema_item_type[0]] =data[schema_item_type[0]].astype(schema_item_type[1])
# print(data.dtypes)returndata# 将df隐藏行解开pd.set_option('display.max_columns', None)
# from io import StringIOall_s=list()
end_path="end.txt"# 将数据写入数组 排序defprintx(str_value):
all_s.append(str_value+"\n")
# 数据写出defwrite_csv(all_s, end_path):
all_s.sort()
withopen(end_path, 'w', encoding='utf-8') asfile:
forsinall_s:
file.write(s)
# 获得列的名称和数据类型defgetType(obj, col):
obj_type=obj.dtype.namereturn [col, obj_type]
# 获得列空值率的数量defgetNone_count(data):
returndata.isnull().sum(axis=0)
# 返回数据去重后的数据情况defget_discount(data):
list_data=data.drop_duplicates().head(5).valuesreturnlist(list_data)
# 入参为列名,以及维度listdefget_weidu(col_name, weidu_list):
weidu_list.add(col_name)
# 获得去重后的数据量级defdistinct_num(data):
returndata.drop_duplicates().count()
# 获得日期的最大值和最小值defgetdate_fun(data, col_name, rq_list):
max_data=data.max()
min_data=data.min()
# gncd = getNone_count(data)gncds=str(getNone_count(data))
printx("日期列 "+col_name+" 的缺失量为:"+gncds)
printx("日期列 "+col_name+" 最大值为:"+str(max_data) +",最小值为:"+str(min_data))
get_weidu(col_name, rq_list)
# 分组求度量defget_group_num_fun(data, weidu):
# printx(data.head())max_data=data.groupby(weidu).max()
min_data=data.groupby(weidu).min()
# 中位数med_data=data.groupby(weidu).median()
# 平均数mea_data=data.groupby(weidu).mean()
# 方差var_data=data.groupby(weidu).var()
# 标准差std_data=data.groupby(weidu).std()
printx("维度为"+weidu+"时,度量数据如下"+str(
        [{"最大值": max_data}, {"最小值": min_data}, {"中位数": med_data}, {"平均数": mea_data}, {"方差": var_data},
         {"标准差": std_data}]))
# 返回数据最大值,最小值,中位数,平均数,方差和标准差defget_num_fun(data):
# print(data)max_data=data.max()
min_data=data.min()
# 中位数med_data=data.median()
# 平均数mea_data=data.mean()
# 方差var_data=data.var()
# 标准差std_data=data.std()
return [{"最大值": max_data}, {"最小值": min_data}, {"中位数": med_data}, {"平均数": mea_data}, {"方差": var_data},
            {"标准差": std_data}]
# 返回数字类型的最大值等defget_num(data, col_name, weidu_list, wy_list, dl_list):
# print(col_name)# 缺失值量gncd=getNone_count(data)
gncds=str(getNone_count(data))
# 去重值量dnd=distinct_num(data)
# print(gncd)gdd=str(get_discount(data))
six_num=get_num_fun(data.dropna())
ifdnd==1andgncd==0:
printx("唯一数值列 "+col_name+" 的值为:"+gdd)
get_weidu(col_name, wy_list)
elifdnd<=3:
printx("维度数值列 "+col_name+" 的缺失量为:"+gncds)
printx("维度数值列 "+col_name+" 的枚举值为:"+gdd)
get_weidu(col_name, weidu_list)
else:
printx("普通数值列 "+col_name+" 的缺失量为:"+gncds)
printx("普通数值列 "+col_name+" 的枚举值为:"+gdd)
printx("普通数值列 "+col_name+"数据情况:"+str(six_num))
get_weidu(col_name, dl_list)
# 字符串类型进入本循环defgetObjValue(data, col_name, weidu_list, wy_list, str_list):
# 缺失值量gncd=getNone_count(data)
gncds=str(getNone_count(data))
# 去重值量dnd=distinct_num(data)
gdd=str(get_discount(data))
ifdnd==1andgncd!=0:
printx("唯一字符串列 "+col_name+" 的值为:"+gdd)
# printx(get_discount(data))get_weidu(col_name, wy_list)
elifdnd<=3:
printx("维度字符串列 "+col_name+" 的缺失量为:"+gncds)
printx("维度字符串列 "+col_name+" 的枚举值为:"+gdd)
# printx(get_discount(data))get_weidu(col_name, weidu_list)
else:
printx("普通字符串列 "+col_name+" 的缺失量为:"+gncds)
printx("普通字符串列 "+col_name+" 的枚举值为:"+gdd)
# printx(get_discount(data))get_weidu(col_name, str_list)
defwaring_set(oth_list):
iflen(oth_list) >0:
printx("出现未分组数据,字段和类型如下"+str(oth_list))
else:
pass# 主方法if__name__=='__main__':
print("开始运行", datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'))
# 原始数据table_name=""schema_type=clean_schema(get_odps_schema(table_name))
print("开始读取odps", datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'))
df=exe_sql("select *  from "+table_name+" where ds='20221213' limit 100000", schema_type)
print("转换odps为pd完成", datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'))
# df = pd.read_csv("testdata.csv", encoding='utf8')# print(df.dtypes)# 清除\N为None后的数据nodf=df.replace("\\N", np.NaN)
# print(nodf["updatetime"])# 将日期类型从object转化为datetime# nodf["updatetime"] = nodf["updatetime"].apply(lambda t: pd.to_datetime(t))# nodf["createtime"] = nodf["createtime"].apply(lambda t: pd.to_datetime(t))# 获得列名+类型col_name_type=list()
# 维度列weidu_list=set()
# 唯一值所在的列wy_list=set()
# 字符串所在的列str_list=set()
# 日期列rq_list=set()
# 其他列oth_list=set()
# 度量列dl_list=set()
# 获得列类型forcolinlist(nodf):
col_name_type.append(getType(nodf[col], col))
# 根据列类型进行分组分析forcol_nameincol_name_type:
ifcol_name[1] =='object'orcol_name[1] =='string':
getObjValue(nodf[col_name[0]], col_name[0], weidu_list, wy_list, str_list)
# for col_name in col_name_type:elifcol_name[1] =='int64'orcol_name[1] =='float64':
get_num(nodf[col_name[0]], col_name[0], weidu_list, wy_list, dl_list)
# for col_name in col_name_type:elifcol_name[1] =='datetime64[ns]':
getdate_fun(nodf[col_name[0]], col_name[0], rq_list)
else:
oth_list.add([col_name[0], col_name[1]])
waring_set(oth_list)
print("已解析出度量和维度", datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'))
forweiduinweidu_list:
# 将维度增加到度量的set中dl_list.add(weidu)
get_group_num_fun(nodf[list(dl_list)], weidu)
# 将维度从度量的set中挪开dl_list.remove(weidu)
# 根据日期进行分组# print(rq_list)# print(dl_list)forrqinrq_list:
# print(rq)dl_list.add(rq)
rq_nodf=nodf[list(dl_list)]
rq_nodf_copy=rq_nodf.copy()
# 将时间置换为月rq_nodf_copy[rq] =rq_nodf[rq].dt.month# rq_nodf_copy.get_group_num_fun(rq_nodf_copy, rq)
# print(rq_nodf.std())# 将日期列置换为索引# rq_nodf.index = rq_nodf[rq]# 删除日期列# rq_nodf.drop(labels=rq, axis=1)# 索引替换为周# rq_nodf.index = rq_nodf.index.isocalendar().weekdl_list.remove(rq)
# print(rq_nodf.head(5))print("解析完成,开始写出", datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'))
# 写出数据write_csv(all_s, end_path)
print("写出成功", datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'))
相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
目录
相关文章
|
9月前
|
数据采集 搜索推荐 算法
大数据信息SEO优化系统软件
大数据信息SEO优化系统软件(V1.0)是公司基于“驱动企业价值持续增长”战略,针对企业网站、电商平台及内容营销场景深度定制的智能化搜索引擎优化解决方案。该软件以“提升搜索排名、精准引流获客”为核心目标,通过整合全网数据采集、智能关键词挖掘、内容质量分析、外链健康度监测等功能模块,为企业构建从数据洞察到策略落地的全链路SEO优化体系,助力品牌高效提升搜索引擎曝光度,实现从流量获取到商业转化的价值升级。
203 2
|
9月前
|
存储 分布式计算 Hadoop
Hadoop框架解析:大数据处理的核心技术
组件是对数据和方法的封装,从用户角度看是实现特定功能的独立黑盒子,能够有效完成任务。组件,也常被称作封装体,是对数据和方法的简洁封装形式。从用户的角度来看,它就像是一个实现了特定功能的黑盒子,具备输入和输出接口,能够独立完成某些任务。
|
9月前
|
人工智能 分布式计算 DataWorks
多模态数据处理新趋势:阿里云ODPS技术栈深度解析与未来展望
阿里云ODPS技术栈通过MaxCompute、Object Table与MaxFrame等核心组件,实现了多模态数据的高效处理与智能分析。该架构支持结构化与非结构化数据的统一管理,并深度融合AI能力,显著降低了分布式计算门槛,推动企业数字化转型。未来,其在智慧城市、数字医疗、智能制造等领域具有广泛应用前景。
760 6
多模态数据处理新趋势:阿里云ODPS技术栈深度解析与未来展望
|
11月前
|
人工智能 分布式计算 大数据
MCP、MaxFrame与大数据技术全景解析
本文介绍了 MCP 协议、MaxFrame 分布式计算框架以及大数据基础设施建设的相关内容。MCP(Model Context Protocol)是一种开源协议,旨在解决 AI 大模型与外部数据源及工具的集成问题,被比喻为大模型的“USB 接口”,通过统一交互方式降低开发复杂度。其核心架构包括 Client、Server、Tool 和 Schema 四个关键概念,并在百炼平台中得到实践应用。MaxFrame 是基于 Python 的高性能分布式计算引擎,支持多模态数据处理与 AI 集成,结合 MaxCompute 提供端到端的数据处理能力。
|
存储 搜索推荐 大数据
数据大爆炸:解析大数据的起源及其对未来的启示
数据大爆炸:解析大数据的起源及其对未来的启示
860 15
数据大爆炸:解析大数据的起源及其对未来的启示
|
数据可视化 数据挖掘 定位技术
Pandas数据应用:地理信息系统
本文介绍如何使用Pandas结合地理信息系统(GIS)进行空间数据分析与可视化。Pandas是Python强大的数据处理库,而GIS用于捕获、存储和分析地理数据。通过安装`geopandas`、`matplotlib`等库,可以实现数据加载、转换、空间索引查询、投影变换及可视化等功能。文章详细讲解了常见问题及解决方案,并提供代码案例,帮助读者高效处理地理数据,支持决策分析。
344 26
|
存储 大数据 数据挖掘
Pandas高级数据处理:大数据集处理
Pandas 是强大的 Python 数据分析库,但在处理大规模数据集时可能遇到性能瓶颈和内存不足问题。本文介绍常见问题及解决方案,如分块读取、选择性读取列、数据类型优化、避免不必要的副本创建等技巧,并通过代码示例详细解释。同时,针对 `MemoryError`、`SettingWithCopyWarning` 和 `DtypeWarning` 等常见报错提供解决方法,帮助读者更高效地处理大数据集。
622 16
|
存储 分布式计算 大数据
大数据揭秘:从数据湖到数据仓库的全面解析
大数据揭秘:从数据湖到数据仓库的全面解析
427 19
|
Serverless 对象存储 人工智能
智能文件解析:体验阿里云多模态信息提取解决方案
在当今数据驱动的时代,信息的获取和处理效率直接影响着企业决策的速度和质量。然而,面对日益多样化的文件格式(文本、图像、音频、视频),传统的处理方法显然已经无法满足需求。
538 4
智能文件解析:体验阿里云多模态信息提取解决方案
|
文字识别 开发者 数据处理
多模态数据信息提取解决方案评测报告!
阿里云推出的《多模态数据信息提取》解决方案,利用AI技术从文本、图像、音频和视频中提取关键信息,支持多种应用场景,大幅提升数据处理效率。评测涵盖部署体验、文档清晰度、模板简化、示例验证及需求适配性等方面。方案表现出色,部署简单直观,功能强大,适合多种业务场景。建议增加交互提示、多语言支持及优化OCR和音频转写功能...
496 3
多模态数据信息提取解决方案评测报告!

推荐镜像

更多
  • DNS