隐语 Unbalanced PSI Benchmark 白皮书
原创王细亮、张磊隐语的小剧场 2023-03-22 16:06 发表于浙江
收录于合集
#隐语解读20个
#星河杯大赛专辑6个
导语
去年10月份,隐语发布了PSI的性能数据,引起了内部和外部用户的广泛关注,具体协议包括:ecdh/kkrt16/bc22协议,这些协议更适合双方数据量差别不大的场景,称为平衡PSI(Balanced PSI)。在现实的隐私求交场景中,有时双方数据量级差异很大,例如:百万vs十亿,2千万vs 20亿。针对这种场景,隐语实现并开源了专门的非平衡PSI(Unbalanced PSI)协议,能得到更好的性能。具体来讲:与ecdh-psi对比,ecdh-psi在大数据集上进行两次加密操作。隐语实现的非平衡PSI只在大数据集上进行一次加密操作,在大数据集与小数据集的体量相差非常大的时候,总体计算量和运行时间大约是ecdh-psi的1/2。非平衡PSI还把协议分成离线和在线(offline/online)两个阶段,在提前执行离线(offline)缓存的情形下,在线阶段只需10多分钟即可完成在线(online)协议,得到交集结果。本文给出隐语非平衡PSI协议(Unbalanced PSI)的具体测试环境、步骤、和数据,供大家参考。备注:后续本文有增删订正内容,请参见留言区。
复现路径
一、测试机型
- Python:3.8
- pip: >= 19.3
- OS: CentOS 7
- CPU/Memory: 推荐最低配置是 8C16G
- 硬盘:500G
二、安装conda
使用conda管理python环境,如果机器没有conda需要先安装。步骤如下:
#sudo apt-get install wget
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
#安装
bash Miniconda3-latest-Linux-x86_64.sh
# 一直按回车然后输入yes
please answer 'yes' or 'no':
>>> yes
# 选择安装路径, 文件名前加点号表示隐藏文件
Miniconda3 will now be installed into this location:
>>> ~/.miniconda3
# 添加配置信息到 ~/.bashrc文件
Do you wish the installer to initialize Miniconda3 by running conda init? [yes|no]
[no] >>> yes
#运行配置信息文件或重启电脑
source ~/.bashrc
#测试是否安装成功,有显示版本号表示安装成功
conda --version
三、安装secretflow
conda create -n sf-benchmark python=3.8 conda activate sf-benchmark pip install -U secretflow
四、创建节点并启动集群
- 创建ray header节点
创建ray header节点,选择一台机器为主机,在主机上执行如下命令,ip替换为主机的内网ip,命名为alice,端口选择一个空闲端口即可注意:192.168.0.1 ip为mock,请替换为实际的ip地址
RAY_DISABLE_REMOTE_CODE=true \ray start --head --node-ip-address="192.168.0.1" --port="9394" --resources='{"alice": 8}' --include-dashboard=False
- 创建从属节点
创建从属节点,在bob机器执行如下命令,ip依然填alice机器的内网ip,命名为bob,端口不变
RAY_DISABLE_REMOTE_CODE=true \ray start --address="192.168.0.1:9394" --resources='{"bob": 8}'
五、数据要求Alice方:2000万Bob方:20亿交集:1000万
六、Benchmark脚本
脚本分为offline和online,offline用于对大数据方的setup、online对小数据的执行基于ecdh的oprf协议。参考阅读:非平衡隐私集合求交(Unbalanced PSI)协议介绍。
- offline脚本:
import os import sys import time import logging import multiprocess from absl import app import spu import secretflow as sf #import random # init log logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) # SPU settings cluster_def = { 'nodes': [ # <<< !!! >>> replace <192.168.0.1:17268> to alice node's local ip & free port {'party': 'alice', 'id': 'local:0', 'address': f'192.168.0.1:17268'}, # <<< !!! >>> replace <192.168.0.2:17269> to bob node's local ip & free port {'party': 'bob', 'id': 'local:1', 'address': f'192.168.0.2:17269'}, ], 'runtime_config': { 'protocol': spu.spu_pb2.SEMI2K, 'field': spu.spu_pb2.FM128, }, } link_desc = { 'recv_timeout_ms': 3600000, } def main(_): # sf init # <<< !!! >>> replace <192.168.0.1:9394> to your ray head sf.shutdown() sf.init(['alice','bob'],address='192.168.0.1:9394',log_to_driver=True,omp_num_threads=multiprocess.cpu_count()) # init log logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) alice = sf.PYU('alice') bob = sf.PYU('bob') #carol = sf.PYU('carol') # <<< !!! >>> replace path to real parties local file path. offline_input_path = { alice: 'dummyalice.csv', bob: '/root/benchmark/unbalanced_200000w.csv', } select_keys = { alice: ['id'], bob: ['id'], } spu = sf.SPU(cluster_def, link_desc) # offline print("=====offline phase====") start = time.time() offline_output_path = { alice: "/data/unbalanced_2000w_out.csv", bob: "/data/unbalanced_200000w_out.csv", } offline_preprocess_path = "/root/benchmark/offline_out/offline_psi0107.csv" secret_key = "000102030405060708090a0b0c0d0e0ff0e0d0c0b0a090807060504030201000" secret_key_path = "/root/benchmark/secret_key.bin" with open(secret_key_path, 'wb') as f: f.write(bytes.fromhex(secret_key)) reports = spu.psi_csv( key=select_keys, input_path=offline_input_path, output_path=offline_output_path, receiver='alice', # if `broadcast_result=False`, only receiver can get output file. protocol='ECDH_OPRF_UB_PSI_2PC_OFFLINE', # psi protocol precheck_input=False, # will cost ext time if set True sort=True, # will cost ext time if set True broadcast_result=False, # will cost ext time if set True bucket_size=10000000, curve_type="CURVE_FOURQ", preprocess_path=offline_preprocess_path, ecdh_secret_key_path=secret_key_path, ) #print(f"psi reports: {reports}") logging.info(f"offline psi reports: {reports}") logging.info(f"cost time: {time.time() - start}") sf.shutdown() if __name__ == '__main__': app.run(main)
online脚本:
import os import sys import time # import random import logging import multiprocess from absl import app import spu import secretflow as sf # init log logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) # SPU settings cluster_def = { 'nodes': [ # <<< !!! >>> replace <192.168.0.1:13666> to alice node's local ip & free port {'party': 'alice', 'id': 'local:0', 'address': f'192.168.0.1:13666'}, # <<< !!! >>> replace <192.168.0.2:12946> to bob node's local ip & free port {'party': 'bob', 'id': 'local:1', 'address': f'192.168.0.1:13667'}, ], 'runtime_config': { 'protocol': spu.spu_pb2.SEMI2K, 'field': spu.spu_pb2.FM128, }, } link_desc = { 'recv_timeout_ms': 3600000, } def main(_): # sf init # <<< !!! >>> replace <192.168.0.1:9394> to your ray head sf.shutdown() sf.init(['alice','bob'],address='192.168.0.1:9394',log_to_driver=True,omp_num_threads=multiprocess.cpu_count()) # init log logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) alice = sf.PYU('alice') bob = sf.PYU('bob') # <<< !!! >>> replace path to real parties local file path. online_input_path = { alice: '/root/benchmark/unbalanced_2000w.csv', bob: 'dummy.bob.csv', } output_path = { alice: '/data/unbalanced_20000wvs2000w.csv', bob: '/data/unbalanced_20000wvs2000w.csv', } select_keys = { alice: ['id'], bob: ['id'], } spu = sf.SPU(cluster_def, link_desc) offline_preprocess_path = "/root/benchmark/offline_out/offline_psi0107.csv" secret_key_path = "/root/benchmark/secret_key.bin" # online print("=====online phase====") start = time.time() reports = spu.psi_csv( key=select_keys, input_path=online_input_path, output_path=output_path, receiver='alice', # if `broadcast_result=False`, only receiver can get output file. protocol='ECDH_OPRF_UB_PSI_2PC_ONLINE', # psi protocol precheck_input=True, # will cost ext time if set True sort=True, # will cost ext time if set True broadcast_result=False, # will cost ext time if set True bucket_size=100000000, curve_type="CURVE_FOURQ", preprocess_path=offline_preprocess_path, ecdh_secret_key_path=secret_key_path, ) #print(f"psi reports: {reports}") logging.info(f"online psi reports: {reports}") logging.info(f"cost time: {time.time() - start}") sf.shutdown() if __name__ == '__main__': app.run(main)
- Benchmark报告
备注:目前是小数据方到大数据方的交集结果,如果大数据方到小数据方的交集结果online运行时间会增加一倍 。
解读:• Unbalanced PSI的offline阶段,大数据方对大数据集(20亿)进行加密并发送(截取12B)到小数据方,加密(ecc点乘)可以进行并发计算,CPU的核数增大可以提高性能。可以看到性能数据基本和CPU的核数成线性关系,网络带宽消耗不大。• Unbalanced PSI的online阶段,可以划分为两部分子阶段,对小数据集数据执行ecdh-oprf得到小数据集的加密结果;小数据集加密结果和offline阶段的到大数据集加密数据进行比较的到交集。总体计算量和传输量不大,运行时间在10几分钟左右。
- Balanced PSI Benchmark报告
为了方便大家对比,同等硬件资源和数据规模下,对平衡PSI做了性能测试,报告如下:
解读:
Balanced PSI 跑20亿*2000万规模数据时,在8C16G资源下CPU成为瓶颈,机器资源提升到64C128G后LAN 环境 79分钟完成任务执行。
参考文章:1.非平衡隐私集合求交(Unbalanced PSI)协议介绍2.隐语 balanced psi Benchmark 白皮书