强一致性时代,Kafka、Redis、Celery 谁才是那块短板

本文涉及的产品
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
RDS AI 助手,专业版
简介: 这篇文章讨论了一个金融级实时Tick数据项目的失败与修复。项目最初使用Celery、Kafka和Redis,但因缺乏重试、幂等和安全策略导致问题。文章提出了四个关键改进方向,修复后的系统满足了金融级要求,总结了五点教训。

——金融级实时分析的一次反面教材拆解

要是在普通场景里做抓取,Redis、Kafka、Celery 各用各的,互不干涉也能“跑起来”。
但一旦换成 金融级的实时 Tick 数据抓取(纳斯达克逐笔数据),并且还要求 强一致性 + 零丢失,事情立刻变得严肃很多——任何抖动、丢包、重复写库都可能导致分析错误甚至交易事故。

我自己就踩过一次非常典型的坑:
“看似能跑,但实际上迟早会炸” 的反面架构。
今天就借这次经历,把原来的错误实现剖开给大家看看,然后再展示怎么一步步修回来。

一、先看反面教材(错误代码案例)

当时我们的思路很朴素:

  • Celery 调度任务
  • 每次任务抓一下 Tick 数据
  • 抓到就直接塞进 Kafka
  • Kafka Consumer 负责写入数据库
    看上去一气呵成,甚至能跑得挺顺。

下面是当时的“错误实现”(强烈不推荐这样写):

# bad_fast_path.py (错误示例:极易丢数据)
from celery import Celery
import requests
from kafka import KafkaProducer
import psycopg2

PROXY = "http://user:pass@proxy.16yun.cn:12345"
KAFKA_BOOTSTRAP = "kafka:9092"
CELERY_BROKER = "redis://redis:6379/0"
DB_DSN = "dbname=ticks user=pg password=pg host=postgres"

app = Celery('tasks', broker=CELERY_BROKER)
producer = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP)

@app.task
def fetch_and_send(ticker):
    # 完全没重试、没幂等、没安全策略
    r = requests.get(
        f"https://api.example.com/tick/{ticker}",
        proxies={
   "http": PROXY, "https": PROXY},
        timeout=5
    )
    producer.send("ticks", r.content)
    producer.flush()

消费者端也一样简单粗暴:

from kafka import KafkaConsumer
import psycopg2

consumer = KafkaConsumer("ticks", bootstrap_servers="kafka:9092",
                         auto_offset_reset='earliest', enable_auto_commit=True)
conn = psycopg2.connect(DB_DSN)

for msg in consumer:
    # 写库失败会直接丢数据(因为位移已经提前提交)
    conn.cursor().execute("INSERT INTO tick_raw (payload) VALUES (%s)", (msg.value,))
    conn.commit()

当时觉得“挺干净的呀”。
结果没过多久,问题一个接一个冒出来。

二、为什么这种写法必炸(问题分析)

总结一下,这个反面实现的核心槽点有:

1)写库失败必丢数据

Consumer 开了 enable_auto_commit=True,意味着:
消息还没成功写入数据库,位移已经被 Kafka 自动提交了。
一旦 Postgres 抽风、Deadlock、网络闪断,数据直接没了。

2)Celery 任务根本不是幂等的

同一条 Tick 可能被抓两次、重试不知道写了多少轮,最终落到 DB 里就是一堆重复数据。

3)网络 + 代理 抖一下就 GG

金融 Tick 秒级、毫秒级都有可能激增,而我们完全没加 backoff、没超时控制、没失败保护。

4)Redis 只是一个“摆设”

没有锁、没有任务去重、没有序列号控制,Celery 想发几次就发几次。

5)没有任何“可追踪能力”

没有 message_id,没有 trace id,出了问题你根本不知道是哪条 Tick 丢了。

结果就是:
系统表面上看不出问题,但数据质量已经偷偷烂掉。

三、怎么修?我们把系统一点点改对

