首先是大致思路,如果是数值类型,则当做度量的数据,而如果是日期类型则计算月维度下的分组情况,如果是其他类型则默认为object;先计算缺失值和去重值,如果枚举值等于1个,则为唯一值,不参与计算,不超过三个,则认为其为维度字段,非数值类型的只计算五个枚举值,缺失值和量级;数值类型则当做度量参与到后续分组汇总计算。
汇总计算主要会集中在有维度,无维度以及日期为月上,求最大值,最小值,中位数,平均数,方差和标准差。
上面都是差不多的优点,下面说缺点:
- 不支持复杂类型的判断
- 不支持多维度的度量计算
- 如果需要根据日期升频或者降频需要改代码
- 速度转换慢,从odps的df到pd的df,没有采用to_pandas的方法,因为之前使用速度很慢,而是取消将数据写入到pd的df,但速度也仅仅是1000条/1秒
- 输出格式未完成整理好,可能需要一定成本的整理
- 如果想进行多表操作,需要改代码
使用前有三个地方需要写
main方法中的table_name用来写表名
odps的id,key以及project
写出文件的位置
# 1.判断数据类型# 2.如果是字符串类型,返回七个枚举值,一个枚举值如果长度超过10,后面的字符替换为...# 3.如果字符串类型的枚举值不超过五个,则默认为维度字段,参与到后续计算# 4.如果是日期类型,则取最大值和最小值,并取有维度时每个维度的最大值和最小值# 5.如果数字类型枚举值不超过三个,当做字符串类型计算# 6.如果是数字类型,则取最大值,最小值,中位数,平均数,方差和标准差;以及各维度状态下的...# 7.所有类型都统计缺失值,数据50%的数据# 8.读取odps的数据,解析odps的数据情况# 9.10万条数据解析耗时:# 开始运行 2022-12-14 10:49:47.627770# 开始读取odps 2022-12-14 10:49:48.162656# 开始转换df类型 2022-12-14 10:51:27.100467# 转换odps为pd完成 2022-12-14 10:51:29.562657# 已解析出度量和维度 2022-12-14 10:51:30.925895# 解析完成,开始写出 2022-12-14 10:51:33.160971# 写出成功 2022-12-14 10:51:33.165625fromcollectionsimportdefaultdictimportpandasaspdimportnumpyasnpimportdatetimefromodpsimportODPSfromodps.dfimportDataFrame# 配置odps参数o=ODPS( access_id='', # 登陆账号secret_access_key='', # 登陆密码project='', # odps上的项目名称endpoint='http://service.odps.aliyun.com/api'# 官方提供的接口) # 获得数据类型defget_odps_schema(table_name): df_odps=DataFrame(o.get_table(table_name)) returnlist(df_odps.dtypes) # 获得原始数据的类型defclean_schema(list_schema): # print(list_schema[1])end_list=list() foriinrange(len(list_schema)): # print(list_schema[i]) 替换脏数据tmp_list=str(list_schema[i]).replace("<column", "").replace("type ", "").replace(">", "").split(',') end_list.append([tmp_list[0].strip(), tmp_list[1].strip()]) # print(end_list)returnend_list# 直接将maxcomputer的数据转换为pandas的dataframedefexe_sql(sql, schema_type): data= [] count_i=1witho.execute_sql(sql).open_reader(tunnel=True) asreader: d=defaultdict(list) # collection默认一个dictforrecordinreader: # 速度大概1000/1s 有点慢print("已解析:"+str(count_i), datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')) count_i=count_i+1forresinrecord: d[res[0]].append(res[1]) # 解析record中的每一个元组,存储方式为(k,v),以k作为key,存储每一列的内容;data=pd.DataFrame.from_dict(d, orient='index').T# 转换为数据框,并转置,不转置的话是横条数据print("开始转换df类型", datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')) forschema_item_typeinschema_type: # print(schema_item_type)ifschema_item_type[1] =="datetime": passelse: data[schema_item_type[0]] =data[schema_item_type[0]].astype(schema_item_type[1]) # print(data.dtypes)returndata# 将df隐藏行解开pd.set_option('display.max_columns', None) # from io import StringIOall_s=list() end_path="end.txt"# 将数据写入数组 排序defprintx(str_value): all_s.append(str_value+"\n") # 数据写出defwrite_csv(all_s, end_path): all_s.sort() withopen(end_path, 'w', encoding='utf-8') asfile: forsinall_s: file.write(s) # 获得列的名称和数据类型defgetType(obj, col): obj_type=obj.dtype.namereturn [col, obj_type] # 获得列空值率的数量defgetNone_count(data): returndata.isnull().sum(axis=0) # 返回数据去重后的数据情况defget_discount(data): list_data=data.drop_duplicates().head(5).valuesreturnlist(list_data) # 入参为列名,以及维度listdefget_weidu(col_name, weidu_list): weidu_list.add(col_name) # 获得去重后的数据量级defdistinct_num(data): returndata.drop_duplicates().count() # 获得日期的最大值和最小值defgetdate_fun(data, col_name, rq_list): max_data=data.max() min_data=data.min() # gncd = getNone_count(data)gncds=str(getNone_count(data)) printx("日期列 "+col_name+" 的缺失量为:"+gncds) printx("日期列 "+col_name+" 最大值为:"+str(max_data) +",最小值为:"+str(min_data)) get_weidu(col_name, rq_list) # 分组求度量defget_group_num_fun(data, weidu): # printx(data.head())max_data=data.groupby(weidu).max() min_data=data.groupby(weidu).min() # 中位数med_data=data.groupby(weidu).median() # 平均数mea_data=data.groupby(weidu).mean() # 方差var_data=data.groupby(weidu).var() # 标准差std_data=data.groupby(weidu).std() printx("维度为"+weidu+"时,度量数据如下"+str( [{"最大值": max_data}, {"最小值": min_data}, {"中位数": med_data}, {"平均数": mea_data}, {"方差": var_data}, {"标准差": std_data}])) # 返回数据最大值,最小值,中位数,平均数,方差和标准差defget_num_fun(data): # print(data)max_data=data.max() min_data=data.min() # 中位数med_data=data.median() # 平均数mea_data=data.mean() # 方差var_data=data.var() # 标准差std_data=data.std() return [{"最大值": max_data}, {"最小值": min_data}, {"中位数": med_data}, {"平均数": mea_data}, {"方差": var_data}, {"标准差": std_data}] # 返回数字类型的最大值等defget_num(data, col_name, weidu_list, wy_list, dl_list): # print(col_name)# 缺失值量gncd=getNone_count(data) gncds=str(getNone_count(data)) # 去重值量dnd=distinct_num(data) # print(gncd)gdd=str(get_discount(data)) six_num=get_num_fun(data.dropna()) ifdnd==1andgncd==0: printx("唯一数值列 "+col_name+" 的值为:"+gdd) get_weidu(col_name, wy_list) elifdnd<=3: printx("维度数值列 "+col_name+" 的缺失量为:"+gncds) printx("维度数值列 "+col_name+" 的枚举值为:"+gdd) get_weidu(col_name, weidu_list) else: printx("普通数值列 "+col_name+" 的缺失量为:"+gncds) printx("普通数值列 "+col_name+" 的枚举值为:"+gdd) printx("普通数值列 "+col_name+"数据情况:"+str(six_num)) get_weidu(col_name, dl_list) # 字符串类型进入本循环defgetObjValue(data, col_name, weidu_list, wy_list, str_list): # 缺失值量gncd=getNone_count(data) gncds=str(getNone_count(data)) # 去重值量dnd=distinct_num(data) gdd=str(get_discount(data)) ifdnd==1andgncd!=0: printx("唯一字符串列 "+col_name+" 的值为:"+gdd) # printx(get_discount(data))get_weidu(col_name, wy_list) elifdnd<=3: printx("维度字符串列 "+col_name+" 的缺失量为:"+gncds) printx("维度字符串列 "+col_name+" 的枚举值为:"+gdd) # printx(get_discount(data))get_weidu(col_name, weidu_list) else: printx("普通字符串列 "+col_name+" 的缺失量为:"+gncds) printx("普通字符串列 "+col_name+" 的枚举值为:"+gdd) # printx(get_discount(data))get_weidu(col_name, str_list) defwaring_set(oth_list): iflen(oth_list) >0: printx("出现未分组数据,字段和类型如下"+str(oth_list)) else: pass# 主方法if__name__=='__main__': print("开始运行", datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')) # 原始数据table_name=""schema_type=clean_schema(get_odps_schema(table_name)) print("开始读取odps", datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')) df=exe_sql("select * from "+table_name+" where ds='20221213' limit 100000", schema_type) print("转换odps为pd完成", datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')) # df = pd.read_csv("testdata.csv", encoding='utf8')# print(df.dtypes)# 清除\N为None后的数据nodf=df.replace("\\N", np.NaN) # print(nodf["updatetime"])# 将日期类型从object转化为datetime# nodf["updatetime"] = nodf["updatetime"].apply(lambda t: pd.to_datetime(t))# nodf["createtime"] = nodf["createtime"].apply(lambda t: pd.to_datetime(t))# 获得列名+类型col_name_type=list() # 维度列weidu_list=set() # 唯一值所在的列wy_list=set() # 字符串所在的列str_list=set() # 日期列rq_list=set() # 其他列oth_list=set() # 度量列dl_list=set() # 获得列类型forcolinlist(nodf): col_name_type.append(getType(nodf[col], col)) # 根据列类型进行分组分析forcol_nameincol_name_type: ifcol_name[1] =='object'orcol_name[1] =='string': getObjValue(nodf[col_name[0]], col_name[0], weidu_list, wy_list, str_list) # for col_name in col_name_type:elifcol_name[1] =='int64'orcol_name[1] =='float64': get_num(nodf[col_name[0]], col_name[0], weidu_list, wy_list, dl_list) # for col_name in col_name_type:elifcol_name[1] =='datetime64[ns]': getdate_fun(nodf[col_name[0]], col_name[0], rq_list) else: oth_list.add([col_name[0], col_name[1]]) waring_set(oth_list) print("已解析出度量和维度", datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')) forweiduinweidu_list: # 将维度增加到度量的set中dl_list.add(weidu) get_group_num_fun(nodf[list(dl_list)], weidu) # 将维度从度量的set中挪开dl_list.remove(weidu) # 根据日期进行分组# print(rq_list)# print(dl_list)forrqinrq_list: # print(rq)dl_list.add(rq) rq_nodf=nodf[list(dl_list)] rq_nodf_copy=rq_nodf.copy() # 将时间置换为月rq_nodf_copy[rq] =rq_nodf[rq].dt.month# rq_nodf_copy.get_group_num_fun(rq_nodf_copy, rq) # print(rq_nodf.std())# 将日期列置换为索引# rq_nodf.index = rq_nodf[rq]# 删除日期列# rq_nodf.drop(labels=rq, axis=1)# 索引替换为周# rq_nodf.index = rq_nodf.index.isocalendar().weekdl_list.remove(rq) # print(rq_nodf.head(5))print("解析完成,开始写出", datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')) # 写出数据write_csv(all_s, end_path) print("写出成功", datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'))