Python爬虫定时任务:自动化抓取豆瓣每日最新短评

简介: Python爬虫定时任务:自动化抓取豆瓣每日最新短评

一、项目架构设计与技术选型
在开始编码前,我们需要规划完整的解决方案架构。一个健壮的定时爬虫系统应包含以下核心模块:

  1. 网页抓取模块:负责发送HTTP请求并获取页面内容
  2. 数据解析模块:从HTML中提取结构化短评数据
  3. 数据存储模块:将抓取结果持久化保存
  4. 定时任务模块:按照预定计划触发爬取任务
  5. 反爬应对模块:确保爬虫的稳定运行
    技术栈选择:
    ● Requests:用于发送HTTP请求
    ● BeautifulSoup:用于HTML解析
    ● APScheduler:轻量级定时任务框架
    ● Pandas:数据处理与存储
    ● SQLite:轻量级数据库存储
    二、核心代码实现
  6. 网页抓取与反爬策略
    ```import requests
    from bs4 import BeautifulSoup
    import pandas as pd
    import time
    import random
    import sqlite3
    from datetime import datetime, timedelta
    from apscheduler.schedulers.blocking import BlockingScheduler
    import logging

class DoubanCommentCrawler:
def init(self):
self.session = requests.Session()

    # 设置通用的请求头,模拟真实浏览器
    self.headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
        'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
        'Connection': 'keep-alive',
    }
    self.session.headers.update(self.headers)

    # 代理配置
    self.proxyHost = "www.16yun.cn"
    self.proxyPort = "5445"
    self.proxyUser = "16QMSOML"
    self.proxyPass = "280651"

def get_proxies(self):
    """
    构建代理配置
    """
    proxyMeta = f"http://{self.proxyUser}:{self.proxyPass}@{self.proxyHost}:{self.proxyPort}"
    proxies = {
        "http": proxyMeta,
        "https": proxyMeta,
    }
    return proxies

def get_movie_comments(self, movie_id, max_pages=3, use_proxy=True):
    """
    获取指定电影的短评
    :param movie_id: 豆瓣电影ID
    :param max_pages: 最大爬取页数
    :param use_proxy: 是否使用代理
    :return: 短评列表
    """
    comments = []

    for page in range(max_pages):
        try:
            # 构建URL,按时间排序获取最新评论
            url = f'https://movie.douban.com/subject/{movie_id}/comments'
            params = {
                'start': page * 20,
                'limit': 20,
                'sort': 'time',  # 按时间排序
                'status': 'P'
            }

            # 随机延迟,避免请求过于频繁
            time.sleep(random.uniform(1, 3))

            # 根据参数决定是否使用代理
            if use_proxy:
                proxies = self.get_proxies()
                response = self.session.get(url, params=params, timeout=10, proxies=proxies)
            else:
                response = self.session.get(url, params=params, timeout=10)

            response.raise_for_status()

            # 解析HTML内容
            page_comments = self.parse_comments(response.text)
            comments.extend(page_comments)

            logging.info(f'成功抓取电影{movie_id}第{page + 1}页,获取{len(page_comments)}条短评')

            # 如果当前页评论不足20条,说明已到最后一页
            if len(page_comments) < 20:
                break

        except requests.RequestException as e:
            logging.error(f'抓取第{page + 1}页时发生错误: {e}')
            continue

    return comments

def parse_comments(self, html_content):
    """
    解析HTML,提取短评信息
    """
    soup = BeautifulSoup(html_content, 'html.parser')
    comments = []

    # 查找所有短评条目
    comment_items = soup.find_all('div', class_='comment-item')

    for item in comment_items:
        try:
            comment = {}

            # 提取用户信息
            user_info = item.find('span', class_='comment-info')
            if user_info:
                comment['user'] = user_info.find('a').text.strip()
                comment['rating'] = self.extract_rating(user_info)

            # 提取评论内容和时间
            comment_content = item.find('span', class_='short')
            if comment_content:
                comment['content'] = comment_content.text.strip()

            comment_time = item.find('span', class_='comment-time')
            if comment_time:
                comment['time'] = comment_time.text.strip()

            # 只收集当天的评论
            if self.is_today_comment(comment['time']):
                comment['crawl_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                comments.append(comment)

        except Exception as e:
            logging.warning(f'解析单条评论时发生错误: {e}')
            continue

    return comments

def extract_rating(self, user_info):
    """
    提取用户评分
    """
    rating_tag = user_info.find('span', class_=lambda x: x and x.startswith('allstar'))
    if rating_tag:
        rating_class = rating_tag.get('class', [])
        for cls in rating_class:
            if cls.startswith('allstar'):
                return cls.replace('allstar', '').replace('0', '')
    return '暂无评分'

def is_today_comment(self, comment_time):
    """
    判断评论是否为今天发布
    """
    today = datetime.now().date()
    # 处理豆瓣时间格式
    if '今天' in comment_time:
        return True
    # 可以添加更多时间格式判断
    return True  # 暂时返回True,实际使用时需要完善

使用示例

def test_with_proxy():
"""测试带代理的爬虫"""
crawler = DoubanCommentCrawler()

# 测试电影ID:阿甘正传
movie_id = '1292722'

print("开始使用代理爬取豆瓣短评...")
try:
    comments = crawler.get_movie_comments(movie_id, max_pages=2, use_proxy=True)
    print(f"成功爬取 {len(comments)} 条评论")

    # 显示前几条评论
    for i, comment in enumerate(comments[:3]):
        print(f"评论 {i+1}: {comment['user']} - {comment['content'][:50]}...")

except Exception as e:
    print(f"爬取过程中发生错误: {e}")
    # 可以尝试不使用代理
    print("尝试不使用代理...")
    try:
        comments = crawler.get_movie_comments(movie_id, max_pages=1, use_proxy=False)
        print(f"不使用代理成功爬取 {len(comments)} 条评论")
    except Exception as e2:
        print(f"不使用代理也失败了: {e2}")

if name == "main":

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

test_with_proxy()
2. 数据存储模块
```class DataStorage:
    def __init__(self, db_path='douban_comments.db'):
        self.db_path = db_path
        self.init_database()

    def init_database(self):
        """初始化数据库表结构"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        create_table_sql = '''
        CREATE TABLE IF NOT EXISTS movie_comments (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            movie_id TEXT NOT NULL,
            user_name TEXT NOT NULL,
            rating TEXT,
            content TEXT NOT NULL,
            comment_time TEXT,
            crawl_time TEXT NOT NULL,
            created_date TEXT NOT NULL
        )
        '''
        cursor.execute(create_table_sql)
        conn.commit()
        conn.close()

    def save_comments(self, movie_id, comments):
        """保存评论到数据库"""
        if not comments:
            return

        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        insert_sql = '''
        INSERT INTO movie_comments 
        (movie_id, user_name, rating, content, comment_time, crawl_time, created_date)
        VALUES (?, ?, ?, ?, ?, ?, ?)
        '''

        today = datetime.now().strftime('%Y-%m-%d')
        data_to_insert = []

        for comment in comments:
            data_to_insert.append((
                movie_id,
                comment.get('user', ''),
                comment.get('rating', ''),
                comment.get('content', ''),
                comment.get('time', ''),
                comment.get('crawl_time', ''),
                today
            ))

        cursor.executemany(insert_sql, data_to_insert)
        conn.commit()
        conn.close()

        logging.info(f'成功保存{len(comments)}条评论到数据库')

    def export_to_excel(self, date=None):
        """导出指定日期的数据到Excel"""
        if date is None:
            date = datetime.now().strftime('%Y-%m-%d')

        conn = sqlite3.connect(self.db_path)
        query = "SELECT * FROM movie_comments WHERE created_date = ?"
        df = pd.read_sql_query(query, conn, params=[date])
        conn.close()

        if not df.empty:
            filename = f'douban_comments_{date}.xlsx'
            df.to_excel(filename, index=False)
            logging.info(f'数据已导出到 {filename}')
            return filename
        else:
            logging.warning(f'没有找到{date}的数据')
            return None
  1. 定时任务调度器
    ```class CommentScheduler:
    def init(self):

     self.crawler = DoubanCommentCrawler()
     self.storage = DataStorage()
     self.scheduler = BlockingScheduler()
    
     # 配置监控的电影ID列表
     self.movie_ids = [
         '1292722',  # 阿甘正传
         '1291546',  # 霸王别姬
         '1292720',  # 泰坦尼克号
         '3541415',  # 盗梦空间
     ]
    

    def daily_crawl_job(self):

     """每日定时爬取任务"""
     logging.info(f'开始执行每日爬取任务,时间: {datetime.now()}')
    
     all_comments = []
     for movie_id in self.movie_ids:
         try:
             logging.info(f'开始爬取电影 {movie_id} 的短评')
             comments = self.crawler.get_movie_comments(movie_id, max_pages=2)
             self.storage.save_comments(movie_id, comments)
             all_comments.extend(comments)
    
             # 请求间隔,避免被封IP
             time.sleep(random.uniform(2, 5))
    
         except Exception as e:
             logging.error(f'爬取电影 {movie_id} 时发生错误: {e}')
             continue
    
     # 导出当日数据
     self.storage.export_to_excel()
    
     logging.info(f'每日爬取任务完成,共获取{len(all_comments)}条短评')
    

    def start_scheduler(self):

     """启动定时调度器"""
     # 每天上午10点执行爬取任务
     self.scheduler.add_job(
         self.daily_crawl_job,
         'cron',
         hour=10,
         minute=0,
         id='daily_douban_crawl'
     )
    
     # 添加测试任务(每分钟执行一次,用于测试)
     self.scheduler.add_job(
         self.daily_crawl_job,
         'interval',
         minutes=1,
         id='test_crawl'
     )
    
     logging.info('定时爬虫调度器已启动...')
    
     try:
         self.scheduler.start()
     except KeyboardInterrupt:
         logging.info('程序被用户中断')
     except Exception as e:
         logging.error(f'调度器发生错误: {e}')
    

配置日志

logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('douban_crawler.log', encoding='utf-8'),
logging.StreamHandler()
]
)

三、系统部署与运行
1. 环境配置与依赖安装
首先创建requirements.txt文件:
```requests==2.31.0
beautifulsoup4==4.12.2
pandas==2.0.3
apscheduler==3.10.1
openpyxl==3.1.2
  1. 启动系统
    创建主程序文件main.py:
    ```if name == "main":

    测试单次爬取

    def test_single_crawl():

     crawler = DoubanCommentCrawler()
     storage = DataStorage()
    
     test_movie_id = '1292722'  # 阿甘正传
     comments = crawler.get_movie_comments(test_movie_id, max_pages=1)
     storage.save_comments(test_movie_id, comments)
     print(f"测试爬取完成,获取{len(comments)}条评论")
    

    选择运行模式

    print("请选择运行模式:")
    print("1. 测试单次爬取")
    print("2. 启动定时任务")

    choice = input("请输入选择 (1 或 2): ")

    if choice == '1':

     test_single_crawl()
    

    elif choice == '2':

     scheduler = CommentScheduler()
     scheduler.start_scheduler()
    

    else:

     print("无效选择")
    

    ```
    四、技术要点与最佳实践

  2. 反爬虫策略应对
    ● User-Agent轮换:模拟真实浏览器访问
    ● 请求频率控制:随机延迟避免规律请求
    ● IP代理池:应对IP封禁(本文未实现,生产环境建议添加)
    ● 会话保持:使用Session维持cookie
  3. 错误处理与日志记录
    ● 完善的异常捕获机制
    ● 详细的日志记录,便于问题排查
    ● graceful degradation(优雅降级)
  4. 数据质量保障
    ● 数据去重机制
    ● 数据完整性校验
    ● 定时清理历史数据
  5. 可扩展性设计
    ● 模块化设计,便于功能扩展
    ● 支持多电影同时监控
    ● 灵活的存储后端支持
    五、总结
    本文详细介绍了构建一个完整的豆瓣短评定时爬虫系统的全过程。通过合理的架构设计和代码实现,我们创建了一个稳定、可扩展的自动化数据采集系统。这个系统不仅能够定时抓取最新的短评数据,还具备了完善的反爬应对机制和数据管理功能。
    在实际应用中,建议进一步优化以下方面:
  6. 增加IP代理支持:应对更严格的反爬措施
  7. 添加验证码识别:处理可能的验证码挑战
  8. 实现分布式架构:提升爬取效率
  9. 添加数据分析和可视化:充分发挥数据价值
相关文章
|
6月前
|
数据采集 Web App开发 自然语言处理
新闻热点一目了然:Python爬虫数据可视化
新闻热点一目了然:Python爬虫数据可视化
|
5月前
|
机器学习/深度学习 人工智能 API
破译AI“指纹”:如何识别机器生成内容?
破译AI“指纹”:如何识别机器生成内容?
758 117
|
3月前
|
数据采集 文字识别 JavaScript
基于文本检测的 Python 爬虫弹窗图片定位与拖动实现
基于文本检测的 Python 爬虫弹窗图片定位与拖动实现
|
5月前
|
数据采集 Web App开发 数据安全/隐私保护
实战:Python爬虫如何模拟登录与维持会话状态
实战:Python爬虫如何模拟登录与维持会话状态
|
6月前
|
数据采集 JavaScript 前端开发
“所见即所爬”:使用Pyppeteer无头浏览器抓取动态壁纸
“所见即所爬”:使用Pyppeteer无头浏览器抓取动态壁纸
|
4月前
|
JavaScript 数据挖掘 大数据
基于python大数据的房价数据分析系统
本研究基于Python构建大数据房价分析系统,整合多源数据,运用Django、Vue.js与MySQL技术,实现数据采集、处理、建模与可视化,助力政府调控、企业决策与购房者选择,提升房地产信息化水平。
|
3月前
|
数据采集 JavaScript 前端开发
拼多多数据抓取:Python 爬虫中的 JS 逆向基础案例分析
拼多多数据抓取:Python 爬虫中的 JS 逆向基础案例分析
|
5月前
|
数据采集 监控 数据库
Python异步编程实战:爬虫案例
🌟 蒋星熠Jaxonic,代码为舟的星际旅人。从回调地狱到async/await协程天堂,亲历Python异步编程演进。分享高性能爬虫、数据库异步操作、限流监控等实战经验,助你驾驭并发,在二进制星河中谱写极客诗篇。
Python异步编程实战:爬虫案例
|
8月前
|
数据采集 Web App开发 数据可视化
Python爬取闲鱼价格趋势并可视化分析
Python爬取闲鱼价格趋势并可视化分析
|
8月前
|
数据采集 存储 C++
Python异步爬虫(aiohttp)加速微信公众号图片下载
Python异步爬虫(aiohttp)加速微信公众号图片下载

热门文章

最新文章