为了把这个系统从“能跑”修成“靠谱”,我们做了 4 个关键的改造方向:

  • Celery 负责调度,不负责可靠性
    抓到的数据必须进入 Kafka 这种“专业的持久化日志系统”。
  • Kafka Producer 必须幂等、可重试、确认到位(acks=all)
  • Kafka Consumer 手动提交位移 + DB 幂等写入
    (这点是防丢数据的最核心部分)
  • Redis 负责锁 + 去重 + 短期缓存
    保证并发抓取不会乱套。

下面给出完整的修复后代码(更工程化),并保留代理IP接入示例。

完整修复方案代码示例

config.py(公共配置)

# 爬虫代理配置(16YUN示例)
# config.py
PROXY_HOST = "proxy.16yun.cn"
PROXY_PORT = 12345
PROXY_USER = "your_user"
PROXY_PASS = "your_pass"

PROXY_URL = f"http://{PROXY_USER}:{PROXY_PASS}@{PROXY_HOST}:{PROXY_PORT}"

KAFKA_BOOTSTRAP = "kafka:9092"
KAFKA_TOPIC = "nasdaq_ticks"

CELERY_BROKER_URL = "redis://redis:6379/0"
CELERY_BACKEND = "redis://redis:6379/1"

REDIS_URL = "redis://redis:6379/2"

POSTGRES_DSN = "dbname=ticks user=pg password=pg host=postgres"

Celery 任务:抓取 + 写入 Kafka(更可靠版本)

# tasks.py
from celery import Celery
import requests
import time
import uuid
import json
import redis
from confluent_kafka import Producer
from config import *

app = Celery("tick_tasks", broker=CELERY_BROKER_URL, backend=CELERY_BACKEND)
rds = redis.Redis.from_url(REDIS_URL)

producer = Producer({
   
    "bootstrap.servers": KAFKA_BOOTSTRAP,
    "enable.idempotence": True,   # 幂等生产
    "acks": "all",                # Leader + ISR 全确认
    "retries": 5,
})

def http_get_with_proxy(url):
    proxies = {
   "http": PROXY_URL, "https": PROXY_URL}
    r = requests.get(url, proxies=proxies, timeout=6)
    r.raise_for_status()
    return r.text

@app.task(bind=True, max_retries=5, default_retry_delay=2)
def fetch_and_publish(self, ticker):
    lock_key = f"lock:{ticker}"
    if not rds.set(lock_key, "1", nx=True, ex=5):
        return  # 已经有 worker 抓这个 ticker 了

    try:
        url = f"https://api.example.com/market/nasdaq/tick/{ticker}"
        raw = http_get_with_proxy(url)

        message = {
   
            "message_id": str(uuid.uuid4()),
            "ticker": ticker,
            "time": int(time.time() * 1000),
            "payload": raw,
        }

        def callback(err, msg):
            if err:
                raise Exception(f"Kafka delivery failed: {err}")

        producer.produce(KAFKA_TOPIC, json.dumps(message).encode(), callback=callback)
        producer.flush(10)

    except Exception as e:
        raise self.retry(exc=e)

    finally:
        rds.delete(lock_key)

四、这一轮修完之后,我们到底得到了什么?

修复后的版本终于能满足金融级场景的核心要求:

强一致性:

  • 消费者在 DB 写成功后才提交 offset
  • 强制幂等写库(不重复、不漏写)

零丢失(工程意义上的 zero-loss):

  • Kafka acks=all
  • producer 幂等
  • Celery 自动重试
  • 消费端手动提交位移

抗抖动能力强:

  • 任务重试
  • 网络抖动不再导致“写库失败 → 消费者自动 commit → 数据永久消失”

并发抓取不再乱序:

  • Redis 锁保证不会有两个 worker 抓同一条 Tick

