抓取任务队列精简化:延迟队列、优先级队列与回退策略设计

本文涉及的产品
实时数仓Hologres,5000CU*H 100GB 3个月
智能开放搜索 OpenSearch行业算法版,1GB 20LCU 1个月
实时计算 Flink 版,1000CU*H 3个月
简介: 描述了作者在处理抓取任务队列时遇到的挑战,包括任务堆积、线程阻塞和超时重试问题。通过引入延迟队列、优先级队列和回退策略,作者成功优化了任务调度策略,提高了系统的稳定性和资源利用率。核心代码示例展示了如何使用Redis实现延迟和优先级队列,以及如何执行任务和处理失败重试。最终,系统变得更加智能和高效,实现了更好的调度和资源管理。

一、时间线:从“任务卡死”到“队列瘦身”

那天凌晨三点,我盯着终端上那行熟悉的报错信息,整个人是懵的。
任务堆积、线程阻塞、超时重试——看起来像是平常的小毛病,但这次不一样,整个抓取进程几乎陷入瘫痪。

我用的是一个典型的任务分发架构:

  • 主任务列表里存放要爬取的URL;
  • 多线程消费者从队列中拉取任务执行;
  • 遇到失败的任务会自动重试。

听起来很正常对吧?但随着关键词搜索的热点量级飙升,比如“中美关系”“AI新政策”“新能源车”,队列瞬间爆炸了。重试任务在队列里越堆越多,优先级机制形同虚设。

我意识到问题的关键不是“任务多”,而是“调度策略太蠢”。于是,开始了一次“抓取任务队列瘦身”的改造行动。

二、方案分析:三种策略的较量

1. 延迟队列:给失败任务“冷静期”

以前的逻辑是,任务失败就立刻重试。结果导致网络抖动时同一个URL被疯狂重试十几次。
改成延迟队列后,我们给每个失败任务一个“冷静时间”,比如第一次失败延迟30秒、第二次失败延迟2分钟,以此类推。

这让系统变得温柔多了。抓取不再像无头苍蝇,而是更像个“耐心的猎人”。

2. 优先级队列:让热点任务排在前面

关键词搜索任务其实并不一样,“AI政策”类的实时性要求更高,而“电影解说”这种可以晚点再爬。
所以我们加了一个优先级字段,根据关键词热度动态调整抓取顺序。

这样一来,高优先级任务先执行,低优先级的在资源紧张时自动让路。

3. 回退策略:从“重试”到“放弃”

有些URL在多次尝试后仍然返回404,再爬也没意义。我们为每个任务设置了最大重试次数和退避算法(Exponential Backoff),超过阈值就直接丢弃。

这一步的收益意外地大——CPU占用下降了20%,内存占用下降了40%。

三、架构改进方案:让队列更聪明

最终的方案,是这三种机制的融合体。
我们引入一个智能任务调度器,用Redis实现延迟与优先级队列,同时在逻辑层面加入回退策略:

  • 任务生产者:根据关键词生成搜索任务。
  • 延迟队列(Redis Sorted Set):对失败任务按延迟时间排序。
  • 优先级队列(Redis Priority Queue):对任务按权重进行优先调度。
  • 任务消费者:定期扫描两类队列,从高优先级、到过期延迟任务依次取出执行。
  • 失败回退模块:记录失败次数、执行退避、必要时丢弃。

四、核心代码示例

下面是一段精简版实现,使用了 requests + Redis + 爬虫代理,以今日头条为目标,抓取关键词“人工智能”的热点新闻。

import requests
import redis
import json
import time
from datetime import datetime
from random import randint

# Redis连接
r = redis.StrictRedis(host='localhost', port=6379, db=0)

# 代理配置 参考亿牛云爬虫加强版
proxy_host = "proxy.16yun.cn"
proxy_port = "3100"
proxy_user = "16YUN"
proxy_pass = "16IP"

proxies = {
   
    "http": f"http://{proxy_user}:{proxy_pass}@{proxy_host}:{proxy_port}",
    "https": f"http://{proxy_user}:{proxy_pass}@{proxy_host}:{proxy_port}",
}

# 添加任务到优先级队列
def add_task(keyword, priority=1):
    task = json.dumps({
   "keyword": keyword, "retries": 0})
    r.zadd("priority_queue", {
   task: priority})

# 模拟延迟队列插入
def add_delay_task(task_data, delay_seconds):
    timestamp = time.time() + delay_seconds
    r.zadd("delay_queue", {
   task_data: timestamp})

# 从队列中获取任务
def fetch_task():
    # 优先取优先级队列
    task = r.zrevrange("priority_queue", 0, 0)
    if task:
        r.zrem("priority_queue", task[0])
        return json.loads(task[0])
    # 再取延迟队列中已到期的任务
    now = time.time()
    delay_task = r.zrangebyscore("delay_queue", 0, now, start=0, num=1)
    if delay_task:
        r.zrem("delay_queue", delay_task[0])
        return json.loads(delay_task[0])
    return None

