PySpark实战:亿级爬虫数据的高效处理指南

简介: PySpark助力高效处理亿级爬虫数据,支持分布式清洗、转换与分析。具备弹性扩展、内存优化、多格式兼容等优势,结合Spark生态实现TB级数据全流程处理,提升大规模数据处理效率与系统稳定性。

免费编程软件「python+pycharm」
链接:https://pan.quark.cn/s/48a86be2fdc0

一、为什么选择PySpark处理爬虫数据?
当你的爬虫每天抓取千万级网页时,传统单机工具(如Pandas、Excel)很快会遇到内存不足和性能瓶颈。PySpark作为Apache Spark的Python接口,通过分布式计算框架能轻松处理TB级数据,且具有以下优势:
探秘代理IP并发连接数限制的那点事 - 2025-11-11T144156.324.png

弹性扩展:从单台笔记本到千节点集群无缝切换
内存优化:通过RDD/DataFrame分片存储,避免OOM错误
统一处理:同时处理结构化(数据库)和非结构化(HTML/JSON)数据
生态完善:与HDFS、S3等存储系统天然集成,支持SQL、MLlib等扩展
我们以某电商网站1亿条商品数据为例,展示从数据清洗到分析的全流程。

二、环境准备与数据加载

  1. 集群配置建议
    开发环境:本地模式(4核16G内存可处理百万级数据)
    生产环境:
    3台m5.xlarge EC2实例(16G内存)
    每个Worker分配6G执行内存
    使用EMR或Dataproc快速部署
    from pyspark.sql import SparkSession

spark = SparkSession.builder \
.appName("CrawlerDataProcessing") \
.config("spark.executor.memory", "6g") \
.config("spark.driver.memory", "4g") \
.getOrCreate()

  1. 数据源接入
    支持多种格式直接读取:

从JSON文件加载(支持gzip压缩)

df = spark.read.json("s3a://crawler-data/2023-10/*.json.gz")

从MySQL直接读取(需JDBC驱动)

jdbc_url = "jdbc:mysql://db-host:3306/crawler_db"
df_mysql = spark.read.format("jdbc").options(
url=jdbc_url,
dbtable="products",
user="user",
password="pass"
).load()

从Parquet加载(推荐存储格式)

df_parquet = spark.read.parquet("hdfs://namenode:8020/data/products.parquet")

三、核心处理流程

  1. 数据清洗四步法
    (1)空值处理

删除全为空的列

df = df.dropna(how="all")

填充特定列的空值

from pyspark.sql.functions import col, when
df = df.withColumn("price",
when(col("price").isNull(), 0).otherwise(col("price")))

(2)异常值过滤

价格异常值(假设合理范围是0-10000)

df = df.filter((col("price") >= 0) & (col("price") <= 10000))

销量异常值(使用分位数过滤)

stats = df.approxQuantile("sales", [0.01, 0.99], 0.1)
min_sales, max_sales = stats[0], stats[1]
df = df.filter((col("sales") >= min_sales) & (col("sales") <= max_sales))

(3)重复数据合并

按URL去重,保留最新记录

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.partitionBy("url").orderBy(col("crawl_time").desc())
df = df.withColumn("row_num", row_number().over(window_spec)) \
.filter(col("row_num") == 1) \
.drop("row_num")

(4)数据标准化

from pyspark.sql.functions import lower, trim, regexp_replace

统一品牌名称

brand_map = {"apple": "Apple", "samsung": "Samsung"}
df = df.replace(brand_map, subset=["brand"])

清理HTML标签

from pyspark.sql.functions import udf
from bs4 import BeautifulSoup

def clean_html(html):
return BeautifulSoup(html, "html.parser").get_text() if html else None

clean_html_udf = udf(clean_html)
df = df.withColumn("clean_desc", clean_html_udf(col("description")))

  1. 高效转换技巧
    (1)JSON字段解析

from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, DoubleType

定义schema(比推断更高效)

schema = StructType().add("sku", StringType()).add("stock", DoubleType())

df = df.withColumn("inventory",
from_json(col("inventory_json"), schema)) \
.select("", "inventory.")

(2)日期处理优化

from pyspark.sql.functions import to_timestamp, date_format

转换ISO格式时间戳

df = df.withColumn("crawl_date",
to_timestamp(col("crawl_time"), "yyyy-MM-dd HH:mm:ss")) \
.withColumn("month", date_format(col("crawl_date"), "yyyyMM"))

(3)字符串批量处理

from pyspark.sql.functions import concat_ws, split

拆分多值字段

df = df.withColumn("tags_array", split(col("tags"), ",")) \
.withColumn("first_tag", col("tags_array")[0])

合并字段

df = df.withColumn("full_name",
concat_ws(" ", col("brand"), col("model")))

  1. 聚合分析实战
    (1)基础统计

