小说爬虫-03 爬取章节的详细内容并保存 将章节URL推送至RabbitMQ Scrapy消费MQ 对数据进行爬取后写入SQLite

简介: 小说爬虫-03 爬取章节的详细内容并保存 将章节URL推送至RabbitMQ Scrapy消费MQ 对数据进行爬取后写入SQLite

代码仓库

代码我已经上传到 Github,大家需要的可以顺手点个 Star!

https://github.com/turbo-duck/biquge_fiction_spider

背景介绍上一节已经拿到了 小说的详细内容 和 章节的列表

接下来,将章节的列表使用脚本从SQLite数据库中取出,使用脚本把数据推送至 RabbitMQ 中。

最后,Scrapy消费MQ,手动ACK确认,将数据写入到SQLite。

使用技术

  • RabbitMQ
  • Scrapy
  • SQLite

生产者代码

连接 SQLite + RabbitMQ,构造 URL 推送!

import pika
import json
import sqlite3
import os
from dotenv import load_dotenv
load_dotenv()


sql_connection = sqlite3.connect('../db/biquge.db')
cursor = sql_connection.cursor()

rabbitmq_queue = os.getenv('RABBITMQ_QUEUE', 'default_queue')
rabbitmq_host = os.getenv('RABBITMQ_HOST', 'localhost')
rabbitmq_port = os.getenv('RABBITMQ_PORT', '5672')
virtual_host = os.getenv('RABBITMQ_VHOST', '/')
username = os.getenv('RABBITMQ_USERNAME', 'guest')
password = os.getenv('RABBITMQ_PASSWORD', 'guest')

credentials = pika.PlainCredentials(
    username,
    password
)

connection_params_result = {
    'host': rabbitmq_host,
    'port': rabbitmq_port,
    'virtual_host': '/',
    'credentials': credentials,
}
mq_connection = pika.BlockingConnection(pika.ConnectionParameters(**connection_params_result))
channel = mq_connection.channel()
channel.queue_declare(queue=rabbitmq_queue, durable=True)

# 按照页查询小说 (不然一次性太多了)
page_info = '1/1391'
sql = """
SELECT each_code FROM biquge_list WHERE page_info = ?
"""
cursor.execute(sql, (page_info, ))
results = cursor.fetchall()
for each_fiction in results:
    fiction_code = each_fiction[0]
    print(f"fiction code: {fiction_code}")
    # 根据 小说编码 查询 小说章节编码
    sql = """
    SELECT chapter_code FROM chapter_list WHERE fiction_code = ?
    """
    cursor.execute(sql, (fiction_code,))
    chapter_results = cursor.fetchall()
    for each_chapter in chapter_results:
        chapter_code = each_chapter[0]
        chapter_url = f"https://www.xbiqugew.com/book/{fiction_code}/{chapter_code}.html"
        message = json.dumps({
            'url': chapter_url,
        })
        channel.basic_publish(
            exchange='',
            routing_key=rabbitmq_queue,
            body=message.encode('utf-8'),
            properties=pika.BasicProperties(delivery_mode=2)
        )
        print(f"Send MQ: {message}")
    # i = input("======")
mq_connection.close()
sql_connection.close()

消费者代码

Scrapy 连接 RabbitMQ 将数据写入 SQLite。

Spider.py

import scrapy
import time
import pika
import sqlite3
import re
import json
from urllib import parse
from biquge_chapter_detail_spider.items import BiqugeChapterDetailSpiderItem


