MaxComputer-Pyodps之使用pyodps实现sql的循环

简介: 用过MaxComputer的同学,都知道MaxComputer SQL只支持常用DDL\DML语句,不支持存储过程的开发,但我们常常为解决业务逻辑的时候需要使用循环或递归,那这个在MaxComputer里面该怎么实现呢?

用过MaxComputer的同学,都知道MaxComputer SQL只支持常用DDLDML语句,不支持存储过程的开发,但我们常常为解决业务逻辑的时候需要使用循环或递归,那这个在MaxComputer里面该怎么实现呢?

常用的方式应该是脚本内嵌SQL,本文介绍的便是使用python内嵌SQL的方法实现这一过程。使用这一方法主要考虑到以下两点:

  • MaxComputer 是支持Python SDK,PyODPS便是MaxCompute的Python版本的SDK,它提供了对MaxCompute对象的基本操作和DataFrame框架。
  • 用户不需要手动定义ODPS入口。DataWorks的PyODPS节点中,将会包含一个全局的变量 odps或o,即ODPS入口。

        print(o.exist_table('pyodps_iris'))  --官方示例
    

场景描述

某公司产品经理需要根据客户的征信情况分析客户的风险属性,但是第三方提供的接口(exo_cust_credit_record)里面的查询结果(searchresult)是JSON字符串,产品经理无法根据这种数据分析客户的风险属性,需要将JSON内的每一条返回项输出到明细表(exo_cust_credit_detail)

场景样例

外部平台征信查询记录表

    drop table if exists pwork.`exo_cust_credit_record`;
    CREATE TABLE pwork.`exo_cust_credit_record` (
        `id` BIGINT,
        `username` STRING COMMENT '用户名',
        `idnumber` STRING COMMENT '身份证号码',
        `mobile`   STRING COMMENT '手机号',
        `searchresult` STRING COMMENT '查询结果',
        `ctime` DATETIME COMMENT '创建时间',
        `utime` DATETIME COMMENT '最近更新时间'
    )
    COMMENT '外部平台征信查询记录';

客户征信查询明细

    drop table if exists pwork.`exo_cust_credit_detail`;
    CREATE TABLE pwork.`exo_cust_credit_detail` ( 
        `username` STRING COMMENT '用户名',
        `idnumber` STRING COMMENT '身份证号码',
        `mobile`   STRING COMMENT '手机号',
        `hit_type` STRING COMMENT '命中类型',
        `is_hit`   bigint COMMENT '是否命中'
    )
    COMMENT '客户征信查询明细';

配置表

    drop table if exists pwork.`t99_hit_configer`;
    CREATE TABLE pwork.`t99_hit_configer` ( 
         id  bigint
          ,hit_type string COMMENT '命中类型'
          ,`desc`   STRING COMMENT '类型描述'
        ,column_name STRING COMMENT 'json列名'
    )
    COMMENT '配置表';

插入测试数据

    insert overwrite table pwork.`t99_hit_configer`
    values (1,'1001','身份证号查询高危行为','id_abnormal')
    ,(2,'1002','身份证号查询电信欠费','id_phone_overdue')
    ,(3,'1003','身份证号查询法院失信人','id_court_bad')
    ;
    
    insert overwrite table pwork.`exo_cust_credit_record`
    values (1,'张三','311121200001010001','13100000001','{"id_abnormal":0,"id_phone_overdue":1,"id_court_bad":0}',getdate(),getdate())
    ,(1,'李四','311121200001010002','13100000002','{"id_abnormal":1,"id_court_bad":0}',getdate(),getdate())
    ,(1,'王五','311121200001010003','13100000003','{}',getdate(),getdate())
    ;

创建PYODPS节点任务实现SQL循环

代码如下

    i=1
    with odps.execute_sql('select * from pwork.`t99_hit_configer`').open_reader() as reader:
        for record in reader:
            
            hit_type = record.hit_type
            column_name = record.column_name 
            
            print hit_type,column_name
    
        ### 定义insert类型,覆盖原有数据
            type = 'overwrite' if(i==1) else 'into'
            i+=1
    
            sql = '''
                insert %s table pwork.`exo_cust_credit_detail` 
                select  distinct
                          username
                         ,idnumber
                         ,mobile
                         ,'%s'  -- hit_type
                         ,get_json_object(searchresult,'$.%s')    -- hit_status 
                from pwork.`exo_cust_credit_record`
                where  get_json_object(searchresult,'$.%s') is not null
                '''%(type,hit_type,column_name,column_name)
            print (sql)
            odps.execute_sql(sql)
            print str(hit_type),'加载完成'

执行下代码,来看下exo_cust_credit_detail的内容:

    select * from pwork.`exo_cust_credit_detail`
    order by username,hit_type
    limit 10;

image

参考文档:

MaxComputer用户指南:https://help.aliyun.com/product/27797.html?spm=a2c4g.11186623.3.1.2eaf304cXjjJ2i

相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
相关文章
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之如何在SQL语句里使用CASE WHEN语句
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
435 2
|
SQL 分布式计算 DataWorks
DataWorks操作报错合集之新建项目的元数据的sql报错,如何解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
185 0
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之使用API调用ODPS SQL时,出现资源被定时任务抢占,该怎么办
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
237 32
|
SQL 分布式计算 DataWorks
DataWorks产品使用合集之未保存的ODPS SQL语句该如何找回
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
176 2
|
SQL 关系型数据库 MySQL
INSERT INTO t_a.tableName SELECT * FROM t_b.tableName 如何通过定义一个list对象,包含多个tableName,循环执行前面的sql,用MySQL的语法写
【8月更文挑战第7天】INSERT INTO t_a.tableName SELECT * FROM t_b.tableName 如何通过定义一个list对象,包含多个tableName,循环执行前面的sql,用MySQL的语法写
198 5
|
SQL 分布式计算 DataWorks
DataWorks操作报错合集之使用sql查询报错无权限,是什么原因
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
SQL 运维 分布式计算
DataWorks产品使用合集之ODPPS中如何使用SQL查询从表中获取值并将其赋值给临时变量以供后续使用
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
SQL DataWorks 安全
DataWorks产品使用合集之是否支持调度StarRocks的SQL任务
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
SQL DataWorks 关系型数据库
DataWorks产品使用合集之数据集成时源头提供数据库自定义函数调用返回数据,数据源端是否可以写自定义SQL实现
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
SQL 分布式计算 DataWorks
DataWorks操作报错合集之在执行SQL查询时遇到报错,代码为[XX000],该怎么解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。