商品价格分布

price_stats = df.groupBy("category").agg(
{"price": ["min", "max", "avg", "stddev"]}
)

销量TOP100

top_products = df.orderBy(col("sales").desc()).limit(100)

(2)时间序列分析

from pyspark.sql.functions import window, count

按小时统计抓取量

hourly_count = df.groupBy(
window(col("crawl_time"), "1 hour")
).agg(count("*").alias("count"))

转换为Pandas DataFrame便于可视化

hourly_pd = hourly_count.toPandas()

(3)关联分析

商品共现分析(出现在同一页面的商品对)

from pyspark.sql.functions import collect_list, explode

先按页面分组收集商品ID

page_products = df.groupBy("page_url").agg(
collect_list("product_id").alias("products")
)

生成所有商品对

from itertools import combinations
def generate_pairs(product_list):
return [",".join(pair) for pair in combinations(sorted(product_list), 2)]

generate_pairs_udf = udf(generate_pairs)

pairs = page_products.withColumn("product_pairs",
generate_pairs_udf(col("products"))) \
.selectExpr("explode(product_pairs) as pair")

统计共现次数

co_occurrence = pairs.groupBy("pair").count().orderBy(col("count").desc())

四、性能优化秘籍

  1. 内存管理
    调整分区数:df.repartition(200)(通常设为核心数的2-3倍)
    缓存策略:

    多次使用的DataFrame建议缓存

    df.cache() # 内存不足时自动转为磁盘存储

    或指定存储级别

    from pyspark.storagelevel import StorageLevel
    df.persist(StorageLevel.MEMORY_AND_DISK_SER)

  2. 执行计划优化

    查看执行计划(调试用)

    df.explain(True)

强制广播小表(Join时)

from pyspark.sql.functions import broadcast
small_df = spark.createDataFrame([...])
result = df.join(broadcast(small_df), "category_id")

  1. 资源监控
    Spark UI:通过http://:4040查看任务详情
    关键指标:
    Stage完成时间
    GC时间占比
    Shuffle读写量
    五、结果输出与部署
  2. 输出格式选择

    输出到MySQL

    df.write.format("jdbc").options(
    url="jdbc:mysql://db-host:3306/result_db",
    dbtable="processed_products",
    user="user",
    password="pass",

    批量写入优化

    batchsize="10000"
    ).mode("overwrite").save()

输出到Parquet(推荐)

df.write.parquet("s3a://output-bucket/processed_data/")

输出到CSV(注意分区)

df.repartition(1).write.option("header", "true").csv("hdfs://.../output.csv")

  1. 定时任务集成

    使用Airflow调度

    from airflow import DAG
    from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

with DAG("daily_crawler_processing", schedule_interval="@daily") as dag:
process_task = SparkSubmitOperator(
task_id="process_data",
application="/path/to/process.py",
conn_id="spark_default",
application_args=["--date", "{ {ds}}"]
)

六、常见问题Q&A
Q1:被网站封IP怎么办?
A:立即启用备用代理池,建议使用住宅代理(如站大爷IP代理),配合每请求更换IP策略。更高级方案:

实施IP轮换策略(每5-10个请求切换)
结合User-Agent池模拟不同浏览器
对敏感网站降低抓取频率(如从1秒/请求改为10秒/请求)
Q2:处理过程中出现OOM错误如何解决?
A:按以下步骤排查:

检查Spark UI中的Executor内存使用情况
适当减少spark.executor.memory( paradoxically,过大内存会导致GC停顿)
增加分区数(df.repartition(500))
检查是否有数据倾斜(某些key数据量过大)
对大表使用sample()先测试小数据集
Q3:如何处理数据倾斜问题?
A:常见解决方案:

方法1:对倾斜键加盐后聚合

from pyspark.sql.functions import rand

salted_df = df.withColumn("salted_key",
concat(col("skewedkey"), lit(""), (rand() * 10).cast("int")))

先按盐值聚合,再按原键聚合

result = salted_df.groupBy("salted_key").agg(...) \
.groupBy(regexp_extract(col("saltedkey"), "(.*)\d+", 1).alias("original_key")) \
.agg(...)

方法2:单独处理倾斜键

skewed_keys = [...] # 已知倾斜的key列表
normal_df = df.filter(~col("key").isin(skewed_keys))
skewed_df = df.filter(col("key").isin(skewed_keys))

分别处理后合并

Q4:PySpark和Pandas如何选择?
A:决策矩阵:

场景 PySpark推荐 Pandas推荐
数据量 > 1GB ✓ ✗
需要分布式处理 ✓ ✗
复杂ETL流程 ✓ ✗
快速原型开发 ✗ ✓
数据可视化 ✗ ✓
机器学习(小数据集) ✗ ✓
Q5:如何监控PySpark作业性能?
A:关键监控点:

Stage Duration:单个Stage耗时(长说明有瓶颈)
GC Time:垃圾回收时间占比(>10%需优化)
Shuffle Read/Write:网络传输量(过大考虑广播)
Input Size/Records:实际处理数据量
Scheduler Delay:任务排队时间(集群资源不足)
建议配置:

spark.conf.set("spark.extraListeners", "com.example.CustomMetricListener")

或通过log4j输出详细日志

spark.sparkContext.setLogLevel("INFO")

通过以上方法,你可以高效处理亿级爬虫数据,同时保持代码的可维护性和系统的稳定性。实际项目中,建议先在小数据集(1%样本)上验证逻辑,再逐步扩展到全量数据。

目录
相关文章
|
2月前
|
数据采集 存储 NoSQL
Python爬虫实战:新闻数据抓取与MongoDB存储全流程
本文以腾讯新闻为例,详解Python爬虫抓取新闻数据并存入MongoDB的完整流程,涵盖反爬突破、数据清洗、存储优化及分布式架构,助你构建高效、稳定的海量数据采集系统。
303 0
|
27天前
|
前端开发 Java Python
Python高效实现Word转HTML:从基础到进阶的全流程方案
本文介绍如何利用Python实现Word文档(.docx)高效转换为HTML,解决企业数字化转型中文档格式迁移的痛点。通过对比python-docx、pandoc和Mammoth等工具,结合样式保留、图片处理、表格优化与批量转换方案,提供低成本、高灵活性的自动化流程。适用于产品手册、技术文档、课件等场景,提升转换效率达40倍,成本降低90%。
501 0
|
2月前
|
存储 算法 定位技术
Python计算经纬度坐标点距离:从原理到实战
本文详解Python实现地球两点间精确距离计算,涵盖Haversine与Vincenty公式、向量化优化及地理围栏等实战应用,助你掌握高精度球面距离算法。
238 0
|
2月前
|
JSON 缓存 API
Python采集淘宝商品详情数据,API接口系列json数据返回
根据开放平台文档和示例,以下是使用Python调用淘宝商品详情API获取JSON数据的完整实现方案:
|
23天前
|
SQL 人工智能 自然语言处理
Spring Boot + GPT:我做了一个能自己写 SQL 的后端系统
本文介绍如何基于Spring Boot与GPT(或国产大模型如通义千问、DeepSeek)构建智能后端系统,实现自然语言自动生成SQL。系统采用分层架构,集成AI语义理解、SQL安全验证与执行功能,提升开发效率并降低数据查询门槛,兼具安全性与可扩展性。
147 7
|
27天前
|
数据采集 监控 NoSQL
Airflow调度爬虫任务:从零搭建高效定时采集系统
Airflow以DAG实现爬虫任务依赖管理,支持分钟级调度与Web监控,解决crontab无依赖控制、Jenkins不灵活等问题。结合PythonOperator、动态参数传递与分布式架构,可构建高可用、易扩展的自动化采集系统,适用于电商价格监控等场景。
96 0
|
1月前
|
存储 缓存 监控
基于淘宝商品详情 API 的竞品监控系统搭建:价格 / 库存 / 促销实时追踪
淘宝商品详情 API 的竞品监控系统搭建:价格 / 库存 / 促销实时追踪
|
15天前
|
数据采集 存储 前端开发
医疗爬虫实战:手把手教你抓取丁香园药品信息库
本文以丁香园药品库为例,用Python实战讲解医疗数据爬取技术。涵盖Requests、Lxml、Pandas等工具应用,解析反爬策略、代理轮换、数据清洗与存储方案,助你高效获取结构化药品信息,兼顾合规与实用性。(238字)
62 0
|
2月前
|
SQL XML Java
Mybatis基础使用知识(注解)
mybatis 通过 xml 或注解的方式将要执行的各种 statement 配置起来,并通过java对象和statement中sql的动态参数进行映射生成最终执行的sql语句。 最后 mybatis 框架执行sql 并将结果映射为java对象并返回。采用ORM(对象关系映射)思想解决了实体和数据库映射问题,对jdbc进行了封装,屏蔽了jdbc api 底层访问细节,使我们不用与jdbc api 打交道,就可以完成对数据库的持久化操作。
361 0
|
2月前
|
数据采集 存储 缓存
爬虫数据去重:BloomFilter算法实现指南
布隆过滤器(BloomFilter)是爬虫去重中高效的空间节省方案,适用于亿级URL去重。相比HashSet,内存占用降低80%以上,支持O(1)插入与查询,虽有少量误判但无漏判。本文详解其原理、参数调优、分布式实现及爬虫集成,助你应对大规模数据挑战。(238字)
114 0

热门文章

最新文章