基于pyodps对MaxComputer表的数据探查

简介: 本次脚本设计是针对大批量表,并且没有明确业务支持下的数据探查,会根据不同的类型进行判定,根据结果值进行分析得出结论,并给出一定的建议,同时该脚本仅仅支持普通表,一级分区表和二级分区表;一级分区表的分区字段必须是ds或者pt。

注意:本次的pyodps的版本是0.10.7,别问,问就是dataworks是这个版本。

方法详解

首先是开始,导入的包分别是必备的odps,额外还有自带的os和csv,os和csv可以通过修改代码来替换,之后odps需要配置四个参数,分别是id,key,项目名称和endpoint,如果是初次学习者可以在maxcomputer官方文档搜索odpccmd_public,会有详细介绍。

如果你是在dataworks上跑,可以自行删除这一部分,因为其自带了这一部分。

下面是一个表集合,如果表过多,可以再建一个文件,读取文件来更改table_list的数据,再下面的txt_file是用来写入数据的,每一列的意思和上面注释相同。

第二块是对结果数据的一些判定,这一块可以根据自己的情况来选择不同的入参和出参,同时float的判定是因为大部分结果都是数字类型,为了不再额外进行类型判定以及避免开发者的类型选择有误特意进行的调整。

下面这一部分则是一个优化,如果csv已经创建,而且首行数据和代码相同,则不会删除表数据,而是追加,否则会覆盖重新写入表头,当然没有创建csv则会先创建csv写入表头。

在这里是一个写入的方法,这里接收的data会是一个元组,而且为了规避特殊字符对逗号的干扰,这里进行了一些replace,同时每次写入的时候会进行一个换行。

再下面的一个是判定表是否为分区表,没有dir到合适的方法,只能使用了一个try,然后下面是safeStr是在写代码的时候,方便复用二级分区表的方法,设定的一个数据,其实就是会给一级分区表或者普通表设定一个二级分区None。

再下面的是一个判定方法,根据不同列类型返回不同的值,方便后续维护。

如你所见,依旧是对于分区的判定,没有相应的api,使用起来真的很麻烦,如果有一天分区不是=加逗号分割,这也就废了,也有可能是太菜没有发现。

它又来了,还是一个分区方法,为了获得最大分区,我也是煞费苦心。

这是一个通用类型的方法,量级,null的空值率,空字符串的数量,最大值和最小值。

然后分别是针对不同类型的特色数据处理,这里的总和又加了try是因为decimal相加的精度问题,不知道为什么astype居然不能转类型,然后是标准差和平均数,最后的set其实也可以改成通用类型,就是取20个不重复的枚举值,然后再结合上面的问题方法来确定数据的之类问题,用write写出到csv。

部分缺陷

  1. 第一点肯定是空字符串的量级最后还是没有求出来,会一直显示是0;
  2. 为了代码的正常运行,进行了多次replace操作,影响性能而且也印象最后的数据呈现;
  3. 在max的时候,没有进行优化,可能出现数字比较但是结果是字典值比较的情况;
  4. 大量关于分区的方法,而不是直接使用的api;
  5. 部分写死的数据,如果要调整,比较麻烦。

完整代码

  1. 导入ODPS库和其他所需的库;
  2. 使用ODPS连接到阿里云MaxCompute,并设置数据表列表、数据显示的最大行数以及输出CSV文件的路径;
  3. 定义了一个名为is_float()的函数,用于判断字符串是否可以转换为浮点数;
  4. 定义了一个名为findQuestion()的函数,用于分析每个字段的统计信息,并返回是否存在问题的提示;
  5. 定义了一个名为breakWithCSV()的函数,用于读取CSV文件并检查其是否符合要求,如果符合要求,则返回文件中所有表的名称列表;
  6. 主函数中的代码实现了对每个表的字段的统计分析,并将结果保存到CSV文件中。在对每个字段进行分析时,使用了is_float()和findQuestion()函数,并将结果写入到CSV文件中。
