pandas+odps实现批量化表信息解析

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云解析 DNS,旗舰版 1个月
简介: 最近在做数仓建设,发现很多弊病都来自ods的数据没有探测好,之前都是人手工探查,看具体的字段注释是否是枚举或者最大值最小值等判定,再加上后续数据采集方式的调整,再次探查的成本会增大,我就寻思能够利用pandas来分析odps的数据。

首先是大致思路,如果是数值类型,则当做度量的数据,而如果是日期类型则计算月维度下的分组情况,如果是其他类型则默认为object;先计算缺失值和去重值,如果枚举值等于1个,则为唯一值,不参与计算,不超过三个,则认为其为维度字段,非数值类型的只计算五个枚举值,缺失值和量级;数值类型则当做度量参与到后续分组汇总计算。

汇总计算主要会集中在有维度,无维度以及日期为月上,求最大值,最小值,中位数,平均数,方差和标准差。

上面都是差不多的优点,下面说缺点:

  • 不支持复杂类型的判断
  • 不支持多维度的度量计算
  • 如果需要根据日期升频或者降频需要改代码
  • 速度转换慢,从odps的df到pd的df,没有采用to_pandas的方法,因为之前使用速度很慢,而是取消将数据写入到pd的df,但速度也仅仅是1000条/1秒
  • 输出格式未完成整理好,可能需要一定成本的整理
  • 如果想进行多表操作,需要改代码

使用前有三个地方需要写

main方法中的table_name用来写表名

odps的id,key以及project

写出文件的位置

# 1.判断数据类型# 2.如果是字符串类型,返回七个枚举值,一个枚举值如果长度超过10,后面的字符替换为...# 3.如果字符串类型的枚举值不超过五个,则默认为维度字段,参与到后续计算# 4.如果是日期类型,则取最大值和最小值,并取有维度时每个维度的最大值和最小值# 5.如果数字类型枚举值不超过三个,当做字符串类型计算# 6.如果是数字类型,则取最大值,最小值,中位数,平均数,方差和标准差;以及各维度状态下的...# 7.所有类型都统计缺失值,数据50%的数据# 8.读取odps的数据,解析odps的数据情况# 9.10万条数据解析耗时:# 开始运行 2022-12-14 10:49:47.627770# 开始读取odps 2022-12-14 10:49:48.162656# 开始转换df类型 2022-12-14 10:51:27.100467# 转换odps为pd完成 2022-12-14 10:51:29.562657# 已解析出度量和维度 2022-12-14 10:51:30.925895# 解析完成,开始写出 2022-12-14 10:51:33.160971# 写出成功 2022-12-14 10:51:33.165625fromcollectionsimportdefaultdictimportpandasaspdimportnumpyasnpimportdatetimefromodpsimportODPSfromodps.dfimportDataFrame# 配置odps参数o=ODPS(
access_id='',  # 登陆账号secret_access_key='',  # 登陆密码project='',  # odps上的项目名称endpoint='http://service.odps.aliyun.com/api'# 官方提供的接口)
# 获得数据类型defget_odps_schema(table_name):
df_odps=DataFrame(o.get_table(table_name))
returnlist(df_odps.dtypes)
# 获得原始数据的类型defclean_schema(list_schema):
# print(list_schema[1])end_list=list()
foriinrange(len(list_schema)):
# print(list_schema[i]) 替换脏数据tmp_list=str(list_schema[i]).replace("<column", "").replace("type ", "").replace(">", "").split(',')
end_list.append([tmp_list[0].strip(), tmp_list[1].strip()])
# print(end_list)returnend_list# 直接将maxcomputer的数据转换为pandas的dataframedefexe_sql(sql, schema_type):
data= []
count_i=1witho.execute_sql(sql).open_reader(tunnel=True) asreader:
d=defaultdict(list)  # collection默认一个dictforrecordinreader:
# 速度大概1000/1s 有点慢print("已解析:"+str(count_i), datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'))
count_i=count_i+1forresinrecord:
d[res[0]].append(res[1])  # 解析record中的每一个元组,存储方式为(k,v),以k作为key,存储每一列的内容;data=pd.DataFrame.from_dict(d, orient='index').T# 转换为数据框,并转置,不转置的话是横条数据print("开始转换df类型", datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'))
forschema_item_typeinschema_type:
# print(schema_item_type)ifschema_item_type[1] =="datetime":
passelse:
data[schema_item_type[0]] =data[schema_item_type[0]].astype(schema_item_type[1])
# print(data.dtypes)returndata# 将df隐藏行解开pd.set_option('display.max_columns', None)
# from io import StringIOall_s=list()
end_path="end.txt"# 将数据写入数组 排序defprintx(str_value):
all_s.append(str_value+"\n")
# 数据写出defwrite_csv(all_s, end_path):
all_s.sort()
withopen(end_path, 'w', encoding='utf-8') asfile:
forsinall_s:
file.write(s)
# 获得列的名称和数据类型defgetType(obj, col):
obj_type=obj.dtype.namereturn [col, obj_type]
# 获得列空值率的数量defgetNone_count(data):
returndata.isnull().sum(axis=0)
# 返回数据去重后的数据情况defget_discount(data):
list_data=data.drop_duplicates().head(5).valuesreturnlist(list_data)
# 入参为列名,以及维度listdefget_weidu(col_name, weidu_list):
weidu_list.add(col_name)
# 获得去重后的数据量级defdistinct_num(data):
returndata.drop_duplicates().count()
# 获得日期的最大值和最小值defgetdate_fun(data, col_name, rq_list):
max_data=data.max()
min_data=data.min()
# gncd = getNone_count(data)gncds=str(getNone_count(data))
printx("日期列 "+col_name+" 的缺失量为:"+gncds)
printx("日期列 "+col_name+" 最大值为:"+str(max_data) +",最小值为:"+str(min_data))
get_weidu(col_name, rq_list)
# 分组求度量defget_group_num_fun(data, weidu):
# printx(data.head())max_data=data.groupby(weidu).max()
min_data=data.groupby(weidu).min()
# 中位数med_data=data.groupby(weidu).median()
# 平均数mea_data=data.groupby(weidu).mean()
# 方差var_data=data.groupby(weidu).var()
# 标准差std_data=data.groupby(weidu).std()
printx("维度为"+weidu+"时,度量数据如下"+str(
        [{"最大值": max_data}, {"最小值": min_data}, {"中位数": med_data}, {"平均数": mea_data}, {"方差": var_data},
         {"标准差": std_data}]))
