PyODPS实现MaxComputer表数据把控

简介: 该脚本的最初设计目标是根据ODPS表中的不同二级分区,统计每个字段的详细量级,以便为下游用户提供数据支持。这有助于对中台数据资产进行有效管理,并能够及时关注上游数据质量问题。

该脚本的最初设计目标是根据ODPS表中的不同二级分区,统计每个字段的详细量级,以便为下游用户提供数据支持。这有助于对中台数据资产进行有效管理,并能够及时关注上游数据质量问题。

实现逻辑

  • 首先,要获得表的元数据信息,以便动态捕捉字段的中文释义并生成新的表结构。
  • 其次,为新的表结构提前准备固定字段,因为需要按照这些字段进行分组取最大值。
  • 在统计过程中,采用了单纯的count计数方法,而没有采用去重计数。这是出于成本和效益考虑,特别是考虑到热点问题。如果需要实现去重计数,请考虑热点问题。
  • 设计了一个字段名称和表名组成的黑名单字典来剔除不需要统计的字段。
  • 由于很多表中有相同的字段,设计了一个单独的字段来确定该字段是表中原生的还是退化的维度。通过使用一个由表名和字段名称为key的字典来进行匹配。
  • 然后使用for循环对二级分区进行处理,当然也可以处理其他字段。根据英文标识拼凑一个后缀,从而生成新的表结构。
  • 创建结果表后,根据结果表的表结构获取查询语句所需的字段。
  • 接下来,通过循环表名和二级分区生成union拼凑成的查询语句。
  • 最后,根据固定字段进行分组查询,非固定字段使用MAX函数,将结果记录写入到结果表中。

代码实现

from odps import ODPS
from datetime import date, timedelta
# ODPS静态数据
AccessKeyID = "AKID"
AccessKeySecret = "AKS"
project_name = "项目空间名称"
endpoint='http://service.cn-beijing.maxcompute.aliyun.com/api'
# 表项目空间
tab_project = "查询的表所在的项目空间"
# 表名集合
tab_name_list = ["要查询的表名"]
# 触点集合
cd_name_list = ["二级分区名称"]
# 存储结果的项目空间
result_project = "结果表项目空间名称"
# 存储结果的表名
result_table = "结果表名称"
# 存储结果的分区
result_ds = "ds string"
# 固定字段,此处固定字段必须按照原始表的顺序写
din_col = ("data_type string,col_cn_name string,col_type bigint")
# 基本黑名单字段
black_col = ['ds']
# 表黑名单字段
tab_black_col = {
'字段名称':'要查询的表名'
}
# 获取当前日期
today = date.today()
# 计算昨天的日期
yesterday = (today - timedelta(days=1)).strftime('%Y%m%d')
# 基本过滤逻辑
filter_sql = f"ds={yesterday} and cd_partition="
# 表-数据枚举影射
map_table = {"要查询的表名":"字段归属类型"}
# 关联字段枚举影射
map_tab_col = {"要查询的表名.字段名称":1,"要查询的表名.字段名称":1}
# 分组SQL
def max_sql(end_sql,din_col,sql_basic_list):
    """
    @end_sql:生成的UNION——SQL
    @din_col:固定字段
    @sql_basic_list:结果表所有字段
    将end_sql根据固定字段分组取最大值
    """
    # 拆分固定字段
    din_list = [di_c.split(" ")[0] for di_c in din_col.split(",")]
    # 形成MAX查询
    o_sql = [f"max({i_o}) as {i_o}" for i_o in sql_basic_list]
    max_sql = f"""
    select {",".join(din_list)}
    ,{",".join(o_sql)}
    from ({end_sql})
    group by {",".join(din_list)}
    ;
    """
    # with open("output.txt", "w", encoding="utf-8") as f:
    #     f.write(max_sql)
    return max_sql
# 查询数据并写入结构表中
def read_write_table(odps,end_sql,result_project,result_table,yesterday):
    """
    @odps:ODPS
    @end_sql:执行SQL
    @result_project:结果表所在项目空间
    @result_table:结果表名称
    @yesterday:分区
    将数据写入到结果表对应分区
    """
    with odps.execute_sql(end_sql).open_reader() as reader:
        records = [record for record in reader]
        print("执行SQL已经生成完成,进入到数据查询阶段")
        # 写入数据
        odps.write_table(f"{result_project}.{result_table}",records,partition=f"ds='{yesterday}'",create_partition=True,overwrite=True)
    print("写入完成,请查询数据")
