前期准备
import SparkApi
import os
from dotenv import load_dotenv, find_dotenv
#以下密钥信息从控制台获取
_=load_dotenv(find_dotenv())
appid = os.getenv("SPARK_APP_ID")
api_secret=os.getenv("SPARK_APP_SECRET")
api_key=os.getenv("SPARK_APP_KEY")
#用于配置大模型版本,默认“general/generalv2”
# domain = "general" # v1.5版本
domain = "generalv2" # v2.0版本
#云端环境的服务地址
# Spark_url = "ws://spark-api.xf-yun.com/v1.1/chat" # v1.5环境的地址
Spark_url = "ws://spark-api.xf-yun.com/v2.1/chat" # v2.0环境的地址ws(s)://spark-api.xf-yun.com/v2.1/chat
text =[]
Prompt 内容
def getText(role,content):
jsoncon = {}
jsoncon["role"] = role
jsoncon["content"] = content
text.append(jsoncon)
return text
def getlength(text):
length = 0
for content in text:
temp = content["content"]
leng = len(temp)
length += leng
return length
def checklen(text):
while (getlength(text) > 8000):
del text[0]
return text
if __name__ == '__main__':
text.clear
# 分隔符
delimiter = "####"
# 等价类划分法的Chain of Thought 的 prompt
ep_message=f"""{delimiter}等价类测试用例设计方法是把输入的参数域划分成若等价类,这些等价类包含了有效等价类和无效等价类,
有效等价类是指对于程序的规格说明来说是合理的,有意义的输入数据构成的集合,利用有效等价类可检验程序是否实现了规格说明中所规定的功能。
无效等价类是指对于程序的规格说明来说是不合理的,无意义的输入数据构成的集合,利用无效等价类可检验程序是否有效的避免了规格说明中所规定的功能以外的内容。
然后从每个等价类中选取少数代表性数据作为测试用例,每一类的代表性数据在测试中的作用等价于这一类中的其他值。
特别注意,一条测试用例可以覆盖多个有效等价类,一条测试用例只能覆盖一个无效等价类{delimiter}
使用等价类测试用例设计方法需要经过如下几步:{delimiter}
step1:{delimiter}对输入的参数进行等价类划分,在划分等价类的时候,应该遵从如下的一些原则:{delimiter}
在输入条件规定了输入值的集合或者规定了必须满足的条件的情况下,可确立一个有效等价类和一个无效等价类。
在输入条件是一个布尔量的情况下,可确定一个有效等价类和一个无效等价类。布尔量是一个二值枚举类型, 一个布尔量具有两种状态: true 和 false 。
在规定了输入数据的一组值(假定n个),并且程序要对每一个输入值分别处理的情况下,可确立n个有效等价类和一个无效等价类.例:输入条件说明输入字符为:中文、英文、阿拉伯文三种之一,则分别取这三种这三个值作为三个有效等价类,另外把三种字符之外的任何字符作为无效等价类。
在规定了输入数据必须遵守的规则的情况下,可确立一个有效等价类(符合规则)和若干个无效等价类(从不同角度违反规则)。
在确知已划分的等价类中各元素在程序处理中的方式不同的情况下,则应再将该等价类进一步的划分为更小的等价类{delimiter}
step2:{delimiter}将等价类转化成测试用例,按照[输入条件][有效等价类][无效等价类] 建立等价类表,等价类表可以用markdown的方式给出,列出所有划分出的等价类,为每一个等价类规定一个唯一的编号。
{delimiter}设计一个测试用例覆盖有效等价类的时候,需要这个测试用例使其尽可能多地覆盖尚未被覆盖地有效等价类,重复这一步。直到所有的有效等价类都被覆盖为止。
{delimiter}设计一个新的测试用例,使其仅覆盖一个尚未被覆盖的无效等价类,重复这一步.直到所有的无效等价类都被覆盖为止,测试用例用markdown 的的表格形式输出。{delimiter}
输出按照如下步骤输出:{delimiter}
step1:{delimiter} <step 1 reasoning >
step2:{delimiter} <step 2 reasoning >
测试用例:{delimiter} <response to customer>
"""
system_message = f"你是一名资深测试工程师,下面你会用等价类测试用例设计方法设计测试用例{ep_message},{delimiter},请根据下面的业务描述设计接口参数的入参:{delimiter}"
user_message = f"""被测系统是地铁车票自助购票软件系统需求,系统只接收 5元或10元纸币,一次只能使用一张纸币,车票只有两种面值 5 元或者 10 元。其中:
若投入5元纸币,并选择购买5元面值票,完成后出票,提示购票成功。
若投入5元纸币,并选择购买10元面值票,提示金额不足,并退回5元纸币。
若投入10元纸币,并选择购买5元面值票,完成后出票,提示购票成功,并找零5元。
若投入10元纸币,并选择购买10元面值票,完成购买后出票,提示购买成功。
若输入纸币后在规定时间内不选择票种类的按钮,退回的纸币,提示错误。
若选择购票按钮后不投入纸币,提示错误."""
question = checklen(getText("user",system_message+user_message))
SparkApi.main(appid,api_key,api_secret,Spark_url,domain,question)
输出结果:
据业务描述,我们可以将输入参数划分为以下等价类:
有效等价类 | 无效等价类 |
---|---|
投入5元纸币,并选择购买5元面值票 | 投入5元纸币,并选择购买10元面值票 |
投入5元纸币,并选择购买10元面值票 | 投入10元纸币,并选择购买5元面值票 |
投入10元纸币,并选择购买5元面值票 | 投入10元纸币,并选择购买10元面值票 |
输入纸币后在规定时间内不选择票种类的按钮 | 不选择购票按钮 |
选择购票按钮后不投入纸币 | 输入纸币后在规定时间内不选择票种类的按钮 |
接下来,我们根据等价类设计测试用例:
用例编号 | 输入参数 | 预期结果 |
---|---|---|
1 | 投入5元纸币,并选择购买5元面值票 | 提示购票成功 |
2 | 投入5元纸币,并选择购买10元面值票 | 提示金额不足,并退回5元纸币 |
3 | 投入10元纸币,并选择购买5元面值票 | 完成后出票,提示购票成功,并找零5元 |
4 | 投入10元纸币,并选择购买10元面值票 | 完成购买后出票,提示购买成功 |
5 | 输入纸币后在规定时间内不选择票种类的按钮 | 提示错误 |
6 | 选择购票按钮后不投入纸币 | 提示错误 |
引用SparkApi代码
本文中的大模型使用的是讯飞的 Spark,其中和 Spark 交互的代码如下:
import _thread as thread
import base64
import datetime
import hashlib
import hmac
import json
from urllib.parse import urlparse
import ssl
from datetime import datetime
from time import mktime
from urllib.parse import urlencode
from wsgiref.handlers import format_date_time
import websocket # 使用websocket_client
answer = ""
class Ws_Param(object):
# 初始化
def __init__(self, APPID, APIKey, APISecret, Spark_url):
self.APPID = APPID
self.APIKey = APIKey
self.APISecret = APISecret
self.host = urlparse(Spark_url).netloc
self.path = urlparse(Spark_url).path
self.Spark_url = Spark_url
# 生成url
def create_url(self):
# 生成RFC1123格式的时间戳
now = datetime.now()
date = format_date_time(mktime(now.timetuple()))
# 拼接字符串
signature_origin = "host: " + self.host + "\n"
signature_origin += "date: " + date + "\n"
signature_origin += "GET " + self.path + " HTTP/1.1"
# 进行hmac-sha256进行加密
signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),
digestmod=hashlib.sha256).digest()
signature_sha_base64 = base64.b64encode(signature_sha).decode(encoding='utf-8')
authorization_origin = f'api_key="{self.APIKey}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha_base64}"'
authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
# 将请求的鉴权参数组合为字典
v = {
"authorization": authorization,
"date": date,
"host": self.host
}
# 拼接鉴权参数,生成url
url = self.Spark_url + '?' + urlencode(v)
# 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致
return url
# 收到websocket错误的处理
def on_error(ws, error):
print("### error:", error)
# 收到websocket关闭的处理
def on_close(ws,one,two):
print(" ")
# 收到websocket连接建立的处理
def on_open(ws):
thread.start_new_thread(run, (ws,))
def run(ws, *args):
data = json.dumps(gen_params(appid=ws.appid, domain= ws.domain,question=ws.question))
ws.send(data)
# 收到websocket消息的处理
def on_message(ws, message):
# print(message)
data = json.loads(message)
code = data['header']['code']
if code != 0:
print(f'请求错误: {code}, {data}')
ws.close()
else:
choices = data["payload"]["choices"]
status = choices["status"]
content = choices["text"][0]["content"]
print(content,end ="")
global answer
answer += content
# print(1)
if status == 2:
ws.close()
def gen_params(appid, domain,question):
"""
通过appid和用户的提问来生成请参数
"""
data = {
"header": {
"app_id": appid,
"uid": "1234"
},
"parameter": {
"chat": {
"domain": domain,
"random_threshold": 0.5,
"max_tokens": 2048,
"auditing": "default"
}
},
"payload": {
"message": {
"text": question
}
}
}
return data
def main(appid, api_key, api_secret, Spark_url,domain, question):
# print("星火:")
wsParam = Ws_Param(appid, api_key, api_secret, Spark_url)
# websocket.enableTrace(False)
wsUrl = wsParam.create_url()
ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open)
ws.appid = appid
ws.question = question
ws.domain = domain
ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})