使用 PySpark 读取csv数据进行分析,将结果数据导入招聘数据
1. 设置 PySpark 环境:
设置 PySpark 环境,PySpark 需要一个 SparkSession 来与 Spark 功能进行交互。我们还需要配置 Spark 支持 Hive,并包含 MySQL 连接器 JAR 以实现数据库连接。
from pyspark.sql import SparkSession spark = SparkSession.builder \ .enableHiveSupport() \ .config("spark.driver.extraClassPath", "lib/mysql-connector-java-8.0.30.jar") \ .getOrCreate()
这里我们需要导入一个mysql jdbc的依赖jar。
2. 从 CSV 读取数据:
我们的招聘数据存储在一个 CSV 文件中。我们将定义一个函数将这些数据读入 Spark DataFrame 中。我们将指定一个模式以确保每列的数据类型正确。
from pyspark.sql.types import StringType, StructType def read_data_from_csv(path): schema = StructType() \ .add("recruitment_positions", StringType(), True) \ .add("recruitment_city", StringType(), True) \ .add("recruitment_salary", StringType(), True) \ .add("recruitment_experience", StringType(), True) \ .add("recruitment_skills", StringType(), True) \ .add("recruitment_company", StringType(), True) \ .add("recruitment_industry", StringType(), True) \ .add("recruitment_scale", StringType(), True) df = spark.read \ .option("header", True) \ .schema(schema) \ .csv(path) return df # 示例用法: df = read_data_from_csv("../data_prepare/data.csv")
3. 数据分析:
接下来,我们将对招聘数据进行一些基本的数据分析。我们将选择相关列并根据需要应用转换。
def data_analysis(df): df.createTempView("job") df = spark.sql(""" SELECT recruitment_positions, recruitment_salary, recruitment_skills AS recruitment_requirements, recruitment_experience, '本科' AS recruiting_educational_qualifications, recruitment_company, recruitment_scale AS company_stage, recruitment_industry, recruitment_skills, recruitment_city, recruitment_city AS recruitment_area, recruitment_city AS recruitment_address, recruitment_scale FROM job LIMIT 10 """) df.show() return df # 示例用法: df = data_analysis(df)
4. 将数据写入 MySQL:
一旦我们分析了数据,可能希望将其存储在 MySQL 数据库中以进行进一步处理或报告。我们将定义一个函数将 DataFrame 写入 MySQL,导入数据之前需要创建mysql表。
CREATE TABLE `recruitment_data` ( `recruitment_data_id` int NOT NULL AUTO_INCREMENT COMMENT '招聘数据ID', `recruitment_positions` varchar(500) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '招聘职位', `recruitment_salary` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '招聘薪资', `recruitment_requirements` varchar(500) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '招聘职位', `recruitment_experience` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '招聘经历', `recruiting_educational_qualifications` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '招聘学历', `recruitment_company` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '招聘公司', `company_stage` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '公司阶段', `recruitment_industry` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '招聘行业', `recruitment_skills` varchar(500) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '招聘技能', `recruitment_city` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '招聘城市', `recruitment_area` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '招聘区域', `recruitment_address` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '招聘地址', `recruitment_scale` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '招聘规模', `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', PRIMARY KEY (`recruitment_data_id`) ) ENGINE=InnoDB AUTO_INCREMENT=26698 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='招聘数据'
pyspark追加写入mysql
def write_data_to_mysql(df): df.write.format("jdbc") \ .option("url", "jdbc:mysql://localhost:3306/project05928") \ .option("driver", "com.mysql.jdbc.Driver") \ .option("dbtable", "recruitment_data") \ .option("user", "root") \ .option("password", "12345678") \ .mode("append") \ .save() # 示例用法: write_data_to_mysql(df)
结论:
在本博客文章中,我们探讨了如何使用 PySpark 分析招聘数据。我们从设置 PySpark 环境开始,然后读取 CSV 文件中的数据,进行数据分析,最后将分析后的数据导入到 MySQL 数据库中。PySpark
的可伸缩性和性能使其成为处理大量招聘数据的理想选择。通过利用 PySpark 的功能,组织可以获得有价值的见解,优化他们的招聘流程并做出数据驱动的决策。
完整代码
from pyspark.sql import SparkSession from pyspark.sql.types import StringType, StructType def read_data_from_csv(path): schema = StructType() \ .add("recruitment_positions",StringType(),True) \ .add("recruitment_city",StringType(),True) \ .add("recruitment_salary",StringType(),True) \ .add("recruitment_experience",StringType(),True) \ .add("recruitment_skills",StringType(),True) \ .add("recruitment_company",StringType(),True) \ .add("recruitment_industry",StringType(),True) \ .add("recruitment_scale",StringType(),True) df = spark.read\ .option("header", True)\ .schema(schema)\ .csv(path) return df def data_ana(df): df.createTempView("job") df = spark.sql(""" select recruitment_positions, recruitment_salary, recruitment_skills as recruitment_requirements, recruitment_experience, '本科' as recruiting_educational_qualifications, recruitment_company, recruitment_scale as company_stage, recruitment_industry, recruitment_skills, recruitment_city, recruitment_city as recruitment_area, recruitment_city as recruitment_address, recruitment_scale from job limit 10 """) df.show() return df def write_data2mysql(df): df.write.format("jdbc") \ .option("url", "jdbc:mysql://localhost:3306/project05928") \ .option("driver", "com.mysql.jdbc.Driver") \ .option("dbtable", "recruitment_data") \ .option("user", "root") \ .option("password", "12345678") \ .mode("append") \ .save() if __name__ == '__main__': spark = SparkSession.builder \ .enableHiveSupport()\ .config("spark.driver.extraClassPath","lib/mysql-connector-java-8.0.30.jar")\ .getOrCreate() df = read_data_from_csv("../data_prepare/data.csv") df = data_ana(df) write_data2mysql(df) spark.stop()