使用IntelliJ Idea开发Spark Streaming流应用程序

简介: 使用IntelliJ Idea开发Spark Streaming流应用程序

未经许可,禁止以任何形式转载,若要引用,请标注链接地址

全文共计2178字,阅读大概需要3分钟

一、实验目的

掌握IntelliJ Idea创建Spark Streaming流应用程序的过程。

 熟悉在spark上提交运行Spark Streaming作业的方式。


二、实验内容

1、使用IntelliJ Idea创建Spark Streaming流应用程序。

 2、打包Spark Streaming流应用程序并提交执行。


三、实验原理

Spark Streaming内部的基本工作原理如下:接收实时输入数据流,然后将数据拆分成多个batch,比如每收集1秒的数据封装为一个batch,然后将每个batch交给Spark的计算引擎进行处理,最后会生产出一个结果数据流,其中的数据,也是由一个一个的batch所组成的

bfd6a75a00ef4ef38094a70fcf1e72e3.png


四、实验环境

硬件:x86_64 ubuntu 16.04服务器

 软件:JDK1.8,Scala-2.11.8,Spark-2.3.2,Hadoop-2.7.3,IntelliJ Idea


五、实验步骤

5.1 启动IntelliJ Idea并创建spark项目

1、启动IntelliJ Idea。在终端窗口下,执行以下命令:

1.  $ cd /opt/idea-IC-191.7479.19/bin
2.  $ ./idea.sh

2、在idea中创建scala项目,并命名为”sparkstreaming”,其它都默认即可,然后点击”Finish”按钮。如下图所示:


b1272d4ce0844672a55144d9543adcd3.png

3、点击【File】菜单,选择【Project structure】选项,进入项目结构界面。如下图所示:

7e300a10338541b49ba0954e1e57366b.png


4、按图中所示依次选择,导入spark的jar包到项目中。如下图所示:


7dd698998ad742748b195fbe0c0371f6.png

5、jar包所在目录为”/opt/spark/jars/“,之后一直点击【OK】按钮即可导包成功。如下图所示:

0cebf7a59d3e4892a6bcf4e45a623b7b.png


6、查看成功导入的部分jar包。如下图所示:


5.2 编写spark代码

1、选中spark_project1项目的src目录右键依次选择【New】-【Scala Class】,创建WordCount类。如下图所示:

beb752d5b5cf430ba51771f7b34fa126.png


2、与上述方法一样,在弹出的对话框中命名为”WordCount”,并选择”Object”类型。如下图所示:

91e2e0e5048945a8b04a85589bdd29a6.png


图片10

 3、编写流程序代码,读取指定端口中的数据,对来自端口的数据进行单词统计。代码如下所示:


1.  import org.apache.spark.SparkConf
2.  import org.apache.spark.streaming.Seconds
3.  import org.apache.spark.streaming.StreamingContext
4.       
5.  object WordCount {
6.       
7.    def main(args: Array[String]): Unit = {
8.      val conf = new SparkConf()
9.          .setMaster("local[*]")
10.         .setAppName("WordCount")
11.      
12.     val ssc = new StreamingContext(conf, Seconds(20))
13.      
14.     val lines = ssc.socketTextStream("localhost", 9999)
15.     val words = lines.flatMap { _.split(" ") }   
16.     val pairs = words.map { word => (word, 1) }  
17.     val wordCounts = pairs.reduceByKey(_ + _)  
18.      
19.     wordCounts.print()  
20.      
21.     ssc.start()
22.     ssc.awaitTermination()
23.   }
24.      
25. }

5.3 打包程序

1、点击【File】菜单,选择【Project structure】选项,进入项目结构界面。如下图所示:

a26ed3bb16864bd7a7f080f2b6e6c4d4.png


2、按图中选择依次点击进行打包。如下图所示:

771fae7babc449209f87566e3763d803.png


3、弹出对话框,Main Class框中选择 WordCount,其它保持默认即可,点击【OK】。如下图所示:

20e77a0b69614763afab54868c0c473e.png


4、返回到项目结构界面,把项目依赖的所有jar包都删除,只导出类文件,点击【OK】按钮。如下图所示:

442953958d34440c8a9ae4d7a9b6fa02.png


