背景
- 在实际项目中,可能会通过前端框架使用 WebSocket 和后端进行通信
- 这里就来详细讲解下 FastAPI 是如何操作 WebSocket 的
模拟 WebSocket 客户端
#!usr/bin/env python # -*- coding:utf-8 _*- """ # author: 小菠萝测试笔记 # blog: https://www.cnblogs.com/poloyy/ # time: 2021/10/5 5:26 下午 # file: 46_websocket.py """ import uvicorn from fastapi import FastAPI, WebSocket from fastapi.responses import HTMLResponse app = FastAPI() html = """ <!DOCTYPE html> <html> <head> <title>小菠萝聊天室</title> </head> <body> <h1>小菠萝聊天室</h1> <form action="" onsubmit="sendMessage(event)"> <input type="text" id="messageText" autocomplete="off"/> <button>Send</button> </form> <ul id='messages'> </ul> <script> // 加载页面,自动创建一个 WebSocket 连接 var ws = new WebSocket("ws://localhost:8080/ws"); // 收到消息 ws.onmessage = function(event) { // 获取输入框的值 var messages = document.getElementById('messages') // 创建一个 li 元素 var message = document.createElement('li') // 接收 event 的 data var content = document.createTextNode(event.data) message.appendChild(content) messages.appendChild(message) }; // 发送消息方法 function sendMessage(event) { var input = document.getElementById("messageText") ws.send(input.value) input.value = '' event.preventDefault() } </script> </body> </html> """ # 返回一段 HTML 代码给前端 @app.get("/") async def get(): return HTMLResponse(html) @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): # 1、ws 连接 await websocket.accept() while True: # 2、接收客户端发送的内容 data = await websocket.receive_text() # 3、服务端发送内容 await websocket.send_text(f"小菠萝收到的消息是: {data}") if __name__ == '__main__': uvicorn.run(app="46_websocket:app", reload=True, host="127.0.0.1", port=8080)
启动 uvicorn 服务器,访问 127.0.0.1:8080/
客户端、服务端建立 WebSocket 连接成功
发送聊天信息
每发一条消息,均会显示在列表中
可以在其他地方使用 WebSocket
- Depends
- Security
- Cookie
- Header
- Path
- Query
在依赖项中使用 WebSocket
from typing import Optional import uvicorn from fastapi import FastAPI, WebSocket, Cookie, Query, status, Depends from fastapi.responses import HTMLResponse app = FastAPI() html = """ <!DOCTYPE html> <html> <head> <title>Chat</title> </head> <body> <h1>小菠萝聊天室</h1> <form action="" onsubmit="sendMessage(event)"> <label>Item ID: <input type="text" id="itemId" autocomplete="off" value="foo"/></label> <label>Token: <input type="text" id="token" autocomplete="off" value="some-key-token"/></label> <button onclick="connect(event)">Connect</button> <hr> <label>Message: <input type="text" id="messageText" autocomplete="off"/></label> <button>Send</button> </form> <ul id='messages'> </ul> <script> var ws = null; function connect(event) { var itemId = document.getElementById("itemId") var token = document.getElementById("token") ws = new WebSocket("ws://localhost:8080/items/" + itemId.value + "/ws?token=" + token.value); ws.onmessage = function(event) { var messages = document.getElementById('messages') var message = document.createElement('li') var content = document.createTextNode(event.data) message.appendChild(content) messages.appendChild(message) }; event.preventDefault() } function sendMessage(event) { var input = document.getElementById("messageText") ws.send(input.value) input.value = '' event.preventDefault() } </script> </body> </html> """ @app.get("/") async def get(): return HTMLResponse(html) async def get_cookie_or_token( websocket: WebSocket, session: Optional[str] = Cookie(None), token: Optional[str] = Query(None) ): # 模拟:如果 session 和 token 都为空,则关闭 websocket if session or token: return session or token await websocket.close(code=status.WS_1008_POLICY_VIOLATION) @app.websocket("/items/{item_id}/ws") async def websocket_depends( websocket: WebSocket, item_id: str, q: Optional[str] = None, # 依赖项 cookie_or_token: str = Depends(get_cookie_or_token) ): # 1、创建 websocket 连接 await websocket.accept() while True: # 2、接收客户端发送的内容 data = await websocket.receive_text() # 3、服务端发送内容 await websocket.send_text(f"cookie or token value is:{cookie_or_token}") if q: # 4、如果有传查询参数 q,则再发一条 await websocket.send_text(f"query param value is:{q}") # 5、最后再发一条信息 await websocket.send_text(f"Message text was: {data}, for item ID: {item_id}") if __name__ == '__main__': uvicorn.run(app="46_websocket:app", reload=True, host="127.0.0.1", port=8080)