RPC接口测试技术-websocket 自动化测试实践

简介: RPC接口测试技术-websocket 自动化测试实践

WebSocket 是一种在单个 TCP 连接上进行全双工通信(Full Duplex 是通讯传输的一个术语。通信允许数据在两个方向上同时传输,它在能力上相当于两个单工通信方式的结合。全双工指可以同时(瞬时)进行信号的双向传输( A→B 且 B→A )。指 A→B 的同时 B→A,是瞬时同步的)的协议。

WebSocket 通信协议于 2011 年被 IETF 定为标准 RFC 6455,并由 RFC7936 补充规范。WebSocket API (WebSocket API 是一个使用WebSocket 协议的接口,通过它来建立全双工通道来收发消息) 也被 W3C 定为标准。

而 HTTP 协议就不支持持久连接,虽然在 HTTP1.1 中进行了改进,使得有一个 keep-alive,在一个 HTTP 连接中,可以发送多个 Request,接收多个 Response。

但是在 HTTP 中 Request = Response 永远是成立的,也就是说一个 request 只能有一个response。而且这个response也是被动的,不能主动发起。

websocket 常用于社交/订阅、多玩家游戏、协同办公/编辑、股市基金报价、体育实况播放、音视频聊天/视频会议/在线教育、智能家居与基于位置的应用。

websocket 接口不能使用 requests 直接进行接口的调用,可以依赖第三方库的方式来实现调用,以下内容介绍如何调用第三方库实现 websocket 的接口自动化测试。

实战

使用 python 语言实现 websocket 的接口自动化

环境准备

1.安装 pyhton3 环境下载需要的运行库

2.下载需要的运行库

pip install websocket-client

实战演示

  • 连接 websoket 服务器
import logging
from websocket import create_connection
logger = logging.getLogger(__name__)
url = 'ws://echo.websocket.org/' #一个在线的回环websocket接口,必须以websocket的方式连接后访问,无法直接在网页端输入该地址访问
wss = create_connection(url, timeout=timeout)
  • 发送 websocket 消息
wss.send('Hello World')
  • 接收 websocket 消息
res = wss.recv()
logger.info(res)
  • 关闭 websocket 连接
wss.close()
  • websocket 第三方库的调用不支持直接发送除字符串外的其他数据类型,所以在发送请求之前需要将 Python 结构化的格式,转换为成为字符串类型或者 json 字符串后,再发起 websocket 的接口请求
#待发送的数据体格式为:
data= {
    "a" : "abcd",
    "b" : 123
    }
# 发送前需要把数据处理成 json 字符串
new_data=json.dumps(data,ensure_ascii=False)
wss.send(new_data)
  • 接收的数据体的处理:如果接口定义为 json 的话,由于数据的传输都是字符串格式的,需要对接收的数据体进行转换操作
#    接收的数据体的格式也为字符串
logger.info(type(res)) # <class 'str'>

对于响应内容进行格式转换处理:

def load_json(base_str):
    if isinstance(base_str, str):
        try:
            res = json.loads(base_str)
            return load_json(res)
        except JSONDecodeError:
            return base_str
    elif isinstance(base_str, list):
        res = []
        for i in base_str:
            res.append(load_json(i))
        return res
    elif isinstance(base_str, dict):
        for key, value in base_str.items():
            base_str[key] = load_json(value)
        return base_str
    return base_str
  • websocket 接口自动化测试,二次封装 demo 展示
    web_socket_util.py 封装 websocket 接口通用操作:
import logging
import json
from websocket import create_connection
logger = logging.getLogger(__name__)
class WebsocketUtil():
    def conn(self, uri, timeout=3):
        '''
        连接web服务器
        :param uri: 服务的url
        :param timeout: 超时时间
        :return:
        '''
        self.wss = create_connection(uri, timeout=timeout)
    def send(self, message):
        '''
        发送请求数据体
        :param message: 待发送的数据信息
        :return:
        '''
        if not isinstance(message, str):
            message = json.dumps(message)
        return self.wss.send(message)
    def load_json(self, base_str):
        '''
        进行数据体的处理
        :param base_str: 待处理的数据体
        :return:
        '''
        if isinstance(base_str, str):
            try:
                res = json.loads(base_str)
                return self.load_json(res)
            except JSONDecodeError:
                return base_str
        elif isinstance(base_str, list):
            res = []
            for i in base_str:
                res.append(self.load_json(i))
            return res
        elif isinstance(base_str, dict):
            for key, value in base_str.items():
                base_str[key] = self.load_json(value)
            return base_str
        return base_str
    def recv(self, timeout=3):
        '''
        接收数据体信息,并调用数据体处理方法处理响应体
        :param timeout: 超时时间
        :return:
        '''
        if isinstance(timeout, dict):
            timeout = timeout["timeout"]
        try:
            self.settimeout(timeout)
            recv_json = self.wss.recv()
            all_json_recv = self.load_json(recv_json)
            self._set_response(all_json_recv)
            return all_json_recv
        except WebSocketTimeoutException:
            logger.error(f"已经超过{timeout}秒没有接收数据啦")
    def settimeout(self, timeout):
        '''
        设置超时时间
        :param timeout: 超时时间
        :return:
        '''
        self.wss.settimeout(timeout)
    def recv_all(self, timeout=3):
        '''
        接收多个数据体信息,并调用数据体处理方法处理响应体
        :param timeout: 超时时间
        :return:
        '''
        if isinstance(timeout, dict):
            timeout = timeout["timeout"]
        recv_list = []
        while True:
            try:
                self.settimeout(timeout)
                recv_json = self.wss.recv()
                all_json_recv = self.load_json(recv_json)
                recv_list.append(all_json_recv)
                logger.info(f"all::::: {all_json_recv}")
            except WebSocketTimeoutException:
                logger.error(f"已经超过{timeout}秒没有接收数据啦")
                break
        self._set_response(recv_list)
        return recv_list
    def close(self):
        '''
        关闭连接
        :return:
        '''
        return self.wss.close()
    def _set_response(self, response):
        self.response = response
    def _get_response(self) -> list:
        return self.response

