1 数据源与格式
在SparkSQL模块,提供一套完成API接口,用于方便读写外部数据源的的数据(从Spark 1.4版本提供),框架本身内置外部数据源:
在Spark 2.4版本中添加支持Image Source(图像数据源)和Avro Source。
数据分析处理中,数据可以分为结构化数据、非结构化数据及半结构化数据。
1)、结构化数据(Structured)
结构化数据源可提供有效的存储和性能。例如,Parquet和ORC等柱状格式使从列的子集中提取值变得更加容易。
基于行的存储格式(如Avro)可有效地序列化和存储提供存储优势的数据。然而,这些优点通常以灵活性为代价。如因结构的固定性,格式转变可能相对困难。
2)、非结构化数据(UnStructured)
相比之下,非结构化数据源通常是自由格式文本或二进制对象,其不包含标记或元数据以定义数据的结构。
报纸文章,医疗记录,图像,应用程序日志通常被视为非结构化数据。这些类型的源通常
要求数据周围的上下文是可解析的。
3)、半结构化数据(Semi-Structured)
半结构化数据源是按记录构建的,但不一定具有跨越所有记录的明确定义的全局模式。每个数据记录都使用其结构信息进行扩充。
半结构化数据格式的好处是,它们在表达数据时提供了最大的灵活性,因为每条记录都是自我描述的。但这些格式的主要缺点是它们会产生额外的解析开销,并且不是特别为ad-hoc(特定)查询而构建的。
2 加载/保存数据
SparkSQL提供一套通用外部数据源接口,方便用户从数据源加载和保存数据,例如从MySQL表中既可以加载读取数据:load/read,又可以保存写入数据:save/write。
由于SparkSQL没有内置支持从HBase表中加载和保存数据,但是只要实现外部数据源接口,也能像上面方式一样读取加载数据。
2.1 Load 加载数据
2.1.1 获取SparkSession对象
/** * 获取SparkSession对象 * * @return */ static void createSpark() { properties = getProperties(); spark = SparkSession.builder() .config("spark.local.dir", "/tmp/spark-tmp") .config("spark.driver.maxResultSize", "20g") .config("spark.sql.parquet.writeLegacyFormat", "true") .config("hbase.zookeeper.quorum", properties.getProperty("hbase.zookeeper.quorum")) .config("hbase.zookeeper.property.clientPort", properties.getProperty("hbase.zookeeper.property.clientPort")) .config("zookeeper.znode.parent", properties.getProperty("zookeeper.znode.parent")) .config("spark.redis.host", properties.getProperty("spark.redis.host")) .config("spark.redis.port", properties.getProperty("spark.redis.port")) .config("spark.redis.auth", properties.getProperty("spark.redis.auth")) .config("spark.redis.db", properties.getProperty("spark.redis.db")) .master("local[*]") .appName("toplion.offline") .enableHiveSupport() .getOrCreate(); spark.sparkContext().setLogLevel(Level.ERROR.toString()); spark.udf().register("removeEmoji", (UDF1<String, String>) col -> { return EmojiParser.removeAllEmojis(col); } , DataTypes.StringType); spark.udf().register("addStr", new UDF3<Object, String, String, String>() { @Override public String call(Object o, String head, String tail) throws Exception { return head + o.toString() + tail; } } , DataTypes.StringType); spark.udf().register("addUUID", new UDF0() { @Override public String call() throws Exception { return UUID.randomUUID().toString(); } } , DataTypes.StringType); SparkConf sparkConf = spark.sparkContext().getConf(); redisConfig = RedisConfig.fromSparkConf(sparkConf); readWriteConfig =ReadWriteConfig.fromSparkConf(sparkConf); jsc = JavaSparkContext.fromSparkContext(spark.sparkContext()); redisContext = new RedisContext(spark.sparkContext()); }
在SparkSQL中读取数据使用SparkSession读取,并且封装到数据结构Dataset/DataFrame中。
DataFrameReader专门用于加载load读取外部数据源的数据,基本格式如下:
SparkSQL模块本身自带支持读取外部数据源的数据:
总结起来三种类型数据,也是实际开发中常用的:
- 第一类:文件格式数据
- 文本文件text、csv文件和json文件
- 第二类:列式存储数据
- Parquet格式、ORC格式
- 第三类:数据库表
- 关系型数据库RDBMS:MySQL、DB2、Oracle和MSSQL
- Hive仓库表
官方文档:http://spark.apache.org/docs/2.4.5/sql-data-sources-load-save-functions.html
此外加载文件数据时,可以直接使用SQL语句,指定文件存储格式和路径:
2.2 Save 保存数据
SparkSQL模块中可以从某个外部数据源读取数据,就能向某个外部数据源保存数据,提供相
应接口,通过DataFrameWrite类将数据进行保存。
与DataFrameReader类似,提供一套规则,将数据Dataset保存,基本格式如下:
SparkSQL模块内部支持保存数据源如下:
所以使用SpakrSQL分析数据时,从数据读取,到数据分析及数据保存,链式操作,更多就是
ETL操作。当将结果数据DataFrame/Dataset保存至Hive表中时,可以设置分区partition和分桶
bucket,形式如下:
2.2.1 写出至redis
需要的依赖:
spark-redis-2.4.1-jar-with-dependencies.jar
spark使用parallelize方法创建RDD
JavaRDD和RDD的互相转换
protected static RedisContext redisContext; protected void toRedisSET(RDD<String> vs, String setName) { redisContext.toRedisSET(vs, setName, 60 * 60 * 48, redisConfig, readWriteConfig); } protected void toRedisHASH(RDD<Tuple2<String, String>> kvs, String setName) { redisContext.toRedisHASH(kvs, setName, 60 * 60 * 48, redisConfig, readWriteConfig); }
@Override protected void writeRedis(LocalDate startDate, LocalDate endDate, String group, String dateTag) { if ("all".equals(group)) { Dataset<Row> deviceConnection = table("iot.device_connection"); Dataset<Row> device = table("iot.device"); Dataset<Row> ds = deviceConnection.join(device, deviceConnection.col("device_id").equalTo(device.col("id"))) .where(col("created_time").lt(endDate.toString())) .select( //deviceConnection.col("device_id"), device.col("project_id").as("projectId"), device.col("identity").as("deviceIdentity"), when(deviceConnection.col("type").equalTo(0),"ONLINE").otherwise("OFFLINE").as("currentType"), deviceConnection.col("created_time").cast(DataTypes.LongType).as("lastUpdateDate") ).withColumn("lastUpdateDate",col("lastUpdateDate").multiply(1000)) .orderBy(col("lastUpdateDate").desc()); Dataset<Row> dsData = ds.groupBy(col("deviceIdentity")) .agg( first(col("lastUpdateDate")).as("lastUpdateDate"), first(col("projectId")).as("projectId"), first(col("currentType")).as("currentType") ) //.withColumn("uuid", functions.callUDF("addStr", col("project_id"), lit(""), lit(""))) .withColumn("_class", lit("com.toplion.iot.device.domain.DeviceConnection")) .withColumn("uuid", functions.callUDF("uuid")) .withColumn("id", col("uuid")); List<String> list = dsData.toJSON().collectAsList(); list.forEach(s -> { JSONObject r = JSONObject.parseObject(s); String uuidStr = r.getString("uuid"); String projectIdStr = r.getString("projectId"); String deviceIdentityStr = r.getString("deviceIdentity"); String lastUpdateDateStr = r.getString("lastUpdateDate"); String currentTypeStr = r.getString("currentType"); JavaRDD<String> uuid = jsc.parallelize(Arrays.asList(new String[]{uuidStr})); toRedisSET(uuid.rdd(), "DeviceConnection:projectId:" + projectIdStr); toRedisSET(uuid.rdd(), "DeviceConnection:deviceIdentity:" + deviceIdentityStr); toRedisSET(uuid.rdd(), "DeviceConnection:lastUpdateDate:" + lastUpdateDateStr); toRedisSET(uuid.rdd(), "DeviceConnection:currentType:" + currentTypeStr); JavaRDD<String> idx = jsc.parallelize(Arrays.asList("DeviceConnection:lastUpdateDate:" + lastUpdateDateStr, "DeviceConnection:projectId:" + projectIdStr, "DeviceConnection:currentType:" + currentTypeStr, "DeviceConnection:deviceIdentity:" + deviceIdentityStr) ); toRedisSET(idx.rdd(), "DeviceConnection:" + uuidStr + ":idx"); }); dsData.write() .format("org.apache.spark.sql.redis") .option("table", "DeviceConnection") .option("key.column", "uuid") .option("ttl", 60 * 60 * 48) .mode(SaveMode.Append) .save(); //todo 上面是根据设备连接记录,下面是设备表 device = device.where(col("status").equalTo("0")); ds = device .where(col("last_modified_date").lt(endDate.toString())) .select( device.col("id").as("device_id"), device.col("project_id"), device.col("identity"), device.col("last_modified_date").as("created_time"), device.col("network_status"), device.col("working_status") ) .orderBy(col("created_time").desc()); dsData = ds.groupBy(col("project_id").as("projectId")) .agg( countDistinct(col("device_id")).as("count"), countDistinct(when(col("network_status").equalTo("1"), col("device_id"))).as("online"), countDistinct(when(col("working_status").equalTo("1"), col("device_id"))).as("working") ) .withColumn("offline", col("count").minus(col("online"))) .withColumn("date", lit(dateTag)) //.withColumn("uuid", functions.callUDF("addStr", col("project_id"), lit(""), lit(""))) .withColumn("_class", lit("com.toplion.iot.device.domain.DeviceS")) .withColumn("uuid", functions.callUDF("uuid")) .withColumn("id", col("uuid")) .withColumn("type", lit("0")); list = dsData.toJSON().collectAsList(); list.forEach(s -> { JSONObject r = JSONObject.parseObject(s); String uuidStr=r.getString("uuid"); String typeStr=r.getString("type"); String dateStr=r.getString("date"); String projectIdStr=r.getString("projectId"); JavaRDD<String> uuid = jsc.parallelize(Arrays.asList(new String[]{uuidStr})); toRedisSET(uuid.rdd(), "DeviceS:type:" +typeStr); toRedisSET(uuid.rdd(), "DeviceS:date:" + dateStr); toRedisSET(uuid.rdd(), "DeviceS:projectId:" + projectIdStr); JavaRDD<String> idx = jsc.parallelize(Arrays.asList("DeviceS:date:" + dateStr, "DeviceS:projectId:" + projectIdStr, "DeviceS:type:" + typeStr)); toRedisSET(idx.rdd(), "DeviceS:" + uuidStr + ":idx"); }); dsData.write() .format("org.apache.spark.sql.redis") .option("table", "DeviceS") .option("key.column", "uuid") .option("ttl", 60 * 60 * 48) .mode(SaveMode.Append) .save(); } }
2.3 案例演示
加载json格式数据,提取name和age字段值,保存至Parquet列式存储文件。
// 加载json数据 val peopleDF = spark.read.format("json").load("/datas/resources/people.json") val resultDF = peopleDF.select("name", "age") // 保存数据至parquet resultDF.write.format("parquet").save("/datas/people-parquet")
在spark-shell上执行上述语句,截图结果如下:
查看HDFS文件系统目录,数据已保存值parquet文件,并且使用snappy压缩。
2.4 保存模式(SaveMode)
将Dataset/DataFrame数据保存到外部存储系统中,考虑是否存在,存在的情况下的下如何
进行保存,DataFrameWriter中有一个mode方法指定模式:
通过源码发现SaveMode时枚举类,使用Java语言编写,如下四种保存模式:
第一种:Append 追加模式,当数据存在时,继续追加;
第二种:Overwrite 覆写模式,当数据存在时,覆写以前数据,存储当前最新数据;
第三种:ErrorIfExists 存在及报错;
第四种:Ignore 忽略,数据存在时不做任何操作;
实际项目依据具体业务情况选择保存模式,通常选择Append和Overwrite模式。
3 parquet 数据
SparkSQL模块中默认读取数据文件格式就是parquet列式存储数据,通过参数【spark.sql.sources.default】设置,默认值为【parquet】。
范例演示代码:直接load加载parquet数据和指定parquet格式加载数据。
import org.apache.spark.sql.{DataFrame, SparkSession} /** * SparkSQL读取Parquet列式存储数据 */ object SparkSQLParquet { def main(args: Array[String]): Unit = { // 构建SparkSession实例对象,通过建造者模式创建 val spark: SparkSession = SparkSession .builder() .appName(this.getClass.getSimpleName.stripSuffix("$")) .master("local[3]") .getOrCreate() import spark.implicits._ // TODO: 从LocalFS上读取parquet格式数据 val usersDF: DataFrame = spark.read.parquet("datas/resources/users.parquet") usersDF.printSchema() usersDF.show(10, truncate = false) println("==================================================") // SparkSQL默认读取文件格式为parquet val df = spark.read.load("datas/resources/users.parquet") df.printSchema() df.show(10, truncate = false) // 应用结束,关闭资源 spark.stop() } }
运行程序结果:
4 text 数据
SparkSession加载文本文件数据,提供两种方法,返回值分别为DataFrame和Dataset,前面【入门案例:词频统计WordCount】中已经使用,下面看一下方法声明:
可以看出textFile方法底层还是调用text方法,先加载数据封装到DataFrame中,再使用as[String]方法将DataFrame转换为Dataset,实际项目中推荐使用textFile方法,从Spark 2.0开始提供。无论是text方法还是textFile方法读取文本数据时,一行一行的加载数据,每行数据使用UTF-8编码的字符串,列名称为【value】。
范例演示:分别使用text和textFile方法加载数据。
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} /** * SparkSQL加载文本文件数据,方法text和textFile */ object SparkSQLText { def main(args: Array[String]): Unit = { // 构建SparkSession实例对象,通过建造者模式创建 val spark: SparkSession = SparkSession .builder() .appName(this.getClass.getSimpleName.stripSuffix("$")) .master("local[3]") .getOrCreate() // 底层实现:单例模式,创建SparkContext对象 import spark.implicits._ // TODO: text方法加载数据,封装至DataFrame中 val dataframe: DataFrame = spark.read.text("datas/resources/people.txt") dataframe.printSchema() dataframe.show(10, truncate = false) println("=================================================") val dataset: Dataset[String] = spark.read.textFile("datas/resources/people.txt") dataset.printSchema() dataset.show(10, truncate = false) spark.stop() // 应用结束,关闭资源 } }
5 json 数据
实际项目中,有时处理数据以JSON格式存储的,尤其后续结构化流式模块:StructuredStreaming,从Kafka Topic消费数据很多时间是JSON个数据,封装到DataFrame中,需要解析提取字段的值。以读取github操作日志JSON数据为例,数据结构如下:
- 1)、操作日志数据使用GZ压缩:2015-03-01-11.json.gz,先使用json方法读取。
// 构建SparkSession实例对象,通过建造者模式创建 val spark: SparkSession = SparkSession .builder() .appName(this.getClass.getSimpleName.stripSuffix("$")) .master("local[3]") // 底层实现:单例模式,创建SparkContext对象 .getOrCreate() import spark.implicits._ // TODO: 从LocalFS上读取json格式数据(压缩) val jsonDF: DataFrame = spark.read.json("datas/json/2015-03-01-11.json.gz") jsonDF.printSchema() jsonDF.show(10, truncate = true)
- 2)、使用textFile加载数据,对每条JSON格式字符串数据,使用SparkSQL函数库functions中自
带get_json_obejct函数提取字段:id、type、public和created_at的值。
- 函数:get_json_obejct使用说明
- 核心代码
val githubDS: Dataset[String] = spark.read.textFile("datas/json/2015-03-01-11.json.gz") githubDS.printSchema() // value 字段名称,类型就是String githubDS.show(1) // TODO:使用SparkSQL自带函数,针对JSON格式数据解析的函数 import org.apache.spark.sql.functions._ // 获取如下四个字段的值:id、type、public和created_at val gitDF: DataFrame = githubDS.select( get_json_object($"value", "$.id").as("id"), get_json_object($"value", "$.type").as("type"), get_json_object($"value", "$.public").as("public"), get_json_object($"value", "$.created_at").as("created_at") ) gitDF.printSchema() gitDF.show(10, truncate = false)
运行结果:
范例演示完整代码:
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} /** * SparkSQL读取JSON格式文本数据 */ object SparkSQLJson { def main(args: Array[String]): Unit = { // 构建SparkSession实例对象,通过建造者模式创建 val spark: SparkSession = SparkSession .builder() .appName(this.getClass.getSimpleName.stripSuffix("$")) .master("local[3]") // 底层实现:单例模式,创建SparkContext对象 .getOrCreate() import spark.implicits._ // TODO: 从LocalFS上读取json格式数据(压缩) val jsonDF: DataFrame = spark.read.json("datas/json/2015-03-01-11.json.gz") jsonDF.printSchema() jsonDF.show(10, truncate = true) println("===================================================") val githubDS: Dataset[String] = spark.read.textFile("datas/json/2015-03-01-11.json.gz") githubDS.printSchema() // value 字段名称,类型就是String githubDS.show(1) // TODO:使用SparkSQL自带函数,针对JSON格式数据解析的函数 import org.apache.spark.sql.functions._ // 获取如下四个字段的值:id、type、public和created_at val gitDF: DataFrame = githubDS.select( get_json_object($"value", "$.id").as("id"), get_json_object($"value", "$.type").as("type"), get_json_object($"value", "$.public").as("public"), get_json_object($"value", "$.created_at").as("created_at") ) gitDF.printSchema() gitDF.show(10, truncate = false) // 应用结束,关闭资源 spark.stop() } }
6 csv 数据
在机器学习中,常常使用的数据存储在csv/tsv文件格式中,所以SparkSQL中也支持直接读取格式数据,从2.0版本开始内置数据源。关于CSV/TSV格式数据说明:
SparkSQL中读取CSV格式数据,可以设置一些选项,重点选项:
- 1)、分隔符:sep
- 默认值为逗号,必须单个字符
- 2)、数据文件首行是否是列名称:header
- 默认值为false,如果数据文件首行是列名称,设置为true
- 3)、是否自动推断每个列的数据类型:inferSchema
- 默认值为false,可以设置为true
官方提供案例:
当读取CSV/TSV格式数据文件首行是否是列名称,读取数据方式(参数设置)不一样的 。
- 第一点:首行是列的名称,如下方式读取数据文件
// TODO: 读取TSV格式数据 val ratingsDF: DataFrame = spark.read // 设置每行数据各个字段之间的分隔符, 默认值为 逗号 .option("sep", "\t") // 设置数据文件首行为列名称,默认值为 false .option("header", "true") // 自动推荐数据类型,默认值为false .option("inferSchema", "true") // 指定文件的路径 .csv("datas/ml-100k/u.dat") ratingsDF.printSchema() ratingsDF.show(10, truncate = false)
- 第二点:首行不是列的名称,如下方式读取数据(设置Schema信息)
// 定义Schema信息 val schema = StructType( StructField("user_id", IntegerType, nullable = true) :: StructField("movie_id", IntegerType, nullable = true) :: StructField("rating", DoubleType, nullable = true) :: StructField("timestamp", StringType, nullable = true) :: Nil ) // TODO: 读取TSV格式数据 val mlRatingsDF: DataFrame = spark.read // 设置每行数据各个字段之间的分隔符, 默认值为 逗号 .option("sep", "\t") // 指定Schema信息 .schema(schema) // 指定文件的路径 .csv("datas/ml-100k/u.data") mlRatingsDF.printSchema() mlRatingsDF.show(5, truncate = false)
将DataFrame数据保存至CSV格式文件,演示代码如下:
/** * 将电影评分数据保存为CSV格式数据 */ mlRatingsDF // 降低分区数,此处设置为1,将所有数据保存到一个文件中 .coalesce(1) .write // 设置保存模式,依据实际业务场景选择,此处为覆写 .mode(SaveMode.Overwrite) .option("sep", ",") // TODO: 建议设置首行为列名 .option("header", "true") .csv("datas/ml-csv-" + System.nanoTime())
范例演示完整代码SparkSQLCsv.scala如下:
import org.apache.spark.SparkContext import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} /** * SparkSQL 读取CSV/TSV格式数据: * i). 指定Schema信息 * ii). 是否有header设置 */ object SparkSQLCsv { def main(args: Array[String]): Unit = { // 构建SparkSession实例对象 val spark: SparkSession = SparkSession.builder() .appName(SparkSQLCsv.getClass.getSimpleName) .master("local[2]") .getOrCreate() import spark.implicits._ // 获取SparkContext实例对象 val sc: SparkContext = spark.sparkContext /** * 实际企业数据分析中 * csv\tsv格式数据,每个文件的第一行(head, 首行),字段的名称(列名) */ // TODO: 读取TSV格式数据 val ratingsDF: DataFrame = spark.read // 设置每行数据各个字段之间的分隔符, 默认值为 逗号 .option("sep", "\t") // 设置数据文件首行为列名称,默认值为 false .option("header", "true") // 自动推荐数据类型,默认值为false .option("inferSchema", "true") // 指定文件的路径 .csv("datas/ml-100k/u.dat") ratingsDF.printSchema() ratingsDF.show(10, truncate = false) println("=======================================================") // 定义Schema信息 val schema = StructType( StructField("user_id", IntegerType, nullable = true) :: StructField("movie_id", IntegerType, nullable = true) :: StructField("rating", DoubleType, nullable = true) :: StructField("timestamp", StringType, nullable = true) :: Nil ) // TODO: 读取TSV格式数据 val mlRatingsDF: DataFrame = spark.read // 设置每行数据各个字段之间的分隔符, 默认值为 逗号 .option("sep", "\t") // 指定Schema信息 .schema(schema) // 指定文件的路径 .csv("datas/ml-100k/u.data") mlRatingsDF.printSchema() mlRatingsDF.show(5, truncate = false) println("=======================================================") /** * 将电影评分数据保存为CSV格式数据 */ mlRatingsDF // 降低分区数,此处设置为1,将所有数据保存到一个文件中 .coalesce(1) .write // 设置保存模式,依据实际业务场景选择,此处为覆写 .mode(SaveMode.Overwrite) .option("sep", ",") // TODO: 建议设置首行为列名 .option("header", "true") .csv("datas/ml-csv-" + System.nanoTime()) // 关闭资源 spark.stop() } }