使用 PySpark 读取csv数据进行分析,将结果数据导入招聘数据

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用版 2核4GB 50GB
简介: 使用 PySpark 读取csv数据进行分析,将结果数据导入招聘数据

使用 PySpark 读取csv数据进行分析,将结果数据导入招聘数据

1. 设置 PySpark 环境:

设置 PySpark 环境,PySpark 需要一个 SparkSession 来与 Spark 功能进行交互。我们还需要配置 Spark 支持 Hive,并包含 MySQL 连接器 JAR 以实现数据库连接。

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .enableHiveSupport() \
    .config("spark.driver.extraClassPath", "lib/mysql-connector-java-8.0.30.jar") \
    .getOrCreate()
    

这里我们需要导入一个mysql jdbc的依赖jar。

2. 从 CSV 读取数据:

我们的招聘数据存储在一个 CSV 文件中。我们将定义一个函数将这些数据读入 Spark DataFrame 中。我们将指定一个模式以确保每列的数据类型正确。

from pyspark.sql.types import StringType, StructType

def read_data_from_csv(path):
    schema = StructType() \
        .add("recruitment_positions", StringType(), True) \
        .add("recruitment_city", StringType(), True) \
        .add("recruitment_salary", StringType(), True) \
        .add("recruitment_experience", StringType(), True) \
        .add("recruitment_skills", StringType(), True) \
        .add("recruitment_company", StringType(), True) \
        .add("recruitment_industry", StringType(), True) \
        .add("recruitment_scale", StringType(), True)

    df = spark.read \
        .option("header", True) \
        .schema(schema) \
        .csv(path)

    return df

# 示例用法:
df = read_data_from_csv("../data_prepare/data.csv")

3. 数据分析:

接下来,我们将对招聘数据进行一些基本的数据分析。我们将选择相关列并根据需要应用转换。

def data_analysis(df):
    df.createTempView("job")

    df = spark.sql("""
    SELECT 
        recruitment_positions,
        recruitment_salary,
        recruitment_skills AS recruitment_requirements,
        recruitment_experience,
        '本科' AS recruiting_educational_qualifications,
        recruitment_company,
        recruitment_scale AS company_stage,
        recruitment_industry,
        recruitment_skills,
        recruitment_city,
        recruitment_city AS recruitment_area,
        recruitment_city AS recruitment_address,
        recruitment_scale
    FROM job
    LIMIT 10
    """)

    df.show()
    return df

# 示例用法:
df = data_analysis(df)

4. 将数据写入 MySQL:

一旦我们分析了数据,可能希望将其存储在 MySQL 数据库中以进行进一步处理或报告。我们将定义一个函数将 DataFrame 写入 MySQL,导入数据之前需要创建mysql表。

CREATE TABLE `recruitment_data` (
  `recruitment_data_id` int NOT NULL AUTO_INCREMENT COMMENT '招聘数据ID',
  `recruitment_positions` varchar(500) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '招聘职位',
  `recruitment_salary` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '招聘薪资',
  `recruitment_requirements` varchar(500) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '招聘职位',
  `recruitment_experience` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '招聘经历',
  `recruiting_educational_qualifications` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '招聘学历',
  `recruitment_company` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '招聘公司',
  `company_stage` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '公司阶段',
  `recruitment_industry` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '招聘行业',
  `recruitment_skills` varchar(500) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '招聘技能',
  `recruitment_city` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '招聘城市',
  `recruitment_area` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '招聘区域',
  `recruitment_address` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '招聘地址',
  `recruitment_scale` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '招聘规模',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`recruitment_data_id`)
) ENGINE=InnoDB AUTO_INCREMENT=26698 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='招聘数据'

pyspark追加写入mysql

def write_data_to_mysql(df):
    df.write.format("jdbc") \
        .option("url", "jdbc:mysql://localhost:3306/project05928") \
        .option("driver", "com.mysql.jdbc.Driver") \
        .option("dbtable", "recruitment_data") \
        .option("user", "root") \
        .option("password", "12345678") \
        .mode("append") \
        .save()

# 示例用法:
write_data_to_mysql(df)

结论:

在本博客文章中,我们探讨了如何使用 PySpark 分析招聘数据。我们从设置 PySpark 环境开始,然后读取 CSV 文件中的数据,进行数据分析,最后将分析后的数据导入到 MySQL 数据库中。PySpark

