Python 秒杀系统实战:库存预扣 + 防超卖 极致优化实现

简介: 小张的二手球鞋秒杀系统曾因高并发导致超卖(库存变负)。本文详解三大优化方案:①数据库行锁(简单但性能低);②Redis+Lua预扣库存(原子性防超卖);③消息队列异步落库+令牌桶限流。最终实现万级并发下零超卖、精准库存与系统稳定。(239字)


小张是个独立开发者,自己搭了个二手球鞋交易平台。每次限量款发售,他都得提前喝两杯咖啡盯着后台。因为一秒内涌入上千人抢十双鞋,数据库瞬间卡死,卖出了15双——库存成了负数。他下定决心要重写秒杀系统。
代理 IP 使用小技巧 让你的数据抓取效率翻倍 (47).png

场景还原:库存为什么变成负数
先看小张原来的代码:

def create_order(user_id, product_id):
product = Product.query.get(product_id)
if product.stock > 0:
product.stock -= 1
db.session.commit()
create_order_record(user_id, product_id)
return "成功"
return "已售罄"

看起来没问题,但高并发下藏着巨大的坑。当两个请求同时读到product.stock = 1,都判断stock > 0成立,然后各自减1提交,库存就从1变成了-1。

问题的根源在于“读-判断-写”不是原子操作。多个请求交错执行,互相看不见对方正要做的修改。

方案一:数据库行锁(最直接的防线)
用数据库的行锁机制,让更新操作变成串行执行。MySQL的InnoDB引擎在更新时会自动锁住这一行。

def create_order_with_lock(user_id, product_id):
from sqlalchemy import text

# 用原生SQL加FOR UPDATE,锁住这一行
sql = text("SELECT stock FROM products WHERE id = :pid FOR UPDATE")
product = db.session.execute(sql, {"pid": product_id}).fetchone()

if product.stock > 0:
    update_sql = text("UPDATE products SET stock = stock - 1 WHERE id = :pid")
    db.session.execute(update_sql, {"pid": product_id})
    db.session.commit()
    create_order_record(user_id, product_id)
    return "成功"
db.session.rollback()
return "已售罄"

FOR UPDATE让第一个拿到锁的事务执行完之前,其他所有请求都在排队等待。这样库存肯定减不超,但性能直线下降——每个请求都得等前一个提交才能继续。

适合并发量不大(每秒几十到一百)的场景。小张的平台高峰时上千人抢,这么搞数据库连接池会瞬间爆满。

方案二:Redis预扣库存(性能飞跃)
把库存从数据库搬到内存里。Redis单线程处理命令,天然就没有并发问题。

import redis
import json

r = redis.Redis(host='localhost', port=6379, db=0)

def init_seckill(product_id, stock):
"""秒杀活动开始前,把库存存入Redis"""
r.set(f"stock:{product_id}", stock)

# 记录已下单的用户,防止重复抢
r.delete(f"users:{product_id}")

def seckill(user_id, product_id):

# 用Lua脚本保证原子性
lua_script = """
local stock_key = KEYS[1]
local users_key = KEYS[2]
local user_id = ARGV[1]

-- 检查是否已经抢过
if redis.call('sismember', users_key, user_id) == 1 then
    return -1
end

-- 扣库存
local stock = redis.call('decr', stock_key)
if stock >= 0 then
    redis.call('sadd', users_key, user_id)
    return stock
else
    -- 库存不足,回滚(实际上decr负数后没法回滚,所以先检查)
    return -2
end
"""

# 改进版:先检查再扣减
lua_fixed = """
local stock_key = KEYS[1]
local users_key = KEYS[2]
local user_id = ARGV[1]

if redis.call('sismember', users_key, user_id) == 1 then
    return -1
end

local stock = redis.call('get', stock_key)
if not stock or tonumber(stock) <= 0 then
    return -2
end

redis.call('decr', stock_key)
redis.call('sadd', users_key, user_id)
return tonumber(stock) - 1
"""

