Spark案例读取不同格式文件以及读取输入数据存入Mysql

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: Spark案例读取不同格式文件以及读取输入数据存入Mysql

文章目录

1.spark读取文件

1.txt文件

代码

def main(args: Array[String]): Unit = {

   val spark = SparkSession.builder().appName("ReadTextFile")

     .master("local")

     .config("spark.sql.shuffle.partitions", 1)

     //设置内存不然启动提示堆内存不足

     .config("spark.testing.memory", "512000000")

     .getOrCreate()

   //设置日志级别

   spark.sparkContext.setLogLevel("Error")

   //读取文件

   import  spark.implicits._

   //3.监控目录数据并将Dataset转换成DataFrame

   val ds:Dataset[String] = spark.readStream.textFile("./data/textfile/")

   val dataFrame = ds.map(line => {

     val arr = line.split(",")

     (arr(0).toInt, arr(1), arr(2))

   }).toDF("id","name","address")

   //4.打印结果

   val query: StreamingQuery = dataFrame.writeStream

     .format("console")

     .start()

   query.awaitTermination()

测试

da3183265c21465dbf3cf462eec8db71.png

2.csv文件

代码

csv需要创建schema以及指定分割符

def main(args: Array[String]): Unit = {

   val spark = SparkSession.builder().appName("ReadTextFile")

     .master("local")

     .config("spark.sql.shuffle.partitions", 1)

     //设置内存不然启动提示堆内存不足

     .config("spark.testing.memory", "512000000")

     .getOrCreate()

   //设置日志级别

   spark.sparkContext.setLogLevel("Error")

   //读取文件

   import spark.implicits._

   //创建csv schema

   val personSchema = new StructType().add("id", "integer")

     .add("name", "string")

     .add("address", "string")

   //监控目录

   val df= spark.readStream.option("sep", ",")

     .schema(personSchema).csv("./data/csvfile/")

   //输出结果

   val query: StreamingQuery = df.writeStream

     .format("console")

     .start()

   query.awaitTermination()

测试

0c46ec25e2b34994a8b1492942144b5f.png

3.读取json文件

源码

/**

* 读取Json文件

*/

object ReadJsonFile {

 def main(args: Array[String]): Unit = {

   val spark = SparkSession.builder().appName("ReadJsonFile")

     .master("local")

     .config("spark.sql.shuffle.partitions", 1)

     //设置内存不然启动提示堆内存不足

     .config("spark.testing.memory", "512000000")

     .getOrCreate()

   //设置日志级别

   spark.sparkContext.setLogLevel("Error")

   //读取文件

   //创建csv schema

   val personSchema = new StructType().add("id", "integer")

     .add("name", "string")

     .add("address", "string")

   //监控目录

   val df= spark.readStream.option("sep", ",")

     .schema(personSchema).json("./data/jsonfile/")

   //输出结果

   val query: StreamingQuery = df.writeStream

     .format("console")

     .start()

   query.awaitTermination()

 }

测试

885450763c254611817d00a3e0b4847f.png

2.读取输入的数据存入mysql

实现从控制台监控读取文件写入到mysql数据库。

mysql创建一张表

create table psn(id int,name varchar(20),address varchar(50));

代码

object OutPutToMysql {

 def main(args: Array[String]): Unit = {

   val spark = SparkSession.builder().appName("outputToMysql")

     .master("local")

     .config("spark.sql.shuffle.partitions", 1)

     //设置内存不然启动提示堆内存不足

     .config("spark.testing.memory", "512000000")

     .getOrCreate()

   //创建控制台输入

   val df = spark.readStream.format("socket")

     .option("host","node01")

     .option("port",8888)

     .load()

   import spark.implicits._

   val result = df.as[String].map(line => {

     val arr = line.split(",")

     (arr(0).toInt, arr(1), arr(2))

   }).toDF("id", "name", "address")

   //结果输出到mysql

   result.writeStream.foreachBatch((batchDf:DataFrame,batchId:Long)=>{

     batchDf.write.mode(SaveMode.Overwrite).format("jdbc")

       .option("url","jdbc:mysql://node01:3306/test")

       .option("user","root")

       .option("password","123456")

       .option("dbtable","psn")

       .save()

   }).start().awaitTermination();

   }

测试

0e2e5e5ef20d43479b6b1af5741d8ade.png


相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
14天前
|
存储 关系型数据库 MySQL
mysql怎么查询longblob类型数据的大小
通过本文的介绍,希望您能深入理解如何查询MySQL中 `LONG BLOB`类型数据的大小,并结合优化技术提升查询性能,以满足实际业务需求。
51 6
|
19天前
|
关系型数据库 MySQL 数据库
数据库数据恢复—MYSQL数据库文件损坏的数据恢复案例
mysql数据库文件ibdata1、MYI、MYD损坏。 故障表现:1、数据库无法进行查询等操作;2、使用mysqlcheck和myisamchk无法修复数据库。
|
20天前
|
安全 关系型数据库 MySQL
解决MySQL删除/var/lib/mysql下的所有文件后无法启动的问题
删除 `/var/lib/mysql` 下的所有文件后,需要重新初始化数据目录,确保正确的权限设置,并重新启动 MySQL 服务。通过按照上述步骤操作,可以解决 MySQL 无法启动的问题,并恢复数据库的正常运行。初始化数据目录后,别忘了配置安全设置,并根据需要恢复备份数据。这些步骤不仅能够恢复 MySQL 的正常运行,还能确保数据库的安全性和完整性。
32 2
|
23天前
|
SQL 关系型数据库 MySQL
MySQL导入.sql文件后数据库乱码问题
本文分析了导入.sql文件后数据库备注出现乱码的原因,包括字符集不匹配、备注内容编码问题及MySQL版本或配置问题,并提供了详细的解决步骤,如检查和统一字符集设置、修改客户端连接方式、检查MySQL配置等,确保导入过程顺利。
|
26天前
|
SQL 关系型数据库 MySQL
mysql分页读取数据重复问题
在服务端开发中,与MySQL数据库进行数据交互时,常因数据量大、网络延迟等因素需分页读取数据。文章介绍了使用`limit`和`offset`参数实现分页的方法,并针对分页过程中可能出现的数据重复问题进行了详细分析,提出了利用时间戳或确保排序规则绝对性等解决方案。
|
1月前
|
关系型数据库 MySQL 数据库
GBase 数据库如何像MYSQL一样存放多行数据
GBase 数据库如何像MYSQL一样存放多行数据
|
1月前
|
缓存 NoSQL 关系型数据库
Redis和Mysql如何保证数据⼀致?
在项目中,为了解决Redis与Mysql的数据一致性问题,我们采用了多种策略:对于低一致性要求的数据,不做特别处理;时效性数据通过设置缓存过期时间来减少不一致风险;高一致性但时效性要求不高的数据,利用MQ异步同步确保最终一致性;而对一致性和时效性都有高要求的数据,则采用分布式事务(如Seata TCC模式)来保障。
68 14
|
1月前
|
关系型数据库 MySQL 数据库
【赵渝强老师】MySQL的参数文件
MySQL启动时会读取配置文件my.cnf来确定数据库文件位置及初始化参数。该文件分为Server和Client两部分,包含动态与静态参数。动态参数可在运行中通过命令修改,而静态参数需修改my.cnf并重启服务生效。文中还提供了相关代码示例和视频教程。
|
1月前
|
SQL 关系型数据库 MySQL
【赵渝强老师】MySQL的全量日志文件
MySQL全量日志记录所有操作的SQL语句,默认禁用。启用后,可通过`show variables like %general_log%检查状态,使用`set global general_log=ON`临时开启,执行查询并查看日志文件以追踪SQL执行详情。
|
5天前
|
存储 Oracle 关系型数据库
数据库传奇:MySQL创世之父的两千金My、Maria
《数据库传奇:MySQL创世之父的两千金My、Maria》介绍了MySQL的发展历程及其分支MariaDB。MySQL由Michael Widenius等人于1994年创建,现归Oracle所有,广泛应用于阿里巴巴、腾讯等企业。2009年,Widenius因担心Oracle收购影响MySQL的开源性,创建了MariaDB,提供额外功能和改进。维基百科、Google等已逐步替换为MariaDB,以确保更好的性能和社区支持。掌握MariaDB作为备用方案,对未来发展至关重要。
19 3