Spark入门以及wordcount案例代码

简介: Spark入门以及wordcount案例代码

文章目录

  • spark概念

Apache Spark是用于大规模数据处理的统一分析引擎。它提供了Java、Scala、Python和R语言的高级api,以及一个支持通用执行图的优化引擎。它还支持一组丰富的高级工具,包括用于SQL和结构化数据处理的Spark SQL,用于pandas工作负载的Spark pandas API,用于机器学习的MLlib,用于图形处理的GraphX,以及用于增量计算和流处理的结构化流。

概念:

the Resilient Distributed Dataset(RDD):Spark基础- RDD(核心但旧的API),累加器和广播变量

Spark SQL、数据集(dataset)和数据框架(dataFrame):用关系查询处理结构化数据(比rdd更新的API)

Structured Streaming:用关系查询处理结构化数据流(使用Datasets和dataframe,比DStreams更新的API)

Spark Streaming:使用DStreams(旧API)处理数据流

1.spark安装配置

1.1 解压环境配置

spark-3.4.0-bin-hadoop3-scala2.13.tgz

tar xvf spark-3.4.0-bin-hadoop3-scala2.13.tgz  

mv spark-3.4.0-bin-hadoop3-scala2.13 spark-3.4.0

#配置文件

vi /etc/profile

export SPARK_HOME=/opt/bigdata/spark-3.4.0

export PATH=$SPARK_HOME/bin:$PATH

source /etc/profile

1.2 设置spark运行环境和配置参数

在终端窗口中,执行如下的命令,进入到Spark的conf目录下:

cd /opt/bigdata/spark-3.4.0/conf

cp spark-env.sh.template spark-env.sh

vim spark-env.sh

#在配置文件下载加入以下内容

export JAVA_HOME=/usr/java/default

#暂时不配置

export HADOOP_CONF_DIR=

export SPARK_DIST_CLASSPATH=

1.3 简单的测试

启动

75018cca60424710b62df92606d01003.png

新建立一个readme.md文件放在conf路径下进行读取

d5d7f16b69c54219b32f4b90f62df0d2.png

scala统计wordscount

scala> val textFile = spark.read.textFile("readme.md")

val textFile: org.apache.spark.sql.Dataset[String] = [value: string]

scala> textFile.flatMap(line => line.split(" ")).groupByKey(identity).count().collect()

val res3: Array[(String, Long)] = Array((hello,3), (spark,1), (world,1), (elite,1))

2.快速开始

2.1 引入依赖

   <dependency>

     <groupId>org.apache.spark</groupId>

     <artifactId>spark-core_2.12</artifactId>

     <version>3.4.0</version>

   </dependency>

2.2 实现wordcount

def main(args: Array[String]): Unit = {

   //1.创建SparkSession对象

   val spark: SparkSession = SparkSession.builder()

     .master("local")

     //设置应用程序名字

     .appName("WordCount")

     //设置并行度

     .config("spark.sql.shuffle.partitions",1)

     //设置内存不然启动提示堆内存不足

     .config("spark.testing.memory", "512000000")

     .getOrCreate()

   import spark.implicits._

   //设置日志级别

   spark.sparkContext.setLogLevel("Error")

   //2.读取Socket中的每行数据,生成DataFrame默认列名为"value"

   val lines: DataFrame = spark.readStream

     .format("socket")

     .option("host", "node01")

     .option("port", 8888)

     .load()

   //3.将每行数据切分成单词,首先通过as[String]转换成Dataset操作

   val words: Dataset[String] = lines.as[String].flatMap(line=>{line.split(" ")})

   //4.按照单词分组,统计个数,自动多一个列count

   val wordCounts: DataFrame = words.groupBy("value").count()

   //5.启动流并向控制台打印结果

   val query: StreamingQuery = wordCounts.writeStream

     //更新模式设置为complete

     .outputMode("complete")

     .format("console")

     .start()

   query.awaitTermination()

2.3 测试

5b65fbd9cd204bcd899e51321f3a45f5.png


相关文章
|
2月前
|
分布式计算 大数据 Java
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
75 5
|
2月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
56 3
|
2月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
73 0
|
1月前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
2月前
|
存储 缓存 分布式计算
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
49 4
|
2月前
|
分布式计算 大数据 Spark
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
大数据-95 Spark 集群 SparkSQL Action与Transformation操作 详细解释与测试案例(二)
49 1
|
2月前
|
设计模式 数据采集 分布式计算
企业spark案例 —出租车轨迹分析
企业spark案例 —出租车轨迹分析
111 0
|
2月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
61 0
|
2月前
|
SQL 分布式计算 大数据
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
55 0
|
2月前
|
存储 分布式计算 大数据
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(二)
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(二)
57 0