Structured_Source_HDFS_Spark 代码 | 学习笔记

简介: 快速学习 Structured_Source_HDFS_Spark 代码

开发者学堂课程【大数据Spark2020版(知识精讲与实战演练)第五阶段:Structured_Source_HDFS_Spark 代码】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/692/detail/12150


Structured_Source_HDFS_Spark 代码

内容介绍:

一、流式计算统计 HDFS 上的小文件

二、总结

 

一、流式计算统计 HDFS 上的小文件

1.目标与步骤

(1)目标

通过本次学习,可以了解到如何使用 Structured Streaming 读取 HDFS 中的文件,并以  JSON 的形式解析

这些文件变成一些流,对这些流进行处理,文件中的内容是  JSON 的形式。

(2)步骤

①创建文件

②编写代码

2.代码

val spark = Sparksession.builder()

.appName("hdfs_source")

master("local[6]")

.getorcreate()

spark.sparkContext.setLogLevel("WARN"")

val userschema = new structType()

.add("name", "string")

.add( "age", "integer")

val source = spark

.readStream

.schema(userschema)

.json("hdfs://node01:8020/dataset/dataset" )

val result = source.distinct()

result.writestream

.outputMode(outputMode.update( )).format("console")

.start()

.awaitTermination()

创建新的文件,New——Scala Class,命名 HDFSSource ,关掉其他文件,将 class 改为 object ,编写 main 。

object HDFSSource {

def main(args: Array[string]): unit = {

def main(args: Array[String]: Unit={

//1.创建 SparkSession

val spark = SparkSession.builder()

.appName(name="hdfs_source"")

.master( master = "loca1[6]"")

.getorcreate()

//2.数据读取,目录只能是文件夹,不能是某一个文件

使用流的方式读取文件,假定的场景为不断生成新的,只读取一个是错误的

(数据读取,静态的使用read 读取数据集,动态使用 readstream )

val schema = new StructType()

.add( name = "name" ,dataType = "string") //添加列 name,string 类型

.add( name = "age",dataType = "integer")

val source = spark.readstream

.schema(schema)

.json( path = "hdfs:// nodee1:8020/dataset/dataset")

//3.输出结果

source.writeStream

.outputMode(outputlode.Append( ))// outputMode,如何输出结果,Append 不进行中间状态的处理

.format( source = "console")

.start()

. awaitTermination(

}

运行之前,删掉目录,在处理之前到浏览器查看是否删掉目录

结果重复要进行去重

运行过程中出现一下错误

Exception in thread "main" java.io.IOException: (null) entry in command string: null chmod,.0644

说明 hadoop 失配问题

需要

object HDFSSource i

def main(args: Array[string]): Unit = i

System.setProperty("hadoop.home .ir",C:\\winuti1")

加入System.setProperty("hadoop.home .ir",C: \\winuti1")

读取成功,会显示结果,可以针对结果进行去重

随着文件的产生,不断接收新的数据,流式计算就是不断处理数据的过程

 

二、总结

以流的形式读取基个 HDFS 目录的代码为

val source = spark

.readstream

.schema(userschema)

.json( "hdfs : / / node01 :8020/dataset/dataset")

①指明读取的是一个流式的 Dataset

②指定读取到的数据的 Schema

③指定目录位置,以及数据格式

相关文章
|
22天前
|
分布式计算 API Spark
Spark学习--day05、SparkCore电商网站实操、SparkCore-工程代码
Spark学习--day05、SparkCore电商网站实操、SparkCore-工程代码
74 11
|
9月前
|
存储 分布式计算 负载均衡
Hadoop学习笔记(二)之HDFS
Hadoop学习笔记(二)之HDFS
|
22天前
|
分布式计算 大数据 Linux
Python大数据之PySpark(三)使用Python语言开发Spark程序代码
Python大数据之PySpark(三)使用Python语言开发Spark程序代码
129 0
|
22天前
|
存储 SQL 分布式计算
Hadoop(HDFS+MapReduce+Hive+数仓基础概念)学习笔记(自用)
Hadoop(HDFS+MapReduce+Hive+数仓基础概念)学习笔记(自用)
314 0
|
22天前
|
存储 机器学习/深度学习 分布式计算
Hadoop学习笔记(HDP)-Part.12 安装HDFS
01 关于HDP 02 核心组件原理 03 资源规划 04 基础环境配置 05 Yum源配置 06 安装OracleJDK 07 安装MySQL 08 部署Ambari集群 09 安装OpenLDAP 10 创建集群 11 安装Kerberos 12 安装HDFS 13 安装Ranger 14 安装YARN+MR 15 安装HIVE 16 安装HBase 17 安装Spark2 18 安装Flink 19 安装Kafka 20 安装Flume
77 0
Hadoop学习笔记(HDP)-Part.12 安装HDFS
|
SQL 分布式计算 Java
Spark入门以及wordcount案例代码
Spark入门以及wordcount案例代码
|
存储 缓存 分布式计算
HDFS(二)|学习笔记
快速学习 HDFS(二)
137 0
HDFS(二)|学习笔记
|
SQL JSON 负载均衡
离线同步 mysql 数据到 HDFS2 | 学习笔记
快速学习离线同步 mysql 数据到 HDFS2
169 0
离线同步 mysql 数据到 HDFS2  |  学习笔记
|
分布式计算 大数据 Spark
高级特性_闭包_Spark 闭包分发 | 学习笔记
快速学习 高级特性_闭包_Spark 闭包分发
62 0
高级特性_闭包_Spark 闭包分发 | 学习笔记
|
分布式计算 大数据 Spark
Spark 原理_总体介绍_物理执行图 | 学习笔记
快速学习 Spark 原理_总体介绍_物理执行图
89 0
Spark 原理_总体介绍_物理执行图 | 学习笔记