注意:本次的pyodps的版本是0.10.7,别问,问就是dataworks是这个版本。
方法详解
首先是开始,导入的包分别是必备的odps,额外还有自带的os和csv,os和csv可以通过修改代码来替换,之后odps需要配置四个参数,分别是id,key,项目名称和endpoint,如果是初次学习者可以在maxcomputer官方文档搜索odpccmd_public,会有详细介绍。
如果你是在dataworks上跑,可以自行删除这一部分,因为其自带了这一部分。
下面是一个表集合,如果表过多,可以再建一个文件,读取文件来更改table_list的数据,再下面的txt_file是用来写入数据的,每一列的意思和上面注释相同。
第二块是对结果数据的一些判定,这一块可以根据自己的情况来选择不同的入参和出参,同时float的判定是因为大部分结果都是数字类型,为了不再额外进行类型判定以及避免开发者的类型选择有误特意进行的调整。
下面这一部分则是一个优化,如果csv已经创建,而且首行数据和代码相同,则不会删除表数据,而是追加,否则会覆盖重新写入表头,当然没有创建csv则会先创建csv写入表头。
在这里是一个写入的方法,这里接收的data会是一个元组,而且为了规避特殊字符对逗号的干扰,这里进行了一些replace,同时每次写入的时候会进行一个换行。
再下面的一个是判定表是否为分区表,没有dir到合适的方法,只能使用了一个try,然后下面是safeStr是在写代码的时候,方便复用二级分区表的方法,设定的一个数据,其实就是会给一级分区表或者普通表设定一个二级分区None。
再下面的是一个判定方法,根据不同列类型返回不同的值,方便后续维护。
如你所见,依旧是对于分区的判定,没有相应的api,使用起来真的很麻烦,如果有一天分区不是=加逗号分割,这也就废了,也有可能是太菜没有发现。
它又来了,还是一个分区方法,为了获得最大分区,我也是煞费苦心。
这是一个通用类型的方法,量级,null的空值率,空字符串的数量,最大值和最小值。
然后分别是针对不同类型的特色数据处理,这里的总和又加了try是因为decimal相加的精度问题,不知道为什么astype居然不能转类型,然后是标准差和平均数,最后的set其实也可以改成通用类型,就是取20个不重复的枚举值,然后再结合上面的问题方法来确定数据的之类问题,用write写出到csv。
部分缺陷
- 第一点肯定是空字符串的量级最后还是没有求出来,会一直显示是0;
- 为了代码的正常运行,进行了多次replace操作,影响性能而且也印象最后的数据呈现;
- 在max的时候,没有进行优化,可能出现数字比较但是结果是字典值比较的情况;
- 大量关于分区的方法,而不是直接使用的api;
- 部分写死的数据,如果要调整,比较麻烦。
完整代码
- 导入ODPS库和其他所需的库;
- 使用ODPS连接到阿里云MaxCompute,并设置数据表列表、数据显示的最大行数以及输出CSV文件的路径;
- 定义了一个名为is_float()的函数,用于判断字符串是否可以转换为浮点数;
- 定义了一个名为findQuestion()的函数,用于分析每个字段的统计信息,并返回是否存在问题的提示;
- 定义了一个名为breakWithCSV()的函数,用于读取CSV文件并检查其是否符合要求,如果符合要求,则返回文件中所有表的名称列表;
- 主函数中的代码实现了对每个表的字段的统计分析,并将结果保存到CSV文件中。在对每个字段进行分析时,使用了is_float()和findQuestion()函数,并将结果写入到CSV文件中。
# -*- coding: utf-8 -*-# pip install odpsfromodpsimportODPSfromodpsimportoptionsimportosimportcsv# 注意utf-8和gbk的切换# 数据域 触点名称 表中文名 表名称 字段名称 字段备注 字段数据量 空值率 空字符串量级 最大值 最小值 总和 标准差 平均值 字典值枚举 是否存在问题 存在问题# access_id + secret_access_key+项目名称 自行填写odps=ODPS('', '', '', endpoint='http://service.cn-beijing.maxcompute.aliyun.com/api') # 表集合table_list= [ "", ] # 将max_rows设置为Noneoptions.display.max_rows=Nonetxt_file="./1.csv"# 判断是否可以转化为float类型defis_float(s): try: float(s) returnTrueexceptException: returnFalse# 返回存在问题 字段数据量 is_num传入为True或者FalsedeffindQuestion(num_col, is_null_col, is_0_col, max_col, min_col, std_col, avg_col, is_num): re_str=""ifis_float(num_col) andfloat(num_col) <=100: re_str+=" 该字段数据量过少;建议确定"ifis_float(is_null_col) andfloat(is_null_col) >0: re_str+=" 该字段存在空值;是否需要填充"ifis_float(is_0_col) andfloat(is_0_col) >0: re_str+=" 该字段存在空字符串;建议清洗"ifis_numandis_float(max_col) andis_float(min_col): re_str+=" 该字段类型可能应该为数字类型;建议更换类型"ifis_float(max_col) andis_float(min_col) and (float(max_col) <0orfloat(min_col) <0): re_str+=" 该字段金额存在负数;先确定再清洗"ifis_float(std_col) andis_float(avg_col) and (float(std_col) >float(avg_col)): re_str+=" 该字段离散程度过大;建议确定是否有测试数据或者是否数据缺失"returnre_str# 如果csv文件已经有对应信息不再进行处理defbreakWithCSV(): csv_col_name='数据域,触点名称,表中文名,表名称,字段名称,字段备注,字段数据量,空值率,空字符串量级,最大值,最小值,总和,标准差,平均值,字典值枚举,是否存在问题,存在问题'# 文件不存在 先创建文件ifnotos.path.exists(txt_file): withopen(txt_file, "w", encoding='utf-8') asf: f.write( csv_col_name ) # 如果文件列和字符串不同则创建新的withopen(txt_file, "r", encoding='utf-8') asread_csv: try: # 如果首行和默认不同 则重新创建文件reader=csv.reader(read_csv) csv_one=next(reader) read_csv.close() # 不相等 复写文件if','.join(csv_one) !=csv_col_name: withopen(txt_file, "w") asf: f.write( csv_col_name ) exceptException: # 此处表示文件为空 创建文件withopen(txt_file, "w", encoding='utf-8') asf: f.write( csv_col_name ) # 表名集合all_list= [] withopen("./1.csv", "r", encoding="utf-8") asread_csv: # 跳过首行csv_data=read_csv.readlines()[1:] forline_dataincsv_data: # 如果行没有mn字样,则跳过if"mn"notinline_dataorline_dataisNone: passelse: all_list.append( str(line_data.split(",")[1]) +str(line_data.split(",")[3]) +str(line_data.split(",")[4])) returnall_list# 写入文件defwrite(data): # 打开文件,设置追加模式withopen(txt_file, 'a', encoding='utf-8') asfile: # 如果字符串中有"会导致,失灵,将单引号转化为中文“row=','.join(str(value).replace(",", ";").replace("\"", "“") forvalueindata) # 写入数据file.write(row+'\n') # 刷数file.flush() # 返回类型枚举值关键字defmemString(): return ["方式", "类型", "状态", "类别", "范围"] # 判断是否是分区表defisNotPartition(table_obj): # 如果报错 返回Falsetry: is_or_not=table_obj.exist_partitions() returnis_or_notexceptException: returnFalse# 确保拆分字符串为NonedefsafeStr(): return"None,None"# 分区表 输入不同字符,返回不同字符的数组或者字符defgetType(type_name, partition_name): iftype_name=="1": # 数字类型return ["double", "bigint", "int"] eliftype_name=="2": # decimal类型 主要进行模糊匹配return"decimal"eliftype_name=="3": # 日期类型return ["datetime", "timestamp", "date"] else: # 此处返回分区列名returnpartition_name# 非分区表 输入不同字符,返回不同字符的数组或者字符defgetType_notWith(type_name): iftype_name=="1": # 数字类型return ["double", "bigint", "int"] eliftype_name=="2": # decimal类型 主要进行模糊匹配return"decimal"eliftype_name=="3": # 日期类型return ["datetime", "timestamp", "date"] else: pass# 获得分区名称 ds cd_partitiondefgetPartitionName(tree_name): return [list(tree_name)[0].split(",")[0].split("=")[0], list(tree_name)[0].split(",")[1].split("=")[0].replace('\'', '').replace( "\"", "")] # 判断表的分区字段有多少个defnumReturnPartition(table_obj): returnlen(str(table_obj.partitions.partitions[0]).split(",")) # 返回所有分区目录defgetPartitonData(table_obj): list_ds= [] fords_dataintable_obj.partitions: list_ds.append(str(ds_data)) returnlist_ds# 获得最大分区和下面的二级分区 list_ds = getPartitonDatadefgetMaxPartiton(list_ds): ds_set=set() foritem_dsinlist_ds: ds_set.add(item_ds.split(",")[0].split("=")[1].replace('\'', '').replace("\"", "")) # 最大一级分区max_ds=max(ds_set) end_list=set() # 返回最大分区下面的所有一级+二级分区 eg:ds='20230227',cd_partition='yd'foritem_dsinlist_ds: ifmax_dsinitem_ds: end_list.add(item_ds) returnend_list# 获得最大长度defget_length(x): returnlen(str(x)) # 通用类型defusePrint(col_name, df): # 总列数量col_all_num=df[col_name].count().execute() # 不为Null列数量col_num=df.filter(df[col_name].notnull())[col_name].count().execute() # 列空置率col_isnull=df[col_name].isnull().sum().execute() /col_all_num# 列空字符串的量级col_len0= ((df[col_name].notnull()) & (df[col_name].equals(""))).sum().execute() # 需要转化为pandas才能对分区数据进行iloc操作,但会有性能问题# pandas_df = df.to_pandas()# 最大值 因为此处会有部分值为数字,判断规则改为长度最长值# col_max = pandas_df.iloc[df[col_name].apply(get_length).idxmax()][col_name]col_max=df[col_name].max().execute() # 最小值col_min=df[col_name].min().execute() returnstr(col_num), str(col_isnull), str(col_max).replace("\n", ""), str(col_min).replace("\n", ""), col_len0# 数字类型的通用查询defmoneyPrint(col_name, df, first_item, table, table_name): # 列名col_name_item=col_name# 列数量 列空置率 列长度为0 最大值 最小值col_num, col_isnull, col_max, col_min, col_len0=usePrint(col_name, df) # 总和try: col_sum=df[col_name].sum().execute() exceptException: col_sum="该字段存在精度问题,无法聚合"# 标准差col_std=df[col_name].std().execute() # 平均数col_avg=df[col_name].mean().execute() memString() # 枚举值 只要20个 不为decimal和double进入判定col_set=df[col_name].unique().head(20).values# 问题情况question_str=findQuestion(col_num, col_isnull, col_len0, col_max, col_min, col_std, col_avg, False) write(( str(table_name).split(".")[1].split("_")[0], str(first_item).split(",")[1].replace("cd_partition=", "").replace("\'", ""), table.comment, table_name, col_name_item, col_comment, col_num, col_isnull, col_len0, col_max, col_min, str(col_sum), str(col_std), str(col_avg), str(col_set).replace("\n", "").replace("dict_values([", "").replace("])", "").replace(",", " "), "否"ifquestion_str==""else"是", question_str) ) # 日期类型的通用查询defdataPrint(col_name, df, first_item, table, table_name): # 列名col_name_item=col_name# 列数量 列空置率 列长度为0 最大值 最小值col_num, col_isnull, col_max, col_min, col_len0=usePrint(col_name, df) # 总col_sum="None"# 标准差col_std="None"# 平均数col_avg="None"# 枚举值 只要20个col_set=df[col_name].unique().head(20).values# 问题情况question_str=findQuestion(col_num, col_isnull, col_len0, col_max, col_min, col_std, col_avg, False) write(( str(table_name).split(".")[1].split("_")[0], str(first_item).split(",")[1].replace("cd_partition=", "").replace("\'", ""), table.comment, table_name, col_name_item, col_comment, col_num, col_isnull, col_len0, col_max, col_min, col_sum, col_std, col_avg, str(col_set).replace("\n", "").replace("dict_values([", "").replace("])", "").replace(",", " "), "否"ifquestion_str==""else"是", question_str) ) defstringPrint(col_name, df, first_item, table, table_name): # 列名col_name_item=col_name# 列数量 列空置率 列长度为0 最大值 最小值col_num, col_isnull, col_max, col_min, col_len0=usePrint(col_name, df) # 总和col_sum="None"# 标准差col_std="None"# 平均数col_avg="None"# 枚举值 只要20个 不为decimal和double进入判定col_set=df[col_name].unique().head(20).values# 问题情况question_str=findQuestion(col_num, col_isnull, col_len0, col_max, col_min, col_std, col_avg, True) write(( str(table_name).split(".")[1].split("_")[0], str(first_item).split(",")[1].replace("cd_partition=", "").replace("\'", ""), table.comment, table_name, col_name_item, col_comment, col_num, col_isnull, # 空字符串量级col_len0, col_max, col_miniflen(col_num) >0else"该字段存在空字符串", col_sum, col_std, col_avg, str(col_set).replace("\n", "").replace("dict_values([", "").replace("])", "").replace(",", " "), "否"ifquestion_str==""else"是", question_str) ) if__name__=='__main__': all_list=breakWithCSV() withopen(txt_file, 'a') asfile: # 每次新写入数据加一个空白行file.write('\n') fortable_nameintable_list: # 获得tabletable=odps.get_table(table_name) # 获得schemaschema=table.schemaifisNotPartition(table): # 获得分区个数num_partition_col=numReturnPartition(table) # 如果有两个分区字段ifnum_partition_col==2: # 获得分区目录ds_tree=getPartitonData(table) # 获得最大分区和其二级分区ds_max_tree=getMaxPartiton(ds_tree) # 获得分区名称-colpartition_col_name=getPartitionName(ds_max_tree) forfirst_iteminds_max_tree: df=table.get_partition(f"{first_item}").to_df() df_col=schema.columns# print(f"进入表{table_name}的循环中")# 遍历列名 列类型 列注释forcol_dataindf_col: col_name=str(col_data.name) # 将类型值转为小写col_type=str(col_data.type).lower() col_comment=col_data.comment# 如果表名+触点名称+列名已经存在文件,直接终止本次循环cd_name_value=first_item.split(",")[1].split("=")[1].replace("'", "") ifcd_name_value+table_name+col_nameinall_list: print("丢弃"+cd_name_value+" "+table_name+" "+col_name) continueelifcol_typeingetType("1", partition_col_name) orgetType("2", partition_col_name) incol_type: # 调用数字类型print("未丢弃"+cd_name_value+" "+table_name+" "+col_name) moneyPrint(col_name, df, first_item, table, table_name) elifcol_typeingetType("3", partition_col_name): # 调用日期类型print("未丢弃"+cd_name_value+" "+table_name+" "+col_name) dataPrint(col_name, df, first_item, table, table_name) elifcol_namenotingetType("4", partition_col_name): # 调用不是分区字段的其他类型print("未丢弃"+cd_name_value+" "+table_name+" "+col_name) stringPrint(col_name, df, first_item, table, table_name) else: pass# 如果是只有一个分区字段elifnum_partition_col==1: # 获得分区目录ds_tree=getPartitonData(table) # 获得最大分区---ds_max_tree=getMaxPartiton(ds_tree) # 设置分区名称-colpartition_col_name= ["ds", "DS", "pt", "PT"] forfirst_iteminds_max_tree: df=table.get_partition(f"{first_item}").to_df() df_col=schema.columns# print(f"进入表{table_name}的循环中")# 遍历列名 列类型 列注释forcol_dataindf_col: col_name=str(col_data.name) # 将类型值转为小写col_type=str(col_data.type).lower() col_comment=col_data.comment# 如果表名+触点名称+列名已经存在文件,直接终止本次循环cd_name_value="None"ifcd_name_value+table_name+col_nameinall_list: print("丢弃"+cd_name_value+" "+table_name+" "+col_name) continueelifcol_typeingetType("1", partition_col_name) orgetType("2", partition_col_name) incol_type: # 调用数字类型print("未丢弃"+cd_name_value+" "+table_name+" "+col_name) # 调用数字类型moneyPrint(col_name, df, safeStr(), table, table_name) elifcol_typeingetType("3", partition_col_name): # 调用数字类型print("未丢弃"+cd_name_value+" "+table_name+" "+col_name) # 调用日期类型dataPrint(col_name, df, safeStr(), table, table_name) elifcol_namenotingetType("4", partition_col_name): # 调用数字类型print("未丢弃"+cd_name_value+" "+table_name+" "+col_name) # 调用不是分区字段的其他类型stringPrint(col_name, df, safeStr(), table, table_name) else: passelse: print(f"表{table_name}分区字段过多,请考虑优化脚本!!!") else: # 非分区的表df=table.get_partition.to_df() df_col=schema.columns# print(f"进入表{table_name}的循环中")# 遍历列名 列类型 列注释forcol_dataindf_col: col_name=str(col_data.name) # 将类型值转为小写col_type=str(col_data.type).lower() col_comment=col_data.comment# 如果表名+触点名称+列名已经存在文件,直接终止本次循环cd_name_value="None"ifcd_name_value+table_name+col_nameinall_list: print("丢弃"+cd_name_value+" "+table_name+" "+col_name) continueelifcol_typeingetType_notWith("1") orgetType_notWith("2") incol_type: # 调用数字类型print("未丢弃"+cd_name_value+" "+table_name+" "+col_name) # 调用数字类型moneyPrint(col_name, df, safeStr(), table, table_name) elifcol_typeingetType_notWith("3"): # 调用数字类型print("未丢弃"+cd_name_value+" "+table_name+" "+col_name) # 调用日期类型dataPrint(col_name, df, safeStr(), table, table_name) elifcol_namenotingetType_notWith("4"): # 调用数字类型print("未丢弃"+cd_name_value+" "+table_name+" "+col_name) # 调用不是分区字段的其他类型stringPrint(col_name, df, safeStr(), table, table_name) else: pass