开发者学堂课程【大数据Spark2020版(知识精讲与实战演练)第五阶段:Structured_Source_HDFS_Spark 代码】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/692/detail/12150
Structured_Source_HDFS_Spark 代码
内容介绍:
一、流式计算统计 HDFS 上的小文件
二、总结
一、流式计算统计 HDFS 上的小文件
1.目标与步骤
(1)目标
通过本次学习,可以了解到如何使用 Structured Streaming 读取 HDFS 中的文件,并以 JSON 的形式解析
这些文件变成一些流,对这些流进行处理,文件中的内容是 JSON 的形式。
(2)步骤
①创建文件
②编写代码
2.代码
val spark = Sparksession.builder()
.appName("hdfs_source")
master("local[6]")
.getorcreate()
spark.sparkContext.setLogLevel("WARN"")
val userschema = new structType()
.add("name", "string")
.add( "age", "integer")
val source = spark
.readStream
.schema(userschema)
.json("hdfs://node01:8020/dataset/dataset" )
val result = source.distinct()
result.writestream
.outputMode(outputMode.update( )).format("console")
.start()
.awaitTermination()
创建新的文件,New——Scala Class,命名 HDFSSource ,关掉其他文件,将 class 改为 object ,编写 main 。
object HDFSSource {
def main(args: Array[string]): unit = {
def main(args: Array[String]: Unit={
//1.创建 SparkSession
val spark = SparkSession.builder()
.appName(name="hdfs_source"")
.master( master = "loca1[6]"")
.getorcreate()
//2.数据读取,目录只能是文件夹,不能是某一个文件
使用流的方式读取文件,假定的场景为不断生成新的,只读取一个是错误的
(数据读取,静态的使用read 读取数据集,动态使用 readstream )
val schema = new StructType()
.add( name = "name" ,dataType = "string")
//添加列 name,string 类型
.add( name = "age",dataType = "integer")
val source = spark.readstream
.schema(schema)
.json( path = "hdfs:// nodee1:8020/dataset/dataset")
//3.输出结果
source.writeStream
.outputMode(outputlode.Append( ))
// outputMode,如何输出结果,Append 不进行中间状态的处理
.format( source = "console")
.start()
. awaitTermination(
}
运行之前,删掉目录,在处理之前到浏览器查看是否删掉目录
结果重复要进行去重
运行过程中出现一下错误
Exception in thread "main" java.io.IOException: (null) entry in command string: null chmod,.0644
说明 hadoop 失配问题
需要
object HDFSSource i
def main(args: Array[string]): Unit = i
System.setProperty("hadoop.home .ir",C:\\winuti1")
加入System.setProperty("hadoop.home .ir",C: \\winuti1")
读取成功,会显示结果,可以针对结果进行去重
随着文件的产生,不断接收新的数据,流式计算就是不断处理数据的过程
二、总结
以流的形式读取基个 HDFS 目录的代码为
val source = spark
.readstream
①
.schema(userschema)
②
.json( "hdfs : / / node01 :8020/dataset/dataset")
③
①指明读取的是一个流式的 Dataset
②指定读取到的数据的 Schema
③指定目录位置,以及数据格式