# 生成基本查询SQL
def make_sql_basic(odps,result_project,result_table,din_col):
    """
    @odps:ODPS
    @result_project:结果表项目空间
    @result_table:结果表表名
    返回查询语句的中间部分,包括结果表中所有字段,也可以从上面固定字段拿去,为了避免出现数据丢失,选择从ODPS结果表拿取
    """
    table_obj = odps.get_table(f"{result_project}.{result_table}")
    sql_basic_list = []
    for i_schema in table_obj.table_schema:
            # 还需要剔除固定字段,避免下面select查询写入同名字段
            if i_schema.name not in black_col and i_schema.name not in [din_col_i.split(" ")[0] for din_col_i in din_col.split(",")]:
                sql_basic_list.append(i_schema.name)
    return sql_basic_list
# 生成数据查询语句
def make_sql(all_schema,sql_basic_list,cd_name_list,tab_project,filter_sql,map_table,map_tab_col):
    """
    @all_schema:表元数据信息
    @sql_basic_list:结果表表结构数据
    @cd_name_list:触点组合
    @tab_project:查询表的项目空间
    @filter_sql:过滤语句
    @map_table:表和数据类型的归属map
    @map_tab_col:字段和字段类型的归属map
    返回一个union拼凑成的sql语句
    """
    all_sql = []
    for t_schema in all_schema:
        for cd_name in cd_name_list:
            # 推导字段是count还是置空
            count_all = [f"count({t_schema[1]}) as {sql_i}" 
                         if cd_name == sql_i.replace("_col_count","")
                         else f"NULL as {sql_i}"
                         for sql_i in sql_basic_list ]
            col_map_table = t_schema[0]+"."+t_schema[1]
            end_col_tab = ""
            try:
                end_col_tab = f"{map_tab_col[col_map_table]}"
            except Exception:
                end_col_tab = "0"
            sql_select = f"""select 
            '{map_table[t_schema[0]]}' as data_type
            ,'{t_schema[2]}' as col_cn_name
            ,{end_col_tab} as col_type
            ,{",".join(count_all)}
            from {tab_project}.{t_schema[0]} 
            where {filter_sql}'{cd_name}'"""
            # 将查询数据添加数据,后期union
            all_sql.append(sql_select)
    return "\nunion\n".join(all_sql) 
# 获取表元数据
def get_table_schema(odps,tab_name_list,black_col):
    """
    @odps:ODPS
    @tab_name_list:表数组
    @black_col:黑名单字段
    返回一个三元组组成的数组,三元组组成为表名,字段名,字段注释
    """
    all_schema = []
    for tab_name in tab_name_list:
        table_obj = odps.get_table(tab_name)
        for i_schema in table_obj.table_schema:
              # 默认0不是
            is_black = "0"
            try:
                is_black = tab_black_col[i_schema.name]
            except Exception:
                is_black = "0"
            if i_schema.name not in black_col and is_black == "0":
                all_schema.append((tab_name,i_schema.name,i_schema.comment))
            else:
                pass
    return all_schema
# 建结果表
def create_table(din_col,cd_name_list):
    """
    @cd_name_list:触点集合
    返回格式data_type string,col_cn_name string,col_type string
    """
    cd_make_col = ",".join([f"{cd_name}_col_count bigint" for cd_name in cd_name_list])
    cd_make_col = din_col+","+cd_make_col
    return cd_make_col
# 主函数
def main():
    o = ODPS(AccessKeyID,AccessKeySecret,project_name,endpoint)
    cd_make_col = create_table(din_col,cd_name_list)
    # 删除结果表,谨慎解除注释
    o.delete_table(f"{result_project}.{result_table}",if_exists=True)
    # 建分区结果表,生命周期七天
    o.create_table(f"{result_project}.{result_table}",(f"{cd_make_col}",f"{result_ds}"),lifecycle=7,if_not_exists=True)
    # 返回所有表的元数据
    all_schema = get_table_schema(o,tab_name_list,black_col)
    sql_basic_list = make_sql_basic(o,result_project,result_table,din_col)
    # 生成执行SQL
    end_sql = make_sql(all_schema,sql_basic_list,cd_name_list,tab_project,filter_sql,map_table,map_tab_col)
    read_write_table(o,max_sql(end_sql,din_col,sql_basic_list),result_project,result_table,yesterday)
