Structured_Source_HDFS_Spark 代码 | 学习笔记

简介: 快速学习 Structured_Source_HDFS_Spark 代码

开发者学堂课程【大数据Spark2020版(知识精讲与实战演练)第五阶段:Structured_Source_HDFS_Spark 代码】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/692/detail/12150


Structured_Source_HDFS_Spark 代码

内容介绍:

一、流式计算统计 HDFS 上的小文件

二、总结

 

一、流式计算统计 HDFS 上的小文件

1.目标与步骤

(1)目标

通过本次学习,可以了解到如何使用 Structured Streaming 读取 HDFS 中的文件,并以  JSON 的形式解析

这些文件变成一些流,对这些流进行处理,文件中的内容是  JSON 的形式。

(2)步骤

①创建文件

②编写代码

2.代码

val spark = Sparksession.builder()

.appName("hdfs_source")

master("local[6]")

.getorcreate()

spark.sparkContext.setLogLevel("WARN"")

val userschema = new structType()

.add("name", "string")

.add( "age", "integer")

val source = spark

.readStream

.schema(userschema)

.json("hdfs://node01:8020/dataset/dataset" )

val result = source.distinct()

result.writestream

.outputMode(outputMode.update( )).format("console")

.start()

.awaitTermination()

创建新的文件,New——Scala Class,命名 HDFSSource ,关掉其他文件,将 class 改为 object ,编写 main 。

object HDFSSource {

def main(args: Array[string]): unit = {

def main(args: Array[String]: Unit={

//1.创建 SparkSession

val spark = SparkSession.builder()

.appName(name="hdfs_source"")

.master( master = "loca1[6]"")

.getorcreate()

//2.数据读取,目录只能是文件夹,不能是某一个文件

使用流的方式读取文件,假定的场景为不断生成新的,只读取一个是错误的

(数据读取,静态的使用read 读取数据集,动态使用 readstream )

val schema = new StructType()

.add( name = "name" ,dataType = "string") //添加列 name,string 类型

.add( name = "age",dataType = "integer")

val source = spark.readstream

.schema(schema)

.json( path = "hdfs:// nodee1:8020/dataset/dataset")

//3.输出结果

source.writeStream

.outputMode(outputlode.Append( ))// outputMode,如何输出结果,Append 不进行中间状态的处理

.format( source = "console")

.start()

. awaitTermination(

}

运行之前,删掉目录,在处理之前到浏览器查看是否删掉目录

结果重复要进行去重

运行过程中出现一下错误

Exception in thread "main" java.io.IOException: (null) entry in command string: null chmod,.0644

说明 hadoop 失配问题

需要

object HDFSSource i

def main(args: Array[string]): Unit = i

System.setProperty("hadoop.home .ir",C:\\winuti1")

加入System.setProperty("hadoop.home .ir",C: \\winuti1")

读取成功,会显示结果,可以针对结果进行去重

随着文件的产生,不断接收新的数据,流式计算就是不断处理数据的过程

 

二、总结

以流的形式读取基个 HDFS 目录的代码为

val source = spark

.readstream

.schema(userschema)

.json( "hdfs : / / node01 :8020/dataset/dataset")

①指明读取的是一个流式的 Dataset

②指定读取到的数据的 Schema

③指定目录位置,以及数据格式

相关文章
|
6月前
|
分布式计算 API Spark
Spark学习--day05、SparkCore电商网站实操、SparkCore-工程代码
Spark学习--day05、SparkCore电商网站实操、SparkCore-工程代码
122 11
|
存储 分布式计算 负载均衡
Hadoop学习笔记(二)之HDFS
Hadoop学习笔记(二)之HDFS
|
19天前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
1月前
|
存储 缓存 分布式计算
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
42 4
|
1月前
|
分布式计算 资源调度 Hadoop
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
Hadoop-10-HDFS集群 Java实现MapReduce WordCount计算 Hadoop序列化 编写Mapper和Reducer和Driver 附带POM 详细代码 图文等内容
92 3
|
1月前
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(二)
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(二)
41 3
|
1月前
|
分布式计算 Java Hadoop
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(一)
Hadoop-09-HDFS集群 JavaClient 代码上手实战!详细附代码 安装依赖 上传下载文件 扫描列表 PUT GET 进度条显示(一)
40 2
|
1月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
55 0
|
5月前
|
机器学习/深度学习 分布式计算 API
技术好文:Spark机器学习笔记一
技术好文:Spark机器学习笔记一
40 0
|
6月前
|
存储 机器学习/深度学习 分布式计算
Hadoop学习笔记(HDP)-Part.12 安装HDFS
01 关于HDP 02 核心组件原理 03 资源规划 04 基础环境配置 05 Yum源配置 06 安装OracleJDK 07 安装MySQL 08 部署Ambari集群 09 安装OpenLDAP 10 创建集群 11 安装Kerberos 12 安装HDFS 13 安装Ranger 14 安装YARN+MR 15 安装HIVE 16 安装HBase 17 安装Spark2 18 安装Flink 19 安装Kafka 20 安装Flume
180 0
Hadoop学习笔记(HDP)-Part.12 安装HDFS