stock_key = f"stock:{product_id}"
users_key = f"users:{product_id}"

result = r.eval(lua_fixed, 2, stock_key, users_key, user_id)

if result == -1:
    return "您已经抢过了"
elif result == -2:
    return "已售罄"
else:
    # 异步写入数据库
    async_save_order(user_id, product_id)
    return f"抢到了,剩余{result}件"

Lua脚本在Redis里是原子执行的,整个过程不会被其他命令打断。decr之前先get检查库存,彻底杜绝超卖。

方案三:消息队列削峰填谷
Redis扛住了抢购请求,但每个成功用户都要写数据库创建订单。上万请求同时写数据库,照样会崩。

用消息队列把写操作变成异步的。用户点击抢购后立刻返回“排队中”,后台慢慢处理。

import pika
import threading
from queue import Queue

简单的内存队列(适合单机演示)

order_queue = Queue(maxsize=10000)

def async_save_order(user_id, product_id):
"""生产者:把订单放入队列"""
order_queue.put({
"user_id": user_id,
"product_id": product_id,
"timestamp": time.time()
})

def order_worker():
"""消费者:后台线程慢慢写数据库"""
while True:
order_data = order_queue.get()
try:

        # 这里写数据库
        create_order_record(order_data["user_id"], order_data["product_id"])
        print(f"订单已保存: {order_data}")
    except Exception as e:
        print(f"保存失败: {e}")
        # 失败重试逻辑
        time.sleep(1)
        order_queue.put(order_data)
    finally:
        order_queue.task_done()

启动4个后台线程并发消费

for _ in range(4):
t = threading.Thread(target=order_worker, daemon=True)
t.start()

实际生产环境会用RabbitMQ或Kafka。消费者按数据库能承受的速度慢慢处理,秒杀瞬间的流量洪峰就被削平了。

极致优化:令牌桶限流
即便Redis性能再好,也不可能无限扩展。加上限流,保护系统不被恶意刷单击垮。

import time

class TokenBucket:
def init(self, rate, capacity):
self.rate = rate # 每秒补充的令牌数
self.capacity = capacity # 桶的容量
self.tokens = capacity
self.last_refill = time.time()

def acquire(self):
    now = time.time()
    # 补充令牌
    elapsed = now - self.last_refill
    self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
    self.last_refill = now

    if self.tokens >= 1:
        self.tokens -= 1
        return True
    return False

每个商品独立限流器,每秒只放行100个请求

limiter = TokenBucket(rate=100, capacity=100)

def seckill_with_limit(user_id, product_id):
if not limiter.acquire():
return "系统繁忙,请稍后再试"
return seckill(user_id, product_id)

令牌桶比计数器算法更平滑。漏桶强行让请求匀速通过,令牌桶允许短时间突发流量——比如前0.5秒用掉100个令牌,后0.5秒就只能等令牌慢慢补充。

完整实战:从开始到结束
把上面所有组件拼起来,形成一个完整的秒杀流程:

from flask import Flask, request, jsonify
import redis
import threading
import time

app = Flask(name)
r = redis.Redis(decode_responses=True)

初始化秒杀活动

def init_seckill(product_id, stock, total_limit=1000):
r.set(f"stock:{product_id}", stock)
r.set(f"total_limit:{product_id}", total_limit)
r.delete(f"users:{product_id}")

优化的Lua脚本(检查+扣减+去重)

SECILL_LUA = """
local stock = redis.call('get', KEYS[1])
if not stock or tonumber(stock) <= 0 then
return 0
end
local user_exists = redis.call('sismember', KEYS[2], ARGV[1])
if user_exists == 1 then
return -1
end
redis.call('decr', KEYS[1])
redis.call('sadd', KEYS[2], ARGV[1])
return 1
"""

@app.route('/seckill/')
def seckill_api(product_id):
user_id = request.args.get('user_id')
if not user_id:
return jsonify({"code": 400, "msg": "缺少user_id"})

