Spark入门以及wordcount案例代码

简介: Spark入门以及wordcount案例代码

文章目录

  • spark概念

Apache Spark是用于大规模数据处理的统一分析引擎。它提供了Java、Scala、Python和R语言的高级api,以及一个支持通用执行图的优化引擎。它还支持一组丰富的高级工具,包括用于SQL和结构化数据处理的Spark SQL,用于pandas工作负载的Spark pandas API,用于机器学习的MLlib,用于图形处理的GraphX,以及用于增量计算和流处理的结构化流。

概念:

the Resilient Distributed Dataset(RDD):Spark基础- RDD(核心但旧的API),累加器和广播变量

Spark SQL、数据集(dataset)和数据框架(dataFrame):用关系查询处理结构化数据(比rdd更新的API)

Structured Streaming:用关系查询处理结构化数据流(使用Datasets和dataframe,比DStreams更新的API)

Spark Streaming:使用DStreams(旧API)处理数据流

1.spark安装配置

1.1 解压环境配置

spark-3.4.0-bin-hadoop3-scala2.13.tgz

tar xvf spark-3.4.0-bin-hadoop3-scala2.13.tgz  

mv spark-3.4.0-bin-hadoop3-scala2.13 spark-3.4.0

#配置文件

vi /etc/profile

export SPARK_HOME=/opt/bigdata/spark-3.4.0

export PATH=$SPARK_HOME/bin:$PATH

source /etc/profile

1.2 设置spark运行环境和配置参数

在终端窗口中,执行如下的命令,进入到Spark的conf目录下:

cd /opt/bigdata/spark-3.4.0/conf

cp spark-env.sh.template spark-env.sh

vim spark-env.sh

#在配置文件下载加入以下内容

export JAVA_HOME=/usr/java/default

#暂时不配置

export HADOOP_CONF_DIR=

export SPARK_DIST_CLASSPATH=

1.3 简单的测试

启动

75018cca60424710b62df92606d01003.png

新建立一个readme.md文件放在conf路径下进行读取

d5d7f16b69c54219b32f4b90f62df0d2.png

scala统计wordscount

scala> val textFile = spark.read.textFile("readme.md")

val textFile: org.apache.spark.sql.Dataset[String] = [value: string]

scala> textFile.flatMap(line => line.split(" ")).groupByKey(identity).count().collect()

val res3: Array[(String, Long)] = Array((hello,3), (spark,1), (world,1), (elite,1))

2.快速开始

2.1 引入依赖

   <dependency>

     <groupId>org.apache.spark</groupId>

     <artifactId>spark-core_2.12</artifactId>

     <version>3.4.0</version>

   </dependency>

2.2 实现wordcount

def main(args: Array[String]): Unit = {

   //1.创建SparkSession对象

   val spark: SparkSession = SparkSession.builder()

     .master("local")

     //设置应用程序名字

     .appName("WordCount")

     //设置并行度

     .config("spark.sql.shuffle.partitions",1)

     //设置内存不然启动提示堆内存不足

     .config("spark.testing.memory", "512000000")

     .getOrCreate()

   import spark.implicits._

   //设置日志级别

   spark.sparkContext.setLogLevel("Error")

   //2.读取Socket中的每行数据,生成DataFrame默认列名为"value"

   val lines: DataFrame = spark.readStream

     .format("socket")

     .option("host", "node01")

     .option("port", 8888)

     .load()

   //3.将每行数据切分成单词,首先通过as[String]转换成Dataset操作

   val words: Dataset[String] = lines.as[String].flatMap(line=>{line.split(" ")})

   //4.按照单词分组,统计个数,自动多一个列count

   val wordCounts: DataFrame = words.groupBy("value").count()

   //5.启动流并向控制台打印结果

   val query: StreamingQuery = wordCounts.writeStream

     //更新模式设置为complete

     .outputMode("complete")

     .format("console")

     .start()

   query.awaitTermination()

2.3 测试

5b65fbd9cd204bcd899e51321f3a45f5.png


相关文章
|
5月前
|
分布式计算 API Spark
Spark学习--day05、SparkCore电商网站实操、SparkCore-工程代码
Spark学习--day05、SparkCore电商网站实操、SparkCore-工程代码
109 11
|
4月前
|
分布式计算 资源调度 Java
Scala+Spark+Hadoop+IDEA实现WordCount单词计数,上传并执行任务(简单实例-下)
Scala+Spark+Hadoop+IDEA实现WordCount单词计数,上传并执行任务(简单实例-下)
45 0
|
4月前
|
分布式计算 Hadoop Scala
Scala +Spark+Hadoop+Zookeeper+IDEA实现WordCount单词计数(简单实例-上)
Scala +Spark+Hadoop+Zookeeper+IDEA实现WordCount单词计数(简单实例-上)
39 0
|
5月前
|
SQL 分布式计算 大数据
Spark开发实用技巧-从入门到爱不释手
Spark开发实用技巧-从入门到爱不释手
42 0
|
5月前
|
分布式计算 大数据 数据处理
[AIGC大数据基础] Spark 入门
[AIGC大数据基础] Spark 入门
289 0
|
5月前
|
分布式计算 Hadoop Scala
Spark【环境搭建 01】spark-3.0.0-without 单机版(安装+配置+测试案例)
【4月更文挑战第13天】Spark【环境搭建 01】spark-3.0.0-without 单机版(安装+配置+测试案例)
205 0
|
5月前
|
分布式计算 资源调度 监控
Spark学习--1、Spark入门(Spark概述、Spark部署、Local模式、Standalone模式、Yarn模式)(一)
Spark学习--1、Spark入门(Spark概述、Spark部署、Local模式、Standalone模式、Yarn模式)(一)
185 1
|
5月前
|
数据采集 分布式计算 Linux
Spark实时(数据采集)项目小知识点--sed -i命令详解及入门攻略
Spark实时(数据采集)项目小知识点--sed -i命令详解及入门攻略
175 0
|
5月前
|
存储 分布式计算 Apache
Spark编程范例:Word Count示例解析
Spark编程范例:Word Count示例解析
|
5月前
|
SQL 分布式计算 Java
Spark 基础教程:wordcount+Spark SQL
Spark 基础教程:wordcount+Spark SQL
55 0