RPC接口测试技术-websocket 自动化测试实践

简介: RPC接口测试技术-websocket 自动化测试实践

WebSocket 是一种在单个 TCP 连接上进行全双工通信(Full Duplex 是通讯传输的一个术语。通信允许数据在两个方向上同时传输,它在能力上相当于两个单工通信方式的结合。全双工指可以同时(瞬时)进行信号的双向传输( A→B 且 B→A )。指 A→B 的同时 B→A,是瞬时同步的)的协议。

WebSocket 通信协议于 2011 年被 IETF 定为标准 RFC 6455,并由 RFC7936 补充规范。WebSocket API (WebSocket API 是一个使用WebSocket 协议的接口,通过它来建立全双工通道来收发消息) 也被 W3C 定为标准。

而 HTTP 协议就不支持持久连接,虽然在 HTTP1.1 中进行了改进,使得有一个 keep-alive,在一个 HTTP 连接中,可以发送多个 Request,接收多个 Response。

但是在 HTTP 中 Request = Response 永远是成立的,也就是说一个 request 只能有一个response。而且这个response也是被动的,不能主动发起。

websocket 常用于社交/订阅、多玩家游戏、协同办公/编辑、股市基金报价、体育实况播放、音视频聊天/视频会议/在线教育、智能家居与基于位置的应用。

websocket 接口不能使用 requests 直接进行接口的调用,可以依赖第三方库的方式来实现调用,以下内容介绍如何调用第三方库实现 websocket 的接口自动化测试。

实战
使用 python 语言实现 websocket 的接口自动化

环境准备
1.安装 pyhton3 环境下载需要的运行库
2.下载需要的运行库
pip install websocket-client

实战演示
连接 websoket 服务器
import logging
from websocket import create_connection
logger = logging.getLogger(__name__)
url = 'ws://echo.websocket.org/' #一个在线的回环websocket接口,必须以websocket的方式连接后访问,无法直接在网页端输入该地址访问
wss = create_connection(url, timeout=timeout)

发送 websocket 消息
wss.send('Hello World')

接收 websocket 消息
res = wss.recv()
logger.info(res)

关闭 websocket 连接
wss.close()

websocket 第三方库的调用不支持直接发送除字符串外的其他数据类型,所以在发送请求之前需要将 Python 结构化的格式,转换为成为字符串类型或者 json 字符串后,再发起 websocket 的接口请求

待发送的数据体格式为:

data= {

"a" : "abcd",
"b" : 123
}

发送前需要把数据处理成 json 字符串

new_data=json.dumps(data,ensure_ascii=False)
wss.send(new_data)

接收的数据体的处理:如果接口定义为 json 的话,由于数据的传输都是字符串格式的,需要对接收的数据体进行转换操作

接收的数据体的格式也为字符串

logger.info(type(res)) #

对于响应内容进行格式转换处理:

def load_json(base_str):

if isinstance(base_str, str):
    try:
        res = json.loads(base_str)
        return load_json(res)
    except JSONDecodeError:
        return base_str
elif isinstance(base_str, list):
    res = []
    for i in base_str:
        res.append(load_json(i))
    return res
elif isinstance(base_str, dict):
    for key, value in base_str.items():
        base_str[key] = load_json(value)
    return base_str
return base_str

websocket 接口自动化测试,二次封装 demo 展示
web_socket_util.py 封装 websocket 接口通用操作:
import logging
import json
from websocket import create_connection
logger = logging.getLogger(__name__)
class WebsocketUtil():

def conn(self, uri, timeout=3):
    '''
    连接web服务器
    :param uri: 服务的url
    :param timeout: 超时时间
    :return:
    '''
    self.wss = create_connection(uri, timeout=timeout)

def send(self, message):
    '''
    发送请求数据体
    :param message: 待发送的数据信息
    :return:
    '''
    if not isinstance(message, str):
        message = json.dumps(message)
    return self.wss.send(message)

def load_json(self, base_str):
    '''
    进行数据体的处理
    :param base_str: 待处理的数据体
    :return:
    '''
    if isinstance(base_str, str):
        try:
            res = json.loads(base_str)
            return self.load_json(res)
        except JSONDecodeError:
            return base_str
    elif isinstance(base_str, list):
        res = []
        for i in base_str:
            res.append(self.load_json(i))
        return res
    elif isinstance(base_str, dict):
        for key, value in base_str.items():
            base_str[key] = self.load_json(value)
        return base_str
    return base_str

def recv(self, timeout=3):
    '''
    接收数据体信息,并调用数据体处理方法处理响应体
    :param timeout: 超时时间
    :return:
    '''
    if isinstance(timeout, dict):
        timeout = timeout["timeout"]
    try:
        self.settimeout(timeout)
        recv_json = self.wss.recv()
        all_json_recv = self.load_json(recv_json)
        self._set_response(all_json_recv)
        return all_json_recv
    except WebSocketTimeoutException:
        logger.error(f"已经超过{timeout}秒没有接收数据啦")

def settimeout(self, timeout):
    '''
    设置超时时间
    :param timeout: 超时时间
    :return:
    '''
    self.wss.settimeout(timeout)

def recv_all(self, timeout=3):
    '''
    接收多个数据体信息,并调用数据体处理方法处理响应体
    :param timeout: 超时时间
    :return:
    '''
    if isinstance(timeout, dict):
        timeout = timeout["timeout"]
    recv_list = []
    while True:
        try:
            self.settimeout(timeout)
            recv_json = self.wss.recv()
            all_json_recv = self.load_json(recv_json)
            recv_list.append(all_json_recv)
            logger.info(f"all::::: {all_json_recv}")
        except WebSocketTimeoutException:
            logger.error(f"已经超过{timeout}秒没有接收数据啦")
            break
    self._set_response(recv_list)
    return recv_list