# 执行任务
def run_task(task):
    keyword = task["keyword"]
    retries = task["retries"]
    url = f"https://www.toutiao.com/search/?keyword={keyword}"

    try:
        resp = requests.get(url, proxies=proxies, timeout=5)
        if resp.status_code == 200:
            print(f"[{datetime.now()}] 抓取成功:{keyword}")
        else:
            raise Exception("非200响应")
    except Exception as e:
        retries += 1
        print(f"[{datetime.now()}] 抓取失败({retries}):{keyword},错误:{e}")
        if retries < 3:
            delay = retries * 30 + randint(5, 10)
            add_delay_task(json.dumps({
   "keyword": keyword, "retries": retries}), delay)
        else:
            print(f"[{datetime.now()}] 放弃任务:{keyword}")

# 示例运行
if __name__ == "__main__":
    add_task("人工智能", priority=10)
    add_task("新能源车", priority=8)
    add_task("影视娱乐", priority=3)

    while True:
        task = fetch_task()
        if not task:
            print("当前无可执行任务,休息10秒")
            time.sleep(10)
            continue
        run_task(task)

这段代码在实践中已经帮我避免了“雪崩式重试”的坑。延迟机制让系统喘口气,优先级机制让资源更聚焦,回退策略防止浪费。

五、总结

如果说之前的抓取系统像一群抢活干的工人,现在的版本更像一个有节奏的流水线:谁急谁先上,谁失败谁先冷静。

延迟队列让系统“稳”,优先级队列让调度“聪明”,回退策略让失败“有底线”。
抓取的稳定性、资源利用率、响应速度都上了一个台阶。

——有时候,优化不是让它“更快”,而是让它“更会等”。

相关文章
|
3月前
|
数据采集 存储 缓存
LLM + 抓取:让学术文献检索更聪明
结合爬虫与大模型,打造懂语义的学术检索助手:自动抓取最新NLP+爬虫论文,经清洗、向量化与RAG增强,由LLM提炼贡献,告别关键词匹配,实现精准智能问答。
298 0
LLM + 抓取:让学术文献检索更聪明
|
1月前
|
人工智能 自然语言处理 物联网
从“通用AI”到“懂我AI”:企业微调专属智能助手实战指南
从“通用AI”到“懂我AI”:企业微调专属智能助手实战指南
168 9
|
1月前
|
数据采集 NoSQL Redis
百万级并发下的去重挑战:Bloom Filter 与 Redis 的组合方案
本文探讨了高并发数据采集中避免重复URL抓取的问题,提出了结合Bloom Filter、Redis HyperLogLog和持久化备份的解决方案,实现了快速查重、准确统计和数据恢复。
123 2
|
1月前
|
数据采集 消息中间件 机器学习/深度学习
静态规则解析与动态行为分析结合的混合抽取框架
本文深入探讨现代网页数据抓取的挑战与突破,揭示网页“行为语言”的三大隐藏层。通过结合静态解析与动态模拟的混合抽取框架,实现对复杂网页的精准抓取,展现从规则驱动到行为理解的技术演进,倡导以共生思维重构数据采集的本质。
125 1
|
1月前
|
人工智能 自然语言处理 架构师
跳槽加分项:掌握Dify工作流,我薪资涨了40%
一年前我还是月薪25K的全栈工程师,如今凭借掌握Dify工作流,成功转型为AI应用架构师,拿下35K offer,薪资涨幅40%。通过实战项目积累、简历优化与面试话术升级,我将Dify技能转化为职场竞争力,实现职业跃迁。Dify不仅降低了AI开发门槛,更成为我涨薪的“密码”。你也可以!
|
2月前
|
存储 API 数据库
按图搜索1688商品的API接口
本文介绍如何利用阿里云ImageSearch服务实现1688商品的按图搜索功能。通过提取图像特征向量并计算相似度,结合Flask搭建API接口,可快速构建基于图片的商品检索系统,提升电商用户体验。
288 0
|
4月前
|
数据采集 NoSQL 数据挖掘
简单URL队列与复杂任务流转的边界实践 —— 速查小抄
本文对比了爬虫项目中“招聘市场监测”与“金融数据采集”两类场景下的任务调度策略,介绍了何时使用简单队列、何时采用复杂流转,并提供 Python 示例代码及代理配置建议,助你高效构建爬虫系统。
149 1
简单URL队列与复杂任务流转的边界实践 —— 速查小抄
|
5月前
|
SQL 关系型数据库 MySQL
MySQL表设计经验
本文介绍了数据库表设计的15个实用技巧,涵盖命名规范、字段类型选择、主键设计、索引优化等方面,帮助后端程序员提升数据库设计能力,避免常见错误,提高系统性能与可维护性。
235 0
|
5月前
|
人工智能 安全 算法
HTTPS 的「秘钥交换 + 证书校验」全流程
HTTPS 通过“证书如身份证、密钥交换如临时暗号”的握手流程,实现身份认证与数据加密双重保障,确保通信安全可靠。
535 0
|
7月前
|
人工智能 自然语言处理 应用服务中间件
Bolt.diy 创意建站方案测评 | 不懂代码,你也可以快速建站
本文详细介绍了一款名为Bolt.diy的创意建站工具的使用流程与功能体验。Bolt.diy是阿里云推出的一款基于自然语言交互的Web开发工具,用户可通过简单描述需求快速生成个性化网站。文章从开通服务、配置API-Key到实际创建网站进行了详细步骤解析,并展示了如何通过本地nginx部署生成的代码。此外,还尝试了优化初级会计考试招生宣传页面的过程,发现目前工具在图片资源处理和一键发布功能上存在局限性。整体来看,Bolt.diy操作便捷、成本可控,适合个人及企业低成本验证创意需求。