# -*- coding: utf-8 -*-# pip install odpsfromodpsimportODPSfromodpsimportoptionsimportosimportcsv# 注意utf-8和gbk的切换# 数据域   触点名称    表中文名    表名称 字段名称 字段备注 字段数据量 空值率 空字符串量级 最大值 最小值 总和 标准差 平均值 字典值枚举 是否存在问题 存在问题# access_id + secret_access_key+项目名称 自行填写odps=ODPS('', '', '',
endpoint='http://service.cn-beijing.maxcompute.aliyun.com/api')
# 表集合table_list= [
"",
]
# 将max_rows设置为Noneoptions.display.max_rows=Nonetxt_file="./1.csv"# 判断是否可以转化为float类型defis_float(s):
try:
float(s)
returnTrueexceptException:
returnFalse# 返回存在问题 字段数据量 is_num传入为True或者FalsedeffindQuestion(num_col, is_null_col, is_0_col, max_col, min_col, std_col, avg_col, is_num):
re_str=""ifis_float(num_col) andfloat(num_col) <=100:
re_str+=" 该字段数据量过少;建议确定"ifis_float(is_null_col) andfloat(is_null_col) >0:
re_str+=" 该字段存在空值;是否需要填充"ifis_float(is_0_col) andfloat(is_0_col) >0:
re_str+=" 该字段存在空字符串;建议清洗"ifis_numandis_float(max_col) andis_float(min_col):
re_str+=" 该字段类型可能应该为数字类型;建议更换类型"ifis_float(max_col) andis_float(min_col) and (float(max_col) <0orfloat(min_col) <0):
re_str+=" 该字段金额存在负数;先确定再清洗"ifis_float(std_col) andis_float(avg_col) and (float(std_col) >float(avg_col)):
re_str+=" 该字段离散程度过大;建议确定是否有测试数据或者是否数据缺失"returnre_str# 如果csv文件已经有对应信息不再进行处理defbreakWithCSV():
csv_col_name='数据域,触点名称,表中文名,表名称,字段名称,字段备注,字段数据量,空值率,空字符串量级,最大值,最小值,总和,标准差,平均值,字典值枚举,是否存在问题,存在问题'# 文件不存在 先创建文件ifnotos.path.exists(txt_file):
withopen(txt_file, "w", encoding='utf-8') asf:
f.write(
csv_col_name            )
# 如果文件列和字符串不同则创建新的withopen(txt_file, "r", encoding='utf-8') asread_csv:
try:
# 如果首行和默认不同 则重新创建文件reader=csv.reader(read_csv)
csv_one=next(reader)
read_csv.close()
# 不相等 复写文件if','.join(csv_one) !=csv_col_name:
withopen(txt_file, "w") asf:
f.write(
csv_col_name                    )
exceptException:
# 此处表示文件为空 创建文件withopen(txt_file, "w", encoding='utf-8') asf:
f.write(
csv_col_name                )
# 表名集合all_list= []
withopen("./1.csv", "r", encoding="utf-8") asread_csv:
# 跳过首行csv_data=read_csv.readlines()[1:]
forline_dataincsv_data:
# 如果行没有mn字样,则跳过if"mn"notinline_dataorline_dataisNone:
passelse:
all_list.append(
str(line_data.split(",")[1]) +str(line_data.split(",")[3]) +str(line_data.split(",")[4]))
returnall_list# 写入文件defwrite(data):
# 打开文件,设置追加模式withopen(txt_file, 'a', encoding='utf-8') asfile:
# 如果字符串中有"会导致,失灵,将单引号转化为中文“row=','.join(str(value).replace(",", ";").replace("\"", "“") forvalueindata)
# 写入数据file.write(row+'\n')
# 刷数file.flush()
# 返回类型枚举值关键字defmemString():
return ["方式", "类型", "状态", "类别", "范围"]
# 判断是否是分区表defisNotPartition(table_obj):
# 如果报错 返回Falsetry:
is_or_not=table_obj.exist_partitions()
returnis_or_notexceptException:
returnFalse# 确保拆分字符串为NonedefsafeStr():
return"None,None"# 分区表 输入不同字符,返回不同字符的数组或者字符defgetType(type_name, partition_name):
iftype_name=="1":
# 数字类型return ["double", "bigint", "int"]
eliftype_name=="2":
# decimal类型 主要进行模糊匹配return"decimal"eliftype_name=="3":
# 日期类型return ["datetime", "timestamp", "date"]
else:
# 此处返回分区列名returnpartition_name# 非分区表 输入不同字符,返回不同字符的数组或者字符defgetType_notWith(type_name):
iftype_name=="1":
# 数字类型return ["double", "bigint", "int"]
eliftype_name=="2":
# decimal类型 主要进行模糊匹配return"decimal"eliftype_name=="3":
# 日期类型return ["datetime", "timestamp", "date"]
else:
pass# 获得分区名称 ds cd_partitiondefgetPartitionName(tree_name):
return [list(tree_name)[0].split(",")[0].split("=")[0],
list(tree_name)[0].split(",")[1].split("=")[0].replace('\'', '').replace(
"\"", "")]
# 判断表的分区字段有多少个defnumReturnPartition(table_obj):
returnlen(str(table_obj.partitions.partitions[0]).split(","))
# 返回所有分区目录defgetPartitonData(table_obj):
list_ds= []
fords_dataintable_obj.partitions:
list_ds.append(str(ds_data))
returnlist_ds# 获得最大分区和下面的二级分区 list_ds = getPartitonDatadefgetMaxPartiton(list_ds):
ds_set=set()
foritem_dsinlist_ds:
ds_set.add(item_ds.split(",")[0].split("=")[1].replace('\'', '').replace("\"", ""))
# 最大一级分区max_ds=max(ds_set)
end_list=set()
# 返回最大分区下面的所有一级+二级分区 eg:ds='20230227',cd_partition='yd'foritem_dsinlist_ds:
ifmax_dsinitem_ds:
end_list.add(item_ds)
returnend_list# 获得最大长度defget_length(x):
returnlen(str(x))
# 通用类型defusePrint(col_name, df):
# 总列数量col_all_num=df[col_name].count().execute()
# 不为Null列数量col_num=df.filter(df[col_name].notnull())[col_name].count().execute()
# 列空置率col_isnull=df[col_name].isnull().sum().execute() /col_all_num# 列空字符串的量级col_len0= ((df[col_name].notnull()) & (df[col_name].equals(""))).sum().execute()
# 需要转化为pandas才能对分区数据进行iloc操作,但会有性能问题# pandas_df = df.to_pandas()# 最大值 因为此处会有部分值为数字,判断规则改为长度最长值# col_max = pandas_df.iloc[df[col_name].apply(get_length).idxmax()][col_name]col_max=df[col_name].max().execute()
# 最小值col_min=df[col_name].min().execute()
returnstr(col_num), str(col_isnull), str(col_max).replace("\n", ""), str(col_min).replace("\n", ""), col_len0# 数字类型的通用查询defmoneyPrint(col_name, df, first_item, table, table_name):
# 列名col_name_item=col_name# 列数量 列空置率 列长度为0 最大值 最小值col_num, col_isnull, col_max, col_min, col_len0=usePrint(col_name, df)
# 总和try:
col_sum=df[col_name].sum().execute()
exceptException:
col_sum="该字段存在精度问题,无法聚合"# 标准差col_std=df[col_name].std().execute()
# 平均数col_avg=df[col_name].mean().execute()
memString()
# 枚举值 只要20个 不为decimal和double进入判定col_set=df[col_name].unique().head(20).values# 问题情况question_str=findQuestion(col_num, col_isnull, col_len0, col_max, col_min, col_std, col_avg, False)
write((
str(table_name).split(".")[1].split("_")[0],
str(first_item).split(",")[1].replace("cd_partition=", "").replace("\'", ""),
table.comment,
table_name,
col_name_item,
col_comment,
col_num,
col_isnull,
col_len0,
col_max,
col_min,
str(col_sum),
str(col_std),
str(col_avg),
str(col_set).replace("\n", "").replace("dict_values([", "").replace("])", "").replace(",", " "),
"否"ifquestion_str==""else"是",
question_str)
    )