5、点击【Build】菜单下的Build Artifacts选项进行编译。如下图所示:


bf43a8d6325d4fea95f7521ad184c031.png

6、按图中所示选择即开始编译。如下图所示:

65665aa7d3c5405d8401638995199422.png


5.4 集群运行jar包

1、启动Spark集群。在终端窗口下,执行以下命令:

1.  $ cd /opt/spark
2.  $ ./sbin/start-all.sh

2、启动nc服务器。在终端窗口下,执行以下命令:

1.  $ nc -lp 9999

3、另打开一个终端窗口,执行如下命令,提交jar包到spark中运行程序:

1.  $ spark-submit --class WordCount /root/IdeaProjects/sparkstreaming/out/artifacts/sparkstreaming_jar/sparkstreaming.jar

执行过程如下图所示:

b3e2ac37afba4882a2283ceef1d2b6d1.png


4、切换到nc服务器所在终端窗口,输入以下内容:

1.  hello sparkstreaming
2.  hello scala

5、切换到Spark流程序提交窗口。在终端中可以看出单词统计的输出结果。如下图所示:

c22a4bf4f4ad41fe8e68cf5c31f71a31.png


— END —


相关文章
|
30天前
|
SQL 分布式计算 监控
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
本文演示了使用 EMR Serverless Spark 产品搭建一个日志分析应用的全流程,包括数据开发和生产调度以及交互式查询等场景。
56441 7
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
|
11天前
|
分布式计算 Java Scala
如何处理 Spark Streaming 的异常情况?
【6月更文挑战第16天】如何处理 Spark Streaming 的异常情况?
22 4
|
14天前
|
分布式计算 大数据 数据处理
Apache Spark在大数据处理中的应用
Apache Spark是大数据处理的热门工具,由AMPLab开发并捐赠给Apache软件基金会。它以内存计算和优化的执行引擎著称,提供比Hadoop更快的处理速度,支持批处理、交互式查询、流处理和机器学习。Spark架构包括Driver、Master、Worker Node和Executor,核心组件有RDD、DataFrame、Dataset、Spark SQL、Spark Streaming、MLlib和GraphX。文章通过代码示例展示了Spark在批处理、交互式查询和实时数据处理中的应用,并讨论了其优势(高性能、易用性、通用性和集成性)和挑战。【6月更文挑战第11天】
44 6
|
20天前
|
IDE Java 项目管理
Java入门——Intellij IDEA简介、使用IDEA开发程序、IDEA常用快捷键、IDEA其他操作
Java入门——Intellij IDEA简介、使用IDEA开发程序、IDEA常用快捷键、IDEA其他操作
20 3
|
25天前
|
分布式计算 Spark 大数据
深入探究Apache Spark在大数据处理中的实践应用
【6月更文挑战第2天】Apache Spark是流行的开源大数据处理框架,以其内存计算速度和低延迟脱颖而出。本文涵盖Spark概述、核心组件(包括Spark Core、SQL、Streaming和MLlib)及其在数据预处理、批处理分析、交互式查询、实时处理和机器学习中的应用。通过理解Spark内部机制和实践应用,可提升大数据处理效率,发挥其在各行业的潜力。
|
6天前
|
分布式计算 资源调度 Java
Scala+Spark+Hadoop+IDEA实现WordCount单词计数,上传并执行任务(简单实例-下)
Scala+Spark+Hadoop+IDEA实现WordCount单词计数,上传并执行任务(简单实例-下)
14 0
|
6天前
|
分布式计算 Hadoop Scala
Scala +Spark+Hadoop+Zookeeper+IDEA实现WordCount单词计数(简单实例-上)
Scala +Spark+Hadoop+Zookeeper+IDEA实现WordCount单词计数(简单实例-上)
9 0
|
6天前
|
Java Scala Maven
Intellij IDEA+Maven+Scala第一个程序
Intellij IDEA+Maven+Scala第一个程序
14 0
|
1月前
|
机器学习/深度学习 分布式计算 数据处理
在Python中应用Spark框架
在Python中应用Spark框架
25 1
|
1月前
|
流计算
实时计算 Flink版操作报错之程序在idea跑没问题,打包在服务器跑就一直报错,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。