package cn.itcast.spark.source import java.util.Properties import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StructType} import org.apache.spark.sql.{DataFrame, SparkSession} object _03SparkSQLSourceTest { def main(args: Array[String]): Unit = { // 构建SparkSession实例对象 val spark: SparkSession = SparkSession.builder() .master("local[4]") .appName(this.getClass.getSimpleName.stripSuffix("$")) .config("spark.sql.shuffle.partitions", "4") .getOrCreate() import spark.implicits._ // TODO: 1. CSV 格式数据文本文件数据 -> 依据 CSV文件首行是否是列名称,决定读取数据方式不一样的 /* CSV 格式数据: 每行数据各个字段使用逗号隔开 也可以指的是,每行数据各个字段使用 单一 分割符 隔开数据 */ // 方式一:首行是列名称,数据文件u.dat val dataframe: DataFrame = spark.read .format("csv") .option("sep", "\\t") .option("header", "true") .option("inferSchema", "true") .load("datas/ml-100k/u.dat") dataframe.printSchema() dataframe.show(10, truncate = false) // 方式二:首行不是列名,需要自定义Schema信息,数据文件u.data // 自定义schema信息 val schema: StructType = new StructType() .add("user_id", IntegerType, nullable = true) .add("iter_id", IntegerType, nullable = true) .add("rating", DoubleType, nullable = true) .add("timestamp", LongType, nullable = true) val df: DataFrame = spark.read .format("csv") .schema(schema) .option("sep", "\\t") .load("datas/ml-100k/u.data") df.printSchema() df.show(10, truncate = false) /* ============================================================================== */ // TODO: 2. 读取MySQL表中数据 // 第一、简洁版格式 /* def jdbc(url: String, table: String, properties: Properties): DataFrame */ val props = new Properties() props.put("user", "root") props.put("password", "123456") props.put("driver", "com.mysql.cj.jdbc.Driver") val empDF: DataFrame = spark.read.jdbc( "jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true", // "db_test.emp", // props // ) println(s"Partition Number = ${empDF.rdd.getNumPartitions}") empDF.printSchema() empDF.show(10, truncate = false) // 第二、标准格式写 /* WITH tmp AS ( select * from emp e join dept d on e.deptno = d.deptno ) */ val table: String = "(select ename,deptname,sal from db_test.emp e join db_test.dept d on e.deptno = d.deptno) AS tmp" val joinDF: DataFrame = spark.read .format("jdbc") .option("url", "jdbc:mysql://node1.itcast.cn:3306/?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true") .option("driver", "com.mysql.cj.jdbc.Driver") .option("user", "root") .option("password", "123456") .option("dbtable", table) .load() joinDF.printSchema() joinDF.show(10, truncate = false) // 应用结束,关闭资源 spark.stop() } }