小说爬虫-02 爬取小说详细内容和章节列表 推送至RabbitMQ 消费ACK确认 Scrapy爬取 SQLite

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
云解析 DNS,旗舰版 1个月
全局流量管理 GTM,标准版 1个月
简介: 小说爬虫-02 爬取小说详细内容和章节列表 推送至RabbitMQ 消费ACK确认 Scrapy爬取 SQLite

代码仓库

代码我已经上传到 Github,大家需要的可以顺手点个 Star!

https://github.com/turbo-duck/biquge_fiction_spider

背景介绍

上一节已经拿到了每个小说的编码:fiction_code,并且写入了数据库表。

接下来,我们写一个小工具,将数据表中的数据,都推送到 RabbitMQ 中。

为了保证我们不丢数据,在消费的时候,我们将手动进行 ACK 确认。

目前,现在库工具和RabbitMQ的配合比较差,不知道为什么, 所以就手搓了一部分代码, 来实现 RabbitMQ 和 Scrapy 的结合。

使用技术

  • RabbitMQ
  • Scrapy
  • SQLite

生产者代码

先写一个生产者,从数据库中拿到数据,然后将URL推送到RabbitMQ中。后续将用Scrapy对该队列进行消费。

完整代码如下

import pika
import json
import sqlite3
import os
from dotenv import load_dotenv
load_dotenv()

sql_connection = sqlite3.connect('../db/biquge.db')
cursor = sql_connection.cursor()

rabbitmq_queue = os.getenv('RABBITMQ_QUEUE', 'default_queue')
rabbitmq_host = os.getenv('RABBITMQ_HOST', 'localhost')
rabbitmq_port = os.getenv('RABBITMQ_PORT', '5672')
virtual_host = os.getenv('RABBITMQ_VHOST', '/')
username = os.getenv('RABBITMQ_USERNAME', 'guest')
password = os.getenv('RABBITMQ_PASSWORD', 'guest')

credentials = pika.PlainCredentials(
    username,
    password
)

connection_params_result = {
    'host': rabbitmq_host,
    'port': rabbitmq_port,
    'virtual_host': '/',
    'credentials': credentials,
}
mq_connection = pika.BlockingConnection(pika.ConnectionParameters(**connection_params_result))
channel = mq_connection.channel()
channel.queue_declare(queue=rabbitmq_queue, durable=True)


sql = """
SELECT each_href FROM biquge_list
"""
cursor.execute(sql)
results = cursor.fetchall()
for row in results:
    each_href = row[0]
    print(each_href)
    message = json.dumps({
        'url': each_href,
    })
    channel.basic_publish(
        exchange='',
        routing_key=rabbitmq_queue,
        body=message.encode('utf-8'),
        properties=pika.BasicProperties(delivery_mode=2)
    )
    print(f"Send MQ: {message}")

mq_connection.close()
sql_connection.close()

消费者代码

由于市面上的包不太我符合我的需求,所以这里是手搓的,比较长!

这里有一些主要的逻辑判断:


查询数据是否存在,存在则直接ACK确认

消费MQ如果失败将会重连

由于重连后确认的tag将会失效,所以会有一个version_id的机制来判断 比较提交错误

这个Spider中有两个主要的部分:


爬取小说的详细介绍

爬取小说的章节列表

这是两个不同的Item

Spider.py

这里做一些介绍:

初始化方法中定义了一些实例变量

def __init__(self, **kwargs):
    super().__init__(**kwargs)
    self.queue_name = None
    self.channel = None
    self.db_params = None
    self.conn = None
    self.cursor = None
    self.tcp_uuid = 0

建立 RabbitMQ 连接

def establish_connection(self):
    try:
        connection_params = self.settings.get('RABBITMQ_PARAMS', None)
        self.queue_name = connection_params['queue']
        credentials = pika.PlainCredentials(
            connection_params['username'],
            connection_params['password']
        )
        connection_params_result = {
            'host': connection_params['host'],
            'port': connection_params['port'],
            'virtual_host': connection_params['virtual_host'],
            'credentials': credentials,
            'heartbeat': 3600,
            'connection_attempts': 5,
        }
        connection = pika.BlockingConnection(pika.ConnectionParameters(**connection_params_result))
        self.channel = connection.channel()
        self.channel.basic_qos(prefetch_count=1)
        self.tcp_uuid = int(self.tcp_uuid) + 1
    except Exception as e:
        print(f"连接MQ失败: {str(e)}")
        print("等待5秒后重试...")
        time.sleep(5)
        self.establish_connection()

