我们需要安装`websockets`库(如果尚未安装)

本文涉及的产品
RDS DuckDB + QuickBI 企业套餐,8核32GB + QuickBI 专业版
简介: 我们需要安装`websockets`库(如果尚未安装)

WebSocket服务器

首先,我们需要安装websockets库(如果尚未安装)。可以通过pip进行安装:

pip install websockets

然后,我们可以编写一个简单的WebSocket服务器:

import asyncio
import websockets

async def echo(websocket, path):
    async for message in websocket:
        print(f"Received: {message}")
        await websocket.send(f"Echo: {message}")

start_server = websockets.serve(echo, "localhost", 8765)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

解释

  1. 导入模块:我们导入了asynciowebsockets模块。asyncio是Python的异步I/O框架,而websockets提供了WebSocket的实现。
  2. 定义echo函数:这是一个异步函数,它接受两个参数:websocketpathwebsocket是一个WebSocket连接对象,而path是客户端连接的路径(在这个例子中我们不会使用它)。
* 使用`async for`循环,我们可以异步地从WebSocket连接中读取消息。每当从客户端接收到消息时,循环就会迭代一次。
* 接收到的消息被打印到控制台,并附加一个前缀"Received: "。
* 然后,服务器将一个带有前缀"Echo: "的相同消息发送回客户端。这是通过调用`websocket.send()`方法实现的。
  1. 启动服务器:我们使用websockets.serve()函数来启动WebSocket服务器。这个函数接受三个参数:处理函数(在这里是echo函数)、主机名(在这里是"localhost")和端口号(在这里是8765)。这个函数返回一个服务器对象,但我们在这里没有使用它。
  2. 运行事件循环:我们使用asyncio.get_event_loop()来获取当前的事件循环,并使用run_until_complete()方法来运行服务器直到它完成(但实际上,由于服务器是无限循环的,所以它永远不会完成)。然后,我们使用run_forever()方法来保持事件循环运行,以便服务器可以持续接受和处理连接。

WebSocket客户端

接下来,我们可以编写一个简单的WebSocket客户端来与服务器进行交互:

import asyncio
import websockets

async def client():
    uri = "ws://localhost:8765"
    async with websockets.connect(uri) as websocket:
        await websocket.send("Hello, Server!")
        response = await websocket.recv()
        print(f"Received: {response}")

asyncio.get_event_loop().run_until_complete(client())

解释

  1. 导入模块:与服务器代码相同,我们导入了asynciowebsockets模块。
  2. 定义client函数:这是一个异步函数,它不接受任何参数。
* 我们首先定义了要连接的WebSocket URI(在这里是"ws://localhost:8765")。
* 使用`async with`语句和`websockets.connect()`函数,我们建立了一个到服务器的WebSocket连接。这个连接对象被赋值给`websocket`变量,并在`async with`块的范围内保持打开状态。
* 然后,我们使用`websocket.send()`方法向服务器发送一条消息("Hello, Server!")。
* 接下来,我们使用`websocket.recv()`方法从服务器接收一条消息,并将其赋值给`response`变量。注意,由于`recv()`方法也是异步的,所以我们使用`await`来等待它完成。
* 最后,我们打印出从服务器接收到的消息。
  1. 运行事件循环:与服务器代码相同,我们使用asyncio.get_event_loop()来获取当前的事件循环,并使用run_until_complete()方法来运行客户端函数。

运行和测试

现在,你可以分别运行服务器和客户端代码。首先运行服务器代码,然后在一个新的终端窗口中运行客户端代码。你应该会看到以下输出:

服务器输出

Received: Hello, Server!

客户端输出

Received: Echo: Hello, Server!

这表明客户端成功地向服务器发送了一条消息,并且服务器也成功地将该消息发送回客户端。

扩展和深入

虽然这个示例很简单,但它展示了WebSocket的基本工作原理。在实际应用中,WebSocket可以用于实现
处理结果:

WebSocket服务器

首先,我们需要安装websockets库(如果尚未安装)。可以通过pip进行安装:

