CodeSample小助手 2019-12-31
使用KMS创建一个主密钥,使用主密钥生成一个数据密钥,再使用数据密钥在本地加解密数据。这种场景适用于大量数据的加解密。具体架构如下所示:
图例 | 含义 |
---|---|
用户主密钥(CMK) | |
明文数据密钥 | |
密文数据密钥 | |
明文文件 | |
密文文件 |
您可以调用以下KMS API,在本地对数据进行加解密。
API名称 | 说明 |
---|---|
CreateKey | 创建用户主密钥(CMK)。 |
CreateAlias | 为指定用户主密钥创建一个别名。 |
GenerateDataKey | 在线生成数据密钥,用指定CMK加密数据密钥后,返回数据密钥的密文和明文。 |
Decrypt | 解密KMS直接加密的数据(包括GenerateDataKey产生的数据密钥的密文),不需要指定CMK。 |
$ aliyun kms CreateKey
{
"KeyMetadata": {
"CreationDate": "2019-04-08T07:45:54Z",
"Description": "",
"KeyId": "1234abcd-12ab-34cd-56ef-12345678****",
"KeyState": "Enabled",
"KeyUsage": "ENCRYPT/DECRYPT",
"DeleteDate": "",
"Creator": "111122223333",
"Arn": "acs:kms:cn-hangzhou:111122223333:key/1234abcd-12ab-34cd-56ef-12345678****",
"Origin": "Aliyun_KMS",
"MaterialExpireTime": ""
},
"RequestId": "2a37b168-9fa0-4d71-aba4-2077dd9e80df"
}
给主密钥添加别名(可选)
$ aliyun kms CreateAlias --AliasName alias/Apollo/WorkKey --KeyId 1234abcd-12ab-34cd-56ef-1234567890ab
alias/Apollo/WorkKey
#!/usr/bin/env python
#coding=utf-8
import json
import base64
from Crypto.Cipher import AES
from aliyunsdkcore import client
from aliyunsdkkms.request.v20160120 import GenerateDataKeyRequest
def KmsGenerateDataKey(client, key_alias):
request = GenerateDataKeyRequest.GenerateDataKeyRequest()
request.set_accept_format('JSON')
request.set_KeyId(key_alias)
request.set_NumberOfBytes(32)
response = json.loads(client.do_action(request))
datakey_encrypted = response["CiphertextBlob"]
datakey_plaintext = response["Plaintext"]
return (datakey_plaintext, datakey_encrypted)
def ReadTextFile(in_file):
file = open(in_file, 'r')
content = file.read()
file.close()
return content
def WriteTextFile(out_file, lines):
file = open(out_file, 'w')
for ln in lines:
file.write(ln)
file.write('\n')
file.close()
# Out file format (text)
# Line 1: b64 encoded data key
# Line 2: b64 encoded IV
# Line 3: b64 encoded ciphertext
# Line 4: b64 encoded authentication tag
def LocalEncrypt(datakey_plaintext, datakey_encrypted, in_file, out_file):
data_key_binary = base64.b64decode(datakey_plaintext)
cipher = AES.new(data_key_binary, AES.MODE_EAX)
in_content = ReadTextFile(in_file)
ciphertext, tag = cipher.encrypt_and_digest(in_content)
lines = [datakey_encrypted, base64.b64encode(cipher.nonce), base64.b64encode(ciphertext), base64.b64encode(tag)];
WriteTextFile(out_file, lines)
clt = client.AcsClient('Access-Key-Id','Access-Key-Secret','Region-Id')
key_alias = 'alias/Apollo/WorkKey'
in_file = './data/sales.csv'
out_file = './data/sales.csv.cipher'
# Generate Data Key
datakey = KmsGenerateDataKey(clt, key_alias)
# Locally Encrypt the sales record
LocalEncrypt(datakey[0], datakey[1], in_file, out_file)
解密本地文件(对应架构图中的第3步,用明文密钥为本地数据解密。)
#!/usr/bin/env python
#coding=utf-8
import json
import base64
from Crypto.Cipher import AES
from aliyunsdkcore import client
from aliyunsdkkms.request.v20160120 import DecryptRequest
def KmsDecrypt(client, ciphertext):
request = DecryptRequest.DecryptRequest()
request.set_accept_format('JSON')
request.set_CiphertextBlob(ciphertext)
response = json.loads(clt.do_action(request))
return response.get("Plaintext")
def ReadTextFile(in_file):
file = open(in_file, 'r')
lines = []
for ln in file:
lines.append(ln)
file.close()
return lines
def WriteTextFile(out_file, content):
file = open(out_file, 'w')
file.write(content)
file.close()
def LocalDecrypt(datakey, iv, ciphertext, tag, out_file):
cipher = AES.new(datakey, AES.MODE_EAX, iv)
data = cipher.decrypt_and_verify(ciphertext, tag).decode('utf-8')
WriteTextFile(out_file, data)
clt = client.AcsClient('Access-Key-Id','Access-Key-Secret','Region-Id')
in_file = './data/sales.csv.cipher'
out_file = './data/decrypted_sales.csv'
# Read encrypted file
in_lines = ReadTextFile(in_file)
# Decrypt data key
datakey = KmsDecrypt(clt, in_lines[0])
# Locally decrypt the sales record
LocalDecrypt(
base64.b64decode(datakey),
base64.b64decode(in_lines[1]), # IV
base64.b64decode(in_lines[2]), # Ciphertext
base64.b64decode(in_lines[3]), # Authentication tag
out_file
)