Structured_案例_代码编写 | 学习笔记

简介: 快速学习 Structured_案例_代码编写

开发者学堂课程【大数据Spark2020版(知识精讲与实战演练)第五阶段:Structured_案例_代码编写】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/692/detail/12144


Structured_案例_代码编写

内容介绍

一. 目标和过程

二. 代码编写

 

一. 目标和过程

目标:

实现 Structured Streaming 部分的代码编写

步骤:

1.创建文件

2.创建 SparkSession

3.读取 Socket 数据生成 DataFrame

4.将 DataFrame 转为 Dataset, 使用有类型的 API 处理词频统计

5.生成结果集并写入控制台

第二步创建 SparkSession 和第三步读取 Socket 数据生成 DataFrame 都是为了数据的读取。第四步为了进行数据的处理,第四步来进行数据的落地。

这三大步也是以往在编写 structured streaming 的三大步骤。

第一步读取读数据,然后第二步处理,第三步落地。

 

二.代码编写

先进入到的 idea 当中,去创建一个新的包,区别于 Spark streaming 的包。然后包名叫做 cn.itcast.structured 。

然后创建文件,文件就命名为叫做 socket word count 。

然后接下来类创建出来以后把 class 改为 object

改为 object ,大家一定能猜到其实是要写 main 方法的把 main 方法给它表示出来。

image.png

代码编写步骤:

第一步肯定还是要去创建 Spark session 创建 Spark position

第二步要进行数据的读取,数据集的生成,生成数据读取。

第三步要去进行数据的处理。

第四步生成结果集的生成和输出。这就是的四大步骤。

但是在这之前,工程当中其实还没有导入 structure streaming 相关的 API ,要进行一个简单的导入,就把 pom 打开。

structure streaming 不需要导太多的东西,只需要导一个依赖,就是 dependency

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_2.11</artifactId>

<version>2.2.0</version> 

接下来就已经导入完成了,它会进行相应的加载。等它加载完以后,就回到 socket word count,

创建 Spark session,代码如下所示

