RPC接口测试技术-websocket 自动化测试实践

简介: RPC接口测试技术-websocket 自动化测试实践

WebSocket 是一种在单个 TCP 连接上进行全双工通信(Full Duplex 是通讯传输的一个术语。通信允许数据在两个方向上同时传输,它在能力上相当于两个单工通信方式的结合。全双工指可以同时(瞬时)进行信号的双向传输( A→B 且 B→A )。指 A→B 的同时 B→A,是瞬时同步的)的协议。

WebSocket 通信协议于 2011 年被 IETF 定为标准 RFC 6455,并由 RFC7936 补充规范。WebSocket API (WebSocket API 是一个使用WebSocket 协议的接口,通过它来建立全双工通道来收发消息) 也被 W3C 定为标准。

而 HTTP 协议就不支持持久连接,虽然在 HTTP1.1 中进行了改进,使得有一个 keep-alive,在一个 HTTP 连接中,可以发送多个 Request,接收多个 Response。

但是在 HTTP 中 Request = Response 永远是成立的,也就是说一个 request 只能有一个response。而且这个response也是被动的,不能主动发起。

websocket 常用于社交/订阅、多玩家游戏、协同办公/编辑、股市基金报价、体育实况播放、音视频聊天/视频会议/在线教育、智能家居与基于位置的应用。

websocket 接口不能使用 requests 直接进行接口的调用,可以依赖第三方库的方式来实现调用,以下内容介绍如何调用第三方库实现 websocket 的接口自动化测试。

实战
使用 python 语言实现 websocket 的接口自动化

环境准备
1.安装 pyhton3 环境下载需要的运行库
2.下载需要的运行库
pip install websocket-client

实战演示
连接 websoket 服务器
import logging
from websocket import create_connection
logger = logging.getLogger(__name__)
url = 'ws://echo.websocket.org/' #一个在线的回环websocket接口,必须以websocket的方式连接后访问,无法直接在网页端输入该地址访问
wss = create_connection(url, timeout=timeout)

发送 websocket 消息
wss.send('Hello World')

接收 websocket 消息
res = wss.recv()
logger.info(res)

关闭 websocket 连接
wss.close()

websocket 第三方库的调用不支持直接发送除字符串外的其他数据类型,所以在发送请求之前需要将 Python 结构化的格式,转换为成为字符串类型或者 json 字符串后,再发起 websocket 的接口请求

待发送的数据体格式为:

data= {

"a" : "abcd",
"b" : 123
}

发送前需要把数据处理成 json 字符串

new_data=json.dumps(data,ensure_ascii=False)
wss.send(new_data)

接收的数据体的处理:如果接口定义为 json 的话,由于数据的传输都是字符串格式的,需要对接收的数据体进行转换操作

接收的数据体的格式也为字符串

logger.info(type(res)) #

对于响应内容进行格式转换处理:

def load_json(base_str):

if isinstance(base_str, str):
    try:
        res = json.loads(base_str)
        return load_json(res)
    except JSONDecodeError:
        return base_str
elif isinstance(base_str, list):
    res = []
    for i in base_str:
        res.append(load_json(i))
    return res
elif isinstance(base_str, dict):
    for key, value in base_str.items():
        base_str[key] = load_json(value)
    return base_str
return base_str

websocket 接口自动化测试,二次封装 demo 展示
web_socket_util.py 封装 websocket 接口通用操作:
import logging
import json
from websocket import create_connection
logger = logging.getLogger(__name__)
class WebsocketUtil():

def conn(self, uri, timeout=3):
    '''
    连接web服务器
    :param uri: 服务的url
    :param timeout: 超时时间
    :return:
    '''
    self.wss = create_connection(uri, timeout=timeout)

def send(self, message):
    '''
    发送请求数据体
    :param message: 待发送的数据信息
    :return:
    '''
    if not isinstance(message, str):
        message = json.dumps(message)
    return self.wss.send(message)

def load_json(self, base_str):
    '''
    进行数据体的处理
    :param base_str: 待处理的数据体
    :return:
    '''
    if isinstance(base_str, str):
        try:
            res = json.loads(base_str)
            return self.load_json(res)
        except JSONDecodeError:
            return base_str
    elif isinstance(base_str, list):
        res = []
        for i in base_str:
            res.append(self.load_json(i))
        return res
    elif isinstance(base_str, dict):
        for key, value in base_str.items():
            base_str[key] = self.load_json(value)
        return base_str
    return base_str

def recv(self, timeout=3):
    '''
    接收数据体信息,并调用数据体处理方法处理响应体
    :param timeout: 超时时间
    :return:
    '''
    if isinstance(timeout, dict):
        timeout = timeout["timeout"]
    try:
        self.settimeout(timeout)
        recv_json = self.wss.recv()
        all_json_recv = self.load_json(recv_json)
        self._set_response(all_json_recv)
        return all_json_recv
    except WebSocketTimeoutException:
        logger.error(f"已经超过{timeout}秒没有接收数据啦")

