Django实时通信实战:WebSocket与ASGI全解析(下)

简介: 本文将使用 Django Channels 构建一个多用户实时聊天室,并详细介绍如何在生产环境中部署 WebSocket 应用。

一、实战:构建实时聊天室

环境准备

下面将使用 Django Channels 构建一个多用户实时聊天室。Django Channels的介绍、安装与配置,参考上篇。

实现 WebSocket 消费者

创建mysite\myapp_infra\websocket\consumers.py文件,这是处理 WebSocket 连接的核心。主要实现了如下方法:

  • connect:处理新连接,验证用户token,将用户通道写入缓存,并加入默认组
  • disconnect:处理连接断开,删除用户的缓存通道记录,将用户从所在的房间组中移除
  • receive:处理接收到的消息。首先进行心跳检测(ping/pong),然后验证用户身份,解析两层 JSON 结构的消息内容。根据消息类型 demo-message-send 处理文本消息:若指定接收用户则单发,否则群发广播。
  • broadcast_message:处理群发消息,从事件中提取payload数据并发送给所有连接的客户端
  • single_message:处理单发消息,从事件中提取payload数据并发送给指定的客户端
"""WebScoket 消费者"""

import json
import logging
from django.core.cache import cache
from django.conf import settings
from channels.generic.websocket import AsyncWebsocketConsumer
from rest_framework_simplejwt.tokens import RefreshToken
from rest_framework_simplejwt.exceptions import TokenError

logger = logging.getLogger(__name__)


class InfraConsumer(AsyncWebsocketConsumer):
    """基础设施-WebSocket 异步消费者"""

    async def connect(self):
        """当客户端发起 WebSocket 连接时调用"""
        query_params = self.scope["query_string"].decode()
        # 从查询参数获取token
        token_param = [
            param.split("=")
            for param in query_params.split("&")
            if param.startswith("token=")
        ]
        if not token_param or len(token_param[0]) != 2:
            logger.error("缺少token参数")
            await self.close(code=4001)  # 自定义错误码
            return

        token = token_param[0][1]
        try:
            # 验证Refresh Token有效性
            refresh = RefreshToken(token)
            user_id = refresh["user_id"]
            self.scope["user"] = {
   "id": user_id, "token_type": "refresh"}

            # 登录成功后,将用户通道写入缓存
            cache.set(f"user_{user_id}_channel", self.channel_name)
            # 加入默认组
            self.room_group_name = settings.DEFAULT_GROUP_NAME
            await self.channel_layer.group_add(self.room_group_name, self.channel_name)
            # 接受客户端的WebSocket连接请求
            await self.accept()
        except TokenError as e:
            logger.error("无效token")
            await self.close(code=4003)
        except Exception as e:
            logger.error(str(e))
            await self.close(code=4000)

    async def disconnect(self, close_code):
        """当客户端断开 WebSocket 连接时调用"""
        # 获取当前用户
        user = self.scope.get("user")
        if user:
            user_id = user.get("id")
            # 移除用户通道
            cache.delete(f"user_{user_id}_channel")
            # 将用户从组中移除
            room_group_name = getattr(self, "room_group_name", None)
            if room_group_name:
                await self.channel_layer.group_discard(
                    room_group_name, self.channel_name
                )

    async def receive(self, text_data=None, bytes_data=None):
        """当接收到客户端发送的消息时调用"""
        # 心跳检测
        if text_data == "ping":
            await self.send(text_data="pong")
            return

        user = self.scope.get("user")
        if not user:
            logger.warning(f"匿名用户访问拒绝:{text_data}")
            return
        logger.info(f"收到用户 {user.get('id')} 发送消息:{text_data}")

        # 因为消息采用了两次JSON封装,这里进行两次JSON解析
        try:
            outer_payload = json.loads(text_data)  # 外层JSON解释
            message_type = outer_payload.get("type")
            raw_content = outer_payload.get("content", "{}")
            inner_content = json.loads(raw_content)  # 内层JSON解释
        except json.JSONDecodeError:
            logger.error("内容解析失败")
            return

        # 处理业务消息
        if message_type == "demo-message-send":
            message_text = inner_content.get("text", "").strip()
            if not message_text:
                logger.warning("消息内容不能为空")
                return

            # 构建响应字典
            content = {
   
                "fromUserId": user.get("id"),
                "text": message_text,
                "single": False,  # 默认群发
            }

            # 判断接收对象
            target_user_id = inner_content.get("toUserId")
            if target_user_id not in [None, "", 0]:
                # 单发
                await self._send_single_message(target_user_id, content)
            else:
                # 群发广播
                await self._send_broadcast_message(content)

    def _build_response(self, content):
        """构建标准化的响应消息,进行两次JSON封装"""
        # 第一次封装:添加消息类型
        inner_message = {
   
            "type": "demo-message-receive",
            "content": json.dumps(content),
        }
        # 第二次封装:整体序列化
        return json.dumps(inner_message)

    async def _send_single_message(self, target_user_id, content):
        """向指定用户发送单条消息"""
        # 获取用户通道
        target_channel = cache.get(f"user_{target_user_id}_channel")
        if not target_channel:
            return False

        # 构建并发送消息
        message = self._build_response(content)
        await self.channel_layer.send(
            target_channel,
            {
   
                "type": "single.message",
                "payload": message,
            },
        )
        return True

    async def _send_broadcast_message(self, content):
        """向房间内所有用户广播消息"""
        message = self._build_response(content)
        await self.channel_layer.group_send(
            self.room_group_name,
            {
   
                "type": "broadcast.message",
                "payload": message,
            },
        )

    async def broadcast_message(self, event):
        """处理群发消息"""
        payload = event["payload"]
        await self.send(text_data=payload)

    async def single_message(self, event):
        """处理单发消息"""
        payload = event["payload"]
        await self.send(text_data=payload)

