代码仓库
代码我已经上传到 Github,大家需要的可以顺手点个 Star!
https://github.com/turbo-duck/biquge_fiction_spider
背景介绍上一节已经拿到了 小说的详细内容 和 章节的列表
接下来,将章节的列表使用脚本从SQLite数据库中取出,使用脚本把数据推送至 RabbitMQ 中。
最后,Scrapy消费MQ,手动ACK确认,将数据写入到SQLite。
使用技术
- RabbitMQ
- Scrapy
- SQLite
生产者代码
连接 SQLite + RabbitMQ,构造 URL 推送!
import pika import json import sqlite3 import os from dotenv import load_dotenv load_dotenv() sql_connection = sqlite3.connect('../db/biquge.db') cursor = sql_connection.cursor() rabbitmq_queue = os.getenv('RABBITMQ_QUEUE', 'default_queue') rabbitmq_host = os.getenv('RABBITMQ_HOST', 'localhost') rabbitmq_port = os.getenv('RABBITMQ_PORT', '5672') virtual_host = os.getenv('RABBITMQ_VHOST', '/') username = os.getenv('RABBITMQ_USERNAME', 'guest') password = os.getenv('RABBITMQ_PASSWORD', 'guest') credentials = pika.PlainCredentials( username, password ) connection_params_result = { 'host': rabbitmq_host, 'port': rabbitmq_port, 'virtual_host': '/', 'credentials': credentials, } mq_connection = pika.BlockingConnection(pika.ConnectionParameters(**connection_params_result)) channel = mq_connection.channel() channel.queue_declare(queue=rabbitmq_queue, durable=True) # 按照页查询小说 (不然一次性太多了) page_info = '1/1391' sql = """ SELECT each_code FROM biquge_list WHERE page_info = ? """ cursor.execute(sql, (page_info, )) results = cursor.fetchall() for each_fiction in results: fiction_code = each_fiction[0] print(f"fiction code: {fiction_code}") # 根据 小说编码 查询 小说章节编码 sql = """ SELECT chapter_code FROM chapter_list WHERE fiction_code = ? """ cursor.execute(sql, (fiction_code,)) chapter_results = cursor.fetchall() for each_chapter in chapter_results: chapter_code = each_chapter[0] chapter_url = f"https://www.xbiqugew.com/book/{fiction_code}/{chapter_code}.html" message = json.dumps({ 'url': chapter_url, }) channel.basic_publish( exchange='', routing_key=rabbitmq_queue, body=message.encode('utf-8'), properties=pika.BasicProperties(delivery_mode=2) ) print(f"Send MQ: {message}") # i = input("======") mq_connection.close() sql_connection.close()
消费者代码
Scrapy 连接 RabbitMQ 将数据写入 SQLite。
Spider.py
import scrapy import time import pika import sqlite3 import re import json from urllib import parse from biquge_chapter_detail_spider.items import BiqugeChapterDetailSpiderItem class SpiderSpider(scrapy.Spider): name = "spider" # allowed_domains = ["spider.com"] start_urls = [] def __init__(self, **kwargs): super().__init__(**kwargs) self.queue_name = None self.channel = None self.db_params = None self.conn = None self.cursor = None self.tcp_uuid = 0 def establish_connection(self): try: connection_params = self.settings.get('RABBITMQ_PARAMS', None) self.queue_name = connection_params['queue'] credentials = pika.PlainCredentials( connection_params['username'], connection_params['password'] ) connection_params_result = { 'host': connection_params['host'], 'port': connection_params['port'], 'virtual_host': connection_params['virtual_host'], 'credentials': credentials, 'heartbeat': 3600, 'connection_attempts': 5, } connection = pika.BlockingConnection(pika.ConnectionParameters(**connection_params_result)) self.channel = connection.channel() self.channel.basic_qos(prefetch_count=1) self.tcp_uuid = int(self.tcp_uuid) + 1 except Exception as e: print(f"连接MQ失败: {str(e)}") print("等待5秒后重试...") time.sleep(5) self.establish_connection() def connect_db(self): try: self.conn = sqlite3.connect("../db/biquge.db") self.cursor = self.conn.cursor() except Exception as e: print("Error connecting to DB: ", e) print("等待5秒后重试...") time.sleep(5) self.connect_db() def extract_last_number_html(self, text): # 使用正则表达式查找所有的数字 numbers = re.findall(r'.*?/(\d+).html', text) # print(numbers) if numbers: # 返回最后一个数字 return str(numbers[-1]) else: return "" def start_requests(self): self.establish_connection() self.connect_db() while True: try: method, header, body = self.channel.basic_get(self.queue_name) except Exception as e: print("--- ---") print(e) print("--- establish_connection ---") self.establish_connection() time.sleep(1) continue if not method: continue delivery_tag = method.delivery_tag body = body.decode() body = parse.unquote(body) json_data = json.loads(body) print(body) url = json_data['url'] if url is None or url == "": self.ack(delivery_tag) continue chapter_code = self.extract_last_number_html(url) # print(chapter_code) # 检验数据库中是否有数据 有则跳过 sql = "SELECT COUNT(id) AS count FROM chapter_detail_list WHERE chapter_code = ?" try: self.cursor.execute(sql, (chapter_code,)) result = self.cursor.fetchone() count = result[0] if count > 0: print(f"SQL SELECT chapter_code: {chapter_code}, COUNT: {count}, ACK: {delivery_tag} 已跳过") self.ack(delivery_tag) continue except Exception as e: print(e) print(sql) print("--- reconnect_db ---") self.no_ack(delivery_tag) self.connect_db() time.sleep(1) continue print(f"准备请求: {url}, ACK: {delivery_tag}") yield self.callback( url=url, delivery_tag=delivery_tag, chapter_code=chapter_code, ) def callback(self, url, delivery_tag, chapter_code): meta = { "url": url, "chapter_code": chapter_code, "delivery_tag": delivery_tag, "tcp_uuid": int(self.tcp_uuid), } print(url) return scrapy.Request( url=url, meta=meta, callback=self.parse_list, ) def ack(self, delivery_tag): self.channel.basic_ack(delivery_tag=delivery_tag) print(f"提交ACK确认: {delivery_tag}") def no_ack(self, delivery_tag): self.channel.basic_reject(delivery_tag=delivery_tag, requeue=True) def parse_list(self, response): meta = response.meta chapter_code = meta['chapter_code'] chapter_list = response.xpath(".//div[@id='content']/text()").extract() chapter_json_list = [] for each_chapter in chapter_list: each_chapter = re.sub(r' |\xa0', "", str(each_chapter)) chapter_json_list.append(each_chapter) chapter_content = json.dumps(chapter_json_list, ensure_ascii=False) item = BiqugeChapterDetailSpiderItem() item['chapter_code'] = str(chapter_code) item['chapter_content'] = str(chapter_content) print(f"抓取 chapter_code: {chapter_code}") yield item # ack delivery_tag = meta['delivery_tag'] tcp_uuid = meta['tcp_uuid'] if int(tcp_uuid) == self.tcp_uuid: self.ack(delivery_tag) else: print(f"ACK 跳过: tcp_uuid: {tcp_uuid}, self.tcp_uuid: {self.tcp_uuid}, delivery_tag: {delivery_tag}")
pipline.py
写入 SQLite
# Define your item pipelines here # # Don't forget to add your pipeline to the ITEM_PIPELINES setting # See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html # useful for handling different item types with a single interface from itemadapter import ItemAdapter import sqlite3 class SQLitePipeline: def __init__(self): self.cursor = None self.connection = None def open_spider(self, spider): # 连接到 SQLite 数据库 self.connection = sqlite3.connect("../db/biquge.db") self.cursor = self.connection.cursor() def close_spider(self, spider): # 关闭数据库连接 self.connection.close() def process_item(self, item, spider): sql = """ INSERT INTO chapter_detail_list(chapter_code, chapter_content) VALUES(?, ?) """ # print(sql) self.cursor.execute( sql, (item['chapter_code'], item['chapter_content']) ) self.connection.commit() return item
settings.py
相关的配置情况
# Scrapy settings for biquge_chapter_detail_spider project # # For simplicity, this file contains only settings considered important or # commonly used. You can find more settings consulting the documentation: # # https://docs.scrapy.org/en/latest/topics/settings.html # https://docs.scrapy.org/en/latest/topics/downloader-middleware.html # https://docs.scrapy.org/en/latest/topics/spider-middleware.html import os from dotenv import load_dotenv load_dotenv() BOT_NAME = "biquge_chapter_detail_spider" SPIDER_MODULES = ["biquge_chapter_detail_spider.spiders"] NEWSPIDER_MODULE = "biquge_chapter_detail_spider.spiders" LOG_LEVEL = "ERROR" # Crawl responsibly by identifying yourself (and your website) on the user-agent #USER_AGENT = "biquge_chapter_detail_spider (+http://www.yourdomain.com)" # Obey robots.txt rules ROBOTSTXT_OBEY = False # Configure maximum concurrent requests performed by Scrapy (default: 16) # CONCURRENT_REQUESTS = 1 # Configure a delay for requests for the same website (default: 0) # See https://docs.scrapy.org/en/latest/topics/settings.html#download-delay # See also autothrottle settings and docs DOWNLOAD_DELAY = 0.2 # The download delay setting will honor only one of: #CONCURRENT_REQUESTS_PER_DOMAIN = 16 #CONCURRENT_REQUESTS_PER_IP = 16 # Disable cookies (enabled by default) #COOKIES_ENABLED = False # Disable Telnet Console (enabled by default) #TELNETCONSOLE_ENABLED = False # Override the default request headers: DEFAULT_REQUEST_HEADERS = { "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "en", "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36" } # Enable or disable spider middlewares # See https://docs.scrapy.org/en/latest/topics/spider-middleware.html #SPIDER_MIDDLEWARES = { # "biquge_chapter_detail_spider.middlewares.BiqugeChapterDetailSpiderSpiderMiddleware": 543, #} # Enable or disable downloader middlewares # See https://docs.scrapy.org/en/latest/topics/downloader-middleware.html #DOWNLOADER_MIDDLEWARES = { # "biquge_chapter_detail_spider.middlewares.BiqugeChapterDetailSpiderDownloaderMiddleware": 543, #} # Enable or disable extensions # See https://docs.scrapy.org/en/latest/topics/extensions.html #EXTENSIONS = { # "scrapy.extensions.telnet.TelnetConsole": None, #} # Configure item pipelines # See https://docs.scrapy.org/en/latest/topics/item-pipeline.html ITEM_PIPELINES = { "biquge_chapter_detail_spider.pipelines.SQLitePipeline": 300, # "biquge_chapter_detail_spider.pipelines.BiqugeChapterDetailSpiderPipeline": 300, } # Enable and configure the AutoThrottle extension (disabled by default) # See https://docs.scrapy.org/en/latest/topics/autothrottle.html #AUTOTHROTTLE_ENABLED = True # The initial download delay #AUTOTHROTTLE_START_DELAY = 5 # The maximum download delay to be set in case of high latencies #AUTOTHROTTLE_MAX_DELAY = 60 # The average number of requests Scrapy should be sending in parallel to # each remote server #AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0 # Enable showing throttling stats for every response received: #AUTOTHROTTLE_DEBUG = False # Enable and configure HTTP caching (disabled by default) # See https://docs.scrapy.org/en/latest/topics/downloader-middleware.html#httpcache-middleware-settings #HTTPCACHE_ENABLED = True #HTTPCACHE_EXPIRATION_SECS = 0 #HTTPCACHE_DIR = "httpcache" #HTTPCACHE_IGNORE_HTTP_CODES = [] #HTTPCACHE_STORAGE = "scrapy.extensions.httpcache.FilesystemCacheStorage" # Set settings whose default value is deprecated to a future-proof value REQUEST_FINGERPRINTER_IMPLEMENTATION = "2.7" TWISTED_REACTOR = "twisted.internet.asyncioreactor.AsyncioSelectorReactor" FEED_EXPORT_ENCODING = "utf-8" RABBITMQ_PARAMS = { 'queue': os.getenv('RABBITMQ_QUEUE', 'default_queue'), 'host': os.getenv('RABBITMQ_HOST', 'localhost'), 'port': os.getenv('RABBITMQ_PORT', '5672'), 'virtual_host': os.getenv('RABBITMQ_VHOST', '/'), 'username': os.getenv('RABBITMQ_USERNAME', 'guest'), 'password': os.getenv('RABBITMQ_PASSWORD', 'guest'), 'auto_ack': os.getenv('RABBITMQ_AUTO_ACK', False) }
运行代码
scrapy crawl spider