# 返回数据最大值,最小值,中位数,平均数,方差和标准差defget_num_fun(data):
# print(data)max_data=data.max()
min_data=data.min()
# 中位数med_data=data.median()
# 平均数mea_data=data.mean()
# 方差var_data=data.var()
# 标准差std_data=data.std()
return [{"最大值": max_data}, {"最小值": min_data}, {"中位数": med_data}, {"平均数": mea_data}, {"方差": var_data},
            {"标准差": std_data}]
# 返回数字类型的最大值等defget_num(data, col_name, weidu_list, wy_list, dl_list):
# print(col_name)# 缺失值量gncd=getNone_count(data)
gncds=str(getNone_count(data))
# 去重值量dnd=distinct_num(data)
# print(gncd)gdd=str(get_discount(data))
six_num=get_num_fun(data.dropna())
ifdnd==1andgncd==0:
printx("唯一数值列 "+col_name+" 的值为:"+gdd)
get_weidu(col_name, wy_list)
elifdnd<=3:
printx("维度数值列 "+col_name+" 的缺失量为:"+gncds)
printx("维度数值列 "+col_name+" 的枚举值为:"+gdd)
get_weidu(col_name, weidu_list)
else:
printx("普通数值列 "+col_name+" 的缺失量为:"+gncds)
printx("普通数值列 "+col_name+" 的枚举值为:"+gdd)
printx("普通数值列 "+col_name+"数据情况:"+str(six_num))
get_weidu(col_name, dl_list)
# 字符串类型进入本循环defgetObjValue(data, col_name, weidu_list, wy_list, str_list):
# 缺失值量gncd=getNone_count(data)
gncds=str(getNone_count(data))
# 去重值量dnd=distinct_num(data)
gdd=str(get_discount(data))
ifdnd==1andgncd!=0:
printx("唯一字符串列 "+col_name+" 的值为:"+gdd)
# printx(get_discount(data))get_weidu(col_name, wy_list)
elifdnd<=3:
printx("维度字符串列 "+col_name+" 的缺失量为:"+gncds)
printx("维度字符串列 "+col_name+" 的枚举值为:"+gdd)
# printx(get_discount(data))get_weidu(col_name, weidu_list)
else:
printx("普通字符串列 "+col_name+" 的缺失量为:"+gncds)
printx("普通字符串列 "+col_name+" 的枚举值为:"+gdd)
# printx(get_discount(data))get_weidu(col_name, str_list)
defwaring_set(oth_list):
iflen(oth_list) >0:
printx("出现未分组数据,字段和类型如下"+str(oth_list))
else:
pass# 主方法if__name__=='__main__':
print("开始运行", datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'))
# 原始数据table_name=""schema_type=clean_schema(get_odps_schema(table_name))
print("开始读取odps", datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'))
df=exe_sql("select *  from "+table_name+" where ds='20221213' limit 100000", schema_type)
print("转换odps为pd完成", datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'))
# df = pd.read_csv("testdata.csv", encoding='utf8')# print(df.dtypes)# 清除\N为None后的数据nodf=df.replace("\\N", np.NaN)
# print(nodf["updatetime"])# 将日期类型从object转化为datetime# nodf["updatetime"] = nodf["updatetime"].apply(lambda t: pd.to_datetime(t))# nodf["createtime"] = nodf["createtime"].apply(lambda t: pd.to_datetime(t))# 获得列名+类型col_name_type=list()
# 维度列weidu_list=set()
# 唯一值所在的列wy_list=set()
# 字符串所在的列str_list=set()
# 日期列rq_list=set()
# 其他列oth_list=set()
# 度量列dl_list=set()
# 获得列类型forcolinlist(nodf):
col_name_type.append(getType(nodf[col], col))
# 根据列类型进行分组分析forcol_nameincol_name_type:
ifcol_name[1] =='object'orcol_name[1] =='string':
getObjValue(nodf[col_name[0]], col_name[0], weidu_list, wy_list, str_list)
# for col_name in col_name_type:elifcol_name[1] =='int64'orcol_name[1] =='float64':
get_num(nodf[col_name[0]], col_name[0], weidu_list, wy_list, dl_list)
# for col_name in col_name_type:elifcol_name[1] =='datetime64[ns]':
getdate_fun(nodf[col_name[0]], col_name[0], rq_list)
else:
oth_list.add([col_name[0], col_name[1]])
waring_set(oth_list)
print("已解析出度量和维度", datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'))
forweiduinweidu_list:
# 将维度增加到度量的set中dl_list.add(weidu)
get_group_num_fun(nodf[list(dl_list)], weidu)
# 将维度从度量的set中挪开dl_list.remove(weidu)
# 根据日期进行分组# print(rq_list)# print(dl_list)forrqinrq_list:
# print(rq)dl_list.add(rq)
rq_nodf=nodf[list(dl_list)]
rq_nodf_copy=rq_nodf.copy()
# 将时间置换为月rq_nodf_copy[rq] =rq_nodf[rq].dt.month# rq_nodf_copy.get_group_num_fun(rq_nodf_copy, rq)
# print(rq_nodf.std())# 将日期列置换为索引# rq_nodf.index = rq_nodf[rq]# 删除日期列# rq_nodf.drop(labels=rq, axis=1)# 索引替换为周# rq_nodf.index = rq_nodf.index.isocalendar().weekdl_list.remove(rq)
# print(rq_nodf.head(5))print("解析完成,开始写出", datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'))
# 写出数据write_csv(all_s, end_path)
print("写出成功", datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'))
相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
3月前
|
DataWorks 安全 定位技术
DataWorks产品使用合集之数据地图分区信息记录数都是-1是什么导致的
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
21天前
|
自然语言处理 数据可视化 前端开发
从数据提取到管理:合合信息的智能文档处理全方位解析【合合信息智能文档处理百宝箱】
合合信息的智能文档处理“百宝箱”涵盖文档解析、向量化模型、测评工具等,解决了复杂文档解析、大模型问答幻觉、文档解析效果评估、知识库搭建、多语言文档翻译等问题。通过可视化解析工具 TextIn ParseX、向量化模型 acge-embedding 和文档解析测评工具 markdown_tester,百宝箱提升了文档处理的效率和精确度,适用于多种文档格式和语言环境,助力企业实现高效的信息管理和业务支持。
3960 5
从数据提取到管理:合合信息的智能文档处理全方位解析【合合信息智能文档处理百宝箱】
|
3月前
|
机器学习/深度学习 供应链 大数据
【2023Mathorcup大数据】B题 电商零售商家需求预测及库存优化问题 python代码解析
本文提供了2023年MathorCup大数据竞赛B题的电商零售商家需求预测及库存优化问题的Python代码解析,涉及数据预处理、特征工程、时间序列预测、聚类分析以及模型预测性能评价等步骤。
180 0
|
1月前
|
机器学习/深度学习 自然语言处理 JavaScript
信息论、机器学习的核心概念:熵、KL散度、JS散度和Renyi散度的深度解析及应用
在信息论、机器学习和统计学领域中,KL散度(Kullback-Leibler散度)是量化概率分布差异的关键概念。本文深入探讨了KL散度及其相关概念,包括Jensen-Shannon散度和Renyi散度。KL散度用于衡量两个概率分布之间的差异,而Jensen-Shannon散度则提供了一种对称的度量方式。Renyi散度通过可调参数α,提供了更灵活的散度度量。这些概念不仅在理论研究中至关重要,在实际应用中也广泛用于数据压缩、变分自编码器、强化学习等领域。通过分析电子商务中的数据漂移实例,展示了这些散度指标在捕捉数据分布变化方面的独特优势,为企业提供了数据驱动的决策支持。
61 2
信息论、机器学习的核心概念:熵、KL散度、JS散度和Renyi散度的深度解析及应用
|
1月前
|
机器学习/深度学习 搜索推荐 大数据
深度解析:如何通过精妙的特征工程与创新模型结构大幅提升推荐系统中的召回率,带你一步步攻克大数据检索难题
【10月更文挑战第2天】在处理大规模数据集的推荐系统项目时,提高检索模型的召回率成为关键挑战。本文分享了通过改进特征工程(如加入用户活跃时段和物品相似度)和优化模型结构(引入注意力机制)来提升召回率的具体策略与实现代码。严格的A/B测试验证了新模型的有效性,为改善用户体验奠定了基础。这次实践加深了对特征工程与模型优化的理解,并为未来的技术探索提供了方向。
90 2
深度解析:如何通过精妙的特征工程与创新模型结构大幅提升推荐系统中的召回率,带你一步步攻克大数据检索难题
|
21天前
|
存储 机器学习/深度学习 大数据
量子计算与大数据:处理海量信息的新方法
【10月更文挑战第31天】量子计算凭借其独特的量子比特和量子门技术,为大数据处理带来了革命性的变革。相比传统计算机,量子计算在计算效率、存储容量及并行处理能力上具有显著优势,能有效应对信息爆炸带来的挑战。本文探讨了量子计算如何通过量子叠加和纠缠等原理,加速数据处理过程,提升计算效率,特别是在金融、医疗和物流等领域中的具体应用案例,同时也指出了量子计算目前面临的挑战及其未来的发展方向。
|
29天前
|
人工智能 前端开发 JavaScript
拿下奇怪的前端报错(一):报错信息是一个看不懂的数字数组Buffer(475) [Uint8Array],让AI大模型帮忙解析
本文介绍了前端开发中遇到的奇怪报错问题,特别是当错误信息不明确时的处理方法。作者分享了自己通过还原代码、试错等方式解决问题的经验,并以一个Vue3+TypeScript项目的构建失败为例,详细解析了如何从错误信息中定位问题,最终通过解读错误信息中的ASCII码找到了具体的错误文件。文章强调了基础知识的重要性,并鼓励读者遇到类似问题时不要慌张,耐心分析。
|
7天前
|
并行计算 数据挖掘 大数据
Python数据分析实战:利用Pandas处理大数据集
Python数据分析实战:利用Pandas处理大数据集
|
1月前
|
存储 SQL 分布式计算
湖仓一体架构深度解析:构建企业级数据管理与分析的新基石
【10月更文挑战第7天】湖仓一体架构深度解析:构建企业级数据管理与分析的新基石
64 1
|
1月前
|
数据采集 监控 数据挖掘
CSV文件自动化生成:用Pandas与Datetime高效处理京东商品信息
在电商竞争激烈的背景下,实时掌握商品价格和库存信息至关重要。本文介绍如何使用Python的`pandas`和`datetime`库从京东抓取商品名称、价格等信息,并生成CSV文件。结合代理IP技术,提升爬取效率和稳定性。通过设置请求头、使用代理IP和多线程技术,确保数据抓取的连续性和成功率。最终,数据将以带时间戳的CSV文件形式保存,方便后续分析。

热门文章

最新文章

推荐镜像

更多