——金融级实时分析的一次反面教材拆解
要是在普通场景里做抓取,Redis、Kafka、Celery 各用各的,互不干涉也能“跑起来”。
但一旦换成 金融级的实时 Tick 数据抓取(纳斯达克逐笔数据),并且还要求 强一致性 + 零丢失,事情立刻变得严肃很多——任何抖动、丢包、重复写库都可能导致分析错误甚至交易事故。
我自己就踩过一次非常典型的坑:
“看似能跑,但实际上迟早会炸” 的反面架构。
今天就借这次经历,把原来的错误实现剖开给大家看看,然后再展示怎么一步步修回来。
一、先看反面教材(错误代码案例)
当时我们的思路很朴素:
- Celery 调度任务
- 每次任务抓一下 Tick 数据
- 抓到就直接塞进 Kafka
- Kafka Consumer 负责写入数据库
看上去一气呵成,甚至能跑得挺顺。
下面是当时的“错误实现”(强烈不推荐这样写):
# bad_fast_path.py (错误示例:极易丢数据)
from celery import Celery
import requests
from kafka import KafkaProducer
import psycopg2
PROXY = "http://user:pass@proxy.16yun.cn:12345"
KAFKA_BOOTSTRAP = "kafka:9092"
CELERY_BROKER = "redis://redis:6379/0"
DB_DSN = "dbname=ticks user=pg password=pg host=postgres"
app = Celery('tasks', broker=CELERY_BROKER)
producer = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP)
@app.task
def fetch_and_send(ticker):
# 完全没重试、没幂等、没安全策略
r = requests.get(
f"https://api.example.com/tick/{ticker}",
proxies={
"http": PROXY, "https": PROXY},
timeout=5
)
producer.send("ticks", r.content)
producer.flush()
消费者端也一样简单粗暴:
from kafka import KafkaConsumer
import psycopg2
consumer = KafkaConsumer("ticks", bootstrap_servers="kafka:9092",
auto_offset_reset='earliest', enable_auto_commit=True)
conn = psycopg2.connect(DB_DSN)
for msg in consumer:
# 写库失败会直接丢数据(因为位移已经提前提交)
conn.cursor().execute("INSERT INTO tick_raw (payload) VALUES (%s)", (msg.value,))
conn.commit()
当时觉得“挺干净的呀”。
结果没过多久,问题一个接一个冒出来。
二、为什么这种写法必炸(问题分析)
总结一下,这个反面实现的核心槽点有:
1)写库失败必丢数据
Consumer 开了 enable_auto_commit=True,意味着:
消息还没成功写入数据库,位移已经被 Kafka 自动提交了。
一旦 Postgres 抽风、Deadlock、网络闪断,数据直接没了。
2)Celery 任务根本不是幂等的
同一条 Tick 可能被抓两次、重试不知道写了多少轮,最终落到 DB 里就是一堆重复数据。
3)网络 + 代理 抖一下就 GG
金融 Tick 秒级、毫秒级都有可能激增,而我们完全没加 backoff、没超时控制、没失败保护。
4)Redis 只是一个“摆设”
没有锁、没有任务去重、没有序列号控制,Celery 想发几次就发几次。
5)没有任何“可追踪能力”
没有 message_id,没有 trace id,出了问题你根本不知道是哪条 Tick 丢了。
结果就是:
系统表面上看不出问题,但数据质量已经偷偷烂掉。
三、怎么修?我们把系统一点点改对
为了把这个系统从“能跑”修成“靠谱”,我们做了 4 个关键的改造方向:
- Celery 负责调度,不负责可靠性
抓到的数据必须进入 Kafka 这种“专业的持久化日志系统”。 - Kafka Producer 必须幂等、可重试、确认到位(acks=all)
- Kafka Consumer 手动提交位移 + DB 幂等写入
(这点是防丢数据的最核心部分) - Redis 负责锁 + 去重 + 短期缓存
保证并发抓取不会乱套。
下面给出完整的修复后代码(更工程化),并保留代理IP接入示例。
完整修复方案代码示例
config.py(公共配置)
# 爬虫代理配置(16YUN示例)
# config.py
PROXY_HOST = "proxy.16yun.cn"
PROXY_PORT = 12345
PROXY_USER = "your_user"
PROXY_PASS = "your_pass"
PROXY_URL = f"http://{PROXY_USER}:{PROXY_PASS}@{PROXY_HOST}:{PROXY_PORT}"
KAFKA_BOOTSTRAP = "kafka:9092"
KAFKA_TOPIC = "nasdaq_ticks"
CELERY_BROKER_URL = "redis://redis:6379/0"
CELERY_BACKEND = "redis://redis:6379/1"
REDIS_URL = "redis://redis:6379/2"
POSTGRES_DSN = "dbname=ticks user=pg password=pg host=postgres"
Celery 任务:抓取 + 写入 Kafka(更可靠版本)
# tasks.py
from celery import Celery
import requests
import time
import uuid
import json
import redis
from confluent_kafka import Producer
from config import *
app = Celery("tick_tasks", broker=CELERY_BROKER_URL, backend=CELERY_BACKEND)
rds = redis.Redis.from_url(REDIS_URL)
producer = Producer({
"bootstrap.servers": KAFKA_BOOTSTRAP,
"enable.idempotence": True, # 幂等生产
"acks": "all", # Leader + ISR 全确认
"retries": 5,
})
def http_get_with_proxy(url):
proxies = {
"http": PROXY_URL, "https": PROXY_URL}
r = requests.get(url, proxies=proxies, timeout=6)
r.raise_for_status()
return r.text
@app.task(bind=True, max_retries=5, default_retry_delay=2)
def fetch_and_publish(self, ticker):
lock_key = f"lock:{ticker}"
if not rds.set(lock_key, "1", nx=True, ex=5):
return # 已经有 worker 抓这个 ticker 了
try:
url = f"https://api.example.com/market/nasdaq/tick/{ticker}"
raw = http_get_with_proxy(url)
message = {
"message_id": str(uuid.uuid4()),
"ticker": ticker,
"time": int(time.time() * 1000),
"payload": raw,
}
def callback(err, msg):
if err:
raise Exception(f"Kafka delivery failed: {err}")
producer.produce(KAFKA_TOPIC, json.dumps(message).encode(), callback=callback)
producer.flush(10)
except Exception as e:
raise self.retry(exc=e)
finally:
rds.delete(lock_key)
四、这一轮修完之后,我们到底得到了什么?
修复后的版本终于能满足金融级场景的核心要求:
强一致性:
- 消费者在 DB 写成功后才提交 offset
- 强制幂等写库(不重复、不漏写)
零丢失(工程意义上的 zero-loss):
- Kafka acks=all
- producer 幂等
- Celery 自动重试
- 消费端手动提交位移
抗抖动能力强:
- 任务重试
- 网络抖动不再导致“写库失败 → 消费者自动 commit → 数据永久消失”
并发抓取不再乱序:
- Redis 锁保证不会有两个 worker 抓同一条 Tick
五、踩坑之后的 5 条总结
- “能跑”不等于可靠。
特别是金融场景,一定要把失败路径想全。 - Kafka auto-commit 简直是初学者陷阱。
凡是需要强一致性,auto-commit 都应该直接禁用。 - Celery 永远不应该被当成“可靠消息系统”。
它负责调度,不负责持久化。 - Redis 用得好,能救命;用成缓存,啥都救不了。
- 幂等是实时数据系统的底线。
没幂等,你迟早会遇到重复消息把表写爆的那天。