# coding=utf-8
"""
作者:gaojs
功能:
新增功能:
日期:2022/4/8 18:03
"""
import json
import os.path
import pprint
import time
import requests
# 定义xml转json的函数
import xmltodict as xmltodict
# 强制去掉控制台InsecureRequestWarning
import urllib3
urllib3.disable_warnings()
class Stress:
"""
旁路认证类
"""
def __init__(self):
self.headers = {
'Content-Type': 'text/xml',
'Connection': 'close',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.93 Safari/537.36'
}
self.session = requests.session()
self.session.keep_alive = False
self.proxies = {"http": None, "https": None}
def xmltojson(self, xml):
"""
xml转成json
:param xml:
:return:
"""
xml_data = xmltodict.parse(xml)
json_data = json.dumps(xml_data, indent=1)
return json_data
def post_random(self, vsiteIp, appId):
"""
获取十位随机数方法
:return:
"""
url = 'https://%s/auth/getRandom' % vsiteIp
data = '<?xml version="1.0" encoding="UTF-8"?>' \
'<message>' \
'<head>' \
'<version>1.0</version>' \
'<serviceType>OriginalService</serviceType>' \
'</head>'\
'<body>' \
f'<appId>{appId}</appId>' \
'</body>' \
'</message>'
try:
rsp = self.session.post(url, data=data, headers=self.headers, verify=False, proxies=self.proxies)
# print(rsp.text)
json_data = self.xmltojson(rsp.text)
json_data = json.loads(json_data)
random_value = json_data['message']['body']['original']
except requests.exceptions.ConnectionError:
rsp.status_code = "Connection refused"
# print(json_data, type(json_data))
# print(random_value)
return random_value
def certificate_list(self):
"""
获取证书列表
:return:
"""
url = r'https://127.0.0.1:63451/NSSkfGetCertsListInfo?DllFilePath=D:\旁路认证\GM3000\mtoken_GM.dll'
rsp = self.session.get(url, verify=False, proxies=self.proxies)
json_data = json.loads(rsp.text)
# print(json_data[0])
def skfAttach(self, PlainText, CertIndex0, ukeyPIN, sm3HashNum):
"""
skfAttach签名
:return:
"""
url = f'https://127.0.0.1:63451/NSSkfAttachedSign?PlainText={PlainText}&CertIndex={CertIndex0}&UsbKeyPin={ukeyPIN}&DigestArithmetic={sm3HashNum}'
try:
rsp = self.session.get(url, headers=self.headers, verify=False, proxies=self.proxies)
json_data = json.loads(rsp.text)
signature_value = json_data[0]['signedData']
# print(signature_value)
# print(signature_value)
# 接口返回的述职和界面上的述职不太一样了需要吧‘-’和‘_’替换一下才行
s1 = signature_value.replace('-', '+')
s2 = s1.replace('_', '/')
return s2
except requests.exceptions.ConnectionError:
rsp.status_code = "Connection refused"
def skfDetach(self, PlainText, CertIndex, ukeyPIN, sm3HashNum):
"""
skfdetach签名
PlainText = 十位随机数
CertIndex = 1
sm3HashNum = 1.2.156.10197.1.401
ukeyPIN = 12345678
:return:
"""
url = f'https://127.0.0.1:63451/NSSkfDetachedSign?PlainText={PlainText}&CertIndex={CertIndex}&UsbKeyPin={ukeyPIN}&DigestArithmetic={sm3HashNum}'
try:
rsp = self.session.get(url, headers=self.headers, verify=False, proxies=self.proxies)
time.sleep(3)
json_data = json.loads(rsp.text)
signature_value = json_data[0]['signedData']
# print(signature_value)
# 接口返回的述职和界面上的述职不太一样了需要吧‘-’和‘_’替换一下才行
s1 = signature_value.replace('-', '+')
s2 = s1.replace('_', '/')
return s2
except requests.exceptions.ConnectionError:
rsp.status_code = "Connection refused"
# print(signature_value)
def detach_auth(self, vsiteIp, appId):
"""
带着生成的数字签名去认证
:return:
"""
random_value = self.post_random(vsiteIp, appId)
print(random_value)
# 获取证书列表
self.certificate_list()
# detach数字签名
detach_data = self.skfDetach(PlainText=random_value, CertIndex='0', ukeyPIN='12345678', sm3HashNum='1.2.156.10197.1.401')
url = 'https://%s/auth/authUser' % vsiteIp
data = f'''<?xml version="1.0" encoding="utf-8"?>
<message>
<head>
<version>1.0</version>
<serviceType>authenService</serviceType>
</head>
<body>
<appId>T1</appId>
<authen>
<authCredential authMode="password">
<uname>test</uname>
<pwd>test</pwd>
</authCredential>
<authCredential authMode="cert">
<detach>{detach_data}</detach>
<original>{random_value}</original>
</authCredential>
</authen>
<attributes attributeType="portion">
<attr name="X509Certificate.SubjectDN"></attr>
</attributes>
</body>
</message>
'''
rsp = self.session.post(url, headers=self.headers, data=data, verify=False, proxies=self.proxies)
json_data = self.xmltojson(rsp.text)
json_body = json.loads(json_data)['message']['body']['attributes']['attr']['#text']
print(json_body)
return json_data
# 测试接口代码
test = Stress()
# rsp = test.detach_auth(vsiteIp='192.168.120.209', appId='T1', CertIndex='0', ukeyPIN='12345678', sm3HashNum='1.2.156.10197.1.401')
# print(rsp)
for i in range(100000):
rsp = test.detach_auth(vsiteIp='192.168.120.209', appId='T1')
text = json.loads(rsp)['message']['body']['attributes']['attr']['#text']
print(f'================================================第 {i+1} 次身份认证成功!===============================================')
with open('log.txt', mode='a', encoding='utf-8') as f:
f.write(f'===========================================第 {i+1} 次身份认证成功!============================================\n')
f.write(text + '\n')
print(text)