class SpiderSpider(scrapy.Spider):
    name = "spider"
    # allowed_domains = ["spider.com"]
    start_urls = []

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.queue_name = None
        self.channel = None
        self.db_params = None
        self.conn = None
        self.cursor = None
        self.tcp_uuid = 0

    def establish_connection(self):
        try:
            connection_params = self.settings.get('RABBITMQ_PARAMS', None)
            self.queue_name = connection_params['queue']
            credentials = pika.PlainCredentials(
                connection_params['username'],
                connection_params['password']
            )
            connection_params_result = {
                'host': connection_params['host'],
                'port': connection_params['port'],
                'virtual_host': connection_params['virtual_host'],
                'credentials': credentials,
                'heartbeat': 3600,
                'connection_attempts': 5,
            }
            connection = pika.BlockingConnection(pika.ConnectionParameters(**connection_params_result))
            self.channel = connection.channel()
            self.channel.basic_qos(prefetch_count=1)
            self.tcp_uuid = int(self.tcp_uuid) + 1
        except Exception as e:
            print(f"连接MQ失败: {str(e)}")
            print("等待5秒后重试...")
            time.sleep(5)
            self.establish_connection()

    def connect_db(self):
        try:
            self.conn = sqlite3.connect("../db/biquge.db")
            self.cursor = self.conn.cursor()
        except Exception as e:
            print("Error connecting to DB: ", e)
            print("等待5秒后重试...")
            time.sleep(5)
            self.connect_db()

    def extract_last_number_html(self, text):
        # 使用正则表达式查找所有的数字
        numbers = re.findall(r'.*?/(\d+).html', text)
        # print(numbers)
        if numbers:
            # 返回最后一个数字
            return str(numbers[-1])
        else:
            return ""

    def start_requests(self):
        self.establish_connection()
        self.connect_db()
        while True:
            try:
                method, header, body = self.channel.basic_get(self.queue_name)
            except Exception as e:
                print("--- ---")
                print(e)
                print("--- establish_connection ---")
                self.establish_connection()
                time.sleep(1)
                continue
            if not method:
                continue
            delivery_tag = method.delivery_tag
            body = body.decode()
            body = parse.unquote(body)
            json_data = json.loads(body)
            print(body)
            url = json_data['url']
            if url is None or url == "":
                self.ack(delivery_tag)
                continue
            chapter_code = self.extract_last_number_html(url)
            # print(chapter_code)
            # 检验数据库中是否有数据 有则跳过
            sql = "SELECT COUNT(id) AS count FROM chapter_detail_list WHERE chapter_code = ?"
            try:
                self.cursor.execute(sql, (chapter_code,))
                result = self.cursor.fetchone()
                count = result[0]
                if count > 0:
                    print(f"SQL SELECT chapter_code: {chapter_code}, COUNT: {count}, ACK: {delivery_tag} 已跳过")
                    self.ack(delivery_tag)
                    continue
            except Exception as e:
                print(e)
                print(sql)
                print("--- reconnect_db ---")
                self.no_ack(delivery_tag)
                self.connect_db()
                time.sleep(1)
                continue
            print(f"准备请求: {url}, ACK: {delivery_tag}")
            yield self.callback(
                url=url,
                delivery_tag=delivery_tag,
                chapter_code=chapter_code,
            )

    def callback(self, url, delivery_tag, chapter_code):
        meta = {
            "url": url,
            "chapter_code": chapter_code,
            "delivery_tag": delivery_tag,
            "tcp_uuid": int(self.tcp_uuid),
        }
        print(url)
        return scrapy.Request(
            url=url,
            meta=meta,
            callback=self.parse_list,
        )

    def ack(self, delivery_tag):
        self.channel.basic_ack(delivery_tag=delivery_tag)
        print(f"提交ACK确认: {delivery_tag}")

    def no_ack(self, delivery_tag):
        self.channel.basic_reject(delivery_tag=delivery_tag, requeue=True)

    def parse_list(self, response):
        meta = response.meta
        chapter_code = meta['chapter_code']
        chapter_list = response.xpath(".//div[@id='content']/text()").extract()
        chapter_json_list = []
        for each_chapter in chapter_list:
            each_chapter = re.sub(r' |\xa0', "", str(each_chapter))
            chapter_json_list.append(each_chapter)
        chapter_content = json.dumps(chapter_json_list, ensure_ascii=False)

        item = BiqugeChapterDetailSpiderItem()
        item['chapter_code'] = str(chapter_code)
        item['chapter_content'] = str(chapter_content)
        print(f"抓取 chapter_code: {chapter_code}")
        yield item

        # ack
        delivery_tag = meta['delivery_tag']
        tcp_uuid = meta['tcp_uuid']
        if int(tcp_uuid) == self.tcp_uuid:
            self.ack(delivery_tag)
        else:
            print(f"ACK 跳过: tcp_uuid: {tcp_uuid}, self.tcp_uuid: {self.tcp_uuid}, delivery_tag: {delivery_tag}")

pipline.py

写入 SQLite

# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html


# useful for handling different item types with a single interface
from itemadapter import ItemAdapter
import sqlite3


class SQLitePipeline:
    def __init__(self):
        self.cursor = None
        self.connection = None

    def open_spider(self, spider):
        # 连接到 SQLite 数据库
        self.connection = sqlite3.connect("../db/biquge.db")
        self.cursor = self.connection.cursor()

    def close_spider(self, spider):
        # 关闭数据库连接
        self.connection.close()

    def process_item(self, item, spider):
        sql = """
        INSERT INTO
        chapter_detail_list(chapter_code, chapter_content)
        VALUES(?, ?)
        """
        # print(sql)
        self.cursor.execute(
            sql, (item['chapter_code'], item['chapter_content'])
        )
        self.connection.commit()
        return item

