Spark案例读取不同格式文件以及读取输入数据存入Mysql

本文涉及的产品
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
RDS AI 助手,专业版
简介: Spark案例读取不同格式文件以及读取输入数据存入Mysql

文章目录

1.spark读取文件

1.txt文件

代码

def main(args: Array[String]): Unit = {

   val spark = SparkSession.builder().appName("ReadTextFile")

     .master("local")

     .config("spark.sql.shuffle.partitions", 1)

     //设置内存不然启动提示堆内存不足

     .config("spark.testing.memory", "512000000")

     .getOrCreate()

   //设置日志级别

   spark.sparkContext.setLogLevel("Error")

   //读取文件

   import  spark.implicits._

   //3.监控目录数据并将Dataset转换成DataFrame

   val ds:Dataset[String] = spark.readStream.textFile("./data/textfile/")

   val dataFrame = ds.map(line => {

     val arr = line.split(",")

     (arr(0).toInt, arr(1), arr(2))

   }).toDF("id","name","address")

   //4.打印结果

   val query: StreamingQuery = dataFrame.writeStream

     .format("console")

     .start()

   query.awaitTermination()

测试

da3183265c21465dbf3cf462eec8db71.png

2.csv文件

代码

csv需要创建schema以及指定分割符

def main(args: Array[String]): Unit = {

   val spark = SparkSession.builder().appName("ReadTextFile")

     .master("local")

     .config("spark.sql.shuffle.partitions", 1)

     //设置内存不然启动提示堆内存不足

     .config("spark.testing.memory", "512000000")

     .getOrCreate()

   //设置日志级别

   spark.sparkContext.setLogLevel("Error")

   //读取文件

   import spark.implicits._

   //创建csv schema

   val personSchema = new StructType().add("id", "integer")

     .add("name", "string")

     .add("address", "string")

   //监控目录

   val df= spark.readStream.option("sep", ",")

     .schema(personSchema).csv("./data/csvfile/")

   //输出结果

   val query: StreamingQuery = df.writeStream

     .format("console")

     .start()

   query.awaitTermination()

测试

0c46ec25e2b34994a8b1492942144b5f.png

3.读取json文件

源码

/**

* 读取Json文件

*/

object ReadJsonFile {

 def main(args: Array[String]): Unit = {

   val spark = SparkSession.builder().appName("ReadJsonFile")

     .master("local")

     .config("spark.sql.shuffle.partitions", 1)

     //设置内存不然启动提示堆内存不足

     .config("spark.testing.memory", "512000000")

     .getOrCreate()

   //设置日志级别

   spark.sparkContext.setLogLevel("Error")

   //读取文件

   //创建csv schema

   val personSchema = new StructType().add("id", "integer")

     .add("name", "string")

     .add("address", "string")

   //监控目录

   val df= spark.readStream.option("sep", ",")

     .schema(personSchema).json("./data/jsonfile/")

   //输出结果

   val query: StreamingQuery = df.writeStream

     .format("console")

     .start()

   query.awaitTermination()

 }

测试

885450763c254611817d00a3e0b4847f.png

2.读取输入的数据存入mysql

实现从控制台监控读取文件写入到mysql数据库。

mysql创建一张表

create table psn(id int,name varchar(20),address varchar(50));

代码

object OutPutToMysql {

 def main(args: Array[String]): Unit = {

   val spark = SparkSession.builder().appName("outputToMysql")

     .master("local")

     .config("spark.sql.shuffle.partitions", 1)

     //设置内存不然启动提示堆内存不足

     .config("spark.testing.memory", "512000000")

     .getOrCreate()

   //创建控制台输入

   val df = spark.readStream.format("socket")

     .option("host","node01")

     .option("port",8888)

     .load()

   import spark.implicits._

   val result = df.as[String].map(line => {

     val arr = line.split(",")

     (arr(0).toInt, arr(1), arr(2))

   }).toDF("id", "name", "address")

   //结果输出到mysql

   result.writeStream.foreachBatch((batchDf:DataFrame,batchId:Long)=>{

     batchDf.write.mode(SaveMode.Overwrite).format("jdbc")

       .option("url","jdbc:mysql://node01:3306/test")

       .option("user","root")

       .option("password","123456")

       .option("dbtable","psn")

       .save()

   }).start().awaitTermination();

   }

测试

0e2e5e5ef20d43479b6b1af5741d8ade.png


相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。   相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情: https://www.aliyun.com/product/rds/mysql 
相关文章
|
8月前
|
缓存 NoSQL 关系型数据库
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
|
6月前
|
SQL 人工智能 关系型数据库
如何实现MySQL百万级数据的查询?
本文探讨了在MySQL中对百万级数据进行排序分页查询的优化策略。面对五百万条数据,传统的浅分页和深分页查询效率较低,尤其深分页因偏移量大导致性能显著下降。通过为排序字段添加索引、使用联合索引、手动回表等方法,有效提升了查询速度。最终建议根据业务需求选择合适方案:浅分页可加单列索引,深分页推荐联合索引或子查询优化,同时结合前端传递最后一条数据ID的方式实现高效翻页。
378 0
|
5月前
|
存储 关系型数据库 MySQL
在CentOS 8.x上安装Percona Xtrabackup工具备份MySQL数据步骤。
以上就是在CentOS8.x上通过Perconaxtabbackup工具对Mysql进行高效率、高可靠性、无锁定影响地实现在线快速全量及增加式数据库资料保存与恢复流程。通过以上流程可以有效地将Mysql相关资料按需求完成定期或不定期地保存与灾难恢复需求。
497 10
|
6月前
|
SQL 存储 缓存
MySQL 如何高效可靠处理持久化数据
本文详细解析了 MySQL 的 SQL 执行流程、crash-safe 机制及性能优化策略。内容涵盖连接器、分析器、优化器、执行器与存储引擎的工作原理,深入探讨 redolog 与 binlog 的两阶段提交机制,并分析日志策略、组提交、脏页刷盘等关键性能优化手段,帮助提升数据库稳定性与执行效率。
185 0
|
7月前
|
SQL 关系型数据库 MySQL
MySQL 5.6/5.7 DDL 失败残留文件清理指南
通过本文的指南,您可以更安全地处理 MySQL 5.6 和 5.7 版本中 DDL 失败后的残留文件,有效避免数据丢失和数据库不一致的问题。
|
7月前
|
人工智能 分布式计算 大数据
大数据≠大样本:基于Spark的特征降维实战(提升10倍训练效率)
本文探讨了大数据场景下降维的核心问题与解决方案,重点分析了“维度灾难”对模型性能的影响及特征冗余的陷阱。通过数学证明与实际案例,揭示高维空间中样本稀疏性问题,并提出基于Spark的分布式降维技术选型与优化策略。文章详细展示了PCA在亿级用户画像中的应用,包括数据准备、核心实现与效果评估,同时深入探讨了协方差矩阵计算与特征值分解的并行优化方法。此外,还介绍了动态维度调整、非线性特征处理及降维与其他AI技术的协同效应,为生产环境提供了最佳实践指南。最终总结出降维的本质与工程实践原则,展望未来发展方向。
408 0
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
1027 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
10月前
|
存储 分布式计算 Hadoop
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
549 79
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
288 0
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
284 0

推荐镜像

更多