# 日期类型的通用查询defdataPrint(col_name, df, first_item, table, table_name):
# 列名col_name_item=col_name# 列数量 列空置率 列长度为0 最大值 最小值col_num, col_isnull, col_max, col_min, col_len0=usePrint(col_name, df)
# 总col_sum="None"# 标准差col_std="None"# 平均数col_avg="None"# 枚举值 只要20个col_set=df[col_name].unique().head(20).values# 问题情况question_str=findQuestion(col_num, col_isnull, col_len0, col_max, col_min, col_std, col_avg, False)
write((
str(table_name).split(".")[1].split("_")[0],
str(first_item).split(",")[1].replace("cd_partition=", "").replace("\'", ""),
table.comment,
table_name,
col_name_item,
col_comment,
col_num,
col_isnull,
col_len0,
col_max,
col_min,
col_sum,
col_std,
col_avg,
str(col_set).replace("\n", "").replace("dict_values([", "").replace("])", "").replace(",", " "),
"否"ifquestion_str==""else"是",
question_str)
    )
defstringPrint(col_name, df, first_item, table, table_name):
# 列名col_name_item=col_name# 列数量 列空置率 列长度为0 最大值 最小值col_num, col_isnull, col_max, col_min, col_len0=usePrint(col_name, df)
# 总和col_sum="None"# 标准差col_std="None"# 平均数col_avg="None"# 枚举值 只要20个 不为decimal和double进入判定col_set=df[col_name].unique().head(20).values# 问题情况question_str=findQuestion(col_num, col_isnull, col_len0, col_max, col_min, col_std, col_avg, True)
write((
str(table_name).split(".")[1].split("_")[0],
str(first_item).split(",")[1].replace("cd_partition=", "").replace("\'", ""),
table.comment,
table_name,
col_name_item,
col_comment,
col_num,
col_isnull,
# 空字符串量级col_len0,
col_max,
col_miniflen(col_num) >0else"该字段存在空字符串",
col_sum,
col_std,
col_avg,
str(col_set).replace("\n", "").replace("dict_values([", "").replace("])", "").replace(",", " "),
"否"ifquestion_str==""else"是",
question_str)
    )
