流处理中,有个状态(state)的概念:
无状态的:当前批次处理完之后,数据只与当前批次有关
有状态的:前后批次的数据处理完之后,之间是有关系的
updateStateByKey解读
updateStateByKey:返回的是一个新的并且带有状态的DStream,会根据每一个key进行更新,更新的规则是根据自己定义的function来确定的。
updateStateByKey操作允许您在使用新信息不断更新时保持任意状态。要使用它,您必须执行两个步骤:
定义状态:状态可以是任意数据类型。
定义状态更新功能:使用函数指定如何使用先前状态和输入流中的新值更新状态。
对于每个batch,Spark都会为每个之前已经存在的key去应用一次state更新函数,无论这个key在 batch中是否有新的数据。
如果state更新函数返回none,那么key对应的state就会被删除。 当然,对于每个新出现的key,也会执行state更新函数。
注意,updateStateByKey操作,要求必须开启Checkpoint机制。
案例:基于缓存的实时wordcount程序(在实际业务场景中,这个是非常有用的)
定义state;以wordcount为例,value是作为state来处理的,是根据key来更新我们的value的
定义一个state update function:将旧state的值与新state的值根据所定义的规则给相互作用在一起
Java语言实现:
package com.kfk.spark.common; import org.apache.spark.SparkConf; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaStreamingContext; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/14 * @time : 8:23 下午 */ public class CommStreamingContext { public static JavaStreamingContext getJssc(){ SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("CommStreamingContext"); return new JavaStreamingContext(conf, Durations.seconds(2)); } }
package com.kfk.spark.stream; import com.kfk.spark.common.CommStreamingContext; import org.apache.spark.api.java.Optional; import org.apache.spark.api.java.function.Function2; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; import java.util.Arrays; import java.util.List; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/15 * @time : 10:08 下午 */ public class StreamingUpdateStateByKeyJava { public static void main(String[] args) throws InterruptedException { JavaStreamingContext jssc = CommStreamingContext.getJssc(); // 要使用UpdateStateByKey算子就必须设置一个Checkpoint目录,开启Checkpoint机制 // 以便于内存数据丢失时,可以从Checkpoint中恢复数据 jssc.checkpoint("hdfs://bigdata-pro-m04:9000/user/caizhengjie/datas/sparkCheckpoint"); JavaReceiverInputDStream<String> lines = jssc.socketTextStream("bigdata-pro-m04",9999); // flatmap JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator()); // map JavaPairDStream<String,Integer> pair = words.mapToPair(word -> new Tuple2<>(word,1)); // 通过spark来维护一份每个单词的全局统计次数 JavaPairDStream<String,Integer> wordcount = pair.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { @Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) throws Exception { Integer newValues = 0; if (state.isPresent()){ newValues = state.get(); } for (Integer value : values){ newValues += value; } return Optional.of(newValues); } }); // lambda表达式写法 // JavaPairDStream<String,Integer> wordcount = pair.updateStateByKey((values, state) -> { // Integer newValues = 0; // if (state.isPresent()){ // newValues = state.get(); // } // // for (Integer value : values){ // newValues += value; // } // // return Optional.of(newValues); // // }); wordcount.print(); jssc.start(); jssc.awaitTermination(); } }
Scala语言实现:
package com.kfk.spark.stream import com.kfk.spark.common.CommStreamingContextScala /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/16 * @time : 8:04 下午 */ object StreamingUpdateStateByKeyScala { def main(args: Array[String]): Unit = { val jssc = CommStreamingContextScala.getJssc; val lines = jssc.socketTextStream("bigdata-pro-m04", 9999) // 要使用UpdateStateByKey算子就必须设置一个Checkpoint目录,开启Checkpoint机制 // 以便于内存数据丢失时,可以从Checkpoint中恢复数据 jssc.checkpoint("hdfs://bigdata-pro-m04:9000/user/caizhengjie/datas/sparkCheckpoint") // flatmap val words = lines.flatMap(word => word.split(" ")) // map val pair = words.map(x => (x,1)) // updateStateByKey val wordcount = pair.updateStateByKey((values : Seq[Int], state : Option[Int]) => { var newValue = state.getOrElse(0) for (value <- values){ newValue += value } Option(newValue) }) wordcount.print() jssc.start() jssc.awaitTermination() } }
运行结果:
------------------------------------------- Time: 1608175348000 ms ------------------------------------------- (hive,2) (php,1) (python,1) (java,6) ------------------------------------------- Time: 1608175354000 ms ------------------------------------------- (hive,2) (php,1) (python,2) (java,7) (hadoop,1)vb