PyODPS实现MaxComputer表数据把控

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 该脚本的最初设计目标是根据ODPS表中的不同二级分区,统计每个字段的详细量级,以便为下游用户提供数据支持。这有助于对中台数据资产进行有效管理,并能够及时关注上游数据质量问题。

该脚本的最初设计目标是根据ODPS表中的不同二级分区,统计每个字段的详细量级,以便为下游用户提供数据支持。这有助于对中台数据资产进行有效管理,并能够及时关注上游数据质量问题。

实现逻辑

  • 首先,要获得表的元数据信息,以便动态捕捉字段的中文释义并生成新的表结构。
  • 其次,为新的表结构提前准备固定字段,因为需要按照这些字段进行分组取最大值。
  • 在统计过程中,采用了单纯的count计数方法,而没有采用去重计数。这是出于成本和效益考虑,特别是考虑到热点问题。如果需要实现去重计数,请考虑热点问题。
  • 设计了一个字段名称和表名组成的黑名单字典来剔除不需要统计的字段。
  • 由于很多表中有相同的字段,设计了一个单独的字段来确定该字段是表中原生的还是退化的维度。通过使用一个由表名和字段名称为key的字典来进行匹配。
  • 然后使用for循环对二级分区进行处理,当然也可以处理其他字段。根据英文标识拼凑一个后缀,从而生成新的表结构。
  • 创建结果表后,根据结果表的表结构获取查询语句所需的字段。
  • 接下来,通过循环表名和二级分区生成union拼凑成的查询语句。
  • 最后,根据固定字段进行分组查询,非固定字段使用MAX函数,将结果记录写入到结果表中。

代码实现

from odps import ODPS
from datetime import date, timedelta
# ODPS静态数据
AccessKeyID = "AKID"
AccessKeySecret = "AKS"
project_name = "项目空间名称"
endpoint='http://service.cn-beijing.maxcompute.aliyun.com/api'
# 表项目空间
tab_project = "查询的表所在的项目空间"
# 表名集合
tab_name_list = ["要查询的表名"]
# 触点集合
cd_name_list = ["二级分区名称"]
# 存储结果的项目空间
result_project = "结果表项目空间名称"
# 存储结果的表名
result_table = "结果表名称"
# 存储结果的分区
result_ds = "ds string"
# 固定字段,此处固定字段必须按照原始表的顺序写
din_col = ("data_type string,col_cn_name string,col_type bigint")
# 基本黑名单字段
black_col = ['ds']
# 表黑名单字段
tab_black_col = {
'字段名称':'要查询的表名'
}
# 获取当前日期
today = date.today()
# 计算昨天的日期
yesterday = (today - timedelta(days=1)).strftime('%Y%m%d')
# 基本过滤逻辑
filter_sql = f"ds={yesterday} and cd_partition="
# 表-数据枚举影射
map_table = {"要查询的表名":"字段归属类型"}
# 关联字段枚举影射
map_tab_col = {"要查询的表名.字段名称":1,"要查询的表名.字段名称":1}
# 分组SQL
def max_sql(end_sql,din_col,sql_basic_list):
    """
    @end_sql:生成的UNION——SQL
    @din_col:固定字段
    @sql_basic_list:结果表所有字段
    将end_sql根据固定字段分组取最大值
    """
    # 拆分固定字段
    din_list = [di_c.split(" ")[0] for di_c in din_col.split(",")]
    # 形成MAX查询
    o_sql = [f"max({i_o}) as {i_o}" for i_o in sql_basic_list]
    max_sql = f"""
    select {",".join(din_list)}
    ,{",".join(o_sql)}
    from ({end_sql})
    group by {",".join(din_list)}
    ;
    """
    # with open("output.txt", "w", encoding="utf-8") as f:
    #     f.write(max_sql)
    return max_sql
# 查询数据并写入结构表中
def read_write_table(odps,end_sql,result_project,result_table,yesterday):
    """
    @odps:ODPS
    @end_sql:执行SQL
    @result_project:结果表所在项目空间
    @result_table:结果表名称
    @yesterday:分区
    将数据写入到结果表对应分区
    """
    with odps.execute_sql(end_sql).open_reader() as reader:
        records = [record for record in reader]
        print("执行SQL已经生成完成,进入到数据查询阶段")
        # 写入数据
        odps.write_table(f"{result_project}.{result_table}",records,partition=f"ds='{yesterday}'",create_partition=True,overwrite=True)
    print("写入完成,请查询数据")
# 生成基本查询SQL
def make_sql_basic(odps,result_project,result_table,din_col):
    """
    @odps:ODPS
    @result_project:结果表项目空间
    @result_table:结果表表名
    返回查询语句的中间部分,包括结果表中所有字段,也可以从上面固定字段拿去,为了避免出现数据丢失,选择从ODPS结果表拿取
    """
    table_obj = odps.get_table(f"{result_project}.{result_table}")
    sql_basic_list = []
    for i_schema in table_obj.table_schema:
            # 还需要剔除固定字段,避免下面select查询写入同名字段
            if i_schema.name not in black_col and i_schema.name not in [din_col_i.split(" ")[0] for din_col_i in din_col.split(",")]:
                sql_basic_list.append(i_schema.name)
    return sql_basic_list
