Spark Streaming 快速入门(实操)

简介: Spark Streaming 快速入门(实操)

0x00 教程内容


  1. Spark Streaming 编程
  2. 校验结果并删除测试数据

本案例是在官方文档上做了一定的修改,实现的过程非常简单:开启一个 Spark Streaming 应用,实时接受 TCP Socket 传过来的数据,并进行统计。


0x01 Spark Streaming 编程


Spark Streaming 上构建应用与 Spark 相似,都要先创建 Context 对象,并对抽象数据对象进行操作,Streaming 中处理的数据对象是 DStream。

1. 启动Spark Shell
spark-shell


2. 创建 StreamingContext 对象
// 引入Spark Streaming中的StreamingContext模块
 import org.apache.spark._
// 或者直接执行下面这行
 import org.apache.spark.streaming._
// 注:下面这一项在Spark 1.3及其之后的版本中不是必需的
// import org.apache.spark.streaming.StreamingContext._
// Spark Shell 会默认创建好 Spark Context 对象,所以下面这句我注释掉了,如果是在编辑器中编写代码,需根据实际情况加上并修改。
// 创建本地的SparkContext对象,包含2个执行线程,APP名字命名为StreamingWordCount
// val conf = new SparkConf().setMaster("local[2]").setAppName("StreamingWordCount")
// 创建的 Spark Context 对象为 sc ,我们需要创建本地的StreamingContext对象,第二个参数为处理的时间片间隔时间,设置为1秒
val ssc = new StreamingContext(sc, Seconds(1))


此处需要注意的是,我们创建 StreamingContext 对象,需要用到 SparkContext 对象(sc)和处理数据所需要设置的时间间隔。


回顾 RDD 的创建过程,我们是直接调用sc 相应的办法就行了。


val textFileRDD = sc.textFile("/home/hadoop-sny/datas/word.txt")


3. 创建 DStream 对象

因为我们是接受 TCP 的数据,所以我们要确认是哪台机器发过来的,而且需要指定端口号。我们使用的是 StreamingContext 对象socketTextStream()函数:


// 创建DStream,指明数据源为socket:来自localhost本地主机的9999端口
val lines = ssc.socketTextStream("localhost", 9999)


image.png


数据源可以有很多,原理都类似,我们可以调用不同的创建函数去连接 KafkaFlumeHDFS/S3KinesisTwitter 等数据源。

4. 对 DStream 进行操作

我们需要对接收到的数据进行处理,处理的方式与我们前面教程是一样的。


// 使用flatMap和Split对这1秒内所收到的字符串进行分割
val words = lines.flatMap(_.split(" "))


image.png


切割后会得到一系列的单词,然后再进行下面的操作:

// map操作将独立的单词映射成(word,1)元组
val pairs = words.map(word => (word, 1))
// 对统计结果进行相加,得到(单词,词频)元组
val wordCounts = pairs.reduceByKey(_ + _)


image.png


5. 输出数据

我们可以将结果输出到一个文件中:

// 输出文件夹的前缀,Spark Streaming会自动使用当前时间戳来生成不同的文件夹名称
val outputFile = "/tmp/test-streaming"
// 将结果输出
wordCounts.saveAsTextFiles(outputFile)


image.png


除了输出成文件,其实还有很多种方式,上一篇已经提到过。


实现了流数据的处理逻辑后,我们还需要让 Spark Streaming 运行起来,所以需要执行 Spark Streaming 启动的流程,调用的是StreamingContext 对象的start() 函数和awaitTermination() 函数,分别表示启动信号和等待处理结束的信号。


// 启动Spark Streaming应用
ssc.start()
// 等待计算终止
ssc.awaitTermination()


TODO:经过测试,不执行 ssc.awaitTermination() 也不影响,但是编程的时候是必须的。

执行 ssc.start() 步骤的过程中会报错,显示拒绝连接,无法进行后续实验 :


image.png