def close(self):
    '''
    关闭连接
    :return:
    '''
    return self.wss.close()

def _set_response(self, response):
    self.response = response

def _get_response(self) -> list:
    return self.response

test_case.py websocket 接口自动化测试用例:

class TestWsDemo:

def setup(self):
    url = 'ws://echo.websocket.org/'
    self.wss = WebsocketUtil()
    self.wss.conn(url)

def teardown(self):
    self.wss.close()

def test_demo(self):
    data = {"a": "hello", "b": "world"}
    self.wss.send(data)
    res = self.wss.recv()
    assert 'hello' == res['a']

WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。

在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

相关文章
|
1月前
|
Java 测试技术 容器
Jmeter工具使用:HTTP接口性能测试实战
希望这篇文章能够帮助你初步理解如何使用JMeter进行HTTP接口性能测试,有兴趣的话,你可以研究更多关于JMeter的内容。记住,只有理解并掌握了这些工具,你才能充分利用它们发挥其应有的价值。+
291 23
|
1月前
|
安全 测试技术 持续交付
软考软件评测师——基于风险的测试技术
本文详细阐述了测试计划的核心要素与制定流程,涵盖测试范围界定、实施策略规划、资源配置及风险管理机制。通过风险识别方法论和评估模型,构建了完整的质量保障体系。同时,针对不同测试级别与类型提供具体配置建议,并提出技术选型原则与实施规范,确保测试活动高效有序开展,为项目成功奠定基础。内容结合实际经验,具有较强指导意义。
|
3月前
|
SQL 安全 测试技术
2025接口测试全攻略:高并发、安全防护与六大工具实战指南
本文探讨高并发稳定性验证、安全防护实战及六大工具(Postman、RunnerGo、Apipost、JMeter、SoapUI、Fiddler)选型指南,助力构建未来接口测试体系。接口测试旨在验证数据传输、参数合法性、错误处理能力及性能安全性,其重要性体现在早期发现问题、保障系统稳定和支撑持续集成。常用方法包括功能、性能、安全性及兼容性测试,典型场景涵盖前后端分离开发、第三方服务集成与数据一致性检查。选择合适的工具需综合考虑需求与团队协作等因素。
333 24
|
3月前
|
XML JSON 测试技术
如何使用 Postman 发送和测试 WebSocket
WebSocket 促进客户端和服务器之间通过单个持久连接进行实时、双向通信。 需要使用 Postman 建立 WebSocket 连接吗? 请查看我们简洁的循序渐进指南!
|
3月前
|
SQL 测试技术
除了postman还有什么接口测试工具
最好还是使用国内的接口测试软件,其实国内替换postman的软件有很多,这里我推荐使用yunedit-post这款接口测试工具来代替postman,因为它除了接口测试功能外,在动态参数的支持、后置处理执行sql语句等支持方面做得比较好。而且还有接口分享功能,可以生成接口文档给团队在线浏览。
165 2
|
4月前
|
监控 API 开发工具
Socket.IO介绍,以及怎么连接测试Socket.IO接口?
Socket.IO 是一个用于浏览器和服务器间实时双向通信的库,支持低延迟消息传递、跨平台运行及自动重连。文章介绍了其特点与调试需求,并详细说明如何使用 Apifox 工具创建、连接、发送/接收 Socket.IO 事件,以及团队协作和调试技巧。掌握这些技能可提升实时应用开发效率与质量。
|
设计模式 负载均衡 网络协议
【分布式技术专题】「分布式技术架构」实践见真知,手把手教你如何实现一个属于自己的RPC框架(架构技术引导篇)
【分布式技术专题】「分布式技术架构」实践见真知,手把手教你如何实现一个属于自己的RPC框架(架构技术引导篇)
564 0
|
存储 缓存 Linux
【实战指南】嵌入式RPC框架设计实践:六大核心类构建高效RPC框架
在先前的文章基础上,本文讨论如何通过分层封装提升一个针对嵌入式Linux的RPC框架的易用性。设计包括自动服务注册、高性能通信、泛型序列化和简洁API。框架分为6个关键类:BindingHub、SharedRingBuffer、Parcel、Binder、IBinder和BindInterface。BindingHub负责服务注册,SharedRingBuffer实现高效数据传输,Parcel处理序列化,而Binder和IBinder分别用于服务端和客户端交互。BindInterface提供简单的初始化接口,简化应用集成。测试案例展示了客户端和服务端的交互,验证了RPC功能的有效性。
613 53
|
8月前
|
自然语言处理 负载均衡 API
gRPC 一种现代、开源、高性能的远程过程调用 (RPC) 可以在任何地方运行的框架
gRPC 是一种现代开源高性能远程过程调用(RPC)框架,支持多种编程语言,可在任何环境中运行。它通过高效的连接方式,支持负载平衡、跟踪、健康检查和身份验证,适用于微服务架构、移动设备和浏览器客户端连接后端服务等场景。gRPC 使用 Protocol Buffers 作为接口定义语言,支持四种服务方法:一元 RPC、服务器流式处理、客户端流式处理和双向流式处理。
|
11月前
|
Dubbo 网络协议 Java
RPC框架:一文带你搞懂RPC
这篇文章全面介绍了RPC(远程过程调用)的概念、原理和应用场景,解释了RPC如何工作以及为什么在分布式系统中广泛使用,并探讨了几种常用的RPC框架如Thrift、gRPC、Dubbo和Spring Cloud,同时详细阐述了RPC调用流程和实现透明化远程服务调用的关键技术,包括动态代理和消息的编码解码过程。
RPC框架:一文带你搞懂RPC