该脚本的最初设计目标是根据ODPS表中的不同二级分区,统计每个字段的详细量级,以便为下游用户提供数据支持。这有助于对中台数据资产进行有效管理,并能够及时关注上游数据质量问题。
实现逻辑
- 首先,要获得表的元数据信息,以便动态捕捉字段的中文释义并生成新的表结构。
- 其次,为新的表结构提前准备固定字段,因为需要按照这些字段进行分组取最大值。
- 在统计过程中,采用了单纯的count计数方法,而没有采用去重计数。这是出于成本和效益考虑,特别是考虑到热点问题。如果需要实现去重计数,请考虑热点问题。
- 设计了一个字段名称和表名组成的黑名单字典来剔除不需要统计的字段。
- 由于很多表中有相同的字段,设计了一个单独的字段来确定该字段是表中原生的还是退化的维度。通过使用一个由表名和字段名称为key的字典来进行匹配。
- 然后使用for循环对二级分区进行处理,当然也可以处理其他字段。根据英文标识拼凑一个后缀,从而生成新的表结构。
- 创建结果表后,根据结果表的表结构获取查询语句所需的字段。
- 接下来,通过循环表名和二级分区生成union拼凑成的查询语句。
- 最后,根据固定字段进行分组查询,非固定字段使用MAX函数,将结果记录写入到结果表中。
代码实现
from odps import ODPS from datetime import date, timedelta # ODPS静态数据 AccessKeyID = "AKID" AccessKeySecret = "AKS" project_name = "项目空间名称" endpoint='http://service.cn-beijing.maxcompute.aliyun.com/api' # 表项目空间 tab_project = "查询的表所在的项目空间" # 表名集合 tab_name_list = ["要查询的表名"] # 触点集合 cd_name_list = ["二级分区名称"] # 存储结果的项目空间 result_project = "结果表项目空间名称" # 存储结果的表名 result_table = "结果表名称" # 存储结果的分区 result_ds = "ds string" # 固定字段,此处固定字段必须按照原始表的顺序写 din_col = ("data_type string,col_cn_name string,col_type bigint") # 基本黑名单字段 black_col = ['ds'] # 表黑名单字段 tab_black_col = { '字段名称':'要查询的表名' } # 获取当前日期 today = date.today() # 计算昨天的日期 yesterday = (today - timedelta(days=1)).strftime('%Y%m%d') # 基本过滤逻辑 filter_sql = f"ds={yesterday} and cd_partition=" # 表-数据枚举影射 map_table = {"要查询的表名":"字段归属类型"} # 关联字段枚举影射 map_tab_col = {"要查询的表名.字段名称":1,"要查询的表名.字段名称":1} # 分组SQL def max_sql(end_sql,din_col,sql_basic_list): """ @end_sql:生成的UNION——SQL @din_col:固定字段 @sql_basic_list:结果表所有字段 将end_sql根据固定字段分组取最大值 """ # 拆分固定字段 din_list = [di_c.split(" ")[0] for di_c in din_col.split(",")] # 形成MAX查询 o_sql = [f"max({i_o}) as {i_o}" for i_o in sql_basic_list] max_sql = f""" select {",".join(din_list)} ,{",".join(o_sql)} from ({end_sql}) group by {",".join(din_list)} ; """ # with open("output.txt", "w", encoding="utf-8") as f: # f.write(max_sql) return max_sql # 查询数据并写入结构表中 def read_write_table(odps,end_sql,result_project,result_table,yesterday): """ @odps:ODPS @end_sql:执行SQL @result_project:结果表所在项目空间 @result_table:结果表名称 @yesterday:分区 将数据写入到结果表对应分区 """ with odps.execute_sql(end_sql).open_reader() as reader: records = [record for record in reader] print("执行SQL已经生成完成,进入到数据查询阶段") # 写入数据 odps.write_table(f"{result_project}.{result_table}",records,partition=f"ds='{yesterday}'",create_partition=True,overwrite=True) print("写入完成,请查询数据") # 生成基本查询SQL def make_sql_basic(odps,result_project,result_table,din_col): """ @odps:ODPS @result_project:结果表项目空间 @result_table:结果表表名 返回查询语句的中间部分,包括结果表中所有字段,也可以从上面固定字段拿去,为了避免出现数据丢失,选择从ODPS结果表拿取 """ table_obj = odps.get_table(f"{result_project}.{result_table}") sql_basic_list = [] for i_schema in table_obj.table_schema: # 还需要剔除固定字段,避免下面select查询写入同名字段 if i_schema.name not in black_col and i_schema.name not in [din_col_i.split(" ")[0] for din_col_i in din_col.split(",")]: sql_basic_list.append(i_schema.name) return sql_basic_list # 生成数据查询语句 def make_sql(all_schema,sql_basic_list,cd_name_list,tab_project,filter_sql,map_table,map_tab_col): """ @all_schema:表元数据信息 @sql_basic_list:结果表表结构数据 @cd_name_list:触点组合 @tab_project:查询表的项目空间 @filter_sql:过滤语句 @map_table:表和数据类型的归属map @map_tab_col:字段和字段类型的归属map 返回一个union拼凑成的sql语句 """ all_sql = [] for t_schema in all_schema: for cd_name in cd_name_list: # 推导字段是count还是置空 count_all = [f"count({t_schema[1]}) as {sql_i}" if cd_name == sql_i.replace("_col_count","") else f"NULL as {sql_i}" for sql_i in sql_basic_list ] col_map_table = t_schema[0]+"."+t_schema[1] end_col_tab = "" try: end_col_tab = f"{map_tab_col[col_map_table]}" except Exception: end_col_tab = "0" sql_select = f"""select '{map_table[t_schema[0]]}' as data_type ,'{t_schema[2]}' as col_cn_name ,{end_col_tab} as col_type ,{",".join(count_all)} from {tab_project}.{t_schema[0]} where {filter_sql}'{cd_name}'""" # 将查询数据添加数据,后期union all_sql.append(sql_select) return "\nunion\n".join(all_sql) # 获取表元数据 def get_table_schema(odps,tab_name_list,black_col): """ @odps:ODPS @tab_name_list:表数组 @black_col:黑名单字段 返回一个三元组组成的数组,三元组组成为表名,字段名,字段注释 """ all_schema = [] for tab_name in tab_name_list: table_obj = odps.get_table(tab_name) for i_schema in table_obj.table_schema: # 默认0不是 is_black = "0" try: is_black = tab_black_col[i_schema.name] except Exception: is_black = "0" if i_schema.name not in black_col and is_black == "0": all_schema.append((tab_name,i_schema.name,i_schema.comment)) else: pass return all_schema # 建结果表 def create_table(din_col,cd_name_list): """ @cd_name_list:触点集合 返回格式data_type string,col_cn_name string,col_type string """ cd_make_col = ",".join([f"{cd_name}_col_count bigint" for cd_name in cd_name_list]) cd_make_col = din_col+","+cd_make_col return cd_make_col # 主函数 def main(): o = ODPS(AccessKeyID,AccessKeySecret,project_name,endpoint) cd_make_col = create_table(din_col,cd_name_list) # 删除结果表,谨慎解除注释 o.delete_table(f"{result_project}.{result_table}",if_exists=True) # 建分区结果表,生命周期七天 o.create_table(f"{result_project}.{result_table}",(f"{cd_make_col}",f"{result_ds}"),lifecycle=7,if_not_exists=True) # 返回所有表的元数据 all_schema = get_table_schema(o,tab_name_list,black_col) sql_basic_list = make_sql_basic(o,result_project,result_table,din_col) # 生成执行SQL end_sql = make_sql(all_schema,sql_basic_list,cd_name_list,tab_project,filter_sql,map_table,map_tab_col) read_write_table(o,max_sql(end_sql,din_col,sql_basic_list),result_project,result_table,yesterday) # 执行主函数 if __name__==("__main__"): main()
后续更新方向
- 首先是二级分区,目前我是写死的数组,因为项目不需要那么多,后续可以优化为从表中获取,同时设置分区的黑名单;
- 在写出的时候,我读取SQL的结果是record拼凑成的数组再写入,这样如果数据量大,影响还是会比较大的,但是我还没有想好怎么优化;
- 黑名单字段没有考虑相同KEY的情况,还是设计成二元组比较好;
- 静态数据还是写在代码中,没有单独拆成文件;