五、踩坑之后的 5 条总结

  1. “能跑”不等于可靠。
    特别是金融场景,一定要把失败路径想全。
  2. Kafka auto-commit 简直是初学者陷阱。
    凡是需要强一致性,auto-commit 都应该直接禁用。
  3. Celery 永远不应该被当成“可靠消息系统”。
    它负责调度,不负责持久化。
  4. Redis 用得好,能救命;用成缓存,啥都救不了。
  5. 幂等是实时数据系统的底线。
    没幂等,你迟早会遇到重复消息把表写爆的那天。
相关文章
|
2月前
|
Java Nacos Sentinel
SpringCloud 微服务解决方案:企业级架构实战
全面介绍 SpringCloud 微服务解决方案,涵盖服务注册发现、网关路由、熔断限流、分布式事务等企业级实践
|
人工智能 Java Spring
Spring Boot循环依赖的症状和解决方案
Spring Boot循环依赖的症状和解决方案
|
消息中间件
RabbitMQ中的消息优先级是如何实现的?
RabbitMQ中的消息优先级是如何实现的?
901 0
|
3月前
|
消息中间件 存储 关系型数据库
消息队列四大核心消息类型深度解析:普通、顺序、事务、定时消息原理与实战
本文深入剖析了分布式系统中消息队列的四大核心消息类型。普通消息作为基础模型实现异步通信;顺序消息通过分区有序机制保证关键业务流程的顺序性;事务消息基于两阶段提交解决分布式事务问题;定时消息则支持延迟任务执行。文章从原理、实现到应用场景,结合RocketMQ实例代码(包括事务消息与MySQL的整合)进行了全面讲解,并提供了选型对比建议。这四种消息类型各具特点,开发者应根据业务需求在解耦、顺序保证、事务一致性和延迟执行等维度进行合理选择,以构建高性能、高可用的分布式系统。
313 1
|
7月前
|
NoSQL Linux Apache
2025年10大主流开源协议全解析与开源战略的商业价值-优雅草卓伊凡
2025年10大主流开源协议全解析与开源战略的商业价值-优雅草卓伊凡
1358 8
|
10月前
|
存储 SQL 缓存
Apache Doris & SelectDB 技术能力全面解析
本文将对 Doris & SelectDB 适合的分析场景和技术能力进行概述解析
1572 1
Apache Doris & SelectDB 技术能力全面解析
|
存储 运维 监控
金融场景 PB 级大规模日志平台:中信银行信用卡中心从 Elasticsearch 到 Apache Doris 的先进实践
中信银行信用卡中心每日新增日志数据 140 亿条(80TB),全量归档日志量超 40PB,早期基于 Elasticsearch 构建的日志云平台,面临存储成本高、实时写入性能差、文本检索慢以及日志分析能力不足等问题。因此使用 Apache Doris 替换 Elasticsearch,实现资源投入降低 50%、查询速度提升 2~4 倍,同时显著提高了运维效率。
777 3
金融场景 PB 级大规模日志平台:中信银行信用卡中心从 Elasticsearch 到 Apache Doris 的先进实践
|
存储 缓存 Java
JAVA并发编程系列(11)线程池底层原理架构剖析
本文详细解析了Java线程池的核心参数及其意义,包括核心线程数量(corePoolSize)、最大线程数量(maximumPoolSize)、线程空闲时间(keepAliveTime)、任务存储队列(workQueue)、线程工厂(threadFactory)及拒绝策略(handler)。此外,还介绍了四种常见的线程池:可缓存线程池(newCachedThreadPool)、定时调度线程池(newScheduledThreadPool)、单线程池(newSingleThreadExecutor)及固定长度线程池(newFixedThreadPool)。
|
监控 前端开发 数据可视化
产研项目中的时间管理技巧:PERT 图全解与工具推荐
在项目开发中,任务复杂、依赖关系多、时间预估模糊常常让团队陷入混乱。PERT 图(计划评估与审查技术)能有效梳理任务、明确关键路径、科学预估时间,帮助产研团队从混乱走向有序。
464 5
产研项目中的时间管理技巧:PERT 图全解与工具推荐
|
12月前
|
并行计算
vllm部署模型要点
vllm部署模型要点