其实,当输入ssc.start()以后,Spark Streaming程序便启动起来,但是没有启动数据来源,所以显示连接不上,此时需要启动 NetCat 来向特定端口发送数据。

打开一个新的终端2,执行:

nc -l -p 9999


说明:

-l 参数表示创建一个监听端口

-p 参数表示当前连接结束后仍然保持监听,必须与 -l 参数同时使用。

发现还没安装:

image.png



所以可以通过YUM方式安装一下:

yum install -y nc

image.png


安装好后,再执行下面指令:

nc -l -p 9999


启动之后会处于待输入状态(阻塞状态):

image.png


此时返回终端1,发现不再报错,而是处于计算状态:

image.png


其实,此时已经在/tmp路径生成了很多文件了,并且会不断生成:

image.png


在终端2输入下面文本,按回车:

hello shao
hello shao
i love naiyi
teacher shao
hello naiyi


image.png


返回终端1查看:

image.png


此时就已经在计算了,终端会每隔一秒钟统计一次。如果需要更加准确地观察,可以调大时间间隔,方便自己操作。


0x02 校验结果并删除测试数据


1. 查看统计结果

按Ctrl+Z 停掉终端1的应用后,输入下面的命令看输出的文件:

ls /tmp

发现生成了很多文件:


image.png


cat /tmp/test-streaming*/*


发现生成了很多文件:

image.png


其实生成的文件夹中很多都是没有数据的,使用下面的命令可以过滤出有内容的文件夹,没有数据输出的文件夹的结果为 12 K:

sudo du -sh /tmp/test-str*


image.png


所以可以过滤掉12k大小的:

sudo du -sh /tmp/test-str* | grep -v '12K'


image.png


查看有数据的文件夹,则可以查看到 1 秒中获取到的数据流中所统计的词频结果(需改成自己的文件名):

cat /tmp/test-streaming-1598929305000/*
cat /tmp/test-streaming-1598929355000/*


image.png


2. 删除测试数据
rm -rf /tmp/test-streaming*


0xFF 总结


  1. 本章节为 Spark Streaming 的实操教程,理论教程请参考链接:Spark Streaming 快速入门(理论)
  2. 关注本博客,了解更多大数据知识。点赞、评论、关注,谢谢!
相关文章
|
2月前
|
分布式计算 API Spark
Spark学习--day05、SparkCore电商网站实操、SparkCore-工程代码
Spark学习--day05、SparkCore电商网站实操、SparkCore-工程代码
85 11
|
19天前
|
分布式计算 Java Scala
如何处理 Spark Streaming 的异常情况?
【6月更文挑战第16天】如何处理 Spark Streaming 的异常情况?
106 56
|
2月前
|
分布式计算 关系型数据库 MySQL
Spark编程实验四:Spark Streaming编程
Spark编程实验四:Spark Streaming编程
66 2
|
2月前
|
存储 消息中间件 分布式计算
Spark Streaming
Spark Streaming
38 1
|
2月前
|
分布式计算 大数据 数据处理
【Flink】Flink跟Spark Streaming的区别?
【4月更文挑战第17天】【Flink】Flink跟Spark Streaming的区别?
|
2月前
|
存储 分布式计算 Spark
实战|使用Spark Streaming写入Hudi
实战|使用Spark Streaming写入Hudi
147 0
|
2月前
|
分布式计算 监控 数据处理
Spark Streaming的容错性与高可用性
Spark Streaming的容错性与高可用性
|
2月前
|
分布式计算 数据处理 Apache
Spark Streaming与数据源连接:Kinesis、Flume等
Spark Streaming与数据源连接:Kinesis、Flume等
|
2月前
|
消息中间件 分布式计算 Kafka
使用Kafka与Spark Streaming进行流数据集成
使用Kafka与Spark Streaming进行流数据集成
|
2月前
|
分布式计算 监控 数据处理
Spark Streaming的DStream与窗口操作
Spark Streaming的DStream与窗口操作