# 执行主函数
if __name__==("__main__"):
    main()

后续更新方向

  • 首先是二级分区,目前我是写死的数组,因为项目不需要那么多,后续可以优化为从表中获取,同时设置分区的黑名单;
  • 在写出的时候,我读取SQL的结果是record拼凑成的数组再写入,这样如果数据量大,影响还是会比较大的,但是我还没有想好怎么优化;
  • 黑名单字段没有考虑相同KEY的情况,还是设计成二元组比较好;
  • 静态数据还是写在代码中,没有单独拆成文件;
相关实践学习
简单用户画像分析
本场景主要介绍基于海量日志数据进行简单用户画像分析为背景,如何通过使用DataWorks完成数据采集 、加工数据、配置数据质量监控和数据可视化展现等任务。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
11天前
|
分布式计算 关系型数据库 数据库连接
MaxCompute数据问题之数据迁移如何解决
MaxCompute数据包含存储在MaxCompute服务中的表、分区以及其他数据结构;本合集将提供MaxCompute数据的管理和优化指南,以及数据操作中的常见问题和解决策略。
19 0
|
10天前
|
SQL DataWorks 关系型数据库
dataworks数据集问题之sql查询报错如何解决
DataWorks数据集是指在阿里云DataWorks平台内创建、管理的数据集合;本合集将介绍DataWorks数据集的创建和使用方法,以及常见的配置问题和解决方法。
24 1
|
11天前
|
存储 分布式计算 DataWorks
MaxCompute数据问题之数据不一致如何解决
MaxCompute数据包含存储在MaxCompute服务中的表、分区以及其他数据结构;本合集将提供MaxCompute数据的管理和优化指南,以及数据操作中的常见问题和解决策略。
22 0
|
11天前
|
存储 分布式计算 DataWorks
MaxCompute数据之数据不一致如何解决
MaxCompute数据包含存储在MaxCompute服务中的表、分区以及其他数据结构;本合集将提供MaxCompute数据的管理和优化指南,以及数据操作中的常见问题和解决策略。
16 0
|
3月前
|
分布式计算 数据可视化 数据挖掘
对maxcompute的数据进行临时分析 比如数据分布什么的 用什么工具比较好?
对maxcompute的数据进行临时分析 比如数据分布什么的 用什么工具比较好?
49 3
|
5月前
|
SQL DataWorks 数据处理
在DataWorks中,您可以将抽取数据的SQL和数据集成任务放在一起
在DataWorks中,您可以将抽取数据的SQL和数据集成任务放在一起
38 4
|
6月前
|
SQL 分布式计算 资源调度
阿里云MaxCompute-Hive作业迁移语法兼容性踩坑记录
阿里云MaxCompute-Hive作业迁移语法兼容性踩坑记录
|
7月前
|
SQL 存储 分布式计算
【深入MaxCompute】人力家:用MaxCompute 事务表2.0主键模型去重数据持续降本增效
MaxCompute新增Transaction Table2.0(下文简称事务表2.0)表类型在2023年6月27日开始邀测,支持基于事务表2.0实现近实时的增全量一体的数据存储、计算解决方案。
403 0
|
分布式计算 DataWorks API
基于pyodps对MaxComputer表的数据探查
本次脚本设计是针对大批量表,并且没有明确业务支持下的数据探查,会根据不同的类型进行判定,根据结果值进行分析得出结论,并给出一定的建议,同时该脚本仅仅支持普通表,一级分区表和二级分区表;一级分区表的分区字段必须是ds或者pt。
818 0
|
SQL 数据采集 分布式计算
数据治理中 PyODPS 的正确使用方式
表饱和度(字段是否为空)、字段阈值(数值类字段取值是否超出有效边界)是评估数据质量的关键指标,由于是单表内字段级别的校验和统计,并且几乎涉及所有表,范围大、逻辑简单、重复性强,结合 Python 开发效率高的特点,很多数据工程师会使用 PyODPS 进行相关功能的开发。本文基于 PyODPS 分别使用 3 种方式实现了“饱和度统计”功能,展示了它们的执行效率,并分析了原因。
870 0

相关产品

  • 云原生大数据计算服务 MaxCompute