数据治理中 PyODPS 的正确使用方式

简介: 表饱和度(字段是否为空)、字段阈值(数值类字段取值是否超出有效边界)是评估数据质量的关键指标,由于是单表内字段级别的校验和统计,并且几乎涉及所有表,范围大、逻辑简单、重复性强,结合 Python 开发效率高的特点,很多数据工程师会使用 PyODPS 进行相关功能的开发。本文基于 PyODPS 分别使用 3 种方式实现了“饱和度统计”功能,展示了它们的执行效率,并分析了原因。

数据治理中 PyODPS 的正确使用方式

概述:表饱和度(字段是否为空)、字段阈值(数值类字段取值是否超出有效边界)是评估数据质量的关键指标,由于是单表内字段级别的校验和统计,并且几乎涉及所有表,范围大、逻辑简单、重复性强,结合 Python 开发效率高的特点,很多数据工程师会使用 PyODPS 进行相关功能的开发。本文基于 PyODPS 分别使用 3 种方式实现了“饱和度统计”功能,展示了它们的执行效率,并分析了原因。

结论:1. 除非数据量极少,否则要避免把数据拉取到本地处理;2. 执行 SQL 的方式效率最高,并且直观,如果只是饱和度的场景,推荐这种方式,但是受 SQL 语法的限制,不够灵活;3. DataFrame SDK 的方式效率虽然略低,但使用方式非常灵活,并且可以把常用处理逻辑封装成函数,代码复用率更高;

测试环境: Mac Book Pro | 4C/16G/512G

1. 通过 open_reader 和分区级别并发实现

测试表数据量 分区数 运行时间
332万 5 22分40秒

分析: 通过 open_reader 把表内数据拉取到本地进行检验和统计,虽然代码中使用了多线程,但是并没有“真正”的并发:1、没有利用 ODPS 的计算能力,而是使用了本地的计算能力;2、Python 的 GIL(全局解释器锁) 使线程之间在串行执行。如果数据量极少,这种方式的优势是节省了创建 ODPS 实例的时间和资源开销。

2. 通过 execute_sql 全表扫实现

测试表数据量 分区数 运行时间
6万 77 21秒
1600万 23 18秒
3.1亿 14 50秒

分析:与在 DataWorks 上面执行 SQL 情况相同,只要能把 SQL 拼出来,就能实现想要的功能。但是,如果所在 Project 限制了全表扫,则需要 set odps.sql.allow.fullscan=true; 操作。缺点是,检验逻辑在 SQL 中,是靠拼字符串拼出来的,代码很难复用。

3. 通过 DataFrame 实现

测试表数据量 分区数 运行时间
6万 77 53秒
1600万 23 39秒
3.1亿 14 5分钟56秒

分析:使用了 DataFrame 的 MapReduce API 与聚合函数 sum,DataFrame 会提交 1 个实例,把 DateFrame 操作转换为 UDF SQL,通过 Mapper 把原表数据转换为 int 类型的计数,然后进行 sum 操作。DataFrame 比 SQL 执行效率低的原因,在于自定义 Python 函数与内置函数的性能差异,例如,作者曾使用自己编写的 Reducer 代替 sum ,效率会大大降低。

代码

#!/usr/bin/python
# -*- coding: UTF-8 -*-
import sys

reload(sys)
sys.setdefaultencoding("utf-8")

from odps import ODPS
from odps.models.resource import *
import datetime
from odps import options
from datetime import datetime, timedelta
from time import sleep
from odps.df import DataFrame
from odps.df import output
from queue import Queue
import threading

o = ODPS('xxx', 'xxx', 'xxx', endpoint='xxx')

options.interactive = True  # 用到 print 需要打开
options.verbose = True      # 输出进度信息和错误信息


def mapper(row):
    ret = [1]
    for i in range(len(row)):
        ret.append(1 if row[i] is None or str(row[i]).isspace() or len(str(row[i])) <= 0 else 0)
    yield tuple(ret)

