PyODPS实现MaxComputer表数据把控

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 该脚本的最初设计目标是根据ODPS表中的不同二级分区,统计每个字段的详细量级,以便为下游用户提供数据支持。这有助于对中台数据资产进行有效管理,并能够及时关注上游数据质量问题。

该脚本的最初设计目标是根据ODPS表中的不同二级分区,统计每个字段的详细量级,以便为下游用户提供数据支持。这有助于对中台数据资产进行有效管理,并能够及时关注上游数据质量问题。

实现逻辑

  • 首先,要获得表的元数据信息,以便动态捕捉字段的中文释义并生成新的表结构。
  • 其次,为新的表结构提前准备固定字段,因为需要按照这些字段进行分组取最大值。
  • 在统计过程中,采用了单纯的count计数方法,而没有采用去重计数。这是出于成本和效益考虑,特别是考虑到热点问题。如果需要实现去重计数,请考虑热点问题。
  • 设计了一个字段名称和表名组成的黑名单字典来剔除不需要统计的字段。
  • 由于很多表中有相同的字段,设计了一个单独的字段来确定该字段是表中原生的还是退化的维度。通过使用一个由表名和字段名称为key的字典来进行匹配。
  • 然后使用for循环对二级分区进行处理,当然也可以处理其他字段。根据英文标识拼凑一个后缀,从而生成新的表结构。
  • 创建结果表后,根据结果表的表结构获取查询语句所需的字段。
  • 接下来,通过循环表名和二级分区生成union拼凑成的查询语句。
  • 最后,根据固定字段进行分组查询,非固定字段使用MAX函数,将结果记录写入到结果表中。

代码实现

from odps import ODPS
from datetime import date, timedelta
# ODPS静态数据
AccessKeyID = "AKID"
AccessKeySecret = "AKS"
project_name = "项目空间名称"
endpoint='http://service.cn-beijing.maxcompute.aliyun.com/api'
# 表项目空间
tab_project = "查询的表所在的项目空间"
# 表名集合
tab_name_list = ["要查询的表名"]
# 触点集合
cd_name_list = ["二级分区名称"]
# 存储结果的项目空间
result_project = "结果表项目空间名称"
# 存储结果的表名
result_table = "结果表名称"
# 存储结果的分区
result_ds = "ds string"
# 固定字段,此处固定字段必须按照原始表的顺序写
din_col = ("data_type string,col_cn_name string,col_type bigint")
# 基本黑名单字段
black_col = ['ds']
# 表黑名单字段
tab_black_col = {
'字段名称':'要查询的表名'
}
# 获取当前日期
today = date.today()
# 计算昨天的日期
yesterday = (today - timedelta(days=1)).strftime('%Y%m%d')
# 基本过滤逻辑
filter_sql = f"ds={yesterday} and cd_partition="
# 表-数据枚举影射
map_table = {"要查询的表名":"字段归属类型"}
# 关联字段枚举影射
map_tab_col = {"要查询的表名.字段名称":1,"要查询的表名.字段名称":1}
# 分组SQL
def max_sql(end_sql,din_col,sql_basic_list):
    """
    @end_sql:生成的UNION——SQL
    @din_col:固定字段
    @sql_basic_list:结果表所有字段
    将end_sql根据固定字段分组取最大值
    """
    # 拆分固定字段
    din_list = [di_c.split(" ")[0] for di_c in din_col.split(",")]
    # 形成MAX查询
    o_sql = [f"max({i_o}) as {i_o}" for i_o in sql_basic_list]
    max_sql = f"""
    select {",".join(din_list)}
    ,{",".join(o_sql)}
    from ({end_sql})
    group by {",".join(din_list)}
    ;
    """
    # with open("output.txt", "w", encoding="utf-8") as f:
    #     f.write(max_sql)
    return max_sql
# 查询数据并写入结构表中
def read_write_table(odps,end_sql,result_project,result_table,yesterday):
    """
    @odps:ODPS
    @end_sql:执行SQL
    @result_project:结果表所在项目空间
    @result_table:结果表名称
    @yesterday:分区
    将数据写入到结果表对应分区
    """
    with odps.execute_sql(end_sql).open_reader() as reader:
        records = [record for record in reader]
        print("执行SQL已经生成完成,进入到数据查询阶段")
        # 写入数据
        odps.write_table(f"{result_project}.{result_table}",records,partition=f"ds='{yesterday}'",create_partition=True,overwrite=True)
    print("写入完成,请查询数据")
# 生成基本查询SQL
def make_sql_basic(odps,result_project,result_table,din_col):
    """
    @odps:ODPS
    @result_project:结果表项目空间
    @result_table:结果表表名
    返回查询语句的中间部分,包括结果表中所有字段,也可以从上面固定字段拿去,为了避免出现数据丢失,选择从ODPS结果表拿取
    """
    table_obj = odps.get_table(f"{result_project}.{result_table}")
    sql_basic_list = []
    for i_schema in table_obj.table_schema:
            # 还需要剔除固定字段,避免下面select查询写入同名字段
            if i_schema.name not in black_col and i_schema.name not in [din_col_i.split(" ")[0] for din_col_i in din_col.split(",")]:
                sql_basic_list.append(i_schema.name)
    return sql_basic_list