的可伸缩性和性能使其成为处理大量招聘数据的理想选择。通过利用 PySpark 的功能,组织可以获得有价值的见解,优化他们的招聘流程并做出数据驱动的决策。

完整代码

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType


def read_data_from_csv(path):

    schema = StructType() \
          .add("recruitment_positions",StringType(),True) \
          .add("recruitment_city",StringType(),True) \
          .add("recruitment_salary",StringType(),True) \
          .add("recruitment_experience",StringType(),True) \
          .add("recruitment_skills",StringType(),True) \
          .add("recruitment_company",StringType(),True) \
          .add("recruitment_industry",StringType(),True) \
          .add("recruitment_scale",StringType(),True)

    df = spark.read\
          .option("header", True)\
          .schema(schema)\
          .csv(path)

    return df

def data_ana(df):

    df.createTempView("job")

    df = spark.sql("""
    select 
    recruitment_positions,
    recruitment_salary,
    recruitment_skills as recruitment_requirements,
    recruitment_experience,
    '本科' as recruiting_educational_qualifications,
    recruitment_company,
    recruitment_scale as company_stage,
    recruitment_industry,
    recruitment_skills,
    recruitment_city,
    recruitment_city as recruitment_area,
    recruitment_city as recruitment_address,
    recruitment_scale
    from job
    limit 10
    """)

    df.show()
    return df

def write_data2mysql(df):

    df.write.format("jdbc") \
        .option("url", "jdbc:mysql://localhost:3306/project05928") \
        .option("driver", "com.mysql.jdbc.Driver") \
        .option("dbtable", "recruitment_data") \
        .option("user", "root") \
        .option("password", "12345678") \
        .mode("append") \
        .save()


if __name__ == '__main__':
    spark = SparkSession.builder \
          .enableHiveSupport()\
          .config("spark.driver.extraClassPath","lib/mysql-connector-java-8.0.30.jar")\
          .getOrCreate()
    df = read_data_from_csv("../data_prepare/data.csv")
    df = data_ana(df)

    write_data2mysql(df)

    spark.stop()
相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
1月前
|
SQL 消息中间件 数据处理
DataX读取Hive Orc格式表丢失数据处理记录
DataX读取Hive Orc格式表丢失数据处理记录
159 0
|
1月前
|
SQL 分布式计算 大数据
【大数据技术Hadoop+Spark】Spark SQL、DataFrame、Dataset的讲解及操作演示(图文解释)
【大数据技术Hadoop+Spark】Spark SQL、DataFrame、Dataset的讲解及操作演示(图文解释)
109 0
|
SQL 分布式计算 数据挖掘
PySpark数据分析基础:PySpark Pandas创建、转换、查询、转置、排序操作详解
PySpark数据分析基础:PySpark Pandas创建、转换、查询、转置、排序操作详解
580 0
PySpark数据分析基础:PySpark Pandas创建、转换、查询、转置、排序操作详解
|
11天前
|
存储 数据采集 JSON
PySpark如何处理结构化数据?
【6月更文挑战第15天】PySpark如何处理结构化数据?
19 11
|
11天前
|
存储 机器学习/深度学习 缓存
如何使用PySpark进行离线数据分析?
【6月更文挑战第15天】如何使用PySpark进行离线数据分析?
24 10
|
11天前
|
机器学习/深度学习 分布式计算 算法
PySpark如何处理非结构化数据?
【6月更文挑战第15天】PySpark如何处理非结构化数据?
14 5
|
28天前
|
SQL 分布式计算 JavaScript
利用SparkSQL读写Excel数据
利用SparkSQL读写Excel数据
25 0
|
1月前
|
SQL 分布式计算 数据挖掘
Spark_Day07:Spark SQL(DataFrame是什么和数据分析(案例讲解))
Spark_Day07:Spark SQL(DataFrame是什么和数据分析(案例讲解))
116 0
|
1月前
|
SQL 分布式计算 HIVE
Spark读取变更Hudi数据集Schema实现分析
Spark读取变更Hudi数据集Schema实现分析
79 0
|
1月前
|
分布式计算 DataWorks MaxCompute
这个错误可能是由于读取CSV文件到ODPS的过程中出现了一些问题
这个错误可能是由于读取CSV文件到ODPS的过程中出现了一些问题【1月更文挑战第10天】【1月更文挑战第50篇】
42 3