def data_frame(table_name):
    findnull = DataFrame(o.get_table(table_name))
    col_num = len(findnull.dtypes)
    output_types = ['int' for i in range(col_num + 1)]
    output_names = [findnull.dtypes.columns[i].name for i in range(col_num)]
    output_names.insert(0, 'total_cnt')
    table = findnull.map_reduce(mapper, mapper_output_names=output_names, mapper_output_types=output_types)
    print(table.sum())

def check_data_by_execute_sql(table_name):
    ta = o.get_table(table_name)
    data_count = {}
    table_count = 0
    sql_str = 'select \n'
    for col in ta.schema.columns:
        col_name = col.name
        sql_str += "sum(case when (%s is null) or (%s = '') or (trim(%s) = '') then 1 else 0 end) as %s_yx,\n" % (col_name, col_name, col_name, col_name)
    sql_str += "count(1) as total_cnt \nfrom %s " %(table_name)
    print(sql_str)
    with o.execute_sql(sql_str).open_reader() as rec:
        for r in rec:
            for col in ta.schema.columns:
                print("%s\t\t%d" % (col.name, r.get_by_name(col.name + '_yx')))
            print("%s\t\t%d" % ('total_cnt', r.get_by_name('total_cnt')))

def get_last_day():
    today = datetime.today()
    last_day = today + timedelta(days=-1)
    return last_day.strftime('%Y%m%d')

count_queue = Queue()
threads = []

def check_data_by_open_reader(table_name, pt):
    ta = o.get_table(table_name)
    data_count = {}
    print(table_name + "\t:\t" + str(pt) + " STARTED")
    rec = ta.open_reader(partition=str(pt))
    table_count = rec.count
    for r in rec:
        for col in ta.schema:
            col_value = r.get_by_name(col.name)
            if col.name not in data_count:
                data_count[col.name] = 0
            if col_value == None or str(col_value).isspace() or len(str(col_value)) <= 0:
                data_count[col.name] += 1
    count_queue.put((data_count, table_count))
    print(table_name + "\t:\t" + str(pt) + " DONE")

# 假设 dt 为分区字段
def check_data(table_name):
    table_tocheck = o.get_table(table_name)
    for pt in table_tocheck.iterate_partitions("dt='" + get_last_day() + "'"):  
        t = threading.Thread(target=check_data_by_open_reader, args=(table_name, pt))
        t.setDaemon(True)
        t.start()
        threads.append(t)

    print("线程数共:" + str(len(threads)))

    while True:
        thread_num = len(threading.enumerate()) - 1
        print("线程数量是%d" % thread_num)
        if thread_num <= 0:
            break
        sleep(10)

    total_cnt = 0
    total_data_cnt = {}
    while not count_queue.empty():
        pt_data = count_queue.get()
        data_count = pt_data[0]
        total_cnt += pt_data[1]
        for col_name in data_count.keys():
            if col_name not in total_data_cnt:
                total_data_cnt[col_name] = 0
            total_data_cnt[col_name] += data_count[col_name]

    print(total_cnt, total_data_cnt)

if __name__ == '__main__':
    table_name = 'xxxx' 

    if len(sys.argv) == 2:
        if sys.argv[1] not in ('1', '2', '3'):
            print("ARG ERROR: %s 1|2|3" % sys.argv[0])
            exit()
        print(datetime.now().strftime('%Y-%m-%d %H:%M:%S  BEGIN with ' + table_name))
        if sys.argv[1] == '1':
            check_data(table_name)
        elif sys.argv[1] == '2':
            check_data_by_execute_sql(table_name)
        elif sys.argv[1] == '3':
            data_frame(table_name)
        print(datetime.now().strftime('%Y-%m-%d %H:%M:%S  DONE with ' + table_name))
    else:
        print("ARG ERROR: %s 1|2|3" % sys.argv[0])
