---从单机验证到集群化落地
为什么要做这个项目(背景与动机)
在公司做数据产品时,我们常常遇到三个痛点:脚本跑不稳、页面渲染抓不到数据、以及规模化后调度和重试逻辑变得难以维护。最开始通常是一两个 Python 脚本在开发机上跑通,但到了生产环境,问题就会接踵而来:单机负载、被目标站点限速、以及不同页面结构带来的脆弱性。
本项目的目标是把一个可验证的单机 Playwright 爬虫,逐步演进成能在多台机器上并行工作的集群化方案,并把关键实现、思路与示例代码记录下来,方便求职/展示或直接在企业内复用。
我们要拿到什么数据(数据目标)
本文以招聘网站为示例,目标字段很常见也很实用,便于后续做分析:
- 职位名称(title)
- 公司名(company)
- 城市 / 工作地点(city)
- 薪资(salary)
- 发布时间(publish_time)
这些字段既能做趋势分析,也能用于构建岗位搜索引擎或行业报告。
技术栈与选型理由(为什么选 Playwright)
- Playwright:相比传统的 requests,它能原生处理 JS 渲染、等待节点、拦截资源等,稳定性和可控性更好。
- 代理 IP(示例:亿牛云):高频访问需要代理来分摊流量并降低被限制的概率。
- Redis(任务队列) + 多进程/多机器:用于实现任务分发、去重与伸缩。
- MongoDB / Elasticsearch:分别用于原始数据落盘和后续的检索/分析/可视化。
这种组合能在保证可维护性的同时兼顾扩展与性能。
核心实现(从单机到分布式的演进)
1)单机版:先把逻辑跑通
这是最直接的版本,用来确认页面结构、JS 渲染及提取逻辑是否稳健。代码示例(异步 Playwright):
# single_playwright.py
import asyncio
from playwright.async_api import async_playwright
# ====== 代理配置(示例:亿牛云) ======
proxy_host = "proxy.16yun.cn"
proxy_port = "3100"
proxy_user = "16YUN"
proxy_pass = "16IP"
async def crawl_page(keyword: str, page_num: int = 1):
"""
单页抓取示例函数:访问并解析职位卡片,返回字典列表
"""
async with async_playwright() as pw:
# 启动浏览器(无头/有头可根据调试需要切换)
browser = await pw.chromium.launch(headless=True,
proxy={
"server": f"http://{proxy_host}:{proxy_port}",
"username": proxy_user,
"password": proxy_pass
})
page = await browser.new_page()
url = f"https://example-job-site.com/search?kw={keyword}&p={page_num}"
await page.goto(url, timeout=30000)
# 等待职位列表出现(根据实际页面调整选择器)
await page.wait_for_selector('.job-card', timeout=15000)
cards = await page.query_selector_all('.job-card')
results = []
for c in cards:
# 下面的选择器只是示例,实际按目标站点调整
title = await c.query_selector_eval('.job-title', 'el => el.textContent.trim()')
company = await c.query_selector_eval('.company', 'el => el.textContent.trim()')
salary = await c.query_selector_eval('.salary', 'el => el.textContent.trim()')
city = await c.query_selector_eval('.city', 'el => el.textContent.trim()')
results.append({
"title": title,
"company": company,
"salary": salary,
"city": city
})
await browser.close()
return results
if __name__ == "__main__":
rows = asyncio.run(crawl_page("Python 爬虫", page_num=1))
for r in rows:
print(r)
说明:先把单页、单关键词跑通,再处理翻页、异常与重试逻辑。单机阶段以稳定为先,别急着并发。
2)改造为可并发的单机版本(充分利用单机资源)
当单页逻辑稳定后,我们会把单页函数包装成任务池,利用 asyncio 并发多个标签页(或多个浏览器实例)来提高吞吐。示例要点:
- 限制并发并设置合理的超时
- 处理网络异常与元素不存在的情况(捕获并重试)
- 记录日志与关键指标(成功率、耗时)
(此处略去并发包装的完整代码,实际项目中可用 asyncio.Semaphore
控制并发数。)
3)走向分布式:Redis 作任务队列(生产者 / 消费者模型)
把任务放到 Redis 列表或流(stream)里,多台机器从队列里取任务并执行。示例消费端伪代码:
# worker.py (伪代码,需加异常处理与日志)
import asyncio
import aioredis
from single_playwright import crawl_page # 引入前面实现的单页函数
REDIS_URL = "redis://127.0.0.1:6379/0"
TASK_QUEUE = "job_tasks"
async def worker_loop():
redis = await aioredis.from_url(REDIS_URL)
while True:
task = await redis.lpop(TASK_QUEUE)
if not task:
await asyncio.sleep(1)
continue
# 假设任务是 JSON 字符串,包含 keyword 和 page_num
task = task.decode("utf-8")
# 这里简单演示,生产环境请解析 JSON 并做失败重试、幂等处理
keyword = task
print("取到任务:", keyword)
results = await crawl_page(keyword)
# 结果入库/发送到下游(例如 MongoDB/Elasticsearch)
# save_results(results)
关键点:
- 任务应包含去重标识(比如 URL 哈希或唯一 ID)以避免重复抓取
- 设计任务超时与重试策略(比如 N 次重试后写入失败队列由人工检查)
- 监控队列深度、消费速率以判断是否需要扩容
运营与工程化要点(实用经验)
- 代理策略:不要把所有流量都打到一个代理 IP,建议轮换或使用多个出口;对静态资源设置拦截,减少不必要带宽消耗。
- 资源拦截:Playwright 支持
route.abort()
拦截图片、视频、广告等资源,显著减轻网络压力。 - 健康检查:为每个爬虫进程加上心跳上报(Prometheus / Pushgateway)便于发现挂死或慢速任务。
- 错误收集:记录页面快照(在非敏感场景),异常堆栈与请求/响应,便于定位问题。
- 容器化:把爬虫打包为 Docker 镜像,用 Kubernetes 编排能显著简化部署与弹性伸缩。
- 数据质量控制:入库前做字段校验、去重与时间窗口过滤,避免脏数据污染分析维度。
数据落盘与可视化(展示成就)
落库示例(MongoDB 文档结构):
{
"title": "Python 爬虫工程师",
"company": "某某科技",
"salary": "20k-35k",
"city": "上海",
"publish_time": "2025-09-01",
"source": "example-job-site",
"crawl_at": "2025-09-23T10:00:00"
}
落库后可以做的展示(列举思路):
- 城市分布热力图(Kibana / Superset)
- 薪资区间柱状图(按岗位/城市分组)
- 岗位关键词云(用于挖掘热门技能)
- 时间序列分析(岗位量随时间变化)
这些可视化既能作为产品端仪表盘,也可以作为简历/作品集中突出的数据展示部分。