使用PySpark构建和评估逻辑回归模型预测质量是否合格
随着数据量的不断增长,传统的数据处理工具已经难以满足需求。PySpark作为大数据处理框架Apache Spark的Python API,为大规模数据处理和机器学习提供了强有力的支持。本文将详细介绍如何使用PySpark进行机器学习模型的构建和评估。
1. 环境配置与数据导入
首先,我们需要进行必要的环境配置,并导入所需的库。
from pyspark.sql import SparkSession # 创建 SparkSession spark = SparkSession\ .builder\ .appName("model_train")\ .master("local[*]")\ .getOrCreate() # 读取 CSV 文件 file_path = "data.csv" df = spark.read.csv(file_path, header=True, inferSchema=True, encoding='gbk') columns = ["TreeSpecies", "SampleID", "SamplingDate", "SamplingLocation", "SampledUnit", "SampledUnitAddress", "IsQualified", "InspectionType"] df = df.toDF(*columns) # 查看数据 df.show() df.printSchema()
在上述代码中,我们创建了一个SparkSession并从CSV文件中加载数据。
2. 数据可视化
在进行机器学习模型训练前,数据预处理是一个关键步骤。我们需要对数据可视化分析,来明确接下来的操作。
import matplotlib.pyplot as plt import seaborn as sns source_df=df.toPandas() # 设置 Seaborn 风格 sns.set(style="whitegrid") plt.rcParams['font.family'] = 'Arial Unicode MS' # 创建一个 2x2 的图表布局 fig, axs = plt.subplots(2, 2, figsize=(14, 10)) sns.histplot(source_df['TreeSpecies'], kde=True, ax=axs[0, 0], color='blue') axs[0, 0].set_title('Tree Species Distribution') sns.histplot(source_df['InspectionType'], kde=True, ax=axs[0, 1], color='green') axs[0, 1].set_title('Inspection Type Distribution') sns.histplot(source_df['SampleID'], kde=True, ax=axs[1, 0], color='red') axs[1, 0].set_title('SampleID Distribution') sns.histplot(source_df['IsQualified'], kde=True, ax=axs[1, 1], color='purple') axs[1, 1].set_title('Is Qualified Distribution') plt.tight_layout() # 显示图表 plt.show()
3. 数据预处理
在进行机器学习模型训练前,数据预处理是一个关键步骤。我们需要对数据进行清洗和特征工程。
from pyspark.sql.functions import col, when from pyspark.ml.feature import StringIndexer, OneHotEncoder, StandardScaler, VectorAssembler from pyspark.ml import Pipeline df = df.fillna({"IsQualified": "不合格"}) # 将目标列(IsQualified)转换为数值类型 indexer = StringIndexer(inputCol="IsQualified", outputCol="label") # 对类别特征进行one-hot编码 string_columns = ["TreeSpecies", "SamplingLocation", "SampledUnit"] indexers = [StringIndexer(inputCol=col, outputCol=col+"_index") for col in string_columns] encoders = [OneHotEncoder(inputCol=col+"_index", outputCol=col+"_vec") for col in string_columns] # 特征向量化 assembler = VectorAssembler(inputCols=[col+"_vec" for col in string_columns], outputCol="features") # 标准化 scaler = StandardScaler(inputCol="features", outputCol="scaled_features") # 构建处理管道 pipeline = Pipeline(stages=indexers + encoders + [assembler, scaler, indexer]) model = pipeline.fit(df) processed_df = model.transform(df) # 查看处理后的数据 processed_df.select("features", "scaled_features", "label").show()
3. 模型训练与评估
接下来,我们将数据集划分为训练集和测试集,并使用逻辑回归模型进行训练和评估。
from pyspark.ml.classification import LogisticRegression from pyspark.ml.evaluation import BinaryClassificationEvaluator from pyspark.ml.tuning import ParamGridBuilder, CrossValidator # 划分训练集和测试集 train_df, test_df = processed_df.randomSplit([0.8, 0.2], seed=12345) # 逻辑回归模型 lr = LogisticRegression(featuresCol="scaled_features", labelCol="label") # 超参数调优 param_grid = ParamGridBuilder() \ .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \ .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \ .build() # 交叉验证 crossval = CrossValidator(estimator=lr, estimatorParamMaps=param_grid, evaluator=BinaryClassificationEvaluator(), numFolds=5) # 训练模型 cv_model = crossval.fit(train_df) # 评估模型 predictions = cv_model.transform(test_df) evaluator = BinaryClassificationEvaluator() accuracy = evaluator.evaluate(predictions) print(f"Test Accuracy: {accuracy:.4f}")
结论
本文介绍了如何使用PySpark进行数据预处理、模型训练和评估。PySpark作为一个强大的大数据处理工具,能够有效地处理大规模数据,并应用机器学习算法进行建模和分析。希望通过这篇文章,能够帮助你更好地理解和使用PySpark进行机器学习项目。