相关文章
|
4月前
|
SQL DataWorks 监控
DataWorks产品使用合集之怎么针对表中已经存在的数据进行更新
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
DataWorks产品使用合集之怎么针对表中已经存在的数据进行更新
|
24天前
|
DataWorks 搜索推荐 数据挖掘
DataWorks: 驾驭数据浪潮,解锁用户画像分析新纪元
本文详细评测了DataWorks产品,涵盖最佳实践、用户体验、与其他工具对比及Data Studio新功能。内容涉及用户画像分析、数据管理作用、使用过程中的问题与改进建议,以及Data Studio的新版Notebook环境和智能助手Copilot的体验。整体评价肯定了DataWorks在数据处理和分析上的优势,同时也指出了需要优化的地方。
95 24
|
4月前
|
数据采集 存储 DataWorks
DataWorks产品使用合集之如何查看数据质量中心(DQC)的规则执行记录
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
4月前
|
DataWorks 监控 安全
DataWorks产品使用合集之使用数据洞察过程中经常出现超时是什么导致的
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
4月前
|
存储 DataWorks 安全
DataWorks产品使用合集之数据视图如何创建
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
4月前
|
数据采集 DataWorks 数据挖掘
提升数据分析效率:DataWorks在企业级数据治理中的应用
【8月更文第25天】本文将探讨阿里巴巴云的DataWorks平台如何通过建立统一的数据标准、规范以及实现数据质量监控和元数据管理来提高企业的数据分析效率。我们将通过具体的案例研究和技术实践来展示DataWorks如何简化数据处理流程,减少成本,并加速业务决策。
562 54
|
4月前
|
SQL 分布式计算 DataWorks
利用DataWorks构建高效数据管道
【8月更文第25天】本文将详细介绍如何使用阿里云 DataWorks 的数据集成服务来高效地收集、清洗、转换和加载数据。我们将通过实际的代码示例和最佳实践来展示如何快速构建 ETL 流程,并确保数据管道的稳定性和可靠性。
225 56
|
4月前
|
数据采集 DataWorks 安全
DataWorks产品使用合集之怎么配置每天只导入10条数据
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
4月前
|
分布式计算 DataWorks 关系型数据库
DataWorks产品使用合集之ODPS数据怎么Merge到MySQL数据库
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
4月前
|
DataWorks 关系型数据库 MySQL
DataWorks产品使用合集之mysql节点如何插入数据
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。

热门文章

最新文章

  • 1
    DataWorks操作报错合集之DataWorks任务异常 报错: GET_GROUP_SLOT_EXCEPTION 该怎么处理
    126
  • 2
    DataWorks操作报错合集之DataWorksUDF 报错:evaluate for user defined function xxx cannot be loaded from any resources,该怎么处理
    123
  • 3
    DataWorks操作报错合集之在DataWorks中,任务流在调度时间到达时停止运行,是什么原因导致的
    117
  • 4
    DataWorks操作报错合集之DataWorks ODPS数据同步后,timesramp遇到时区问题,解决方法是什么
    101
  • 5
    DataWorks操作报错合集之DataWorks配置参数在开发环境进行调度,参数解析不出来,收到了 "Table does not exist" 的错误,该怎么处理
    110
  • 6
    DataWorks操作报错合集之DataWorks中udf开发完后,本地和在MaxCompute的工作区可以执行函数查询,但是在datawork里报错FAILED: ODPS-0130071:[2,5],是什么原因
    123
  • 7
    DataWorks操作报错合集之DataWorks提交失败: 提交节点的源码内容到TSP(代码库)失败:"skynet_packageid is null,该怎么解决
    129
  • 8
    DataWorks操作报错合集之DataWorks在同步mysql时报错Code:[Framework-02],mysql里面有个json类型字段,是什么原因导致的
    170
  • 9
    DataWorks操作报错合集之DataWorks集成实例绑定到同一个vpc下面,也添加了RDS的IP白名单报错:数据源配置有误,请检查,该怎么处理
    93
  • 10
    DataWorks操作报错合集之在 DataWorks 中运行了一个 Hologres 表的任务并完成了执行,但是在 Hologres 表中没有看到数据,该怎么解决
    133