def main(args: Array[string]): Unit = {

//1.创建 Sparksession

val spark = Sparksession.builder()

.master(master = "local[6]")

.appName(name = "socketstructured")

·getOrCreate()

设置一个 master 类似于的 Spark streaming 地方,不能给 1,至少要大于 1。

进行数据集的生成。之前编写 Spark SQL 程序的时候是如何生成第一个 data frame ,使用 Spark 来进行 read ,但是读取批量的数据集的时候使用 read ,但是读取流的时候使用 read stream 这点需要注意。

然后要去 format  format 就是要去告诉他现在要读取的是 socket ,那读取到 socket 以后,去 option 指定的 host 是对应的还是要去看一下的 IP 的。

那就打开的 shell 窗口,输入 if config

image.png

然后来去看一下的 IP 是 192168169101,

编写代码如下:

val source: DataFrame = spark.readStream

.format(source = "socket")

.option("host",“192.168.169.101")

.option("port",9999)

.load()

时候大家能注意到是不是读取的就是一个 source 那它是什么样的一个数据类型非常简单,它就是一个 data frame 大家注意到了吧,也就是说除了和批处理不一样的地方就是批处理的时候使用 read ,而流式处理的时候,使用 read stream 就这点区别。要进行词频统计还是使用有类型的 API 会比较舒服。所以应该把 source 转为一个 data set 。

编写如下代码:

val sourceDs = source.as[string](...)

为其赋予一个类型就可以生成一个 source DS 大家能注意到变成了一个 data set string 如下:

val sourceDS: Dataset[string] = source.as[String](...)

但是后面它要求传入一个 encoder

但没有,要导入一个依赖,import Spark 点隐式转换点下划线,把所有的隐式转换导进来,这时拥有了 Encoder

代码如下:

import spark.implicits._

在数据处理之前,就顺手再做一件事情,要再去设置一下的日志级别,否则每一个批次打出来一堆日志,也会影响查看,

代码如下:

spark.sparkContext.setLogLevel(“WARN")

数据处理

编写代码如下:

sourceDS.flatMap(_.split( regex = "“))

.map((_,1))

·groupByKey(_·_1)

.count()

这里注意:处理的方式和 RDD 会有很显著的区别。 RDD 当中是使用 reduce by key 而这使用 group by key 并且不需要去指定怎么计算,直接通过一个方法就可以确定计算方式。

然后会生成一个新的一个 data site 命名为 words,

代码如下:

val words = sourceDsflatMap(_split(regex=““))

结果输出:

首先平常在进行批数据处理的时候,是不是使用  write  输出结果,但是是流,就要使用 words.writeStream 。

words.writeStream

.outputMode(OutputMode.Complete())

.format(source = “console")

.start()

.awaitTermination()

主体代码如下:

//1.创建 SparkSession

def main(args: Array[string]): Unit = {

//1.创建 Sparksession

val spark = Sparksession.builder()

.master(master = "local[6]")

.appName(name = "socketstructured")

·getOrCreate()

//2. 数据集的生成,数据读取

val source: DataFrame = spark.readStream

.format(source = "socket")

.option("host",“192.168.169.101")

.option("port",9999)

.load()

val sourceDS: Dataset[string] = source.as[String](...)

// 3. 数据的处理

sourceDS.flatMap(_.split( regex = "“))

.map((_,1))

·groupByKey(_·_1)

.count()

//4、结果集的生成和输出

words.writeStream

.outputMode(OutputMode.Complete())

.format(source = “console")

.start()

.awaitTermination()

相关文章
|
6月前
|
机器学习/深度学习 人工智能 JSON
Prompt进阶系列1:LangGPT(从编程语言反思LLM的结构化可复用提示设计框架)
Prompt进阶系列1:LangGPT(从编程语言反思LLM的结构化可复用提示设计框架)
Prompt进阶系列1:LangGPT(从编程语言反思LLM的结构化可复用提示设计框架)
|
6月前
|
SQL 数据库
数据库SQL语言实战(六)
本次实战的重点就在于对表格本身的一些处理,包括复制表格、修改表格结构、修改表格数据
|
6月前
|
SQL Oracle 关系型数据库
数据库SQL语言实战(三)
本篇文章重点在于SQL中的各种删除操作
|
6月前
|
SQL 数据库
数据库SQL语言实战(一)
数据库SQL语言实战(一)
|
6月前
|
SQL 数据库
数据库SQL语言实战(二)
数据库SQL语言实战(二)
|
数据采集 存储 SQL
ETL的基础知识,看完你就全明白了!
随着企业的发展,各业务线、产品线、部门都会承建各种信息化系统方便开展自己的业务。
1865 0
ETL的基础知识,看完你就全明白了!
|
SQL Web App开发 流计算
Flink入坑指南第五章 - 语法糖 view
Flink入坑指南系列文章,从实际例子入手,一步步引导用户零基础入门实时计算/Flink,并成长为使用Flink的高阶用户。本文属个人原创,仅做技术交流之用,笔者才疏学浅,如有错误,欢迎指正。 什么是view(视图):视图无非就是存储在数据库中并具有名字的 SQL 语句,或者说是以预定义的 SQL 查询的形式存在的数据表的成分。
3848 0
|
分布式计算 Hadoop 大数据
Structured_案例_运行和总结 | 学习笔记
快速学习 Structured_案例_运行和总结
Structured_案例_运行和总结 | 学习笔记
|
分布式计算 Hadoop 大数据
Structured_案例_介绍 | 学习笔记
快速学习 Structured_案例_介绍
Structured_案例_介绍 | 学习笔记
|
分布式计算 数据挖掘 大数据
Spark 入门_代码编写方式|学习笔记
快速学习 Spark 入门_代码编写方式
Spark 入门_代码编写方式|学习笔记