test_case.py websocket 接口自动化测试用例:

class TestWsDemo:
    def setup(self):
        url = 'ws://echo.websocket.org/'
        self.wss = WebsocketUtil()
        self.wss.conn(url)
    def teardown(self):
        self.wss.close()
    def test_demo(self):
        data = {"a": "hello", "b": "world"}
        self.wss.send(data)
        res = self.wss.recv()
        assert 'hello' == res['a']

WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。

更多技术文章

相关文章
|
12天前
|
敏捷开发 人工智能 Devops
探索自动化测试的高效策略与实践###
当今软件开发生命周期中,自动化测试已成为提升效率、保障质量的关键工具。本文深入剖析了自动化测试的核心价值,探讨了一系列高效策略,包括选择合适的自动化框架、设计可维护的测试脚本、集成持续集成/持续部署(CI/CD)流程,以及有效管理和维护测试用例库。通过具体案例分析,揭示了这些策略在实际应用中的成效,为软件测试人员提供了宝贵的经验分享和实践指导。 ###
|
11天前
|
机器学习/深度学习 人工智能 jenkins
软件测试中的自动化与持续集成实践
在快速迭代的软件开发过程中,自动化测试和持续集成(CI)是确保代码质量和加速产品上市的关键。本文探讨了自动化测试的重要性、常见的自动化测试工具以及如何将自动化测试整合到持续集成流程中,以提高软件测试的效率和可靠性。通过案例分析,展示了自动化测试和持续集成在实际项目中的应用效果,并提供了实施建议。
|
12天前
|
Java 测试技术 持续交付
探索自动化测试在软件开发中的关键作用与实践
在现代软件开发流程中,自动化测试已成为提升产品质量、加速交付速度的不可或缺的一环。本文深入探讨了自动化测试的重要性,分析了其在不同阶段的应用价值,并结合实际案例阐述了如何有效实施自动化测试策略,以期为读者提供一套可操作的实践指南。
|
14天前
|
前端开发 JavaScript 测试技术
前端测试技术中,如何提高集成测试的效率?
前端测试技术中,如何提高集成测试的效率?
|
2天前
|
敏捷开发 前端开发 Java
软件测试中的自动化测试框架选择与实践
在当今软件开发生命周期中,自动化测试已成为提升软件质量和开发效率的关键手段。本文旨在探讨自动化测试框架的选择标准及其在实际项目中的应用实践。通过对主流自动化测试框架的分析比较,结合具体案例,本文将阐述如何根据项目需求和团队特点选择合适的自动化测试工具,并分享实施过程中的经验教训。
9 1
|
10天前
|
Devops jenkins 测试技术
DevOps实践:自动化部署与持续集成的融合之旅
【10月更文挑战第41天】在软件开发的世界中,快速迭代和高效交付是企业竞争力的关键。本文将带你走进DevOps的核心实践——自动化部署与持续集成,揭示如何通过它们提升开发流程的效率与质量。我们将从DevOps的基本理念出发,逐步深入到具体的技术实现,最终展示一个实际的代码示例,让理论与实践相结合,为你的开发旅程提供清晰的指引。
21 4
|
12天前
|
Web App开发 敏捷开发 测试技术
探索自动化测试的奥秘:从理论到实践
【10月更文挑战第39天】在软件质量保障的战场上,自动化测试是提升效率和准确性的利器。本文将深入浅出地介绍自动化测试的基本概念、必要性以及如何实施自动化测试。我们将通过一个实际案例,展示如何利用流行的自动化测试工具Selenium进行网页测试,并分享一些实用的技巧和最佳实践。无论你是新手还是有经验的测试工程师,这篇文章都将为你提供宝贵的知识,帮助你在自动化测试的道路上更进一步。
|
12天前
|
敏捷开发 Java 测试技术
探索自动化测试:从理论到实践
【10月更文挑战第39天】在软件开发的海洋中,自动化测试是一艘能够带领团队高效航行的船只。本文将作为你的航海图,指引你理解自动化测试的核心概念,并分享一段实际的代码旅程,让你领略自动化测试的魅力和力量。准备好了吗?让我们启航!
|
13天前
|
机器学习/深度学习 数据采集 人工智能
智能运维:从自动化到AIOps的演进与实践####
本文探讨了智能运维(AIOps)的兴起背景、核心组件及其在现代IT运维中的应用。通过对比传统运维模式,阐述了AIOps如何利用机器学习、大数据分析等技术,实现故障预测、根因分析、自动化修复等功能,从而提升系统稳定性和运维效率。文章还深入分析了实施AIOps面临的挑战与解决方案,并展望了其未来发展趋势。 ####
|
14天前
|
数据采集 前端开发 安全
前端测试技术
前端测试是确保前端应用程序质量和性能的重要环节,涵盖了多种技术和方法
下一篇
无影云桌面