settings.py

相关的配置情况

# Scrapy settings for biquge_chapter_detail_spider project
#
# For simplicity, this file contains only settings considered important or
# commonly used. You can find more settings consulting the documentation:
#
#     https://docs.scrapy.org/en/latest/topics/settings.html
#     https://docs.scrapy.org/en/latest/topics/downloader-middleware.html
#     https://docs.scrapy.org/en/latest/topics/spider-middleware.html
import os
from dotenv import load_dotenv

load_dotenv()

BOT_NAME = "biquge_chapter_detail_spider"

SPIDER_MODULES = ["biquge_chapter_detail_spider.spiders"]
NEWSPIDER_MODULE = "biquge_chapter_detail_spider.spiders"
LOG_LEVEL = "ERROR"

# Crawl responsibly by identifying yourself (and your website) on the user-agent
#USER_AGENT = "biquge_chapter_detail_spider (+http://www.yourdomain.com)"

# Obey robots.txt rules
ROBOTSTXT_OBEY = False

# Configure maximum concurrent requests performed by Scrapy (default: 16)
# CONCURRENT_REQUESTS = 1

# Configure a delay for requests for the same website (default: 0)
# See https://docs.scrapy.org/en/latest/topics/settings.html#download-delay
# See also autothrottle settings and docs
DOWNLOAD_DELAY = 0.2
# The download delay setting will honor only one of:
#CONCURRENT_REQUESTS_PER_DOMAIN = 16
#CONCURRENT_REQUESTS_PER_IP = 16

# Disable cookies (enabled by default)
#COOKIES_ENABLED = False

# Disable Telnet Console (enabled by default)
#TELNETCONSOLE_ENABLED = False

# Override the default request headers:
DEFAULT_REQUEST_HEADERS = {
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
    "Accept-Language": "en",
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
}

# Enable or disable spider middlewares
# See https://docs.scrapy.org/en/latest/topics/spider-middleware.html
#SPIDER_MIDDLEWARES = {
#    "biquge_chapter_detail_spider.middlewares.BiqugeChapterDetailSpiderSpiderMiddleware": 543,
#}

# Enable or disable downloader middlewares
# See https://docs.scrapy.org/en/latest/topics/downloader-middleware.html
#DOWNLOADER_MIDDLEWARES = {
#    "biquge_chapter_detail_spider.middlewares.BiqugeChapterDetailSpiderDownloaderMiddleware": 543,
#}

# Enable or disable extensions
# See https://docs.scrapy.org/en/latest/topics/extensions.html
#EXTENSIONS = {
#    "scrapy.extensions.telnet.TelnetConsole": None,
#}

# Configure item pipelines
# See https://docs.scrapy.org/en/latest/topics/item-pipeline.html
ITEM_PIPELINES = {
    "biquge_chapter_detail_spider.pipelines.SQLitePipeline": 300,
    # "biquge_chapter_detail_spider.pipelines.BiqugeChapterDetailSpiderPipeline": 300,
}

# Enable and configure the AutoThrottle extension (disabled by default)
# See https://docs.scrapy.org/en/latest/topics/autothrottle.html
#AUTOTHROTTLE_ENABLED = True
# The initial download delay
#AUTOTHROTTLE_START_DELAY = 5
# The maximum download delay to be set in case of high latencies
#AUTOTHROTTLE_MAX_DELAY = 60
# The average number of requests Scrapy should be sending in parallel to
# each remote server
#AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0
# Enable showing throttling stats for every response received:
#AUTOTHROTTLE_DEBUG = False

# Enable and configure HTTP caching (disabled by default)
# See https://docs.scrapy.org/en/latest/topics/downloader-middleware.html#httpcache-middleware-settings
#HTTPCACHE_ENABLED = True
#HTTPCACHE_EXPIRATION_SECS = 0
#HTTPCACHE_DIR = "httpcache"
#HTTPCACHE_IGNORE_HTTP_CODES = []
#HTTPCACHE_STORAGE = "scrapy.extensions.httpcache.FilesystemCacheStorage"

# Set settings whose default value is deprecated to a future-proof value
REQUEST_FINGERPRINTER_IMPLEMENTATION = "2.7"
TWISTED_REACTOR = "twisted.internet.asyncioreactor.AsyncioSelectorReactor"
FEED_EXPORT_ENCODING = "utf-8"