def settimeout(self, timeout):
    '''
    设置超时时间
    :param timeout: 超时时间
    :return:
    '''
    self.wss.settimeout(timeout)

def recv_all(self, timeout=3):
    '''
    接收多个数据体信息,并调用数据体处理方法处理响应体
    :param timeout: 超时时间
    :return:
    '''
    if isinstance(timeout, dict):
        timeout = timeout["timeout"]
    recv_list = []
    while True:
        try:
            self.settimeout(timeout)
            recv_json = self.wss.recv()
            all_json_recv = self.load_json(recv_json)
            recv_list.append(all_json_recv)
            logger.info(f"all::::: {all_json_recv}")
        except WebSocketTimeoutException:
            logger.error(f"已经超过{timeout}秒没有接收数据啦")
            break
    self._set_response(recv_list)
    return recv_list

def close(self):
    '''
    关闭连接
    :return:
    '''
    return self.wss.close()

def _set_response(self, response):
    self.response = response

def _get_response(self) -> list:
    return self.response

test_case.py websocket 接口自动化测试用例:

class TestWsDemo:

def setup(self):
    url = 'ws://echo.websocket.org/'
    self.wss = WebsocketUtil()
    self.wss.conn(url)

def teardown(self):
    self.wss.close()

def test_demo(self):
    data = {"a": "hello", "b": "world"}
    self.wss.send(data)
    res = self.wss.recv()
    assert 'hello' == res['a']

WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。

在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

相关文章
|
6天前
|
JSON Java Maven
使用`MockMvc`来测试带有单个和多个请求参数的`GET`和`POST`接口
使用`MockMvc`来测试带有单个和多个请求参数的`GET`和`POST`接口
19 3
|
1天前
|
机器学习/深度学习 人工智能 测试技术
自动化测试框架的演进与实践
【6月更文挑战第23天】在软件工程领域,自动化测试框架的发展不断推动着质量保证的效率和效果。本文将探讨自动化测试框架从简单脚本到复杂集成系统的演变过程,并分析当前流行的框架如Selenium、Appium以及新兴的AI驱动测试工具。我们将通过具体案例,展示如何在现代软件开发实践中有效应用这些框架以提升测试覆盖率和准确性。
|
4天前
|
机器学习/深度学习 人工智能 测试技术
探索自动化测试的前沿技术与实践
自动化测试作为提升软件开发效率和质量的关键工具,正经历着前所未有的变革。随着人工智能、机器学习、云计算等技术的融合与创新,自动化测试不断突破传统界限,展现出更智能、更高效、更灵活的发展趋势。本文将深入探讨自动化测试领域的最新技术进展,分析其在现代软件开发中的应用,并讨论如何有效整合这些技术以最大化测试效率和准确性。
|
7天前
|
监控 前端开发 测试技术
postman接口测试工具详解
postman接口测试工具详解
35 7
|
7天前
|
监控 JavaScript 前端开发
postman接口测试工具详解
postman接口测试工具详解
20 6
|
6天前
|
数据可视化 前端开发 Java
自动化测试框架的选择与实践: Selenium vs. TestComplete
【6月更文挑战第18天】在软件开发的海洋中,自动化测试是一艘能够确保产品质量和效率的坚固船只。本文将深入探讨两种流行的自动化测试框架——Selenium和TestComplete,从它们的优势、局限性到适用场景进行对比分析。我们将通过实际案例来揭示如何根据项目需求选择最合适的测试工具,并提供一些实用的实施建议。文章旨在为读者提供清晰的指导,帮助他们在自动化测试的旅程中做出明智的决定。
12 3
|
1天前
|
监控 druid Java
Springboot用JUnit测试接口时报错Failed to determine a suitable driver class configure a DataSource: ‘url‘
Springboot用JUnit测试接口时报错Failed to determine a suitable driver class configure a DataSource: ‘url‘
6 0
|
7天前
|
前端开发 测试技术
接口测试:Mock 的价值与意义
Mock测试用于替代复杂或不可用的对象,常见于前后端交互、第三方系统及硬件解耦。它不依赖真实数据,节省工作量和联调时间。核心包括匹配规则(决定修改哪个接口)和模拟响应(设计篡改内容以符合测试用例)。
8 0
|
消息中间件 网络协议 前端开发
SpringBoot轻松整合WebSocket,实现Web在线聊天室
前面为大家讲述了 Spring Boot的整合Redis、RabbitMQ、Elasticsearch等各种框架组件;随着移动互联网的发展,服务端消息数据推送已经是一个非常重要、非常普遍的基础功能。今天就和大家聊聊在SpringBoot轻松整合WebSocket,实现Web在线聊天室,希望能对大家有所帮助。
SpringBoot轻松整合WebSocket,实现Web在线聊天室
|
存储 JavaScript 开发者
Vue合理配置WebSocket并实现群聊
Vue合理配置WebSocket并实现群聊
Vue合理配置WebSocket并实现群聊

热门文章

最新文章