以下是使用 Tornado 框架创建 WebSocket 服务器的简单示例:
import tornado.ioloop import tornado.web import tornado.websocket class WebSocketHandler(tornado.websocket.WebSocketHandler): def open(self): print("WebSocket connection opened") def on_message(self, message): print(f"Received message: {message}") self.write_message(f"You sent: {message}") def on_close(self): print("WebSocket connection closed") if __name__ == "__main__": app = tornado.web.Application([(r"/websocket", WebSocketHandler)]) app.listen(8888) tornado.ioloop.IOLoop.current().start()
但是一用客户端连就報403错误
如果在使用上述代码示例时遇到了 403 错误(Forbidden),这可能是由于默认的 WebSocket 握手验证导致的。Tornado 的默认设置要求进行 WebSocket 握手验证,以确保连接的安全性。
为了解决这个问题,您可以尝试以下方法:
添加 check_origin
方法:在 WebSocketHandler 类中添加 check_origin
方法,以允许来自任何源的 WebSocket 连接。请注意,这将跳过验证来源,潜在地降低了安全性,因此仅在开发和测试阶段使用时才应该采用此方
def check_origin(self, origin): return True
完整代码
import tornado.ioloop import tornado.web import tornado.websocket class WebSocketHandler(tornado.websocket.WebSocketHandler): def open(self): print("WebSocket connection opened") def on_message(self, message): print("Received message:"+message) self.write_message("You sent: {message}") def check_origin(self, origin): return True def on_close(self): print("WebSocket connection closed") if __name__ == "__main__": app = tornado.web.Application([(r"/websocket", WebSocketHandler)]) app.listen(8888) tornado.ioloop.IOLoop.current().start()
访问:ws://localhost:8888/websocket 即可
测试地址:WebSocket在线测试工具