RABBITMQ_PARAMS = {
    'queue': os.getenv('RABBITMQ_QUEUE', 'default_queue'),
    'host': os.getenv('RABBITMQ_HOST', 'localhost'),
    'port': os.getenv('RABBITMQ_PORT', '5672'),
    'virtual_host': os.getenv('RABBITMQ_VHOST', '/'),
    'username': os.getenv('RABBITMQ_USERNAME', 'guest'),
    'password': os.getenv('RABBITMQ_PASSWORD', 'guest'),
    'auto_ack': os.getenv('RABBITMQ_AUTO_ACK', False)
}

运行代码

scrapy crawl spider

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
1月前
|
数据采集 存储 JSON
Python网络爬虫:Scrapy框架的实战应用与技巧分享
【10月更文挑战第27天】本文介绍了Python网络爬虫Scrapy框架的实战应用与技巧。首先讲解了如何创建Scrapy项目、定义爬虫、处理JSON响应、设置User-Agent和代理,以及存储爬取的数据。通过具体示例,帮助读者掌握Scrapy的核心功能和使用方法,提升数据采集效率。
105 6
|
2月前
|
数据采集 API 数据处理
Objective-C 音频爬虫:实时接收数据的 didReceiveData: 方法
Objective-C 音频爬虫:实时接收数据的 didReceiveData: 方法
|
5天前
|
数据采集 安全 定位技术
使用代理IP爬虫时数据不完整的原因探讨
在信息化时代,互联网成为生活的重要部分。使用HTTP代理爬取数据时,可能会遇到失败情况,如代理IP失效、速度慢、目标网站策略、请求频率过高、地理位置不当、网络连接问题、代理配置错误和目标网站内容变化等。解决方法包括更换代理IP、调整请求频率、检查配置及目标网站变化。
34 11
|
24天前
|
数据采集 JSON JavaScript
如何通过PHP爬虫模拟表单提交,抓取隐藏数据
本文介绍了如何使用PHP模拟表单提交并结合代理IP技术抓取京东商品的实时名称和价格,特别是在电商大促期间的数据采集需求。通过cURL发送POST请求,设置User-Agent和Cookie,使用代理IP绕过限制,解析返回数据,展示了完整代码示例。
如何通过PHP爬虫模拟表单提交,抓取隐藏数据
|
1月前
|
消息中间件 大数据 Kafka
大厂面试高频:Kafka、RocketMQ、RabbitMQ 的优劣势比较
本文深入探讨了消息队列的核心概念、应用场景及Kafka、RocketMQ、RabbitMQ的优劣势比较,大厂面试高频,必知必会,建议收藏。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
大厂面试高频:Kafka、RocketMQ、RabbitMQ 的优劣势比较
|
1月前
|
消息中间件 测试技术
通过轻量消息队列(原MNS)主题HTTP订阅+ARMS实现自定义数据多渠道告警
轻量消息队列(原MNS)以其简单队列模型、轻量化协议及按量后付费模式,成为阿里云产品间消息传输首选。本文通过创建主题、订阅、配置告警集成等步骤,展示了该产品在实际应用中的部分功能,确保消息的可靠传输。
44 2
|
1月前
|
数据采集 前端开发 中间件
Python网络爬虫:Scrapy框架的实战应用与技巧分享
【10月更文挑战第26天】Python是一种强大的编程语言,在数据抓取和网络爬虫领域应用广泛。Scrapy作为高效灵活的爬虫框架,为开发者提供了强大的工具集。本文通过实战案例,详细解析Scrapy框架的应用与技巧,并附上示例代码。文章介绍了Scrapy的基本概念、创建项目、编写简单爬虫、高级特性和技巧等内容。
77 4
|
1月前
|
数据采集 中间件 API
在Scrapy爬虫中应用Crawlera进行反爬虫策略
在Scrapy爬虫中应用Crawlera进行反爬虫策略
|
1月前
|
消息中间件 存储 监控
ActiveMQ、RocketMQ、RabbitMQ、Kafka 的区别
【10月更文挑战第24天】ActiveMQ、RocketMQ、RabbitMQ 和 Kafka 都有各自的特点和优势,在不同的应用场景中发挥着重要作用。在选择消息队列时,需要根据具体的需求、性能要求、扩展性要求等因素进行综合考虑,选择最适合的消息队列技术。同时,随着技术的不断发展和演进,这些消息队列也在不断地更新和完善,以适应不断变化的应用需求。
103 1
|
1月前
|
数据采集 JavaScript 程序员
探索CSDN博客数据:使用Python爬虫技术
本文介绍了如何利用Python的requests和pyquery库爬取CSDN博客数据,包括环境准备、代码解析及注意事项,适合初学者学习。
76 0
下一篇
DataWorks