# 生成数据查询语句
def make_sql(all_schema,sql_basic_list,cd_name_list,tab_project,filter_sql,map_table,map_tab_col):
    """
    @all_schema:表元数据信息
    @sql_basic_list:结果表表结构数据
    @cd_name_list:触点组合
    @tab_project:查询表的项目空间
    @filter_sql:过滤语句
    @map_table:表和数据类型的归属map
    @map_tab_col:字段和字段类型的归属map
    返回一个union拼凑成的sql语句
    """
    all_sql = []
    for t_schema in all_schema:
        for cd_name in cd_name_list:
            # 推导字段是count还是置空
            count_all = [f"count({t_schema[1]}) as {sql_i}" 
                         if cd_name == sql_i.replace("_col_count","")
                         else f"NULL as {sql_i}"
                         for sql_i in sql_basic_list ]
            col_map_table = t_schema[0]+"."+t_schema[1]
            end_col_tab = ""
            try:
                end_col_tab = f"{map_tab_col[col_map_table]}"
            except Exception:
                end_col_tab = "0"
            sql_select = f"""select 
            '{map_table[t_schema[0]]}' as data_type
            ,'{t_schema[2]}' as col_cn_name
            ,{end_col_tab} as col_type
            ,{",".join(count_all)}
            from {tab_project}.{t_schema[0]} 
            where {filter_sql}'{cd_name}'"""
            # 将查询数据添加数据,后期union
            all_sql.append(sql_select)
    return "\nunion\n".join(all_sql) 
# 获取表元数据
def get_table_schema(odps,tab_name_list,black_col):
    """
    @odps:ODPS
    @tab_name_list:表数组
    @black_col:黑名单字段
    返回一个三元组组成的数组,三元组组成为表名,字段名,字段注释
    """
    all_schema = []
    for tab_name in tab_name_list:
        table_obj = odps.get_table(tab_name)
        for i_schema in table_obj.table_schema:
              # 默认0不是
            is_black = "0"
            try:
                is_black = tab_black_col[i_schema.name]
            except Exception:
                is_black = "0"
            if i_schema.name not in black_col and is_black == "0":
                all_schema.append((tab_name,i_schema.name,i_schema.comment))
            else:
                pass
    return all_schema
# 建结果表
def create_table(din_col,cd_name_list):
    """
    @cd_name_list:触点集合
    返回格式data_type string,col_cn_name string,col_type string
    """
    cd_make_col = ",".join([f"{cd_name}_col_count bigint" for cd_name in cd_name_list])
    cd_make_col = din_col+","+cd_make_col
    return cd_make_col
# 主函数
def main():
    o = ODPS(AccessKeyID,AccessKeySecret,project_name,endpoint)
    cd_make_col = create_table(din_col,cd_name_list)
    # 删除结果表,谨慎解除注释
    o.delete_table(f"{result_project}.{result_table}",if_exists=True)
    # 建分区结果表,生命周期七天
    o.create_table(f"{result_project}.{result_table}",(f"{cd_make_col}",f"{result_ds}"),lifecycle=7,if_not_exists=True)
    # 返回所有表的元数据
    all_schema = get_table_schema(o,tab_name_list,black_col)
    sql_basic_list = make_sql_basic(o,result_project,result_table,din_col)
    # 生成执行SQL
    end_sql = make_sql(all_schema,sql_basic_list,cd_name_list,tab_project,filter_sql,map_table,map_tab_col)
    read_write_table(o,max_sql(end_sql,din_col,sql_basic_list),result_project,result_table,yesterday)
# 执行主函数
if __name__==("__main__"):
    main()

后续更新方向

  • 首先是二级分区,目前我是写死的数组,因为项目不需要那么多,后续可以优化为从表中获取,同时设置分区的黑名单;
  • 在写出的时候,我读取SQL的结果是record拼凑成的数组再写入,这样如果数据量大,影响还是会比较大的,但是我还没有想好怎么优化;
  • 黑名单字段没有考虑相同KEY的情况,还是设计成二元组比较好;
  • 静态数据还是写在代码中,没有单独拆成文件;
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
目录
相关文章
|
2月前
|
分布式计算 安全 大数据
maxcomputer的介绍
maxcomputer的介绍
370 3
|
2月前
|
分布式计算 运维 数据挖掘
maxcomputer
maxcomputer
759 2
|
10天前
|
存储 分布式计算 关系型数据库
Dataphin中如何使用Hologres外表查询MaxCompute
Hologres支持通过创建外部表来加速MaxCompute数据的查询,此方法用户直接在Hologres环境中访问和分析存储在MaxCompute中的数据,从而提高查询效率并简化数据处理流程。本文将介绍在 Dataphin 产品中如何实现这一操作。
|
7天前
|
分布式计算 DataWorks 大数据
MaxCompute产品使用问题之DataWorks整库全增量同步任务的源库如果新增了表,如何能将这个表快速同步进maxcompute
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
20天前
|
SQL 分布式计算 大数据
MaxCompute产品使用合集之查看表的血缘关系有哪些方法
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
20小时前
|
分布式计算 DataWorks Java
DataWorks产品使用合集之数据集成如何按照分表导入多分区
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
5 0
|
23天前
|
机器学习/深度学习 分布式计算 DataWorks
MaxCompute产品使用合集之离线同步到实时同步表时,数据应该如何处理
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
7天前
|
机器学习/深度学习 分布式计算 大数据
MaxCompute产品使用问题之整库批量实时同步是什么意思
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
7天前
|
分布式计算 运维 大数据
MaxCompute产品使用问题之数据集成任务有脏数据,如何快速定位哪些字段有问题
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
2月前
|
DataWorks Shell 对象存储
DataWorks产品使用合集之在 DataWorks 中,有一个 MySQL 数据表,数据量非常大且数据会不断更新将这些数据同步到 DataWorks如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
44 3

热门文章

最新文章