免费编程软件「python+pycharm」
链接:https://pan.quark.cn/s/48a86be2fdc0
一、为什么选择PySpark处理爬虫数据?
当你的爬虫每天抓取千万级网页时,传统单机工具(如Pandas、Excel)很快会遇到内存不足和性能瓶颈。PySpark作为Apache Spark的Python接口,通过分布式计算框架能轻松处理TB级数据,且具有以下优势:
弹性扩展:从单台笔记本到千节点集群无缝切换
内存优化:通过RDD/DataFrame分片存储,避免OOM错误
统一处理:同时处理结构化(数据库)和非结构化(HTML/JSON)数据
生态完善:与HDFS、S3等存储系统天然集成,支持SQL、MLlib等扩展
我们以某电商网站1亿条商品数据为例,展示从数据清洗到分析的全流程。
二、环境准备与数据加载
- 集群配置建议
开发环境:本地模式(4核16G内存可处理百万级数据)
生产环境:
3台m5.xlarge EC2实例(16G内存)
每个Worker分配6G执行内存
使用EMR或Dataproc快速部署
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("CrawlerDataProcessing") \
.config("spark.executor.memory", "6g") \
.config("spark.driver.memory", "4g") \
.getOrCreate()
- 数据源接入
支持多种格式直接读取:
从JSON文件加载(支持gzip压缩)
df = spark.read.json("s3a://crawler-data/2023-10/*.json.gz")
从MySQL直接读取(需JDBC驱动)
jdbc_url = "jdbc:mysql://db-host:3306/crawler_db"
df_mysql = spark.read.format("jdbc").options(
url=jdbc_url,
dbtable="products",
user="user",
password="pass"
).load()
从Parquet加载(推荐存储格式)
df_parquet = spark.read.parquet("hdfs://namenode:8020/data/products.parquet")
三、核心处理流程
- 数据清洗四步法
(1)空值处理
删除全为空的列
df = df.dropna(how="all")
填充特定列的空值
from pyspark.sql.functions import col, when
df = df.withColumn("price",
when(col("price").isNull(), 0).otherwise(col("price")))
(2)异常值过滤
价格异常值(假设合理范围是0-10000)
df = df.filter((col("price") >= 0) & (col("price") <= 10000))
销量异常值(使用分位数过滤)
stats = df.approxQuantile("sales", [0.01, 0.99], 0.1)
min_sales, max_sales = stats[0], stats[1]
df = df.filter((col("sales") >= min_sales) & (col("sales") <= max_sales))
(3)重复数据合并
按URL去重,保留最新记录
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
window_spec = Window.partitionBy("url").orderBy(col("crawl_time").desc())
df = df.withColumn("row_num", row_number().over(window_spec)) \
.filter(col("row_num") == 1) \
.drop("row_num")
(4)数据标准化
from pyspark.sql.functions import lower, trim, regexp_replace
统一品牌名称
brand_map = {"apple": "Apple", "samsung": "Samsung"}
df = df.replace(brand_map, subset=["brand"])
清理HTML标签
from pyspark.sql.functions import udf
from bs4 import BeautifulSoup
def clean_html(html):
return BeautifulSoup(html, "html.parser").get_text() if html else None
clean_html_udf = udf(clean_html)
df = df.withColumn("clean_desc", clean_html_udf(col("description")))
- 高效转换技巧
(1)JSON字段解析
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, DoubleType
定义schema(比推断更高效)
schema = StructType().add("sku", StringType()).add("stock", DoubleType())
df = df.withColumn("inventory",
from_json(col("inventory_json"), schema)) \
.select("", "inventory.")
(2)日期处理优化
from pyspark.sql.functions import to_timestamp, date_format
转换ISO格式时间戳
df = df.withColumn("crawl_date",
to_timestamp(col("crawl_time"), "yyyy-MM-dd HH:mm:ss")) \
.withColumn("month", date_format(col("crawl_date"), "yyyyMM"))
(3)字符串批量处理
from pyspark.sql.functions import concat_ws, split
拆分多值字段
df = df.withColumn("tags_array", split(col("tags"), ",")) \
.withColumn("first_tag", col("tags_array")[0])
合并字段
df = df.withColumn("full_name",
concat_ws(" ", col("brand"), col("model")))
- 聚合分析实战
(1)基础统计
商品价格分布
price_stats = df.groupBy("category").agg(
{"price": ["min", "max", "avg", "stddev"]}
)
销量TOP100
top_products = df.orderBy(col("sales").desc()).limit(100)
(2)时间序列分析
from pyspark.sql.functions import window, count
按小时统计抓取量
hourly_count = df.groupBy(
window(col("crawl_time"), "1 hour")
).agg(count("*").alias("count"))
转换为Pandas DataFrame便于可视化
hourly_pd = hourly_count.toPandas()
(3)关联分析
商品共现分析(出现在同一页面的商品对)
from pyspark.sql.functions import collect_list, explode
先按页面分组收集商品ID
page_products = df.groupBy("page_url").agg(
collect_list("product_id").alias("products")
)
生成所有商品对
from itertools import combinations
def generate_pairs(product_list):
return [",".join(pair) for pair in combinations(sorted(product_list), 2)]
generate_pairs_udf = udf(generate_pairs)
pairs = page_products.withColumn("product_pairs",
generate_pairs_udf(col("products"))) \
.selectExpr("explode(product_pairs) as pair")
统计共现次数
co_occurrence = pairs.groupBy("pair").count().orderBy(col("count").desc())
四、性能优化秘籍
内存管理
调整分区数:df.repartition(200)(通常设为核心数的2-3倍)
缓存策略:多次使用的DataFrame建议缓存
df.cache() # 内存不足时自动转为磁盘存储
或指定存储级别
from pyspark.storagelevel import StorageLevel
df.persist(StorageLevel.MEMORY_AND_DISK_SER)执行计划优化
查看执行计划(调试用)
df.explain(True)
强制广播小表(Join时)
from pyspark.sql.functions import broadcast
small_df = spark.createDataFrame([...])
result = df.join(broadcast(small_df), "category_id")
- 资源监控
Spark UI:通过http://:4040查看任务详情
关键指标:
Stage完成时间
GC时间占比
Shuffle读写量
五、结果输出与部署 - 输出格式选择
输出到MySQL
df.write.format("jdbc").options(
url="jdbc:mysql://db-host:3306/result_db",
dbtable="processed_products",
user="user",
password="pass",批量写入优化
batchsize="10000"
).mode("overwrite").save()
输出到Parquet(推荐)
df.write.parquet("s3a://output-bucket/processed_data/")
输出到CSV(注意分区)
df.repartition(1).write.option("header", "true").csv("hdfs://.../output.csv")
- 定时任务集成
使用Airflow调度
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
with DAG("daily_crawler_processing", schedule_interval="@daily") as dag:
process_task = SparkSubmitOperator(
task_id="process_data",
application="/path/to/process.py",
conn_id="spark_default",
application_args=["--date", "{
{ds}}"]
)
六、常见问题Q&A
Q1:被网站封IP怎么办?
A:立即启用备用代理池,建议使用住宅代理(如站大爷IP代理),配合每请求更换IP策略。更高级方案:
实施IP轮换策略(每5-10个请求切换)
结合User-Agent池模拟不同浏览器
对敏感网站降低抓取频率(如从1秒/请求改为10秒/请求)
Q2:处理过程中出现OOM错误如何解决?
A:按以下步骤排查:
检查Spark UI中的Executor内存使用情况
适当减少spark.executor.memory( paradoxically,过大内存会导致GC停顿)
增加分区数(df.repartition(500))
检查是否有数据倾斜(某些key数据量过大)
对大表使用sample()先测试小数据集
Q3:如何处理数据倾斜问题?
A:常见解决方案:
方法1:对倾斜键加盐后聚合
from pyspark.sql.functions import rand
salted_df = df.withColumn("salted_key",
concat(col("skewedkey"), lit(""), (rand() * 10).cast("int")))
先按盐值聚合,再按原键聚合
result = salted_df.groupBy("salted_key").agg(...) \
.groupBy(regexp_extract(col("saltedkey"), "(.*)\d+", 1).alias("original_key")) \
.agg(...)
方法2:单独处理倾斜键
skewed_keys = [...] # 已知倾斜的key列表
normal_df = df.filter(~col("key").isin(skewed_keys))
skewed_df = df.filter(col("key").isin(skewed_keys))
分别处理后合并
Q4:PySpark和Pandas如何选择?
A:决策矩阵:
场景 PySpark推荐 Pandas推荐
数据量 > 1GB ✓ ✗
需要分布式处理 ✓ ✗
复杂ETL流程 ✓ ✗
快速原型开发 ✗ ✓
数据可视化 ✗ ✓
机器学习(小数据集) ✗ ✓
Q5:如何监控PySpark作业性能?
A:关键监控点:
Stage Duration:单个Stage耗时(长说明有瓶颈)
GC Time:垃圾回收时间占比(>10%需优化)
Shuffle Read/Write:网络传输量(过大考虑广播)
Input Size/Records:实际处理数据量
Scheduler Delay:任务排队时间(集群资源不足)
建议配置:
spark.conf.set("spark.extraListeners", "com.example.CustomMetricListener")
或通过log4j输出详细日志
spark.sparkContext.setLogLevel("INFO")
通过以上方法,你可以高效处理亿级爬虫数据,同时保持代码的可维护性和系统的稳定性。实际项目中,建议先在小数据集(1%样本)上验证逻辑,再逐步扩展到全量数据。