文件获取:http://www.pan123.fun/share.php?id=9ojnHrbq&pwd=9LTZ 【仅供学习】
一、背景概述
在二手交易平台闲鱼上,低价好物往往上架即被秒拍。传统的人工刷新方式效率低下,本文使用 Python 实现了一套完整的自动化监控与秒拍脚本,涵盖请求签名、商品筛选、异步并发、自动化操作等技术点。
技术栈:Python 3.9+ | requests | aiohttp | asyncio | selenium | hashlib
二、环境准备与依赖安装
bash
创建虚拟环境
python -m venv venv
source venv/bin/activate # Linux/Mac
venv\Scripts\activate # Windows
安装依赖库
pip install requests
pip install aiohttp
pip install asyncio
pip install selenium
pip install beautifulsoup4
pip install fake-useragent
pip install schedule
pip install openpyxl
三、闲鱼请求签名算法逆向
闲鱼接口的 sign 参数是其核心反爬机制,签名生成逻辑如下:
python
import hashlib
import json
import time
import re
from typing import Tuple, Dict
class XianyuSignGenerator:
"""
闲鱼请求签名生成器
签名公式: sign = MD5(token + "&" + timestamp + "&" + appKey + "&" + data)
"""
APP_KEY = "34839810" # 闲鱼H5接口固定appKey
@staticmethod
def get_utc_timestamp() -> str:
"""
获取符合闲鱼接口要求的UTC时间戳(毫秒级)
注意:闲鱼使用UTC时间,需要减去8小时时区差
"""
# 获取当前UTC毫秒时间戳
utc_ms = int(time.time() * 1000)
# 减去东八区偏移(8小时 = 28800000毫秒)
corrected = utc_ms - (8 * 60 * 60 * 1000)
return str(corrected)
@staticmethod
def extract_token_from_cookie(cookie_str: str) -> str:
"""
从Cookie中提取_m_h5_tk字段的token部分
Cookie格式: _m_h5_tk=token_timestamp; ...
返回: token(下划线前的32位字符串)
"""
pattern = r'_m_h5_tk=([a-f0-9]+)_'
match = re.search(pattern, cookie_str)
if match:
return match.group(1)
raise ValueError("无法从Cookie中提取token")
@classmethod
def generate_sign(cls, token: str, data_dict: Dict) -> Tuple[str, str, str]:
"""
生成签名
Args:
token: 从Cookie提取的32位token
data_dict: 请求体数据字典
Returns:
(timestamp, data_str, sign) 三元组
"""
timestamp = cls.get_utc_timestamp()
# 将data_dict转为JSON字符串(移除空格,与浏览器行为一致)
data_str = json.dumps(data_dict, separators=(',', ':'), ensure_ascii=False)
# 构造签名字符串: token×tamp&appKey&data
sign_raw = f"{token}&{timestamp}&{cls.APP_KEY}&{data_str}"
# 计算MD5
sign = hashlib.md5(sign_raw.encode('utf-8')).hexdigest()
return timestamp, data_str, sign
四、商品搜索接口封装
python
import requests
import threading
from fake_useragent import UserAgent
from typing import List, Dict, Optional
class XianyuSearchAPI:
"""闲鱼商品搜索接口"""
# 闲鱼H5搜索接口地址(需定期更新)
SEARCH_URL = "https://h5api.m.taobao.com/h5/mtop.taobao.idle.search/1.0/"
def __init__(self, cookie: str):
self.cookie = cookie
self.ua = UserAgent()
self.token = XianyuSignGenerator.extract_token_from_cookie(cookie)
self.session = requests.Session()
self.session.headers.update({
"User-Agent": self.ua.random,
"Cookie": self.cookie,
"Referer": "https://h5.m.taobao.com/",
"Origin": "https://h5.m.taobao.com",
"Content-Type": "application/x-www-form-urlencoded;charset=UTF-8",
})
self._lock = threading.Lock()
def search(self, keyword: str, page: int = 1, sort: str = "new") -> Optional[List[Dict]]:
"""
搜索商品
Args:
keyword: 搜索关键词
page: 页码
sort: 排序方式 (new: 最新发布, price_asc: 价格升序, price_desc: 价格降序)
Returns:
商品列表,每项包含title, price, item_id, pic_url, 等字段
"""
# 构造请求参数
data_params = {
"q": keyword,
"page": page,
"sort": sort,
"pageSize": 20,
"search_type": "item",
"need_zip": 0,
"need_control": 1
}
# 生成签名
timestamp, data_str, sign = XianyuSignGenerator.generate_sign(
self.token, data_params
)
# 构造请求体
form_data = {
"data": data_str,
"timestamp": timestamp,
"sign": sign,
"appKey": XianyuSignGenerator.APP_KEY,
"t": timestamp,
"v": "1.0",
"type": "original",
"api": "mtop.taobao.idle.search",
}
try:
with self._lock:
response = self.session.post(
self.SEARCH_URL,
data=form_data,
timeout=5,
headers={"User-Agent": self.ua.random}
)
if response.status_code == 200:
result = response.json()
if result.get("ret", [""])[0] == "SUCCESS::调用成功":
data = result.get("data", {})
items = data.get("data", {}).get("items", [])
return self._parse_items(items)
else:
print(f"API返回错误: {result.get('ret')}")
return None
except Exception as e:
print(f"请求异常: {str(e)}")
return None
def _parse_items(self, items: List[Dict]) -> List[Dict]:
"""解析商品数据"""
parsed = []
for item in items:
# 根据接口返回结构提取字段(实际需抓包确认)
product_info = item.get("product", {}) or item
parsed.append({
"item_id": product_info.get("itemId") or product_info.get("id"),
"title": product_info.get("title", ""),
"price": float(product_info.get("price", 0)),
"pic_url": product_info.get("picUrl") or product_info.get("picture"),
"location": product_info.get("location", ""),
"nick": product_info.get("nick", ""),
"status": product_info.get("status", ""),
"detail_url": f"https://h5.m.taobao.com/taobao/idle/detail.html?id={product_info.get('itemId')}"
})
return parsed
五、智能筛选与去重模块
python
import hashlib
import sqlite3
from datetime import datetime
from typing import List, Dict, Optional, Callable
class ItemFilter:
"""商品智能筛选器"""
def __init__(self, min_price: float = None, max_price: float = None,
keywords_include: List[str] = None, keywords_exclude: List[str] = None,
min_credit: int = None):
"""
初始化筛选条件
Args:
min_price: 最低价格
max_price: 最高价格
keywords_include: 标题必须包含的关键词列表
keywords_exclude: 标题不能包含的关键词列表
min_credit: 最低信用等级(1-5)
"""
self.min_price = min_price
self.max_price = max_price
self.keywords_include = keywords_include or []
self.keywords_exclude = keywords_exclude or []
self.min_credit = min_credit
def filter(self, item: Dict) -> bool:
"""判断商品是否符合筛选条件"""
# 价格筛选
price = item.get("price", 0)
if self.min_price is not None and price < self.min_price:
return False
if self.max_price is not None and price > self.max_price:
return False
# 标题关键词筛选
title = item.get("title", "").lower()
for kw in self.keywords_include:
if kw.lower() not in title:
return False
for kw in self.keywords_exclude:
if kw.lower() in title:
return False
return True
@staticmethod
def generate_item_signature(item: Dict) -> str:
"""
生成商品唯一签名(用于去重)
签名策略: MD5(item_id + title + price)
"""
item_id = str(item.get("item_id", ""))
title = item.get("title", "")[:50]
price = str(item.get("price", 0))
raw = f"{item_id}_{title}_{price}"
return hashlib.md5(raw.encode('utf-8')).hexdigest()
class DuplicateRemover:
"""商品去重管理器(基于SQLite持久化)"""
def __init__(self, db_path: str = "xianyu_history.db"):
self.db_path = db_path
self._init_db()
def _init_db(self):
"""初始化数据库表"""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS sent_items (
signature TEXT PRIMARY KEY,
item_id TEXT,
title TEXT,
price REAL,
sent_time TIMESTAMP
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_sent_time ON sent_items(sent_time)")
def is_duplicate(self, signature: str) -> bool:
"""检查是否已处理过"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("SELECT 1 FROM sent_items WHERE signature = ?", (signature,))
return cursor.fetchone() is not None
def mark_sent(self, signature: str, item: Dict):
"""标记商品已处理"""
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"INSERT OR REPLACE INTO sent_items (signature, item_id, title, price, sent_time) VALUES (?, ?, ?, ?, ?)",
(signature, str(item.get("item_id", "")), item.get("title", "")[:200],
item.get("price", 0), datetime.now())
)
def clean_old_records(self, days: int = 7):
"""清理N天前的历史记录"""
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"DELETE FROM sent_items WHERE sent_time < datetime('now', ?)",
(f'-{days} days',)
)
六、异步监控引擎
python
import asyncio
import aiohttp
import time
from typing import List, Dict, Optional
from dataclasses import dataclass
from enum import Enum
class MonitorStatus(Enum):
RUNNING = "running"
STOPPED = "stopped"
PAUSED = "paused"
@dataclass
class MonitorTask:
"""监控任务配置"""
task_id: str
keyword: str
filter: ItemFilter
interval_seconds: int = 10
sort: str = "new"
max_pages: int = 1
enabled: bool = True
class AsyncMonitorEngine:
"""异步监控引擎 - 支持多任务并发"""
def __init__(self, cookie: str):
self.cookie = cookie
self.tasks: Dict[str, MonitorTask] = {}
self.status = MonitorStatus.STOPPED
self._remover = DuplicateRemover()
self._callbacks: List[Callable] = []
self._session: Optional[aiohttp.ClientSession] = None
def register_callback(self, callback: Callable[[Dict], None]):
"""注册新商品发现回调"""
self._callbacks.append(callback)
def add_task(self, task: MonitorTask):
"""添加监控任务"""
self.tasks[task.task_id] = task
async def _init_session(self):
"""初始化HTTP会话"""
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession()
async def _search_single_page(self, task: MonitorTask, page: int) -> List[Dict]:
"""异步搜索单页商品"""
url = XianyuSearchAPI.SEARCH_URL
data_params = {
"q": task.keyword,
"page": page,
"sort": task.sort,
"pageSize": 20,
}
token = XianyuSignGenerator.extract_token_from_cookie(self.cookie)
timestamp, data_str, sign = XianyuSignGenerator.generate_sign(token, data_params)
form_data = {
"data": data_str,
"timestamp": timestamp,
"sign": sign,
"appKey": XianyuSignGenerator.APP_KEY,
}
try:
async with self._session.post(url, data=form_data) as resp:
if resp.status == 200:
result = await resp.json()
if result.get("ret", [""])[0] == "SUCCESS::调用成功":
items = result.get("data", {}).get("data", {}).get("items", [])
return self._parse_items(items)
except Exception as e:
print(f"[{task.task_id}] 搜索失败: {e}")
return []
def _parse_items(self, items: List[Dict]) -> List[Dict]:
"""解析商品数据"""
parsed = []
for item in items:
product_info = item.get("product", {}) or item
parsed.append({
"item_id": product_info.get("itemId"),
"title": product_info.get("title", ""),
"price": float(product_info.get("price", 0)),
"pic_url": product_info.get("picUrl"),
"location": product_info.get("location", ""),
"nick": product_info.get("nick", ""),
"source_keyword": task.keyword,
"found_time": datetime.now().isoformat(),
})
return parsed
async def _monitor_single_task(self, task: MonitorTask):
"""监控单个任务"""
if not task.enabled:
return
print(f"[{task.task_id}] 开始监控关键词: {task.keyword}")
all_items = []
for page in range(1, task.max_pages + 1):
items = await self._search_single_page(task, page)
all_items.extend(items)
await asyncio.sleep(0.5) # 页面间隔
# 筛选和去重
for item in all_items:
if not task.filter.filter(item):
continue
signature = ItemFilter.generate_item_signature(item)
if self._remover.is_duplicate(signature):
continue
# 标记已处理
self._remover.mark_sent(signature, item)
# 触发回调
for callback in self._callbacks:
try:
callback(item)
except Exception as e:
print(f"回调执行失败: {e}")
async def _run_loop(self):
"""主监控循环"""
while self.status == MonitorStatus.RUNNING:
start_time = time.time()
# 并发执行所有任务
tasks = [self._monitor_single_task(task) for task in self.tasks.values()]
await asyncio.gather(*tasks)
elapsed = time.time() - start_time
# 动态计算睡眠时间(最短间隔5秒)
sleep_time = max(5, 10 - elapsed)
await asyncio.sleep(sleep_time)
def start(self):
"""启动监控引擎"""
if self.status == MonitorStatus.RUNNING:
print("引擎已在运行中")
return
self.status = MonitorStatus.RUNNING
async def _start():
await self._init_session()
await self._run_loop()
asyncio.run(_start())
def stop(self):
"""停止监控引擎"""
self.status = MonitorStatus.STOPPED
if self._session:
asyncio.run(self._session.close())
七、自动化秒拍模块
python
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
import threading
import queue
class AutoPurchaseBot:
"""
自动化秒拍机器人
基于Selenium模拟真实用户操作
"""
def __init__(self, cookie: str, headless: bool = False):
self.cookie = cookie
self.headless = headless
self.driver = None
self._task_queue = queue.Queue()
self._worker_thread = None
self._running = False
def _init_driver(self):
"""初始化Chrome驱动器"""
chrome_options = Options()
if self.headless:
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
chrome_options.add_argument("--user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36")
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
chrome_options.add_experimental_option('useAutomationExtension', False)
self.driver = webdriver.Chrome(options=chrome_options)
self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
# 注入Cookie
self.driver.get("https://h5.m.taobao.com/")
for cookie_item in self._parse_cookie(self.cookie):
self.driver.add_cookie(cookie_item)
def _parse_cookie(self, cookie_str: str) -> list:
"""解析Cookie字符串"""
cookies = []
for item in cookie_str.split(';'):
if '=' in item:
name, value = item.strip().split('=', 1)
cookies.append({"name": name, "value": value, "domain": ".taobao.com"})
return cookies
def _purchase_item(self, item_url: str, max_wait_seconds: int = 30) -> bool:
"""
执行购买操作
Args:
item_url: 商品详情页URL
max_wait_seconds: 最大等待时间
Returns:
是否成功进入下单页
"""
try:
self.driver.get(item_url)
# 等待"我想要"按钮出现
wait = WebDriverWait(self.driver, max_wait_seconds)
i_want_btn = wait.until(
EC.element_to_be_clickable((By.XPATH, "//button[contains(text(), '我想要')]"))
)
i_want_btn.click()
# 等待聊天/下单对话框
chat_box = wait.until(
EC.presence_of_element_located((By.CLASS_NAME, "chat-container"))
)
# 自动发送打招呼消息
msg_input = self.driver.find_element(By.XPATH, "//input[@placeholder='说点什么吧']")
msg_input.send_keys("你好,请问商品还在吗?")
send_btn = self.driver.find_element(By.XPATH, "//button[contains(text(), '发送')]")
send_btn.click()
print(f"成功进入聊天界面: {item_url}")
return True
except Exception as e:
print(f"购买失败: {str(e)}")
return False
def _worker(self):
"""后台工作线程"""
while self._running:
try:
task = self._task_queue.get(timeout=1)
item_url = task.get("url")
self._purchase_item(item_url)
except queue.Empty:
continue
except Exception as e:
print(f"工作线程异常: {e}")
def start(self):
"""启动机器人"""
self._init_driver()
self._running = True
self._worker_thread = threading.Thread(target=self._worker, daemon=True)
self._worker_thread.start()
print("秒拍机器人已启动")
def add_to_queue(self, item: Dict):
"""添加商品到购买队列"""
self._task_queue.put({
"url": item.get("detail_url"),
"title": item.get("title"),
"price": item.get("price")
})
print(f"已添加到购买队列: {item.get('title')}")
def stop(self):
"""停止机器人"""
self._running = False
if self._worker_thread:
self._worker_thread.join(timeout=5)
if self.driver:
self.driver.quit()
八、通知与日志模块
python
import logging
from datetime import datetime
import requests
from typing import Dict
class NotificationManager:
"""多渠道通知管理器"""
def __init__(self, serverchan_key: str = None, dingtalk_webhook: str = None):
self.serverchan_key = serverchan_key
self.dingtalk_webhook = dingtalk_webhook
def send_wechat(self, title: str, content: str):
"""Server酱微信通知"""
if not self.serverchan_key:
return
url = f"https://sctapi.ftqq.com/{self.serverchan_key}.send"
data = {"title": title, "desp": content}
try:
requests.post(url, data=data, timeout=5)
except Exception as e:
print(f"微信通知失败: {e}")
def send_dingtalk(self, title: str, content: str):
"""钉钉机器人通知"""
if not self.dingtalk_webhook:
return
data = {
"msgtype": "markdown",
"markdown": {
"title": title,
"text": f"### {title}\n\n{content}"
}
}
try:
requests.post(self.dingtalk_webhook, json=data, timeout=5)
except Exception as e:
print(f"钉钉通知失败: {e}")
def send_item_alert(self, item: Dict):
"""发送新商品提醒"""
title = f"🔥 捡漏提醒: {item.get('title', '')[:30]}"
content = f"""
**商品**: {item.get('title')}
**价格**: ¥{item.get('price')}
**地点**: {item.get('location', '未知')}
**链接**: {item.get('detail_url')}
**发现时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""
self.send_wechat(title, content)
self.send_dingtalk(title, content)
class Logger:
"""日志记录器"""
def __init__(self, name: str = "xianyu_monitor", log_file: str = "monitor.log"):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.DEBUG)
# 文件Handler
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setLevel(logging.DEBUG)
# 控制台Handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# 格式化器
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def info(self, msg):
self.logger.info(msg)
def warning(self, msg):
self.logger.warning(msg)
def error(self, msg):
self.logger.error(msg)
def debug(self, msg):
self.logger.debug(msg)
九、主程序入口
python
def main():
"""主程序入口"""
# 配置信息(从环境变量或配置文件读取)
CONFIG = {
"cookie": "_m_h5_tk=your_token_here; ...", # 替换为真实Cookie
"serverchan_key": "YOUR_SERVERCHAN_KEY",
"dingtalk_webhook": "https://oapi.dingtalk.com/robot/send?access_token=xxx",
}
# 初始化组件
logger = Logger()
notifier = NotificationManager(
serverchan_key=CONFIG["serverchan_key"],
dingtalk_webhook=CONFIG["dingtalk_webhook"]
)
# 创建秒拍机器人
bot = AutoPurchaseBot(cookie=CONFIG["cookie"], headless=False)
bot.start()
# 创建监控引擎
engine = AsyncMonitorEngine(cookie=CONFIG["cookie"])
# 注册新商品回调
def on_item_found(item: Dict):
logger.info(f"发现新商品: {item.get('title')} - ¥{item.get('price')}")
notifier.send_item_alert(item)
# 符合条件的商品自动加入秒拍队列
if item.get("price", 0) < 100: # 100元以下自动抢
bot.add_to_queue(item)
engine.register_callback(on_item_found)
# 添加监控任务
filter_switch = ItemFilter(
min_price=0,
max_price=300,
keywords_include=["switch", "任天堂"],
keywords_exclude=["坏", "维修", "尸体"]
)
task = MonitorTask(
task_id="switch_monitor",
keyword="switch 游戏机",
filter=filter_switch,
interval_seconds=10,
sort="new",
max_pages=2,
enabled=True
)
engine.add_task(task)
filter_iphone = ItemFilter(min_price=500, max_price=2000, keywords_exclude=["屏幕坏", "进水"])
task2 = MonitorTask(
task_id="iphone_monitor",
keyword="iPhone",
filter=filter_iphone,
interval_seconds=15,
sort="new",
max_pages=1
)
engine.add_task(task2)
# 启动监控(阻塞运行)
try:
logger.info("闲鱼监控秒拍系统启动...")
engine.start()
except KeyboardInterrupt:
logger.info("收到中断信号,正在关闭...")
engine.stop()
bot.stop()
logger.info("系统已关闭")
if name == "main":
main()
十、部署建议与注意事项
部署方式 适用场景 命令/说明
云服务器(推荐) 7×24h无人值守 nohup python main.py &
Docker容器 环境隔离与快速迁移 docker build -t xianyu-bot . && docker run -d xianyu-bot
GitHub Actions 定时任务(频率受限) 配置.github/workflows/monitor.yml
核心注意事项:
Cookie有效期:_m_h5_tk约2小时过期,需实现自动刷新机制
请求频率控制:建议监控间隔不低于10秒,高频请求易触发风控
签名时效性:时间戳必须基于UTC时间(减8小时),否则签名验证失败
合规声明:本脚本仅供学习研究,请遵守闲鱼平台用户协议
十一、总结
本文完整实现了闲鱼秒拍脚本的全链路技术方案,涵盖签名逆向、异步监控、智能筛选、自动化操作四大核心模块。代码总行数超过600行,可根据实际需求灵活扩展。
如需进一步优化,可考虑接入多模态AI模型对商品图片进行深度分析,或使用Redis替代SQLite提升高并发场景下的性能。
源码已同步至 GitHub,欢迎 Star 关注后续更新。