点击查看完整代码

配置 WebSocket 路由

创建mysite\myapp_infra\websocket\routing.py文件,定义 WebSocket 的 URL 路由

"""WebSocket 路由配置"""

from django.urls import re_path, path
from .consumers import InfraConsumer

websocket_urlpatterns = [
    # 调用as_asgi()类方法来获取一个 ASGI 应用程序
    re_path(r"^infra/ws/?$", InfraConsumer.as_asgi()),
]

然后在项目的mysite\mysite\asgi.py配置中添加 ASGI 路由

import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack

# 设置环境变量并初始化Django应用
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myproject.settings")
django_application = get_asgi_application()


def get_websocket_application():
    """延迟导入WebSocket路由(避免循环导入)"""
    from myapp_infra.websocket.routing import websocket_urlpatterns

    return AuthMiddlewareStack(URLRouter(websocket_urlpatterns))


# 协议路由:区分HTTP和WebSocket请求
application = ProtocolTypeRouter(
    {
   
        "http": django_application,
        "websocket": get_websocket_application(),
    }
)

点击查看完整代码

创建聊天界面模板

以Vue3界面为例,代码实现了一个 WebSocket 客户端界面,功能包括:

  • 连接控制:显示连接状态、开启/关闭 WebSocket 连接。
  • 消息发送:输入消息内容并选择接收人(单发或群发),点击发送按钮通过 WebSocket 发送消息。
  • 消息接收与展示:监听 WebSocket 返回的数据,解析不同类型的消息(如单发、群发、系统通知)并在右侧列表中倒序展示。
  • 用户列表获取:页面加载时获取用户列表用于选择消息接收人。

image-20250727152815069.png

点击查看完整代码

实现效果

使用 ASGI 运行项目

# 开发环境启动
uvicorn mysite.asgi:application --reload

使用不同的浏览器,分别登录不同的用户,实现实时互发消息。

image-20250727091154265.png

二、生产环境部署

Nginx 配置

使用 Nginx 作为反向代理,添加以下配置来处理 WebSocket 连接。这段配置告诉 Nginx 如何正确处理 WebSocket 升级请求。

location /ws/ {
   
    proxy_pass http://backend;
    proxy_http_version 1.1;
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection "upgrade";
}

使用 Gunicorn+Uvicorn 部署

Gunicorn(全称 Green Unicorn)是一个用于 UNIX 的 高性能 Python WSGI HTTP 服务器。Gunicorn只能用于 Linux 系统,Windows上无法使用。

安装

pip install gunicorn

# 查看版本
gunicorn -v
gunicorn -h

Gunicorn 作为进程管理器,配合 Uvicorn 工作进程处理 ASGI 应用

  • -w 工作进程的数量。默认启动 1 个 Worker 进程
  • -k 要运行的工作进程类型。
  • -b 指定要绑定的服务器套接字。
# 基本启动命令
gunicorn -b 0.0.0.0:8000 -k uvicorn.workers.UvicornWorker mysite.asgi:application

优化 Worker 数量

Gunicorn Worker 数量的设置对性能影响很大,推荐的公式是:(CPU核心数 × 2) + 1。我们可以编写一个启动脚本来自动计算最佳 Worker 数量

#!/bin/bash
# run_gunicorn.sh

