Spark案例读取不同格式文件以及读取输入数据存入Mysql

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: Spark案例读取不同格式文件以及读取输入数据存入Mysql

文章目录

1.spark读取文件

1.txt文件

代码

def main(args: Array[String]): Unit = {

   val spark = SparkSession.builder().appName("ReadTextFile")

     .master("local")

     .config("spark.sql.shuffle.partitions", 1)

     //设置内存不然启动提示堆内存不足

     .config("spark.testing.memory", "512000000")

     .getOrCreate()

   //设置日志级别

   spark.sparkContext.setLogLevel("Error")

   //读取文件

   import  spark.implicits._

   //3.监控目录数据并将Dataset转换成DataFrame

   val ds:Dataset[String] = spark.readStream.textFile("./data/textfile/")

   val dataFrame = ds.map(line => {

     val arr = line.split(",")

     (arr(0).toInt, arr(1), arr(2))

   }).toDF("id","name","address")

   //4.打印结果

   val query: StreamingQuery = dataFrame.writeStream

     .format("console")

     .start()

   query.awaitTermination()

测试

da3183265c21465dbf3cf462eec8db71.png

2.csv文件

代码

csv需要创建schema以及指定分割符

def main(args: Array[String]): Unit = {

   val spark = SparkSession.builder().appName("ReadTextFile")

     .master("local")

     .config("spark.sql.shuffle.partitions", 1)

     //设置内存不然启动提示堆内存不足

     .config("spark.testing.memory", "512000000")

     .getOrCreate()

   //设置日志级别

   spark.sparkContext.setLogLevel("Error")

   //读取文件

   import spark.implicits._

   //创建csv schema

   val personSchema = new StructType().add("id", "integer")

     .add("name", "string")

     .add("address", "string")

   //监控目录

   val df= spark.readStream.option("sep", ",")

     .schema(personSchema).csv("./data/csvfile/")

   //输出结果

   val query: StreamingQuery = df.writeStream

     .format("console")

     .start()

   query.awaitTermination()

测试

0c46ec25e2b34994a8b1492942144b5f.png

3.读取json文件

源码

/**

* 读取Json文件

*/

object ReadJsonFile {

 def main(args: Array[String]): Unit = {

   val spark = SparkSession.builder().appName("ReadJsonFile")

     .master("local")

     .config("spark.sql.shuffle.partitions", 1)

     //设置内存不然启动提示堆内存不足

     .config("spark.testing.memory", "512000000")

     .getOrCreate()

   //设置日志级别

   spark.sparkContext.setLogLevel("Error")

   //读取文件

   //创建csv schema

   val personSchema = new StructType().add("id", "integer")

     .add("name", "string")

     .add("address", "string")

   //监控目录

   val df= spark.readStream.option("sep", ",")

     .schema(personSchema).json("./data/jsonfile/")

   //输出结果

   val query: StreamingQuery = df.writeStream

     .format("console")

     .start()

   query.awaitTermination()

 }

测试

885450763c254611817d00a3e0b4847f.png

2.读取输入的数据存入mysql

实现从控制台监控读取文件写入到mysql数据库。

mysql创建一张表

create table psn(id int,name varchar(20),address varchar(50));

代码

object OutPutToMysql {

 def main(args: Array[String]): Unit = {

   val spark = SparkSession.builder().appName("outputToMysql")

     .master("local")

     .config("spark.sql.shuffle.partitions", 1)

     //设置内存不然启动提示堆内存不足

     .config("spark.testing.memory", "512000000")

     .getOrCreate()

   //创建控制台输入

   val df = spark.readStream.format("socket")

     .option("host","node01")

     .option("port",8888)

     .load()

   import spark.implicits._

   val result = df.as[String].map(line => {

     val arr = line.split(",")

     (arr(0).toInt, arr(1), arr(2))

   }).toDF("id", "name", "address")

   //结果输出到mysql

   result.writeStream.foreachBatch((batchDf:DataFrame,batchId:Long)=>{

     batchDf.write.mode(SaveMode.Overwrite).format("jdbc")

       .option("url","jdbc:mysql://node01:3306/test")

       .option("user","root")

       .option("password","123456")

       .option("dbtable","psn")

       .save()

   }).start().awaitTermination();

   }

测试

0e2e5e5ef20d43479b6b1af5741d8ade.png


相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
1月前
|
监控 关系型数据库 MySQL
zabbix agent集成percona监控MySQL的插件实战案例
这篇文章是关于如何使用Percona监控插件集成Zabbix agent来监控MySQL的实战案例。
32 2
zabbix agent集成percona监控MySQL的插件实战案例
|
2月前
|
存储 SQL 关系型数据库
mysql百分数转小数点格式
在MySQL中,将百分数转换为小数点格式是一个简单直接的操作,可以通过基本的数学表达式和函数实现。无论是处理以字符串形式存储的百分数值,还是直接以数值形式表示的百分比,都可以通过适当的转换查询轻松实现这一目标。通过理解和应用这些基本的转换方法,可以有效地处理和分析数据库中的百分比数据。
32 5
|
2月前
|
存储 关系型数据库 MySQL
MySQL bit类型增加索引后查询结果不正确案例浅析
【8月更文挑战第17天】在MySQL中,`BIT`类型字段在添加索引后可能出现查询结果异常。表现为查询结果与预期不符,如返回错误记录或遗漏部分数据。原因包括索引使用不当、数据存储及比较问题,以及索引创建时未充分考虑`BIT`特性。解决方法涉及正确运用索引、理解`BIT`的存储和比较机制,以及合理创建索引以覆盖各种查询条件。通过`EXPLAIN`分析执行计划可帮助诊断和优化查询。
|
2月前
|
存储 分布式计算 Java
|
3月前
|
缓存 监控 关系型数据库
MySQL PXC 集群死锁分析案例
前不久一个系统死锁导致部分业务受到影响,今次补上详细的节点日志分析过程。
64 1
|
4月前
|
SQL Java 数据库连接
2万字实操案例之在Springboot框架下基于注解用Mybatis开发实现基础操作MySQL之预编译SQL主键返回增删改查
2万字实操案例之在Springboot框架下基于注解用Mybatis开发实现基础操作MySQL之预编译SQL主键返回增删改查
58 2
|
4月前
|
关系型数据库 MySQL 数据库
关系型数据库MySQL开发要点之多表设计案例详解代码实现
关系型数据库MySQL开发要点之多表设计案例详解代码实现
55 2
|
4月前
|
关系型数据库 MySQL 数据库
MySQL数据库开发之多表查询数据准备及案例实操
MySQL数据库开发之多表查询数据准备及案例实操
41 1
|
3月前
|
关系型数据库 MySQL 数据挖掘
MySQL 聚合函数案例解析:深入实践与应用
MySQL 聚合函数案例解析:深入实践与应用
|
4月前
|
分布式计算 监控 大数据
spark实战:实现分区内求最大值,分区间求和以及获取日志文件固定日期的请求路径
spark实战:实现分区内求最大值,分区间求和以及获取日志文件固定日期的请求路径

推荐镜像

更多