stock_key = f"stock:{product_id}"
users_key = f"users:{product_id}"

result = r.eval(SECILL_LUA, 2, stock_key, users_key, user_id)

if result == 0:
    return jsonify({"code": 200, "msg": "已售罄"})
elif result == -1:
    return jsonify({"code": 200, "msg": "每人限购一件"})
else:
    # 异步落库
    order_queue.put({"user_id": user_id, "product_id": product_id})
    return jsonify({"code": 200, "msg": "抢购成功,正在处理"})

if name == 'main':
init_seckill(1, 10) # 商品1号,库存10件
app.run(debug=False, threaded=True)

用wrk或ab压测一下:

模拟200个并发,总共1000个请求

wrk -t4 -c200 -d10s --timeout=2s "http://localhost:5000/seckill/1?user_id=123"

Redis轻松扛住几千并发,库存始终没超卖,数据库订单表也因为有队列保护而稳如泰山。

踩坑经验分享
Redis挂了怎么办? 秒杀开始前做一次库存全量备份到数据库。Redis宕机时快速降级到数据库行锁方案,虽然慢但不会丢失订单。

用户重复点击怎么办? 前端按钮置灰只做一半工作。真正防重靠Lua脚本里的sismember检查。更彻底的办法是用SETNX给每个用户加一个短时锁:

def prevent_double_click(user_id, product_id):
lock_key = f"click_lock:{user_id}:{product_id}"
if r.setnx(lock_key, 1):
r.expire(lock_key, 2) # 2秒过期
return True
return False

库存热key问题:几千人抢同一个商品,Redis单节点网卡可能被打满。可以用本地缓存分摊压力:

from cachetools import TTLCache

local_cache = TTLCache(maxsize=100, ttl=1) # 1秒过期

def get_stock_with_cache(product_id):
if product_id in local_cache:
return local_cache[product_id]
stock = r.get(f"stock:{product_id}")
local_cache[product_id] = stock
return stock

每个服务器节点缓存1秒,把Redis的查询压力降低几十倍。

最终架构图
用户请求 → Nginx限流 → Flask应用 → 令牌桶限流 → Redis预扣库存 → 消息队列 → 数据库落库

每一层都是漏斗结构,流量从外到内逐渐收敛。最外层的Nginx挡住恶意刷量,Redis只处理真正的库存操作,数据库最终只写入成功订单。

小张按照这套架构重写了秒杀系统。下一款限量球鞋发售时,后台监控面板一片绿色,10双鞋在0.3秒内被抢光,库存精准归零。他终于可以不用喝咖啡盯后台了。

