文章目录
1.spark读取文件
1.txt文件
代码
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("ReadTextFile")
.master("local")
.config("spark.sql.shuffle.partitions", 1)
//设置内存不然启动提示堆内存不足
.config("spark.testing.memory", "512000000")
.getOrCreate()
//设置日志级别
spark.sparkContext.setLogLevel("Error")
//读取文件
import spark.implicits._
//3.监控目录数据并将Dataset转换成DataFrame
val ds:Dataset[String] = spark.readStream.textFile("./data/textfile/")
val dataFrame = ds.map(line => {
val arr = line.split(",")
(arr(0).toInt, arr(1), arr(2))
}).toDF("id","name","address")
//4.打印结果
val query: StreamingQuery = dataFrame.writeStream
.format("console")
.start()
query.awaitTermination()
测试
2.csv文件
代码
csv需要创建schema以及指定分割符
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("ReadTextFile")
.master("local")
.config("spark.sql.shuffle.partitions", 1)
//设置内存不然启动提示堆内存不足
.config("spark.testing.memory", "512000000")
.getOrCreate()
//设置日志级别
spark.sparkContext.setLogLevel("Error")
//读取文件
import spark.implicits._
//创建csv schema
val personSchema = new StructType().add("id", "integer")
.add("name", "string")
.add("address", "string")
//监控目录
val df= spark.readStream.option("sep", ",")
.schema(personSchema).csv("./data/csvfile/")
//输出结果
val query: StreamingQuery = df.writeStream
.format("console")
.start()
query.awaitTermination()
测试
3.读取json文件
源码
/**
* 读取Json文件
*/
object ReadJsonFile {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("ReadJsonFile")
.master("local")
.config("spark.sql.shuffle.partitions", 1)
//设置内存不然启动提示堆内存不足
.config("spark.testing.memory", "512000000")
.getOrCreate()
//设置日志级别
spark.sparkContext.setLogLevel("Error")
//读取文件
//创建csv schema
val personSchema = new StructType().add("id", "integer")
.add("name", "string")
.add("address", "string")
//监控目录
val df= spark.readStream.option("sep", ",")
.schema(personSchema).json("./data/jsonfile/")
//输出结果
val query: StreamingQuery = df.writeStream
.format("console")
.start()
query.awaitTermination()
}
测试
2.读取输入的数据存入mysql
实现从控制台监控读取文件写入到mysql数据库。
mysql创建一张表
create table psn(id int,name varchar(20),address varchar(50));
代码
object OutPutToMysql {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("outputToMysql")
.master("local")
.config("spark.sql.shuffle.partitions", 1)
//设置内存不然启动提示堆内存不足
.config("spark.testing.memory", "512000000")
.getOrCreate()
//创建控制台输入
val df = spark.readStream.format("socket")
.option("host","node01")
.option("port",8888)
.load()
import spark.implicits._
val result = df.as[String].map(line => {
val arr = line.split(",")
(arr(0).toInt, arr(1), arr(2))
}).toDF("id", "name", "address")
//结果输出到mysql
result.writeStream.foreachBatch((batchDf:DataFrame,batchId:Long)=>{
batchDf.write.mode(SaveMode.Overwrite).format("jdbc")
.option("url","jdbc:mysql://node01:3306/test")
.option("user","root")
.option("password","123456")
.option("dbtable","psn")
.save()
}).start().awaitTermination();
}
测试