用过MaxComputer的同学,都知道MaxComputer SQL只支持常用DDLDML语句,不支持存储过程的开发,但我们常常为解决业务逻辑的时候需要使用循环或递归,那这个在MaxComputer里面该怎么实现呢?
常用的方式应该是脚本内嵌SQL,本文介绍的便是使用python内嵌SQL的方法实现这一过程。使用这一方法主要考虑到以下两点:
- MaxComputer 是支持Python SDK,PyODPS便是MaxCompute的Python版本的SDK,它提供了对MaxCompute对象的基本操作和DataFrame框架。
-
用户不需要手动定义ODPS入口。DataWorks的PyODPS节点中,将会包含一个全局的变量 odps或o,即ODPS入口。
print(o.exist_table('pyodps_iris')) --官方示例
场景描述
某公司产品经理需要根据客户的征信情况分析客户的风险属性,但是第三方提供的接口(exo_cust_credit_record)里面的查询结果(searchresult)是JSON字符串,产品经理无法根据这种数据分析客户的风险属性,需要将JSON内的每一条返回项输出到明细表(exo_cust_credit_detail)
场景样例
外部平台征信查询记录表
drop table if exists pwork.`exo_cust_credit_record`;
CREATE TABLE pwork.`exo_cust_credit_record` (
`id` BIGINT,
`username` STRING COMMENT '用户名',
`idnumber` STRING COMMENT '身份证号码',
`mobile` STRING COMMENT '手机号',
`searchresult` STRING COMMENT '查询结果',
`ctime` DATETIME COMMENT '创建时间',
`utime` DATETIME COMMENT '最近更新时间'
)
COMMENT '外部平台征信查询记录';
客户征信查询明细
drop table if exists pwork.`exo_cust_credit_detail`;
CREATE TABLE pwork.`exo_cust_credit_detail` (
`username` STRING COMMENT '用户名',
`idnumber` STRING COMMENT '身份证号码',
`mobile` STRING COMMENT '手机号',
`hit_type` STRING COMMENT '命中类型',
`is_hit` bigint COMMENT '是否命中'
)
COMMENT '客户征信查询明细';
配置表
drop table if exists pwork.`t99_hit_configer`;
CREATE TABLE pwork.`t99_hit_configer` (
id bigint
,hit_type string COMMENT '命中类型'
,`desc` STRING COMMENT '类型描述'
,column_name STRING COMMENT 'json列名'
)
COMMENT '配置表';
插入测试数据
insert overwrite table pwork.`t99_hit_configer`
values (1,'1001','身份证号查询高危行为','id_abnormal')
,(2,'1002','身份证号查询电信欠费','id_phone_overdue')
,(3,'1003','身份证号查询法院失信人','id_court_bad')
;
insert overwrite table pwork.`exo_cust_credit_record`
values (1,'张三','311121200001010001','13100000001','{"id_abnormal":0,"id_phone_overdue":1,"id_court_bad":0}',getdate(),getdate())
,(1,'李四','311121200001010002','13100000002','{"id_abnormal":1,"id_court_bad":0}',getdate(),getdate())
,(1,'王五','311121200001010003','13100000003','{}',getdate(),getdate())
;
创建PYODPS节点任务实现SQL循环
代码如下
i=1
with odps.execute_sql('select * from pwork.`t99_hit_configer`').open_reader() as reader:
for record in reader:
hit_type = record.hit_type
column_name = record.column_name
print hit_type,column_name
### 定义insert类型,覆盖原有数据
type = 'overwrite' if(i==1) else 'into'
i+=1
sql = '''
insert %s table pwork.`exo_cust_credit_detail`
select distinct
username
,idnumber
,mobile
,'%s' -- hit_type
,get_json_object(searchresult,'$.%s') -- hit_status
from pwork.`exo_cust_credit_record`
where get_json_object(searchresult,'$.%s') is not null
'''%(type,hit_type,column_name,column_name)
print (sql)
odps.execute_sql(sql)
print str(hit_type),'加载完成'
执行下代码,来看下exo_cust_credit_detail的内容:
select * from pwork.`exo_cust_credit_detail`
order by username,hit_type
limit 10;
参考文档:
MaxComputer用户指南:https://help.aliyun.com/product/27797.html?spm=a2c4g.11186623.3.1.2eaf304cXjjJ2i