```python
async def echo(websocket, path)_
async for message in websocket_
print(f"Received_ {message}")
await websocket.send(f"Echo_ {message}")
start_server = websockets.serve(echo, "localhost", 8765)
asyncio.get_event_loop().run_until_complete(start_server)
1. **导入模块**:我们导入了`asyncio``websockets`模块。`asyncio`是Python的异步I_O框架,而`websockets`提供了WebSocket的实现。
**定义echo函数**:这是一个异步函数,它接受两个参数:`websocket``path``websocket`是一个WebSocket连接对象,而`path`是客户端连接的路径(在这个例子中我们不会使用它)。
* 使用`async for`循环,我们可以异步地从WebSocket连接中读取消息。每当从客户端接收到消息时,循环就会迭代一次。
* 接收到的消息被打印到控制台,并附加一个前缀"Received_ "。
* 然后,服务器将一个带有前缀"Echo_ "的相同消息发送回客户端。这是通过调用`websocket.send()`方法实现的。
**启动服务器**:我们使用`websockets.serve()`函数来启动WebSocket服务器。这个函数接受三个参数:处理函数(在这里是`echo`函数)、主机名(在这里是"localhost")和端口号(在这里是8765)。这个函数返回一个服务器对象,但我们在这里没有使用它。
**运行事件循环**:我们使用`asyncio.get_event_loop()`来获取当前的事件循环,并使用`run_until_complete()`方法来运行服务器直到它完成(但实际上,由于服务器是无限循环的,所以它永远不会完成)。然后,我们使用`run_forever()`方法来保持事件循环运行,以便服务器可以持续接受和处理连接。
### WebSocket客户端
接下来,我们可以编写一个简单的WebSocket客户端来与服务器进行交互:
```python
async def client()_
uri = "ws___localhost_8765"
async with websockets.connect(uri) as websocket_
await websocket.send("Hello, Server!")
response = await websocket.recv()
print(f"Received_ {response}")
asyncio.get_event_loop().run_until_complete(client())
1. **导入模块**:与服务器代码相同,我们导入了`asyncio``websockets`模块。
**定义client函数**:这是一个异步函数,它不接受任何参数。
* 我们首先定义了要连接的WebSocket URI(在这里是"ws___localhost_8765")。
* 使用`async with`语句和`websockets.connect()`函数,我们建立了一个到服务器的WebSocket连接。这个连接对象被赋值给`websocket`变量,并在`async with`块的范围内保持打开状态。
* 然后,我们使用`websocket.send()`方法向服务器发送一条消息("Hello, Server!")。
* 接下来,我们使用`websocket.recv()`方法从服务器接收一条消息,并将其赋值给`response`变量。注意,由于`recv()`方法也是异步的,所以我们使用`await`来等待它完成。
* 最后,我们打印出从服务器接收到的消息。
**运行事件循环**:与服务器代码相同,我们使用`asyncio.get_event_loop()`来获取当前的事件循环,并使用`run_until_complete()`方法来运行客户端函数。
### 运行和测试
现在,你可以分别运行服务器和客户端代码。首先运行服务器代码,然后在一个新的终端窗口中运行客户端代码。你应该会看到以下输出:
**服务器输出**:

```

扩展和深入

虽然这个示例很简单,但它展示了WebSocket的基本工作原理。在实际应用中,WebSocket可以用于实现

相关文章
|
自然语言处理 语音技术 开发者
开源上新|FunASR多语言离线文件转写软件包
开源上新|FunASR多语言离线文件转写软件包
|
存储 数据安全/隐私保护 C++
Qt 中文文档 Qt5.15 PDF Class (从官网Qt 5.15 翻译)
Qt 中文文档 Qt5.15 PDF Class (从官网Qt 5.15 翻译)
1794 0
|
计算机视觉 Python
Yolov5双目测距-双目相机计数及测距教程(附代码)
Yolov5双目测距-双目相机计数及测距教程(附代码)
|
Linux
Linux(5)WIFI/BT调试笔记
Linux(5)WIFI/BT调试笔记
2237 0
|
监控 API 计算机视觉
CompreFace:Star6.1k,Github上火爆的轻量化且强大的人脸识别库,api,sdk都支持
CompreFace 是一个在 GitHub 上拥有 6.1k Star 的轻量级人脸识别库,支持 API 和 SDK。它由 Exadel 公司开发,基于深度学习技术,提供高效、灵活的人脸识别解决方案。CompreFace 支持多种模型(如 VGG-Face、OpenFace 和 Facenet),具备多硬件支持、丰富的功能服务(如人脸检测、年龄性别识别等)和便捷的部署方式。适用于安防监控、商业领域和医疗美容等多个场景。
1927 4
|
10月前
|
Ubuntu 机器人 项目管理
Ubuntu系统更换软件源以及ROS包管理问题解析
以上是针对Ubuntu系统软件源的更换和ROS包管理的关键步骤。务必跟随官方指南,在安装或者配置过程中应答疑解惑,确保每一步操作的准确性。这些操作对于机器人研发人员和爱好者来说是日常任务的一部分,熟练掌握这些技能,能够在机器人编程和项目管理方面提供很大的帮助。
868 0
|
弹性计算 JavaScript Ubuntu
WebSocket协议相关的测试命令工具使用简介
本文介绍了针对WebSocket的测试工具wscat和websocat的基本使用方法,以及通过curl命令测试HTTP/HTTPS协议的方式。对于WebSocket,直接使用curl测试较为复杂,推荐使用wscat或websocat。文中详细说明了这两种工具的安装步骤、常用参数及连接示例,例如在ECS上开启8080端口监听并进行消息收发测试。此外,还提供了curl命令的手动设置头部信息以模拟WebSocket握手的示例,但指出curl仅能作为客户端测试工具,无法模拟服务器。
3797 5
|
机器学习/深度学习 并行计算 PyTorch
如何搭建深度学习的多 GPU 服务器
如何搭建深度学习的多 GPU 服务器
1152 5
如何搭建深度学习的多 GPU 服务器
|
机器学习/深度学习 前端开发 安全
【Gradio】Could not create share link
【Gradio】Could not create share link
7707 7

热门文章

最新文章