1、基本数据源
1.1、文件流
在spark Shell 下运行:
[lyh@hadoop102 spark-yarn-3.2.4]$ spark-shell Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 2022-09-08 08:56:21,875 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Spark context Web UI available at http://hadoop102:4040 Spark context available as 'sc' (master = local[*], app id = local-1662598583370). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3.2.4 /_/ Using Scala version 2.12.15 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_241) Type in expressions to have them evaluated. Type :help for more information. scala> import org.apache.spark.streaming._ import org.apache.spark.streaming._ scala> val ssc = new StreamingContext(sc,Seconds(20)) ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@379899f4 scala> val lines = ssc.textFileStream("file:///home/lyh/streaming/logfile") lines: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@531245fe scala> val kv = lines.map((_,1)).reduceByKey(_+_) kv: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@c207c10 scala> kv.print() scala> ssc.start() ------------------------------------------ Time: 1662598860000 ms ------------------------------------------- ------------------------------------------- Time: 1662598880000 ms ------------------------------------------- ------------------------------------------- Time: 1662598900000 ms ------------------------------------------- (c#,1) (hh,1) (h,1) (javafx,1) (spark,1) (hadoop,1) (js,1) (java,1) (s,1) (c,1)
执行后立即新建终端在 /home/lyh/streaming/logfile 目录下创建文件并写入数据
1.2、Socket 套接字流
// todo 创建环境对象 val conf = new SparkConf() conf.setAppName("word count").setMaster("local[*]") val ssc = new StreamingContext(conf,Seconds(3)) // todo 逻辑处理 // 获取端口数据(Socket) val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999) val words: DStream[String] = lines.flatMap(_.split(" ")) val word: DStream[(String,Int)] = words.map((_, 1)) val wordCount: DStream[(String,Int)] = word.reduceByKey(_ + _) wordCount.print() // todo 关闭环境 // 由于SparkStreaming的采集器是长期运行的,所以不能直接关闭 // 而且main方法的关闭也会使SparkStreaming的采集器关闭 ssc.start() // 等待采集器关闭 ssc.awaitTermination()
启动 NetCat
1. > nc -lp 9999 2. > hello world 3. > hello spark 4. > ...
运行结果:
1.3、自定义 Socket 数据源
通过自定义 Socket 实现数据源不断产生数据
import java.io.PrintWriter import java.net.{ServerSocket, Socket} import scala.io.Source /** * 通过自定义的Socket来不断给客户端发送数据 */ object MySocketReceiver { def index(length: Int): Int = { val rdm = new java.util.Random() rdm.nextInt(length) } def main(args: Array[String]): Unit = { val fileName = "input/1.txt" val lines: List[String] = Source.fromFile(fileName).getLines().toList val listener: ServerSocket = new ServerSocket(9999) while(true){ val socket: Socket = listener.accept() new Thread(){ override def run(){ val out: PrintWriter = new PrintWriter(socket.getOutputStream,true) while (true){ Thread.sleep(1000) val content = lines(index(lines.length)) // 源源不断,每次打印list的第(1~length)随机行 println(content) out.write(content + '\n') out.flush() } socket.close() } }.start() } } }
定义一个处理器接收自定义数据源端口发送过来的数据。
def main(args: Array[String]): Unit = { // todo 创建环境对象 val conf = new SparkConf() conf.setAppName("word count").setMaster("local[*]") val ssc = new StreamingContext(conf,Seconds(3)) // todo 逻辑处理 // 获取端口数据(Socket) val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999) val words: DStream[String] = lines.flatMap(_.split(" ")) val word: DStream[(String,Int)] = words.map((_, 1)) val wordCount: DStream[(String,Int)] = word.reduceByKey(_ + _) wordCount.print() // todo 关闭环境 // 由于SparkStreaming的采集器是长期运行的,所以不能直接关闭 ssc.start() // 等待采集器关闭 ssc.awaitTermination() }
先运行我们的数据源,再运行处理器:
处理器:
1.4、RDD 队列流
1.import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.collection.mutable object SparkStreaming02_RDDStream { def main(args: Array[String]): Unit = { // 1. 初始化配置信息 val conf = new SparkConf() conf.setAppName("rdd Stream").setMaster("local[*]") // 2.初始化SparkStreamingContext val ssc = new StreamingContext(conf,Seconds(4)) // 3.创建RDD队列 val rddQueue: mutable.Queue[RDD[Int]] = new mutable.Queue[RDD[Int]]() // 4.创建QueueInputStream // oneAtATime = true 默认,一次读取队列里面的一个数据 // oneAtATime = false, 按照设定的时间,读取队列里面数据 val inputStream: InputDStream[Int] = ssc.queueStream(rddQueue,oneAtATime = false) // 5. 处理队列中的RDD数据 val sumStream: DStream[Int] = inputStream.reduce(_ + _) // 6. 打印结果 sumStream.print() // 7.启动任务 ssc.start() // 8.向队列中放入RDD for(i <- 1 to 5){ rddQueue += ssc.sparkContext.makeRDD(1 to 5) Thread.sleep(2000) } // 9. 等待数据源进程停止后关闭 ssc.awaitTermination() } }
2、高级数据源
2.1、Kafka 数据源
2.1.1、消费者程序处理流数据
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} object SparkStreaming03_Kafka { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("kafka source") val ssc = new StreamingContext(conf,Seconds(3)) // 定义Kafka参数: kafka集群地址、消费者组名称、key序列化、value序列化 val kafkaPara: Map[String,Object] = Map[String,Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092", ConsumerConfig.GROUP_ID_CONFIG ->"lyh", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer] ) // 读取Kafka数据创建DStream val kafkaDStream: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String]( ssc, LocationStrategies.PreferConsistent, //优先位置 ConsumerStrategies.Subscribe[String,String](Set("testTopic"),kafkaPara) // 消费策略:(订阅多个主题,配置参数) ) // 将每条消息的KV取出 val valueDStream: DStream[String] = kafkaDStream.map(_.value()) // 计算WordCount valueDStream.flatMap(_.split(" ")) .map((_,1)) .reduceByKey(_+_) .print() // 开启任务 ssc.start() ssc.awaitTermination() } }
2.1.2、生产者生产数据
(1)kafka 端生产数据
启动 Kafka 集群
创建 Topic(指定一个分区三个副本):
kafka-topics.sh --bootstrap-server hadoop102:9092 --topic <topic名称> --create --partitions 1 --replication-factor 3
查看是否生成 Topic:
kafka-topics.sh --bootstrap-server hadoop102:9092 --list
生产者生产数据:
1. > kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic <topic名称> 2. > hello world 3. > hello spark 4. > ...
(2)编写生产者程序
package com.lyh import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} object SparkStreaming03_Kafka { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("kafka source") val ssc = new StreamingContext(conf,Seconds(3)) // 定义Kafka参数: kafka集群地址、消费者组名称、key序列化、value序列化 val kafkaPara: Map[String,Object] = Map[String,Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092", ConsumerConfig.GROUP_ID_CONFIG ->"lyh", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer] ) // 读取Kafka数据创建DStream val kafkaDStream: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String]( ssc, LocationStrategies.PreferConsistent, //优先位置 ConsumerStrategies.Subscribe[String,String](Set("testTopic"),kafkaPara) // 消费策略:(订阅多个主题,配置参数) ) // 将每条消息的KV取出 val valueDStream: DStream[String] = kafkaDStream.map(_.value()) // 计算WordCount valueDStream.flatMap(_.split(" ")) .map((_,1)) .reduceByKey(_+_) .print() // 开启任务 ssc.start() ssc.awaitTermination() } }
3、转换操作
3.1、无状态转换操作
3.2、有状态转换操作
3.1.1、滑动窗口转换操作
import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Seconds, StreamingContext} object SparkStreaming05_Window { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("sparkStreaming window") val ssc = new StreamingContext(conf,Seconds(3)) val lines:DStream[String] = ssc.socketTextStream("localhost", 9999) val word_kv = lines.map((_, 1)) /** * 收集器收集RDD合成DStream: 3s 窗口范围: 12s 窗口滑动间隔: 6s/次 * 1. windowLength:表示滑动窗口的长度,即窗口内包含的数据的时间跨度。它是一个Duration对象,用于指定窗口的时间长度。 * 2. slideInterval:表示滑动窗口的滑动间隔,即每隔多长时间将窗口向右滑动一次。同样是一个Duration对象。 * 返回一个新的 DStream **/ val wordToOneByWindow:DStream[(String,Int)] = word_kv.window(Seconds(12), Seconds(6)) // 窗口每滑动一次(6s),对窗口内的数据进行一次聚合操作. val res: DStream[(String,Int)] = wordToOneByWindow.reduceByKey(_ + _) res.print() ssc.start() ssc.awaitTermination() } }
3.1.2、updateStateByKey
import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Seconds, StreamingContext} /** * DStream 有状态转换操作之 updateStateByKey(func) 转换操作 */ object SparkStreaming04_State { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("kafka state") val ssc = new StreamingContext(conf,Seconds(3)) /** * 设置检查点目录的作用是为了确保Spark Streaming应用程序的容错性和可恢复性。 * 在Spark Streaming应用程序运行过程中,它会将接收到的数据分成一批批进行处理。 * 通过设置检查点目录,Spark Streaming会定期将当前的处理状态、接收到的数据偏移量等信息保存到可靠的存储系统中, * 比如分布式文件系统(如HDFS)或云存储服务(如Amazon S3)。 * 一旦应用程序出现故障或崩溃,它可以从最近的检查点中恢复状态,并从上次处理的位置继续处理数据,从而确保数据的完整性和一致性。 */ //检查点的路径如果是本地路径要+ file:// 否则认为是 hdfs路径 / 开头 ssc.checkpoint("file:///D://IdeaProject/SparkStudy/data/") //设置检查点,检查点具有容错机制 val lines: DStream[String] = ssc.socketTextStream("localhost",9999) val word_kv = lines.map((_, 1)) val stateDStream: DStream[(String, Int)] = word_kv.updateStateByKey( /** 参数是一个函数 1. Seq[Int]: 当前key对应的所有value值的集合,因为我们的value是Int,所以这里也是Int 2. Option[Int]: 当前key的历史状态,对于wordCount,历史值就是上一个DStream中这个key的value计算结果(求和结果) Option 是 Scala 中用来表示可能存在或可能不存在的值的容器,是一种避免空引用(null reference)问题的模式。 Option[Int] 有两个可能的实例: (1) Some(value: Int):表示一个包含 Int 类型值的 Option。 (2) None:表示一个空的 Option,不包含任何值。 **/ (values: Seq[Int], state: Option[Int]) => { val currentCount = values.foldLeft(0)(_ + _) val previousCount = state.getOrElse(0) Option(currentCount + previousCount) } ) stateDStream.print() stateDStream.saveAsTextFiles("./out") //输出结果保存到 文本文件中 ssc.start() ssc.awaitTermination() } }
4、输出操作
4.1、输出到文本文件
上面 3.1.2 中就保存DStream输出到了本地:
stateDStream.saveAstextFiles("./out")
4.2、输出到MySQL数据库
import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} import java.sql.{Connection, PreparedStatement} object NetWorkWordCountStateMySQL { def main(args: Array[String]): Unit = { val updateFunc = (values: Seq[Int],state: Option[Int]) => { val currentCount = values.foldLeft(0)(_+_) val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } val conf = new SparkConf().setMaster("local[*]").setAppName("state mysql") val ssc = new StreamingContext(conf,Seconds(5)) // file:\\ 代表本地文件系统 如果用的是 /user/... 这种形式是 HDFS 文件系统 需要启动Hadoop集群 ssc.checkpoint("file:\\D:\\IdeaProjects\\SparkStudy\\data\\state") val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999) val word_kv: DStream[(String, Int)] = lines.flatMap(_.split(" ").map((_, 1))).reduceByKey(_ + _) val stateDStream: DStream[(String, Int)] = word_kv.updateStateByKey[Int](updateFunc) stateDStream.print() stateDStream.foreachRDD( rdd=> { def func(records: Iterator[(String,Int)]): Unit ={ var conn: Connection = null var stmt: PreparedStatement = null try{ conn = DBUtils.getConnection("jdbc:mysql://127.0.0.1:3306/spark","root","Yan1029.") records.foreach(p=>{ val sql = "insert into wordcount values (?,?)" stmt = conn.prepareStatement(sql) stmt.setString(1,p._1.trim) stmt.setInt(2,p._2) stmt.executeUpdate() //不executeUpdate就不会写入数据库 }) }catch { case e: Exception => e.printStackTrace() }finally { // if (stmt!=null) stmt.close() // DBUtils.close() } } val repartitionedRDD: RDD[(String,Int)] = rdd.repartition(3) //扩大分区用 repartition repartitionedRDD.foreachPartition(func) }) ssc.start() ssc.awaitTermination() } }
运行结果:
5、优雅的关闭和恢复数据
5.1、关闭SparkStreaming
流式任务通常都需要7*24小时执行,但是有时涉及到升级代码需要主动停止程序,但是分布式程序,没办法做到一个个进程去杀死,所以配置优雅的关闭就显得至关重要了。
关闭方式:我们通常使用外部文件系统来控制内部程序关闭。
package com.lyh import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState} import java.net.URI object SparkStreaming06_Close { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("sparkStreaming close") val ssc = new StreamingContext(conf,Seconds(3)) val lines:DStream[String] = ssc.socketTextStream("localhost", 9999) val word_kv = lines.map((_, 1)) word_kv.print() ssc.start() // 再创建一个线程去关闭 new Thread(new MonitorStop(ssc)).start() ssc.awaitTermination() //阻塞当前main线程 } } class MonitorStop(ssc: StreamingContext) extends Runnable{ override def run(): Unit = { while (true){ // 一直轮询判断 Thread.sleep(5000) //每5s检查一遍 val fs: FileSystem = FileSystem.get(new URI("hdfs://hadoop102:9000"),new Configuration(),"lyh") val exists: Boolean = fs.exists(new Path("hdfs://hadoop102:9000/stopSpark")) if (exists) { //如果比如(MySQL出现了一行数据、Zookeeper的某个节点出现变化、hdfs是否存在某个目录...)就关闭 val state: StreamingContextState = ssc.getState() if (state == StreamingContextState.ACTIVE){ // 优雅地关闭-处理完当前的数据再关闭 // 计算节点不再接受新的数据,而是把现有的数据处理完毕,然后关闭 ssc.stop(true,true) System.exit(0) } } } } }
5.2、恢复检查点的数据
使用 getActiveOrCreate 的方法来对上一个失败的 Spark 任务进行数据恢复(通过检查点来进行恢复)
方法说明:
若Application为首次重启,将创建一个新的StreamingContext实例;如果Application从失败中重启,从checkpoint目录导入checkpoint数据来重新创建StreamingContext实例。
import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState} import java.net.URI object SparkStreaming07_Resume { def main(args: Array[String]): Unit = { //好处:若Application为首次重启,将创建一个新的StreamingContext实例;如果Application从失败中重启,从checkpoint目录导入checkpoint数据来重新创建StreamingContext实例。 val ssc: StreamingContext = StreamingContext.getActiveOrCreate("file:\\D:\\IdeaProjects\\SparkStudy\\data\\state", () => { val conf = new SparkConf().setMaster("local[*]").setAppName("sparkStreaming resume") val ssc = new StreamingContext(conf, Seconds(3)) val lines: DStream[String] = ssc.socketTextStream("localhost", 9999) val word_kv = lines.map((_, 1)) word_kv.print() ssc }) // 依然设置检查点 防止application失败后丢失数据 ssc.checkpoint("file:\\D:\\IdeaProjects\\SparkStudy\\data\\state") ssc.start() ssc.awaitTermination() //阻塞当前main线程 } }