# 计算最佳 Worker 数
CORES=$(nproc)
WORKERS=$((CORES * 2 + 1))

# 限制最大Worker数(避免内存不足)
MAX_WORKERS=8
if [ $WORKERS -gt $MAX_WORKERS ]; then
  WORKERS=$MAX_WORKERS
fi

# 启动命令
gunicorn -b 0.0.0.0:8000 \
  --workers $WORKERS \
  --max-requests 1000 \         # 预防内存泄漏
  --timeout 120 \               # 超时控制
  --keep-alive 5 \              # Keep-Alive
  -k uvicorn.workers.UvicornWorker \
  mysite.asgi:application

添加执行权限并运行

chmod +x run_gunicorn.sh
./run_gunicorn.sh

您正在阅读的是《Django从入门到实战》专栏!关注不迷路~

相关文章
|
2月前
|
数据挖掘 数据库 Python
Django实战:基于Django和openpyxl实现Excel导入导出功能
`openpyxl` 是用于处理 Excel 文件的 Python 库。本文详解其在 Django 项目中的实战应用,涵盖 Excel 文件的生成、下载、上传与解析。
97 0
Django实战:基于Django和openpyxl实现Excel导入导出功能
|
2月前
|
监控 NoSQL 网络协议
Django 实时通信实战:WebSocket 与 ASGI 全解析(上)
WebSocket 是一种全双工通信协议,支持实时数据传输,适用于聊天、协作、监控等场景。ASGI 是异步 Web 标准,配合 Uvicorn 服务器和 Django Channels,可实现 Django 的 WebSocket 功能,提升实时应用性能。
129 0
|
27天前
|
缓存 监控 中间件
Django中间件自定义开发指南:从原理到实战的深度解析
Django中间件是Web应用的“交通警察”,在请求与响应过程中进行全局处理,适用于身份验证、日志记录、性能监控等功能。本文详解中间件的工作原理、开发步骤及实战案例,帮助开发者掌握自定义中间件的构建方法,提升Django应用的可维护性与扩展性。
128 0
|
2月前
|
人工智能 开发工具 数据库
Django实战:Python代码规范指南
PEP 8 是 Python 官方代码风格指南,提升代码可读性与团队协作效率。本文详解命名规范、注释写法、常用工具(如 Black、flake8)、编程实践与代码优化技巧,助力写出规范、易维护的 Python 代码。
130 7
|
1月前
|
缓存 NoSQL 数据库
Django缓存机制详解:从配置到实战应用
本文全面解析Django缓存技术,涵盖配置方法与六大缓存后端,结合实战场景演示四种典型应用方式,帮助开发者提升Web应用性能,应对高并发挑战。
43 0
|
1月前
|
存储 缓存 数据库
Django模型开发全解析:字段、元数据与继承的实战指南
Django模型是业务逻辑与数据库的核心桥梁,本文详解模型开发三大核心:字段类型选择、元数据配置与继承模式应用,涵盖实战技巧与常见问题解决方案,助你构建高效可维护的数据模型。
63 0
|
2月前
|
存储 数据库 Python
Django模型关系:从一对多到多对多全解析
本文详解Django模型关系:一对多(ForeignKey)及多对多(ManyToManyField)关系的定义、操作与优化技巧。同时探讨外键约束的使用场景与权衡策略。
104 0
|
2月前
|
缓存 NoSQL API
Django缓存机制详解:从配置到实战应用
本文介绍了 Django 缓存机制的基础知识与实战应用,涵盖缓存概念、Redis 安装配置、缓存策略及 API 使用,并通过 RBAC 权限系统演示缓存的读写与删除操作,助力提升 Web 应用性能。
81 0
|
3月前
|
Linux 数据库 数据安全/隐私保护
Python web Django快速入门手册全栈版,共2590字,短小精悍
本教程涵盖Django从安装到数据库模型创建的全流程。第一章介绍Windows、Linux及macOS下虚拟环境搭建与Django安装验证;第二章讲解项目创建、迁移与运行;第三章演示应用APP创建及项目汉化;第四章说明超级用户创建与后台登录;第五章深入数据库模型设计,包括类与表的对应关系及模型创建步骤。内容精炼实用,适合快速入门Django全栈开发。
106 1
|
10月前
|
设计模式 前端开发 数据库
Python Web开发:Django框架下的全栈开发实战
【10月更文挑战第27天】本文介绍了Django框架在Python Web开发中的应用,涵盖了Django与Flask等框架的比较、项目结构、模型、视图、模板和URL配置等内容,并展示了实际代码示例,帮助读者快速掌握Django全栈开发的核心技术。
599 45