文章目录
- spark概念
Apache Spark是用于大规模数据处理的统一分析引擎。它提供了Java、Scala、Python和R语言的高级api,以及一个支持通用执行图的优化引擎。它还支持一组丰富的高级工具,包括用于SQL和结构化数据处理的Spark SQL,用于pandas工作负载的Spark pandas API,用于机器学习的MLlib,用于图形处理的GraphX,以及用于增量计算和流处理的结构化流。
概念:
the Resilient Distributed Dataset(RDD):Spark基础- RDD(核心但旧的API),累加器和广播变量
Spark SQL、数据集(dataset)和数据框架(dataFrame):用关系查询处理结构化数据(比rdd更新的API)
Structured Streaming:用关系查询处理结构化数据流(使用Datasets和dataframe,比DStreams更新的API)
Spark Streaming:使用DStreams(旧API)处理数据流
1.spark安装配置
1.1 解压环境配置
spark-3.4.0-bin-hadoop3-scala2.13.tgz
tar xvf spark-3.4.0-bin-hadoop3-scala2.13.tgz
mv spark-3.4.0-bin-hadoop3-scala2.13 spark-3.4.0
#配置文件
vi /etc/profile
export SPARK_HOME=/opt/bigdata/spark-3.4.0
export PATH=$SPARK_HOME/bin:$PATH
source /etc/profile
1.2 设置spark运行环境和配置参数
在终端窗口中,执行如下的命令,进入到Spark的conf目录下:
cd /opt/bigdata/spark-3.4.0/conf
cp spark-env.sh.template spark-env.sh
vim spark-env.sh
#在配置文件下载加入以下内容
export JAVA_HOME=/usr/java/default
#暂时不配置
export HADOOP_CONF_DIR=
export SPARK_DIST_CLASSPATH=
1.3 简单的测试
启动
新建立一个readme.md文件放在conf路径下进行读取
scala统计wordscount
scala> val textFile = spark.read.textFile("readme.md")
val textFile: org.apache.spark.sql.Dataset[String] = [value: string]
scala> textFile.flatMap(line => line.split(" ")).groupByKey(identity).count().collect()
val res3: Array[(String, Long)] = Array((hello,3), (spark,1), (world,1), (elite,1))
2.快速开始
2.1 引入依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.4.0</version>
</dependency>
2.2 实现wordcount
def main(args: Array[String]): Unit = {
//1.创建SparkSession对象
val spark: SparkSession = SparkSession.builder()
.master("local")
//设置应用程序名字
.appName("WordCount")
//设置并行度
.config("spark.sql.shuffle.partitions",1)
//设置内存不然启动提示堆内存不足
.config("spark.testing.memory", "512000000")
.getOrCreate()
import spark.implicits._
//设置日志级别
spark.sparkContext.setLogLevel("Error")
//2.读取Socket中的每行数据,生成DataFrame默认列名为"value"
val lines: DataFrame = spark.readStream
.format("socket")
.option("host", "node01")
.option("port", 8888)
.load()
//3.将每行数据切分成单词,首先通过as[String]转换成Dataset操作
val words: Dataset[String] = lines.as[String].flatMap(line=>{line.split(" ")})
//4.按照单词分组,统计个数,自动多一个列count
val wordCounts: DataFrame = words.groupBy("value").count()
//5.启动流并向控制台打印结果
val query: StreamingQuery = wordCounts.writeStream
//更新模式设置为complete
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
2.3 测试