一、项目架构设计与技术选型
在开始编码前,我们需要规划完整的解决方案架构。一个健壮的定时爬虫系统应包含以下核心模块:
- 网页抓取模块:负责发送HTTP请求并获取页面内容
- 数据解析模块:从HTML中提取结构化短评数据
- 数据存储模块:将抓取结果持久化保存
- 定时任务模块:按照预定计划触发爬取任务
- 反爬应对模块:确保爬虫的稳定运行
技术栈选择:
● Requests:用于发送HTTP请求
● BeautifulSoup:用于HTML解析
● APScheduler:轻量级定时任务框架
● Pandas:数据处理与存储
● SQLite:轻量级数据库存储
二、核心代码实现 - 网页抓取与反爬策略
```import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
import random
import sqlite3
from datetime import datetime, timedelta
from apscheduler.schedulers.blocking import BlockingScheduler
import logging
class DoubanCommentCrawler:
def init(self):
self.session = requests.Session()
# 设置通用的请求头,模拟真实浏览器
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Connection': 'keep-alive',
}
self.session.headers.update(self.headers)
# 代理配置
self.proxyHost = "www.16yun.cn"
self.proxyPort = "5445"
self.proxyUser = "16QMSOML"
self.proxyPass = "280651"
def get_proxies(self):
"""
构建代理配置
"""
proxyMeta = f"http://{self.proxyUser}:{self.proxyPass}@{self.proxyHost}:{self.proxyPort}"
proxies = {
"http": proxyMeta,
"https": proxyMeta,
}
return proxies
def get_movie_comments(self, movie_id, max_pages=3, use_proxy=True):
"""
获取指定电影的短评
:param movie_id: 豆瓣电影ID
:param max_pages: 最大爬取页数
:param use_proxy: 是否使用代理
:return: 短评列表
"""
comments = []
for page in range(max_pages):
try:
# 构建URL,按时间排序获取最新评论
url = f'https://movie.douban.com/subject/{movie_id}/comments'
params = {
'start': page * 20,
'limit': 20,
'sort': 'time', # 按时间排序
'status': 'P'
}
# 随机延迟,避免请求过于频繁
time.sleep(random.uniform(1, 3))
# 根据参数决定是否使用代理
if use_proxy:
proxies = self.get_proxies()
response = self.session.get(url, params=params, timeout=10, proxies=proxies)
else:
response = self.session.get(url, params=params, timeout=10)
response.raise_for_status()
# 解析HTML内容
page_comments = self.parse_comments(response.text)
comments.extend(page_comments)
logging.info(f'成功抓取电影{movie_id}第{page + 1}页,获取{len(page_comments)}条短评')
# 如果当前页评论不足20条,说明已到最后一页
if len(page_comments) < 20:
break
except requests.RequestException as e:
logging.error(f'抓取第{page + 1}页时发生错误: {e}')
continue
return comments
def parse_comments(self, html_content):
"""
解析HTML,提取短评信息
"""
soup = BeautifulSoup(html_content, 'html.parser')
comments = []
# 查找所有短评条目
comment_items = soup.find_all('div', class_='comment-item')
for item in comment_items:
try:
comment = {}
# 提取用户信息
user_info = item.find('span', class_='comment-info')
if user_info:
comment['user'] = user_info.find('a').text.strip()
comment['rating'] = self.extract_rating(user_info)
# 提取评论内容和时间
comment_content = item.find('span', class_='short')
if comment_content:
comment['content'] = comment_content.text.strip()
comment_time = item.find('span', class_='comment-time')
if comment_time:
comment['time'] = comment_time.text.strip()
# 只收集当天的评论
if self.is_today_comment(comment['time']):
comment['crawl_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
comments.append(comment)
except Exception as e:
logging.warning(f'解析单条评论时发生错误: {e}')
continue
return comments
def extract_rating(self, user_info):
"""
提取用户评分
"""
rating_tag = user_info.find('span', class_=lambda x: x and x.startswith('allstar'))
if rating_tag:
rating_class = rating_tag.get('class', [])
for cls in rating_class:
if cls.startswith('allstar'):
return cls.replace('allstar', '').replace('0', '')
return '暂无评分'
def is_today_comment(self, comment_time):
"""
判断评论是否为今天发布
"""
today = datetime.now().date()
# 处理豆瓣时间格式
if '今天' in comment_time:
return True
# 可以添加更多时间格式判断
return True # 暂时返回True,实际使用时需要完善
使用示例
def test_with_proxy():
"""测试带代理的爬虫"""
crawler = DoubanCommentCrawler()
# 测试电影ID:阿甘正传
movie_id = '1292722'
print("开始使用代理爬取豆瓣短评...")
try:
comments = crawler.get_movie_comments(movie_id, max_pages=2, use_proxy=True)
print(f"成功爬取 {len(comments)} 条评论")
# 显示前几条评论
for i, comment in enumerate(comments[:3]):
print(f"评论 {i+1}: {comment['user']} - {comment['content'][:50]}...")
except Exception as e:
print(f"爬取过程中发生错误: {e}")
# 可以尝试不使用代理
print("尝试不使用代理...")
try:
comments = crawler.get_movie_comments(movie_id, max_pages=1, use_proxy=False)
print(f"不使用代理成功爬取 {len(comments)} 条评论")
except Exception as e2:
print(f"不使用代理也失败了: {e2}")
if name == "main":
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
test_with_proxy()
2. 数据存储模块
```class DataStorage:
def __init__(self, db_path='douban_comments.db'):
self.db_path = db_path
self.init_database()
def init_database(self):
"""初始化数据库表结构"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
create_table_sql = '''
CREATE TABLE IF NOT EXISTS movie_comments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
movie_id TEXT NOT NULL,
user_name TEXT NOT NULL,
rating TEXT,
content TEXT NOT NULL,
comment_time TEXT,
crawl_time TEXT NOT NULL,
created_date TEXT NOT NULL
)
'''
cursor.execute(create_table_sql)
conn.commit()
conn.close()
def save_comments(self, movie_id, comments):
"""保存评论到数据库"""
if not comments:
return
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
insert_sql = '''
INSERT INTO movie_comments
(movie_id, user_name, rating, content, comment_time, crawl_time, created_date)
VALUES (?, ?, ?, ?, ?, ?, ?)
'''
today = datetime.now().strftime('%Y-%m-%d')
data_to_insert = []
for comment in comments:
data_to_insert.append((
movie_id,
comment.get('user', ''),
comment.get('rating', ''),
comment.get('content', ''),
comment.get('time', ''),
comment.get('crawl_time', ''),
today
))
cursor.executemany(insert_sql, data_to_insert)
conn.commit()
conn.close()
logging.info(f'成功保存{len(comments)}条评论到数据库')
def export_to_excel(self, date=None):
"""导出指定日期的数据到Excel"""
if date is None:
date = datetime.now().strftime('%Y-%m-%d')
conn = sqlite3.connect(self.db_path)
query = "SELECT * FROM movie_comments WHERE created_date = ?"
df = pd.read_sql_query(query, conn, params=[date])
conn.close()
if not df.empty:
filename = f'douban_comments_{date}.xlsx'
df.to_excel(filename, index=False)
logging.info(f'数据已导出到 {filename}')
return filename
else:
logging.warning(f'没有找到{date}的数据')
return None
定时任务调度器
```class CommentScheduler:
def init(self):self.crawler = DoubanCommentCrawler() self.storage = DataStorage() self.scheduler = BlockingScheduler() # 配置监控的电影ID列表 self.movie_ids = [ '1292722', # 阿甘正传 '1291546', # 霸王别姬 '1292720', # 泰坦尼克号 '3541415', # 盗梦空间 ]def daily_crawl_job(self):
"""每日定时爬取任务""" logging.info(f'开始执行每日爬取任务,时间: {datetime.now()}') all_comments = [] for movie_id in self.movie_ids: try: logging.info(f'开始爬取电影 {movie_id} 的短评') comments = self.crawler.get_movie_comments(movie_id, max_pages=2) self.storage.save_comments(movie_id, comments) all_comments.extend(comments) # 请求间隔,避免被封IP time.sleep(random.uniform(2, 5)) except Exception as e: logging.error(f'爬取电影 {movie_id} 时发生错误: {e}') continue # 导出当日数据 self.storage.export_to_excel() logging.info(f'每日爬取任务完成,共获取{len(all_comments)}条短评')def start_scheduler(self):
"""启动定时调度器""" # 每天上午10点执行爬取任务 self.scheduler.add_job( self.daily_crawl_job, 'cron', hour=10, minute=0, id='daily_douban_crawl' ) # 添加测试任务(每分钟执行一次,用于测试) self.scheduler.add_job( self.daily_crawl_job, 'interval', minutes=1, id='test_crawl' ) logging.info('定时爬虫调度器已启动...') try: self.scheduler.start() except KeyboardInterrupt: logging.info('程序被用户中断') except Exception as e: logging.error(f'调度器发生错误: {e}')
配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('douban_crawler.log', encoding='utf-8'),
logging.StreamHandler()
]
)
三、系统部署与运行
1. 环境配置与依赖安装
首先创建requirements.txt文件:
```requests==2.31.0
beautifulsoup4==4.12.2
pandas==2.0.3
apscheduler==3.10.1
openpyxl==3.1.2
启动系统
创建主程序文件main.py:
```if name == "main":测试单次爬取
def test_single_crawl():
crawler = DoubanCommentCrawler() storage = DataStorage() test_movie_id = '1292722' # 阿甘正传 comments = crawler.get_movie_comments(test_movie_id, max_pages=1) storage.save_comments(test_movie_id, comments) print(f"测试爬取完成,获取{len(comments)}条评论")选择运行模式
print("请选择运行模式:")
print("1. 测试单次爬取")
print("2. 启动定时任务")choice = input("请输入选择 (1 或 2): ")
if choice == '1':
test_single_crawl()elif choice == '2':
scheduler = CommentScheduler() scheduler.start_scheduler()else:
print("无效选择")```
四、技术要点与最佳实践- 反爬虫策略应对
● User-Agent轮换:模拟真实浏览器访问
● 请求频率控制:随机延迟避免规律请求
● IP代理池:应对IP封禁(本文未实现,生产环境建议添加)
● 会话保持:使用Session维持cookie - 错误处理与日志记录
● 完善的异常捕获机制
● 详细的日志记录,便于问题排查
● graceful degradation(优雅降级) - 数据质量保障
● 数据去重机制
● 数据完整性校验
● 定时清理历史数据 - 可扩展性设计
● 模块化设计,便于功能扩展
● 支持多电影同时监控
● 灵活的存储后端支持
五、总结
本文详细介绍了构建一个完整的豆瓣短评定时爬虫系统的全过程。通过合理的架构设计和代码实现,我们创建了一个稳定、可扩展的自动化数据采集系统。这个系统不仅能够定时抓取最新的短评数据,还具备了完善的反爬应对机制和数据管理功能。
在实际应用中,建议进一步优化以下方面: - 增加IP代理支持:应对更严格的反爬措施
- 添加验证码识别:处理可能的验证码挑战
- 实现分布式架构:提升爬取效率
- 添加数据分析和可视化:充分发挥数据价值