计算出得到KL散度
由上式可知
相对熵KL(A||C) > KL(A||B) 说明A和B之间的概率分布在信息量角度更为接近 而通过概率分布可视化观察,也认为A和B更为接近,两者吻合
相对熵与PSI之间的关系
PSI公式变形
将PSI计算公式变形后可以分解为2项
结论
- PSI本质上是实际分布(A)与预期分布(E)的KL散度的一个对称化操作
- 双向计算相对熵,并把两部分相对熵相加,从而更为全面地描述两个分布的差异
PSI指标的业务应用
实际评估需要分不同粒度
- 时间粒度(按月、按样本集)
- 订单层次(放贷层、申请层)
- 人群(若没有分群建模,可忽略)
时间窗
时间窗尽可能至今为止 有可能建模时间窗稳定 但近期时间窗出现不稳定
评估过程
- 先在放贷样本上计算PSI,剔除不稳定的特征;
- 再对申请样本抽样(可能数据太大)
- 计算PSI再次筛选
建议先看变量数据分布(EDD)
PSI只是一个宏观的指标 建议先看变量数据分布(EDD) 看分位数跨时间变化来检验数据质量
PSI计算细节也予以保留
无法得知PSI上升时,数据分布是左偏还是右偏 建议把PSI计算细节也予以保留 便于在模型不稳定时,第一时间排查问题
PSI源码
https://gitee.com/pingfanrenbiji/population-stability-index-argo/blob/master/psi.py
结合具体业务实现需求
经过上面的学习 咱们已经知道了PSI是个什么玩意了 而且还有了实现好的算法源码 该算法需要2方面的数据 一个是实际数据(训练数据) 实际数据即是训练数据集中的标签列 比如贷款数据样本 标签列为是否按时还款 另一个是预期数据(预测数据根据模型得到的预测结果) 预期样本是 根据训练出来的模型 对于即将要贷款的用户进行预测是否会还款 接下来就要结合自己公司的业务来得到这块数据调用算法就可以了
搭建python服务
公司让我来做psi 因为我是用java开发的嘛 其实是想让我用java来实现psi 我当时就感觉不太合理 java不合适写算法 后来的调研也确认了这个想法 算法还是得需要用ptyhon来写 且都已经实现好了 理解了原理之后 直接就可以用了 用python写的话 对于写java的我来说 有点难度 但稍微克服一下就可以了
python web框架 flask
源码demo
https://gitee.com/pingfanrenbiji/simple-model-monitor-demo
基本语法记录一下
- restful api
from flask_restful import reqparse, abort, Api, Resource @app.route('/test/<jobid>', methods=['POST']) def test_psi_last(jobid): return "成功"
注意:参数用<> 而非{}
- sql查询语句执行 返回元组类型
import pymysql jobid = '1111111' # 创建数据库连接 conn = pymysql.connect( host=mysql_host, user=mysql_user, password=mysql_password, db=mysql_db, port=mysql_port ) # 获取操作游标,也就是开始操作 cur = conn.cursor() # 定义查询sql语句 该sql返回一条数据 sql = """ select id from studio_algor_param_record where job_no=%s order by createtime desc limit 1; """ #执行查询 cur.execute(sql, (jobid)) # 获取查询结果 mid = cur.fetchall() #关闭连接 conn.close() # mid是tuple元组数据类型((xxxx,)) res_list = [x[0] for x in mid] # 获取xxxx字符串 mid = res_list[0]
- 通过pandas执行sql查询返回dataframe类型
# 数据库连接的获取同上 # 查询sql语句 返回list集合 sql = 'select predict_result from studio_model_predict_result where mid =\'' + mid + '\'' # 通过pandas执行sql语句得到dataframe类型的数据结果 df = pd.read_sql(sql, con=conn)
- 插入sql
# 数据库连接的获取同上 # 获取操作游标,也就是开始操作 cur = conn.cursor() # 插入sql语句 sql = """ insert into studio_job_step_files (id,jobid,file_type,path,remark) values (%s,%s,%s,%s,%s) """ # 生成uuid唯一id id = str(uuid.uuid1()).replace('-', '') # 执行sql cur.execute(sql, (id, jobid, '100', py_test, result)) # 提交事务 conn.commit() # 关闭连接 conn.close()
- 使用alluxo(内存文件系统)
alluxio client源码
https://gitee.com/pingfanrenbiji/simple-model-monitor-demo/tree/master/alluxio
连接alluxio
import alluxio client = alluxio.Client(alluxio_ip, alluxio_port)
获取文件
got = None # alluxio文件路径 path = "/poc/d1e5a1efbe764f9d80b200d3a90c5bdf" with client.open(path, 'r') as f: got = f.read()
写入本地文件
# 本地文件路径 py_test_csv = "test.csv" # 将读取到的alluxio文件写入本地文件 with open(py_test_csv, 'w') as f: f.write(got.decode('utf-8')) 注:write方法需要字符串类型的参数 got是字节流类型 通过decode解码得到字符串类型
读取csv文件得到dataframe类型的数据
df = pd.read_csv(py_test_csv, sep=",") # 获取执行列 df['y']
上传文件
# 将dataframe类型的数据写入alluxio文件 # result_detail是dataframe类型的数据 with client.open(py_test, 'w') as f: f.write(bytes(result_detail.to_json().encode("utf8")))
后记
上述过程花了2天时间 从对PSI概念不了解到慢慢了解 没有写过Python工程的代码 连if else语法都要现学 到学会使用Python语言解决问题 所以呀 要相信自己的潜力 以解决问题为目标 不断学习相关的知识 想办法使用能够使用的资源 你一定可以战胜它的