if__name__=='__main__':
all_list=breakWithCSV()
withopen(txt_file, 'a') asfile:
# 每次新写入数据加一个空白行file.write('\n')
fortable_nameintable_list:
# 获得tabletable=odps.get_table(table_name)
# 获得schemaschema=table.schemaifisNotPartition(table):
# 获得分区个数num_partition_col=numReturnPartition(table)
# 如果有两个分区字段ifnum_partition_col==2:
# 获得分区目录ds_tree=getPartitonData(table)
# 获得最大分区和其二级分区ds_max_tree=getMaxPartiton(ds_tree)
# 获得分区名称-colpartition_col_name=getPartitionName(ds_max_tree)
forfirst_iteminds_max_tree:
df=table.get_partition(f"{first_item}").to_df()
df_col=schema.columns# print(f"进入表{table_name}的循环中")# 遍历列名 列类型 列注释forcol_dataindf_col:
col_name=str(col_data.name)
# 将类型值转为小写col_type=str(col_data.type).lower()
col_comment=col_data.comment# 如果表名+触点名称+列名已经存在文件,直接终止本次循环cd_name_value=first_item.split(",")[1].split("=")[1].replace("'", "")
ifcd_name_value+table_name+col_nameinall_list:
print("丢弃"+cd_name_value+" "+table_name+" "+col_name)
continueelifcol_typeingetType("1", partition_col_name) orgetType("2",
partition_col_name) incol_type:
# 调用数字类型print("未丢弃"+cd_name_value+" "+table_name+" "+col_name)
moneyPrint(col_name, df, first_item, table, table_name)
elifcol_typeingetType("3", partition_col_name):
# 调用日期类型print("未丢弃"+cd_name_value+" "+table_name+" "+col_name)
dataPrint(col_name, df, first_item, table, table_name)
elifcol_namenotingetType("4", partition_col_name):
# 调用不是分区字段的其他类型print("未丢弃"+cd_name_value+" "+table_name+" "+col_name)
stringPrint(col_name, df, first_item, table, table_name)
else:
pass# 如果是只有一个分区字段elifnum_partition_col==1:
# 获得分区目录ds_tree=getPartitonData(table)
# 获得最大分区---ds_max_tree=getMaxPartiton(ds_tree)
# 设置分区名称-colpartition_col_name= ["ds", "DS", "pt", "PT"]
forfirst_iteminds_max_tree:
df=table.get_partition(f"{first_item}").to_df()
df_col=schema.columns# print(f"进入表{table_name}的循环中")# 遍历列名 列类型 列注释forcol_dataindf_col:
col_name=str(col_data.name)
# 将类型值转为小写col_type=str(col_data.type).lower()
col_comment=col_data.comment# 如果表名+触点名称+列名已经存在文件,直接终止本次循环cd_name_value="None"ifcd_name_value+table_name+col_nameinall_list:
print("丢弃"+cd_name_value+" "+table_name+" "+col_name)
continueelifcol_typeingetType("1", partition_col_name) orgetType("2",
partition_col_name) incol_type:
# 调用数字类型print("未丢弃"+cd_name_value+" "+table_name+" "+col_name)
# 调用数字类型moneyPrint(col_name, df, safeStr(), table, table_name)
elifcol_typeingetType("3", partition_col_name):
# 调用数字类型print("未丢弃"+cd_name_value+" "+table_name+" "+col_name)
# 调用日期类型dataPrint(col_name, df, safeStr(), table, table_name)
elifcol_namenotingetType("4", partition_col_name):
# 调用数字类型print("未丢弃"+cd_name_value+" "+table_name+" "+col_name)
# 调用不是分区字段的其他类型stringPrint(col_name, df, safeStr(), table, table_name)
else:
passelse:
print(f"表{table_name}分区字段过多,请考虑优化脚本!!!")
else:
# 非分区的表df=table.get_partition.to_df()
df_col=schema.columns# print(f"进入表{table_name}的循环中")# 遍历列名 列类型 列注释forcol_dataindf_col:
col_name=str(col_data.name)
# 将类型值转为小写col_type=str(col_data.type).lower()
col_comment=col_data.comment# 如果表名+触点名称+列名已经存在文件,直接终止本次循环cd_name_value="None"ifcd_name_value+table_name+col_nameinall_list:
print("丢弃"+cd_name_value+" "+table_name+" "+col_name)
continueelifcol_typeingetType_notWith("1") orgetType_notWith("2") incol_type:
# 调用数字类型print("未丢弃"+cd_name_value+" "+table_name+" "+col_name)
# 调用数字类型moneyPrint(col_name, df, safeStr(), table, table_name)
elifcol_typeingetType_notWith("3"):
# 调用数字类型print("未丢弃"+cd_name_value+" "+table_name+" "+col_name)
# 调用日期类型dataPrint(col_name, df, safeStr(), table, table_name)
elifcol_namenotingetType_notWith("4"):
# 调用数字类型print("未丢弃"+cd_name_value+" "+table_name+" "+col_name)
# 调用不是分区字段的其他类型stringPrint(col_name, df, safeStr(), table, table_name)
else:
pass
相关实践学习
简单用户画像分析
本场景主要介绍基于海量日志数据进行简单用户画像分析为背景,如何通过使用DataWorks完成数据采集 、加工数据、配置数据质量监控和数据可视化展现等任务。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
1月前
|
SQL 存储 DataWorks
DataWorks常见问题之dataworks 表授权失败如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
1月前
|
SQL DataWorks 关系型数据库
DataWorks报错问题之写入数据时报‘http.client.ResponseNotReady’如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
1月前
|
消息中间件 分布式计算 DataWorks
DataWorks常见问题之查看数据地图模块总的存储大小失败如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
1月前
|
分布式计算 DataWorks 调度
DataWorks报错问题之dataworks同步clickhouse数据报错如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
1月前
|
存储 分布式计算 DataWorks
DataWorks常见问题之目标表建立失败如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
1月前
|
分布式计算 DataWorks 算法
DataWorks常见问题之更改odps表生命周期失败如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
1月前
|
分布式计算 DataWorks 数据处理
DataWorks如何更改odps表生命周期为永久?
【2月更文挑战第14天】DataWorks如何更改odps表生命周期为永久?
96 2
|
1月前
|
分布式计算 DataWorks NoSQL
DataWorks报错问题之dataworks数据异常如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
1月前
|
SQL 分布式计算 DataWorks
在DataWorks中,将MaxCompute的表映射成Hologres(Holo)外部表的语句
【2月更文挑战第32天】在DataWorks中,将MaxCompute的表映射成Hologres(Holo)外部表的语句
23 1
|
1月前
|
DataWorks 监控 数据可视化