# 生成数据查询语句
def make_sql(all_schema,sql_basic_list,cd_name_list,tab_project,filter_sql,map_table,map_tab_col):
    """
    @all_schema:表元数据信息
    @sql_basic_list:结果表表结构数据
    @cd_name_list:触点组合
    @tab_project:查询表的项目空间
    @filter_sql:过滤语句
    @map_table:表和数据类型的归属map
    @map_tab_col:字段和字段类型的归属map
    返回一个union拼凑成的sql语句
    """
    all_sql = []
    for t_schema in all_schema:
        for cd_name in cd_name_list:
            # 推导字段是count还是置空
            count_all = [f"count({t_schema[1]}) as {sql_i}" 
                         if cd_name == sql_i.replace("_col_count","")
                         else f"NULL as {sql_i}"
                         for sql_i in sql_basic_list ]
            col_map_table = t_schema[0]+"."+t_schema[1]
            end_col_tab = ""
            try:
                end_col_tab = f"{map_tab_col[col_map_table]}"
            except Exception:
                end_col_tab = "0"
            sql_select = f"""select 
            '{map_table[t_schema[0]]}' as data_type
            ,'{t_schema[2]}' as col_cn_name
            ,{end_col_tab} as col_type
            ,{",".join(count_all)}
            from {tab_project}.{t_schema[0]} 
            where {filter_sql}'{cd_name}'"""
            # 将查询数据添加数据,后期union
            all_sql.append(sql_select)
    return "\nunion\n".join(all_sql) 
# 获取表元数据
def get_table_schema(odps,tab_name_list,black_col):
    """
    @odps:ODPS
    @tab_name_list:表数组
    @black_col:黑名单字段
    返回一个三元组组成的数组,三元组组成为表名,字段名,字段注释
    """
    all_schema = []
    for tab_name in tab_name_list:
        table_obj = odps.get_table(tab_name)
        for i_schema in table_obj.table_schema:
              # 默认0不是
            is_black = "0"
            try:
                is_black = tab_black_col[i_schema.name]
            except Exception:
                is_black = "0"
            if i_schema.name not in black_col and is_black == "0":
                all_schema.append((tab_name,i_schema.name,i_schema.comment))
            else:
                pass
    return all_schema
# 建结果表
def create_table(din_col,cd_name_list):
    """
    @cd_name_list:触点集合
    返回格式data_type string,col_cn_name string,col_type string
    """
    cd_make_col = ",".join([f"{cd_name}_col_count bigint" for cd_name in cd_name_list])
    cd_make_col = din_col+","+cd_make_col
    return cd_make_col
# 主函数
def main():
    o = ODPS(AccessKeyID,AccessKeySecret,project_name,endpoint)
    cd_make_col = create_table(din_col,cd_name_list)
    # 删除结果表,谨慎解除注释
    o.delete_table(f"{result_project}.{result_table}",if_exists=True)
    # 建分区结果表,生命周期七天
    o.create_table(f"{result_project}.{result_table}",(f"{cd_make_col}",f"{result_ds}"),lifecycle=7,if_not_exists=True)
    # 返回所有表的元数据
    all_schema = get_table_schema(o,tab_name_list,black_col)
    sql_basic_list = make_sql_basic(o,result_project,result_table,din_col)
    # 生成执行SQL
    end_sql = make_sql(all_schema,sql_basic_list,cd_name_list,tab_project,filter_sql,map_table,map_tab_col)
    read_write_table(o,max_sql(end_sql,din_col,sql_basic_list),result_project,result_table,yesterday)
# 执行主函数
if __name__==("__main__"):
    main()

后续更新方向

  • 首先是二级分区,目前我是写死的数组,因为项目不需要那么多,后续可以优化为从表中获取,同时设置分区的黑名单;
  • 在写出的时候,我读取SQL的结果是record拼凑成的数组再写入,这样如果数据量大,影响还是会比较大的,但是我还没有想好怎么优化;
  • 黑名单字段没有考虑相同KEY的情况,还是设计成二元组比较好;
  • 静态数据还是写在代码中,没有单独拆成文件;
相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
5月前
|
机器学习/深度学习 分布式计算 安全
MaxCompute产品使用合集之如何实现对ODPS中表的变动进行主动获取
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
5月前
|
数据采集 分布式计算 DataWorks
DataWorks产品使用合集之运行MR任务读取源表数据并写入新表的过程,有哪些限制
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
4月前
|
SQL 分布式计算 数据安全/隐私保护
实时数仓 Hologres产品使用合集之重建表的索引后,如何将数据导入新表
实时数仓Hologres是阿里云推出的一款高性能、实时分析的数据库服务,专为大数据分析和复杂查询场景设计。使用Hologres,企业能够打破传统数据仓库的延迟瓶颈,实现数据到决策的无缝衔接,加速业务创新和响应速度。以下是Hologres产品的一些典型使用场景合集。
|
4月前
|
数据采集 DataWorks 安全
DataWorks产品使用合集之单表最大分区是多少
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
4月前
|
分布式计算 DataWorks 数据管理
DataWorks操作报错合集之写入ODPS目的表时遇到脏数据报错,该怎么解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
116 0
|
6月前
|
分布式计算 DataWorks 安全
DataWorks产品使用合集之将多业务分表同步到odps的一个三级分区表中,每级分区怎么赋值
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
46 4
|
5月前
|
分布式计算 DataWorks 关系型数据库
MaxCompute产品使用合集之如何在DataWorks中实现离线同步多个分表到MC的多级分区表
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
5月前
|
SQL 分布式计算 大数据
MaxCompute产品使用合集之如何恢复删除了的表
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
5月前
|
SQL 存储 分布式计算
MaxCompute产品使用合集之使用pyodps读取数据表时,可以通过什么方法来加速读取效率
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
6月前
|
机器学习/深度学习 分布式计算 DataWorks
MaxCompute产品使用合集之离线同步到实时同步表时,数据应该如何处理
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
下一篇
DataWorks