一个简单的PyQt WebSocket示例:
from PyQt5.QtNetwork import QAbstractSocket, QTcpSocket, QHostAddress from PyQt5.QtWebSockets import QWebSocket, QWebSocketServer from PyQt5.QtCore import QUrl, QObject class WebSocket(QObject): def __init__(self, parent=None): super().__init__(parent) self.server = QWebSocketServer("WebSocket Server", QWebSocketServer.NonSecureMode, parent) self.server.newConnection.connect(self.handle_new_connection) self.clients = [] def listen(self, port): address = QHostAddress.LocalHost if not self.server.listen(address, port): print("Failed to listen on port", port) else: print("WebSocket server listening on port", port) def handle_new_connection(self): client = self.server.nextPendingConnection() client.binaryMessageReceived.connect(self.handle_message) client.disconnected.connect(self.handle_disconnect) self.clients.append(client) print("Client connected:", client.peerAddress().toString()) def handle_message(self, message): sender = self.sender() print("Message received from", sender.peerAddress().toString(), ":", message) for client in self.clients: if client != sender: client.sendBinaryMessage(message) def handle_disconnect(self): sender = self.sender() print("Client disconnected:", sender.peerAddress().toString()) self.clients.remove(sender) if __name__ == "__main__": import sys from PyQt5.QtWidgets import QApplication app = QApplication(sys.argv) server = WebSocket() server.listen(8000) sys.exit(app.exec_())
这里我们使用了QWebSocketServer
和QWebSocket
类来创建一个WebSocket服务器,并实现了基本的广播消息功能。当有客户端连接时,我们将客户端添加到clients
列表中,并为每个客户端连接的binaryMessageReceived
信号和disconnected
信号连接对应的槽函数,以便在收到消息或客户端断开连接时执行相应的功能。在handle_message
函数中,我们将收到的消息发送给除发送者以外的所有客户端。