Spark入门以及wordcount案例代码

简介: Spark入门以及wordcount案例代码

文章目录

  • spark概念

Apache Spark是用于大规模数据处理的统一分析引擎。它提供了Java、Scala、Python和R语言的高级api,以及一个支持通用执行图的优化引擎。它还支持一组丰富的高级工具,包括用于SQL和结构化数据处理的Spark SQL,用于pandas工作负载的Spark pandas API,用于机器学习的MLlib,用于图形处理的GraphX,以及用于增量计算和流处理的结构化流。

概念:

the Resilient Distributed Dataset(RDD):Spark基础- RDD(核心但旧的API),累加器和广播变量

Spark SQL、数据集(dataset)和数据框架(dataFrame):用关系查询处理结构化数据(比rdd更新的API)

Structured Streaming:用关系查询处理结构化数据流(使用Datasets和dataframe,比DStreams更新的API)

Spark Streaming:使用DStreams(旧API)处理数据流

1.spark安装配置

1.1 解压环境配置

spark-3.4.0-bin-hadoop3-scala2.13.tgz

tar xvf spark-3.4.0-bin-hadoop3-scala2.13.tgz  

mv spark-3.4.0-bin-hadoop3-scala2.13 spark-3.4.0

#配置文件

vi /etc/profile

export SPARK_HOME=/opt/bigdata/spark-3.4.0

export PATH=$SPARK_HOME/bin:$PATH

source /etc/profile

1.2 设置spark运行环境和配置参数

在终端窗口中,执行如下的命令,进入到Spark的conf目录下:

cd /opt/bigdata/spark-3.4.0/conf

cp spark-env.sh.template spark-env.sh

vim spark-env.sh

#在配置文件下载加入以下内容

export JAVA_HOME=/usr/java/default

#暂时不配置

export HADOOP_CONF_DIR=

export SPARK_DIST_CLASSPATH=

1.3 简单的测试

启动

75018cca60424710b62df92606d01003.png

新建立一个readme.md文件放在conf路径下进行读取

d5d7f16b69c54219b32f4b90f62df0d2.png

scala统计wordscount

scala> val textFile = spark.read.textFile("readme.md")

val textFile: org.apache.spark.sql.Dataset[String] = [value: string]

scala> textFile.flatMap(line => line.split(" ")).groupByKey(identity).count().collect()

val res3: Array[(String, Long)] = Array((hello,3), (spark,1), (world,1), (elite,1))

2.快速开始

2.1 引入依赖

   <dependency>

     <groupId>org.apache.spark</groupId>

     <artifactId>spark-core_2.12</artifactId>

     <version>3.4.0</version>

   </dependency>

2.2 实现wordcount

def main(args: Array[String]): Unit = {

   //1.创建SparkSession对象

   val spark: SparkSession = SparkSession.builder()

     .master("local")

     //设置应用程序名字

     .appName("WordCount")

     //设置并行度

     .config("spark.sql.shuffle.partitions",1)

     //设置内存不然启动提示堆内存不足

     .config("spark.testing.memory", "512000000")

     .getOrCreate()

   import spark.implicits._

   //设置日志级别

   spark.sparkContext.setLogLevel("Error")

   //2.读取Socket中的每行数据,生成DataFrame默认列名为"value"

   val lines: DataFrame = spark.readStream

     .format("socket")

     .option("host", "node01")

     .option("port", 8888)

     .load()

   //3.将每行数据切分成单词,首先通过as[String]转换成Dataset操作

   val words: Dataset[String] = lines.as[String].flatMap(line=>{line.split(" ")})

   //4.按照单词分组,统计个数,自动多一个列count

   val wordCounts: DataFrame = words.groupBy("value").count()

   //5.启动流并向控制台打印结果

   val query: StreamingQuery = wordCounts.writeStream

     //更新模式设置为complete

     .outputMode("complete")

     .format("console")

     .start()

   query.awaitTermination()

2.3 测试

5b65fbd9cd204bcd899e51321f3a45f5.png


相关文章
|
1月前
|
分布式计算 API Spark
Spark学习--day05、SparkCore电商网站实操、SparkCore-工程代码
Spark学习--day05、SparkCore电商网站实操、SparkCore-工程代码
66 11
|
4月前
|
SQL 分布式计算 数据可视化
Spark SQL案例【电商购买数据分析】
Spark SQL案例【电商购买数据分析】
|
5月前
|
SQL 分布式计算 调度
Spark入门(一篇就够了)(三)
Spark入门(一篇就够了)(三)
121 0
|
2天前
|
分布式计算 Hadoop Scala
Spark【环境搭建 01】spark-3.0.0-without 单机版(安装+配置+测试案例)
【4月更文挑战第13天】Spark【环境搭建 01】spark-3.0.0-without 单机版(安装+配置+测试案例)
5 0
|
4月前
|
SQL 分布式计算 数据挖掘
Spark_Day07:Spark SQL(DataFrame是什么和数据分析(案例讲解))
Spark_Day07:Spark SQL(DataFrame是什么和数据分析(案例讲解))
74 0
|
5月前
|
存储 缓存 分布式计算
Spark入门(一篇就够了)(一)
Spark入门(一篇就够了)(一)
132 0
|
2月前
|
分布式计算 资源调度 监控
Spark学习--1、Spark入门(Spark概述、Spark部署、Local模式、Standalone模式、Yarn模式)(一)
Spark学习--1、Spark入门(Spark概述、Spark部署、Local模式、Standalone模式、Yarn模式)(一)
92 1
|
2月前
|
数据采集 分布式计算 Linux
Spark实时(数据采集)项目小知识点--sed -i命令详解及入门攻略
Spark实时(数据采集)项目小知识点--sed -i命令详解及入门攻略
110 0
|
4月前
|
SQL 分布式计算 Java
Spark 基础教程:wordcount+Spark SQL
Spark 基础教程:wordcount+Spark SQL
34 0
|
4月前
|
分布式计算 大数据 Linux
Python大数据之PySpark(三)使用Python语言开发Spark程序代码
Python大数据之PySpark(三)使用Python语言开发Spark程序代码
105 0