目录
相关文章
|
4月前
|
存储 搜索推荐 开发者
RAG 文本分块:七种主流策略的原理与适用场景
分块是RAG系统的基石,直接影响检索质量与LLM推理效果。行业共识:“分块决定RAG质量的70%”。从固定大小、句子/段落级,到语义、递归、滑动窗口及层次化分块,策略需匹配文档类型与任务需求。劣质分块导致上下文断裂、噪声激增、幻觉频发——燃料不行,再强的引擎也徒劳。
546 2
RAG 文本分块:七种主流策略的原理与适用场景
|
1月前
|
人工智能 API
90%的提示词方法正在失效:GPT-5.5发布后的真相
GPT-5.5发布后,传统提示词工程正快速失效:过度细化步骤反降效,OpenAI关停微调API,Karpathy宣告“提示词工程已死”。新范式转向Context与Harness Engineering——用Agent架构(Model+Harness)替代手写提示,聚焦目标定义、上下文编排与错误拦截机制。
217 1
|
1月前
|
SQL 人工智能 安全
为什么你的AI Agent总输出垃圾?因为你没装“技能插件”
本文揭示AI Agent“做事乱”的根源:并非模型能力不足,而是缺乏可执行的技能插件(Skill)。文章指出,大模型缺的不是推理力,而是“怎么做”的上下文——如读文件、查数据库、调API等实操能力。通过MCP协议+工具函数,Skill将业务知识封装为即插即用的数字资产,让Agent从“纸上谈兵的参谋”升级为“自带工具箱的施工队”。
|
2月前
|
存储 人工智能 JavaScript
Prompt、Context、Harness:AI Agent 工程的三层架构解析
2023年重“Prompt”(如何说),2025年重“Context”(看到什么),2026年跃升至“Harness”(系统级约束与验证)。三者非替代而是分层:Prompt优化表达,Context管理信息环境,Harness构建可信执行系统——模型是马,Harness才是缰绳、马鞍与路。
995 10
Prompt、Context、Harness:AI Agent 工程的三层架构解析
|
2月前
|
人工智能 自然语言处理 测试技术
DeepSeek V4:百万上下文,万亿参数,以及重新泛起涟漪的开源池塘
DeepSeek V4发布Pro(1.6T参数/49B激活)与Flash(284B/13B)双模型,均支持1M上下文、thinking模式及Agent能力。全栈开源(权重+技术报告+API+定价),采用混合注意力架构显著降本,中文长文本与推理能力突出,是当前少有的万亿级开源系统级发布
2043 4
DeepSeek V4:百万上下文,万亿参数,以及重新泛起涟漪的开源池塘
|
6月前
|
缓存 安全 搜索推荐
网页模板源码-网站源码建设方式
本文聚焦免费开源企业网站源码模板,解析其低成本、高灵活等优势,推荐 PageAdmin CMS、Joomla、帝国 CMS 等主流模板并说明适配场景,阐述选择方法与二次开发优化要点,为企业低成本高效搭建网站提供实用指引。
455 3
|
9月前
|
数据采集 人工智能 文字识别
《法务RAG开发不踩坑:Kiln+LlamaIndex+Helicone的协同方法指南》
本文记录企业级法务知识库RAG系统的多AI协同开发实战:面对2万份格式混杂、含15%模糊扫描件的法律文档,14天交付需3秒响应精准查询的系统,构建Kiln AI、LlamaIndex、Helicone协同矩阵。Kiln AI完成数据清洗(有效信息密度提至85%)、合成训练样本及模型微调,使专业术语识别准确率达92%;LlamaIndex搭建三层检索架构,融合语义与关键词检索,匹配错误率降至5%,响应时间缩至2.1秒;Helicone优化提示词与推理监控,输出规范率达97%。
596 3
|
NoSQL Java Redis
推荐一款好用的开源免费Java CMS内容管理站群系统
Java开源内容管理系统(JProcms),基于SpringCloud、SpringBoot、MyBatisPlus、Vue3等技术构建,采用Apache-2.0协议,支持免费商用。系统具备自定义字段存储与可视化设计、API制作网站群页面等功能,强调简单灵活的设计理念,降低二次开发成本。支持多种数据库、消息队列和认证方式,提供SaaS多租户、动态权限菜单、工作流配置等强大功能,同时集成阿里云、腾讯云服务,适用于高效建站与内容管理。
1984 4
|
机器学习/深度学习 Oracle 关系型数据库
Oracle 19c单机一键安装脚本分享
Oracle 19c单机一键安装脚本分享
1063 2
|
消息中间件 Java 中间件
MQ四兄弟:如何保证消息可靠性
本文介绍了RabbitMQ、RocketMQ、Kafka和Pulsar四种消息中间件的可靠性机制。这些中间件通过以下几种方式确保消息的可靠传输:1. 消息持久化,确保消息在重启后不会丢失;2. 确认机制,保证消息从生产者到消费者都被成功处理;3. 重试机制,处理失败后的重试;4. 死信队列,处理无法消费的消息。每种中间件的具体实现略有不同,但核心思想相似,都是从生产者、中间件本身和消费者三个角度来保障消息的可靠性。
870 0

热门文章

最新文章