建立数据库的链接

def connect_db(self):
    try:
        self.conn = sqlite3.connect("../db/biquge.db")
        self.cursor = self.conn.cursor()
    except Exception as e:
        print("Error connecting to DB: ", e)
        print("等待5秒后重试...")
        time.sleep(5)
        self.connect_db()

处理需要抓取的请求

def callback(self, url, delivery_tag, fiction_code):
    meta = {
        "url": url,
        "fiction_code": fiction_code,
        "delivery_tag": delivery_tag,
        "tcp_uuid": int(self.tcp_uuid),
    }
    print(url)
    return scrapy.Request(
        url=url,
        meta=meta,
        callback=self.parse_list,
    )

确认消费和拒绝消费

def ack(self, delivery_tag):
    self.channel.basic_ack(delivery_tag=delivery_tag)
    print(f"提交ACK确认: {delivery_tag}")

def no_ack(self, delivery_tag):
    self.channel.basic_reject(delivery_tag=delivery_tag, requeue=True)

对内容进行解析处理

def parse_list(self, response):
    meta = response.meta

    # ==== 解析 小说基本信息 ====
    fiction_code = meta['fiction_code']
    fiction_name = response.xpath(".//div[@id='info']/h1/text()").extract_first()
    fiction_info = response.xpath(".//p[contains(text(), '更新时间:')]/text()").extract_first()
    fiction_introduce = response.xpath(".//div[@id='intro']/text()").extract()
    fiction_author = response.xpath(".//p[contains(text(), '作者:')]/a/text()").extract_first()

    fiction_type = response.xpath(".//div[@class='con_top']/text()").extract_first()
    fiction_type = re.sub(" ", "", str(fiction_type))
    fiction_type = re.sub(re.escape(fiction_name), "", str(fiction_type))
    fiction_type = re.sub(">", "", str(fiction_type))

    fiction_image_url = response.xpath(".//div[@id='fmimg']/img/@src").extract_first()
    fiction_count = response.xpath(".//p[contains(text(), '更新时间:')]/text()").extract_first()
    fiction_count = re.sub("更新时间:", "", str(fiction_count))

    item = BiqugeChapterSpiderFictionItem()
    item['fiction_code'] = str(fiction_code)
    item['fiction_name'] = str(fiction_name)
    item['fiction_info'] = str(fiction_info)
    item['fiction_introduce'] = str(fiction_introduce)
    item['fiction_author'] = str(fiction_author)
    item['fiction_type'] = str(fiction_type)
    item['fiction_image_url'] = str(fiction_image_url)
    item['fiction_count'] = str(fiction_count)
    print(f"获取{item['fiction_name']}信息")
    yield item

    # ==== 解析 小说章节 ====
    chapter_list = response.xpath(".//div[@id='list']/dl/dd/a")
    chapter_set = set()
    chapter_only_one_list = list()
    for each_chapter in chapter_list:
        each_href = each_chapter.xpath("./@href").extract_first()
        each_code = re.sub(".html", "", str(each_href))
        if each_code in chapter_set:
            continue
        else:
            chapter_set.add(each_code)
        each_name = each_chapter.xpath("./text()").extract_first()
        set_item = {
            "each_code": str(each_code),
            "each_name": str(each_name),
       

完整代码如下

import scrapy
import re
import pika
import json
import time
import scrapy
from urllib import parse
import logging
import sqlite3
from biquge_chapter_spider.items import BiqugeChapterSpiderFictionItem, BiqugeChapterSpiderChapterItem


logger = logging.getLogger(__name__)


class SpiderSpider(scrapy.Spider):
    name = "spider"
    # allowed_domains = ["spider.com"]
    start_urls = []

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.queue_name = None
        self.channel = None
        self.db_params = None
        self.conn = None
        self.cursor = None
        self.tcp_uuid = 0

    def establish_connection(self):
        try:
            connection_params = self.settings.get('RABBITMQ_PARAMS', None)
            self.queue_name = connection_params['queue']
            credentials = pika.PlainCredentials(
                connection_params['username'],
                connection_params['password']
            )
            connection_params_result = {
                'host': connection_params['host'],
                'port': connection_params['port'],
                'virtual_host': connection_params['virtual_host'],
                'credentials': credentials,
                'heartbeat': 3600,
                'connection_attempts': 5,
            }
            connection = pika.BlockingConnection(pika.ConnectionParameters(**connection_params_result))
            self.channel = connection.channel()
            self.channel.basic_qos(prefetch_count=1)
            self.tcp_uuid = int(self.tcp_uuid) + 1
        except Exception as e:
            print(f"连接MQ失败: {str(e)}")
            print("等待5秒后重试...")
            time.sleep(5)
            self.establish_connection()

    def connect_db(self):
        try:
            self.conn = sqlite3.connect("../db/biquge.db")
            self.cursor = self.conn.cursor()
        except Exception as e:
            print("Error connecting to DB: ", e)
            print("等待5秒后重试...")
            time.sleep(5)
            self.connect_db()

    def extract_last_number(self, text):
        # 使用正则表达式查找所有的数字
        numbers = re.findall(r'.*?/(\d+)/', text)
        # print(numbers)
        if numbers:
            # 返回最后一个数字
            return str(numbers[-1])
        else:
            return ""

    def start_requests(self):
        self.establish_connection()
        self.connect_db()
        while True:
            try:
                method, header, body = self.channel.basic_get(self.queue_name)
            except Exception as e:
                print("--- ---")
                print(e)
                print("--- establish_connection ---")
                self.establish_connection()
                time.sleep(1)
                continue
            if not method:
                continue
            delivery_tag = method.delivery_tag
            body = body.decode()
            body = parse.unquote(body)
            json_data = json.loads(body)
            print(body)
            url = json_data['url']
            if url is None or url == "":
                self.ack(delivery_tag)
                continue
            fiction_code = self.extract_last_number(url)
            # 检验数据库中是否有数据 有则跳过
            sql = "SELECT COUNT(id) AS count FROM fiction_list WHERE fiction_code = ?"
            try:
                self.cursor.execute(sql, (fiction_code,))
                result = self.cursor.fetchone()
                count = result[0]
                if count > 0:
                    print(f"SQL SELECT fiction_code: {fiction_code}, COUNT: {count}, ACK: {delivery_tag} 已跳过")
                    self.ack(delivery_tag)
                    continue
            except Exception as e:
                print(e)
                print(sql)
                print("--- reconnect_db ---")
                self.no_ack(delivery_tag)
                self.connect_db()
                time.sleep(1)
                continue
            print(f"准备请求: {url}, ACK: {delivery_tag}")
            yield self.callback(
                url=url,
                delivery_tag=delivery_tag,
                fiction_code=fiction_code,
            )

    def callback(self, url, delivery_tag, fiction_code):
        meta = {
            "url": url,
            "fiction_code": fiction_code,
            "delivery_tag": delivery_tag,
            "tcp_uuid": int(self.tcp_uuid),
        }
        print(url)
        return scrapy.Request(
            url=url,
            meta=meta,
            callback=self.parse_list,
        )

    def ack(self, delivery_tag):
        self.channel.basic_ack(delivery_tag=delivery_tag)
        print(f"提交ACK确认: {delivery_tag}")

    def no_ack(self, delivery_tag):
        self.channel.basic_reject(delivery_tag=delivery_tag, requeue=True)

    def parse_list(self, response):
        meta = response.meta

        # ==== 解析 小说基本信息 ====
        fiction_code = meta['fiction_code']
        fiction_name = response.xpath(".//div[@id='info']/h1/text()").extract_first()
        fiction_info = response.xpath(".//p[contains(text(), '更新时间:')]/text()").extract_first()
        fiction_introduce = response.xpath(".//div[@id='intro']/text()").extract()
        fiction_author = response.xpath(".//p[contains(text(), '作者:')]/a/text()").extract_first()

        #  > 都市小说 > 汴京小医娘
        fiction_type = response.xpath(".//div[@class='con_top']/text()").extract_first()
        fiction_type = re.sub(" ", "", str(fiction_type))
        fiction_type = re.sub(re.escape(fiction_name), "", str(fiction_type))
        fiction_type = re.sub(">", "", str(fiction_type))

        fiction_image_url = response.xpath(".//div[@id='fmimg']/img/@src").extract_first()
        fiction_count = response.xpath(".//p[contains(text(), '更新时间:')]/text()").extract_first()
        fiction_count = re.sub("更新时间:", "", str(fiction_count))

        item = BiqugeChapterSpiderFictionItem()
        item['fiction_code'] = str(fiction_code)
        item['fiction_name'] = str(fiction_name)
        item['fiction_info'] = str(fiction_info)
        item['fiction_introduce'] = str(fiction_introduce)
        item['fiction_author'] = str(fiction_author)
        item['fiction_type'] = str(fiction_type)
        item['fiction_image_url'] = str(fiction_image_url)
        item['fiction_count'] = str(fiction_count)
        print(f"获取{item['fiction_name']}信息")
        yield item

        # ==== 解析 小说章节 ====
        chapter_list = response.xpath(".//div[@id='list']/dl/dd/a")
        # 用来去重的 页面上有不少重复内容
        chapter_set = set()
        chapter_only_one_list = list()
        for each_chapter in chapter_list:
            # 40726662.html
            each_href = each_chapter.xpath("./@href").extract_first()
            # 40726662
            each_code = re.sub(".html", "", str(each_href))
            if each_code in chapter_set:
                continue
            else:
                chapter_set.add(each_code)
            each_name = each_chapter.xpath("./text()").extract_first()
            set_item = {
                "each_code": str(each_code),
                "each_name": str(each_name),
            }
            # print(f"set_item: {set_item}")
            chapter_only_one_list.append(set_item)

        # 去重后的
        for each_chapter in chapter_only_one_list:
            chapter_code = each_chapter.get('each_code')
            chapter_name = each_chapter.get('each_name')
            # 通过code进行排序
            chapter_order = int(chapter_code)

            item = BiqugeChapterSpiderChapterItem()
            item['fiction_code'] = str(fiction_code)
            item['chapter_code'] = str(chapter_code)
            item['chapter_name'] = str(chapter_name)
            item['chapter_order'] = int(chapter_order)
            # print(f"获取 {fiction_name} 章节信息: {chapter_name}")
            yield item

        # ack
        delivery_tag = meta['delivery_tag']
        tcp_uuid = meta['tcp_uuid']
        if int(tcp_uuid) == self.tcp_uuid:
            self.ack(delivery_tag)
        else:
            print(f"ACK 跳过: tcp_uuid: {tcp_uuid}, self.tcp_uuid: {self.tcp_uuid}, delivery_tag: {delivery_tag}")

piplines.py

建立对数据库的链接

def open_spider(self, spider):
    self.connection = sqlite3.connect("../db/biquge.db")
    self.cursor = self.connection.cursor()

对不同的 Item 进行处理,通过ItemAdapter,判断属于哪个,来走不同的SQL

def process_item(self, item, spider):
    adapter = ItemAdapter(item)
    if isinstance(item, BiqugeChapterSpiderFictionItem):
        self.process_fiction_item(adapter, spider)
    elif isinstance(item, BiqugeChapterSpiderChapterItem):
        self.process_chapter_item(adapter, spider)
    return item

完整代码如下

# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html


# useful for handling different item types with a single interface
from itemadapter import ItemAdapter
import sqlite3
from .items import BiqugeChapterSpiderFictionItem, BiqugeChapterSpiderChapterItem


class BiqugeChapterSpiderPipeline:
    def process_item(self, item, spider):
        return item


class SQLitePipeline:

    def __init__(self):
        self.cursor = None
        self.connection = None

    def open_spider(self, spider):
        # 连接到 SQLite 数据库
        self.connection = sqlite3.connect("../db/biquge.db")
        self.cursor = self.connection.cursor()

    def close_spider(self, spider):
        # 关闭数据库连接
        self.connection.close()

    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        if isinstance(item, BiqugeChapterSpiderFictionItem):
            self.process_fiction_item(adapter, spider)
        elif isinstance(item, BiqugeChapterSpiderChapterItem):
            self.process_chapter_item(adapter, spider)
        return item

    def process_fiction_item(self, adapter, spider):
        self.cursor.execute('''
            INSERT INTO
            fiction_list(
            fiction_code, fiction_name, fiction_info, 
            fiction_introduce, fiction_author, fiction_type, 
            fiction_image_url, fiction_count, 
            create_time, update_time) 
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
            ''', (
            adapter.get('fiction_code'),
            adapter.get('fiction_name'),
            adapter.get('fiction_info'),
            adapter.get('fiction_introduce'),
            adapter.get('fiction_author'),
            adapter.get('fiction_type'),
            adapter.get('fiction_image_url'),
            adapter.get('fiction_count')
        ))
        self.connection.commit()
        print(f"数据库入库: fiction_list {adapter.get('fiction_name')}")
        return adapter

    def process_chapter_item(self, adapter, spider):
        self.cursor.execute('''
            INSERT INTO
            chapter_list(
            fiction_code, chapter_code, chapter_name, 
            chapter_order, create_time, update_time)
            VALUES(?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
            ''', (
            adapter.get('fiction_code'),
            adapter.get('chapter_code'),
            adapter.get('chapter_name'),
            adapter.get('chapter_order')
        ))
        self.connection.commit()
        # print(f"数据库入库: chapter_list {adapter.get('chapter_name')}")
        return adapter

settings.py

RabbitMQ 的连接配置在这里

# Scrapy settings for biquge_chapter_spider project
#
# For simplicity, this file contains only settings considered important or
# commonly used. You can find more settings consulting the documentation:
#
#     https://docs.scrapy.org/en/latest/topics/settings.html
#     https://docs.scrapy.org/en/latest/topics/downloader-middleware.html
#     https://docs.scrapy.org/en/latest/topics/spider-middleware.html
import os
from dotenv import load_dotenv
load_dotenv()


BOT_NAME = "biquge_chapter_spider"

SPIDER_MODULES = ["biquge_chapter_spider.spiders"]
NEWSPIDER_MODULE = "biquge_chapter_spider.spiders"
LOG_LEVEL = "ERROR"

# Crawl responsibly by identifying yourself (and your website) on the user-agent
#USER_AGENT = "biquge_chapter_spider (+http://www.yourdomain.com)"

# Obey robots.txt rules
ROBOTSTXT_OBEY = False

# Configure maximum concurrent requests performed by Scrapy (default: 16)
#CONCURRENT_REQUESTS = 32

# Configure a delay for requests for the same website (default: 0)
# See https://docs.scrapy.org/en/latest/topics/settings.html#download-delay
# See also autothrottle settings and docs
DOWNLOAD_DELAY = 0.2
# The download delay setting will honor only one of:
#CONCURRENT_REQUESTS_PER_DOMAIN = 16
#CONCURRENT_REQUESTS_PER_IP = 16

# Disable cookies (enabled by default)
#COOKIES_ENABLED = False

# Disable Telnet Console (enabled by default)
#TELNETCONSOLE_ENABLED = False

# Override the default request headers:
DEFAULT_REQUEST_HEADERS = {
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
    "Accept-Language": "en",
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
}

# Enable or disable spider middlewares
# See https://docs.scrapy.org/en/latest/topics/spider-middleware.html
#SPIDER_MIDDLEWARES = {
#    "biquge_chapter_spider.middlewares.BiqugeChapterSpiderSpiderMiddleware": 543,
#}

# Enable or disable downloader middlewares
# See https://docs.scrapy.org/en/latest/topics/downloader-middleware.html
#DOWNLOADER_MIDDLEWARES = {
#    "biquge_chapter_spider.middlewares.BiqugeChapterSpiderDownloaderMiddleware": 543,
#}

# Enable or disable extensions
# See https://docs.scrapy.org/en/latest/topics/extensions.html
#EXTENSIONS = {
#    "scrapy.extensions.telnet.TelnetConsole": None,
#}

# Configure item pipelines
# See https://docs.scrapy.org/en/latest/topics/item-pipeline.html
ITEM_PIPELINES = {
    "biquge_chapter_spider.pipelines.SQLitePipeline": 300,
}

# Enable and configure the AutoThrottle extension (disabled by default)
# See https://docs.scrapy.org/en/latest/topics/autothrottle.html
#AUTOTHROTTLE_ENABLED = True
# The initial download delay
#AUTOTHROTTLE_START_DELAY = 5
# The maximum download delay to be set in case of high latencies
#AUTOTHROTTLE_MAX_DELAY = 60
# The average number of requests Scrapy should be sending in parallel to
# each remote server
#AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0
# Enable showing throttling stats for every response received:
#AUTOTHROTTLE_DEBUG = False

# Enable and configure HTTP caching (disabled by default)
# See https://docs.scrapy.org/en/latest/topics/downloader-middleware.html#httpcache-middleware-settings
#HTTPCACHE_ENABLED = True
#HTTPCACHE_EXPIRATION_SECS = 0
#HTTPCACHE_DIR = "httpcache"
#HTTPCACHE_IGNORE_HTTP_CODES = []
#HTTPCACHE_STORAGE = "scrapy.extensions.httpcache.FilesystemCacheStorage"

# Set settings whose default value is deprecated to a future-proof value
REQUEST_FINGERPRINTER_IMPLEMENTATION = "2.7"
TWISTED_REACTOR = "twisted.internet.asyncioreactor.AsyncioSelectorReactor"
FEED_EXPORT_ENCODING = "utf-8"

# RabbitMQ settings
RABBITMQ_PARAMS = {
    'queue': os.getenv('RABBITMQ_QUEUE', 'default_queue'),
    'host': os.getenv('RABBITMQ_HOST', 'localhost'),
    'port': os.getenv('RABBITMQ_PORT', '5672'),
    'virtual_host': os.getenv('RABBITMQ_VHOST', '/'),
    'username': os.getenv('RABBITMQ_USERNAME', 'guest'),
    'password': os.getenv('RABBITMQ_PASSWORD', 'guest'),
    'auto_ack': os.getenv('RABBITMQ_AUTO_ACK', False)
}

运行代码

运行生产者

python producer.py

运行消费者

scrapy crawl spider


相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
目录
打赏
0
1
1
0
100
分享
相关文章
在Scrapy爬虫中应用Crawlera进行反爬虫策略
在Scrapy爬虫中应用Crawlera进行反爬虫策略
Python网络爬虫:Scrapy框架的实战应用与技巧分享
【10月更文挑战第27天】本文介绍了Python网络爬虫Scrapy框架的实战应用与技巧。首先讲解了如何创建Scrapy项目、定义爬虫、处理JSON响应、设置User-Agent和代理,以及存储爬取的数据。通过具体示例,帮助读者掌握Scrapy的核心功能和使用方法,提升数据采集效率。
194 6
Python网络爬虫:Scrapy框架的实战应用与技巧分享
【10月更文挑战第26天】Python是一种强大的编程语言,在数据抓取和网络爬虫领域应用广泛。Scrapy作为高效灵活的爬虫框架,为开发者提供了强大的工具集。本文通过实战案例,详细解析Scrapy框架的应用与技巧,并附上示例代码。文章介绍了Scrapy的基本概念、创建项目、编写简单爬虫、高级特性和技巧等内容。
155 4
小说爬虫-03 爬取章节的详细内容并保存 将章节URL推送至RabbitMQ Scrapy消费MQ 对数据进行爬取后写入SQLite
小说爬虫-03 爬取章节的详细内容并保存 将章节URL推送至RabbitMQ Scrapy消费MQ 对数据进行爬取后写入SQLite
64 1
小说爬虫-01爬取总排行榜 分页翻页 Scrapy SQLite SQL 简单上手!
小说爬虫-01爬取总排行榜 分页翻页 Scrapy SQLite SQL 简单上手!
119 0
Python进行网络爬虫:Scrapy框架的实践
【8月更文挑战第17天】网络爬虫是自动化程序,用于从互联网收集信息。Python凭借其丰富的库和框架成为构建爬虫的首选语言。Scrapy作为一款流行的开源框架,简化了爬虫开发过程。本文介绍如何使用Python和Scrapy构建简单爬虫:首先安装Scrapy,接着创建新项目并定义爬虫,指定起始URL和解析逻辑。运行爬虫可将数据保存为JSON文件或存储到数据库。此外,Scrapy支持高级功能如中间件定制、分布式爬取、动态页面渲染等。在实践中需遵循最佳规范,如尊重robots.txt协议、合理设置爬取速度等。通过本文,读者将掌握Scrapy基础并了解如何高效地进行网络数据采集。
310 6
Python爬虫开发:BeautifulSoup、Scrapy入门
在现代网络开发中,网络爬虫是一个非常重要的工具。它可以自动化地从网页中提取数据,并且可以用于各种用途,如数据收集、信息聚合和内容监控等。在Python中,有多个库可以用于爬虫开发,其中BeautifulSoup和Scrapy是两个非常流行的选择。本篇文章将详细介绍这两个库,并提供一个综合详细的例子,展示如何使用它们来进行网页数据爬取。
高级网页爬虫开发:Scrapy和BeautifulSoup的深度整合
高级网页爬虫开发:Scrapy和BeautifulSoup的深度整合
Scrapy爬虫框架-通过Cookies模拟自动登录
Scrapy爬虫框架-通过Cookies模拟自动登录
164 0
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等