实时行情推送对网络延迟和连接稳定性要求极高。本文将演示如何在阿里云 ECS 上,用 Python 搭建一个生产级的 WebSocket 行情客户端,实现自动重连、心跳保活,并以股票、加密货币为例展示实时数据接收。
一、为什么要在云上搭建行情监控?
个人开发者本地运行行情客户端时,常遇到网络抖动、运营商限制、断电断网等问题。而云服务器具备稳定的公网带宽、7×24 小时运行能力,且可以就近选择与数据源节点同地域的实例,显著降低延迟。本文以阿里云为例,带您从零开始部署一个可靠的行情的监控系统。
二、准备工作
阿里云 ECS 实例
- 选择上海或杭州地域(靠近 TickDB 国内节点,延迟更低)
- 操作系统:Ubuntu 22.04 LTS(推荐)
- 配置:2核4GB 以上即可
安全组配置
- 放行出方向所有端口(默认已放行)
- 入方向建议仅开放 SSH(22)用于管理,无需开放额外端口(客户端主动连接外部 WebSocket)
获取行情数据源
本文以 TickDB 为例(提供免费的 WebSocket 实时行情接口),您也可以替换为其他支持 WebSocket 的数据源。
三、部署 Python 环境与依赖
通过 SSH 登录 ECS 后,执行以下命令安装 Python 3 及依赖库:
# 更新系统
sudo apt update && sudo apt upgrade -y
# 安装 Python3、pip 及虚拟环境
sudo apt install -y python3 python3-pip python3-venv
# 创建项目目录
mkdir ~/ticker_monitor && cd ~/ticker_monitor
# 创建虚拟环境
python3 -m venv venv
source venv/bin/activate
# 安装 WebSocket 客户端库
pip install websocket-client
四、编写生产级 WebSocket 客户端
以下代码实现了自动重连、心跳保活、消息队列处理等功能,可直接用于生产环境。
创建 monitor.py 文件:
import json
import time
import threading
import queue
import logging
import os
import websocket
# 配置日志
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# TickDB 配置(请替换为你的 API Key)
API_KEY = os.environ.get('TICKDB_API_KEY', 'your_api_key_here')
WS_URL = "wss://api.tickdb.ai/v1/realtime"
# 订阅的品种(可同时订阅多市场)
SYMBOLS = ["AAPL.US", "BTCUSDT", "600519.SH", "XAUUSD"]
# 消息队列(解耦接收与处理)
msg_queue = queue.Queue(maxsize=10000)
class WebSocketClient:
def __init__(self, url, api_key, symbols):
self.url = url
self.api_key = api_key
self.symbols = symbols
self.ws = None
self.running = False
self.reconnect_delay = 1
self.max_reconnect_delay = 60
def on_open(self, ws):
"""连接成功后发送订阅指令"""
sub_msg = {
"cmd": "subscribe",
"data": {
"channel": "ticker",
"symbols": self.symbols
}
}
ws.send(json.dumps(sub_msg))
logger.info(f"已订阅: {self.symbols}")
# 启动心跳线程
self._start_ping()
def on_message(self, ws, message):
"""收到消息后放入队列,不阻塞网络线程"""
try:
msg_queue.put_nowait(message)
except queue.Full:
logger.warning("消息队列已满,丢弃一条消息")
def on_error(self, ws, error):
logger.error(f"WebSocket 错误: {error}")
def on_close(self, ws, close_status_code, close_msg):
logger.warning(f"连接关闭: {close_status_code} {close_msg}")
if self.running:
self._reconnect()
def _start_ping(self):
"""心跳线程:每30秒发送一次ping"""
def ping_loop():
while self.running and self.ws and self.ws.sock and self.ws.sock.connected:
time.sleep(30)
try:
self.ws.send(json.dumps({
"op": "ping"}))
except:
break
threading.Thread(target=ping_loop, daemon=True).start()
def _reconnect(self):
"""指数退避重连"""
delay = min(self.reconnect_delay, self.max_reconnect_delay)
logger.info(f"将在 {delay} 秒后重连...")
time.sleep(delay)
self.reconnect_delay = min(self.reconnect_delay * 2, self.max_reconnect_delay)
self.connect()
def connect(self):
"""建立 WebSocket 连接"""
headers = {
"X-API-Key": self.api_key}
self.ws = websocket.WebSocketApp(
self.url,
header=headers,
on_open=self.on_open,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close
)
self.ws.run_forever(ping_interval=0, ping_timeout=None)
def start(self):
self.running = True
self.connect()
def stop(self):
self.running = False
if self.ws:
self.ws.close()
def process_messages():
"""独立消费者线程:从队列取出消息并处理"""
while True:
try:
msg = msg_queue.get(timeout=1)
data = json.loads(msg)
if data.get('cmd') == 'ticker':
tick = data['data']
logger.info(f"{tick['symbol']}: {tick.get('price')} @ {tick['timestamp']}")
# 可在此扩展业务逻辑:存储到数据库、触发告警等
except queue.Empty:
continue
except Exception as e:
logger.error(f"消息处理异常: {e}")
if __name__ == "__main__":
if not API_KEY or API_KEY == 'your_api_key_here':
logger.error("请设置环境变量 TICKDB_API_KEY")
exit(1)
# 启动消费者线程
consumer = threading.Thread(target=process_messages, daemon=True)
consumer.start()
# 启动 WebSocket 客户端
client = WebSocketClient(WS_URL, API_KEY, SYMBOLS)
client.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
client.stop()
logger.info("程序已退出")
五、运行与测试
设置 API Key 环境变量(避免硬编码)
export TICKDB_API_KEY="你的真实Key"运行脚本
python monitor.py观察输出
你会看到类似日志:2025-03-27 10:00:01 - INFO - 已订阅: ['AAPL.US', 'BTCUSDT', '600519.SH', 'XAUUSD'] 2025-03-27 10:00:02 - INFO - AAPL.US: 173.80 @ 1711441800000 2025-03-27 10:00:02 - INFO - BTCUSDT: 68000.00 @ 1711441800000后台持久化运行
可使用nohup或screen让程序在后台常驻:nohup python monitor.py > monitor.log 2>&1 &
六、性能优化与监控
- 网络延迟:选择与数据源节点同地域的 ECS(如上海),实测 P95 延迟可控制在 50ms 以内。
- 自动重启:若进程意外退出,可使用 systemd 或阿里云 SLS 配合监控告警。
- 日志监控:将日志输出到文件,配合阿里云日志服务(SLS)进行分析与告警。
七、总结
通过以上步骤,你已经在阿里云上搭建了一套高可用的实时行情监控系统。关键技术点包括:
- WebSocket 长连接 + 心跳保活
- 指数退避重连机制
- 消息队列解耦接收与处理
- 多市场统一订阅
文中使用的数据源 TickDB 仅作为示例,你可以根据需要替换为其他提供 WebSocket 接口的行情服务。TickDB 为开发者提供免费 API Key,可到官网申请体验。
后续扩展:可以将接收到的数据存入云数据库(如阿里云 RDS for PostgreSQL),或结合 Grafana 制作实时看板,实现更丰富的监控功能。
注:本文所有代码仅供技术交流,实际使用请遵守数据源服务商的使用协议。