Spark案例读取不同格式文件以及读取输入数据存入Mysql

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用系列 2核4GB
简介: Spark案例读取不同格式文件以及读取输入数据存入Mysql

文章目录

1.spark读取文件

1.txt文件

代码

def main(args: Array[String]): Unit = {

   val spark = SparkSession.builder().appName("ReadTextFile")

     .master("local")

     .config("spark.sql.shuffle.partitions", 1)

     //设置内存不然启动提示堆内存不足

     .config("spark.testing.memory", "512000000")

     .getOrCreate()

   //设置日志级别

   spark.sparkContext.setLogLevel("Error")

   //读取文件

   import  spark.implicits._

   //3.监控目录数据并将Dataset转换成DataFrame

   val ds:Dataset[String] = spark.readStream.textFile("./data/textfile/")

   val dataFrame = ds.map(line => {

     val arr = line.split(",")

     (arr(0).toInt, arr(1), arr(2))

   }).toDF("id","name","address")

   //4.打印结果

   val query: StreamingQuery = dataFrame.writeStream

     .format("console")

     .start()

   query.awaitTermination()

测试

da3183265c21465dbf3cf462eec8db71.png

2.csv文件

代码

csv需要创建schema以及指定分割符

def main(args: Array[String]): Unit = {

   val spark = SparkSession.builder().appName("ReadTextFile")

     .master("local")

     .config("spark.sql.shuffle.partitions", 1)

     //设置内存不然启动提示堆内存不足

     .config("spark.testing.memory", "512000000")

     .getOrCreate()

   //设置日志级别

   spark.sparkContext.setLogLevel("Error")

   //读取文件

   import spark.implicits._

   //创建csv schema

   val personSchema = new StructType().add("id", "integer")

     .add("name", "string")

     .add("address", "string")

   //监控目录

   val df= spark.readStream.option("sep", ",")

     .schema(personSchema).csv("./data/csvfile/")

   //输出结果

   val query: StreamingQuery = df.writeStream

     .format("console")

     .start()

   query.awaitTermination()

测试

0c46ec25e2b34994a8b1492942144b5f.png

3.读取json文件

源码

/**

* 读取Json文件

*/

object ReadJsonFile {

 def main(args: Array[String]): Unit = {

   val spark = SparkSession.builder().appName("ReadJsonFile")

     .master("local")

     .config("spark.sql.shuffle.partitions", 1)

     //设置内存不然启动提示堆内存不足

     .config("spark.testing.memory", "512000000")

     .getOrCreate()

   //设置日志级别

   spark.sparkContext.setLogLevel("Error")

   //读取文件

   //创建csv schema

   val personSchema = new StructType().add("id", "integer")

     .add("name", "string")

     .add("address", "string")

   //监控目录

   val df= spark.readStream.option("sep", ",")

     .schema(personSchema).json("./data/jsonfile/")

   //输出结果

   val query: StreamingQuery = df.writeStream

     .format("console")

     .start()

   query.awaitTermination()

 }

测试

885450763c254611817d00a3e0b4847f.png

2.读取输入的数据存入mysql

实现从控制台监控读取文件写入到mysql数据库。

mysql创建一张表

create table psn(id int,name varchar(20),address varchar(50));

代码

object OutPutToMysql {

 def main(args: Array[String]): Unit = {

   val spark = SparkSession.builder().appName("outputToMysql")

     .master("local")

     .config("spark.sql.shuffle.partitions", 1)

     //设置内存不然启动提示堆内存不足

     .config("spark.testing.memory", "512000000")

     .getOrCreate()

   //创建控制台输入

   val df = spark.readStream.format("socket")

     .option("host","node01")

     .option("port",8888)

     .load()

   import spark.implicits._

   val result = df.as[String].map(line => {

     val arr = line.split(",")

     (arr(0).toInt, arr(1), arr(2))

   }).toDF("id", "name", "address")

   //结果输出到mysql

   result.writeStream.foreachBatch((batchDf:DataFrame,batchId:Long)=>{

     batchDf.write.mode(SaveMode.Overwrite).format("jdbc")

       .option("url","jdbc:mysql://node01:3306/test")

       .option("user","root")

       .option("password","123456")

       .option("dbtable","psn")

       .save()

   }).start().awaitTermination();

   }

测试

0e2e5e5ef20d43479b6b1af5741d8ade.png


相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
1月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
49 3
|
1月前
|
SQL 关系型数据库 MySQL
案例剖析:MySQL唯一索引并发插入导致死锁!
案例剖析:MySQL唯一索引并发插入导致死锁!
案例剖析:MySQL唯一索引并发插入导致死锁!
|
1月前
|
SQL 关系型数据库 MySQL
案例剖析,MySQL共享锁引发的死锁问题!
案例剖析,MySQL共享锁引发的死锁问题!
|
1月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
135 0
|
9天前
|
存储 Oracle 关系型数据库
【赵渝强老师】MySQL InnoDB的数据文件与重做日志文件
本文介绍了MySQL InnoDB存储引擎中的数据文件和重做日志文件。数据文件包括`.ibd`和`ibdata`文件,用于存放InnoDB数据和索引。重做日志文件(redo log)确保数据的可靠性和事务的持久性,其大小和路径可由相关参数配置。文章还提供了视频讲解和示例代码。
115 11
【赵渝强老师】MySQL InnoDB的数据文件与重做日志文件
|
24天前
|
关系型数据库 MySQL 数据库
一个 MySQL 数据库死锁的案例和解决方案
本文介绍了一个 MySQL 数据库死锁的案例和解决方案。
40 3
|
1月前
|
SQL 关系型数据库 MySQL
|
26天前
|
存储 关系型数据库 MySQL
基于案例分析 MySQL 权限认证中的具体优先原则
【10月更文挑战第26天】本文通过具体案例分析了MySQL权限认证中的优先原则,包括全局权限、数据库级别权限和表级别权限的设置与优先级。全局权限优先于数据库级别权限,后者又优先于表级别权限。在权限冲突时,更严格的权限将被优先执行,确保数据库的安全性与资源合理分配。
|
1月前
|
存储 关系型数据库 MySQL
PACS系统 中 dicom 文件在mysql 8.0 数据库中的 存储和读取(pydicom 库使用)
PACS系统 中 dicom 文件在mysql 8.0 数据库中的 存储和读取(pydicom 库使用)
31 2
|
1月前
|
SQL 存储 关系型数据库
SQL文件导入MySQL数据库的详细指南
数据库中的数据转移是一项常规任务,无论是在数据迁移过程中,还是在数据备份、还原场景中,导入导出SQL文件显得尤为重要。特别是在使用MySQL数据库时,如何将SQL文件导入数据库是一项基本技能。本文将详细介绍如何将SQL文件导入MySQL数据库,并提供一个清晰、完整的步骤指南。这篇文章的内容字数大约在
148 1