sparkstreaming mapWithState状态保存问题,我们该如何保证sparkstreaming在任务重启后可以保证我们的可以从checkpoint中读取到之前的状态,我发现我做不到。期望得到帮助。下面是代码
package com.nuc.online
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming._
object WordCount3 {
def main(args: Array[String]): Unit = {
// 本地模式运行,便于测试
val spark = SparkSession.builder()
.appName(this.getClass.getName)
.master("local[2]")
.getOrCreate()
val ssc = new StreamingContext(spark.sparkContext, Seconds(3))
ssc.checkpoint("./check")
val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))
val lines = ssc.socketTextStream("hadoop102", 9999)
lines.checkpoint(Duration(6000))
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))
// 定义一个接收三个参数的匿名函数
val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
val sum = one.getOrElse(0) + state.getOption.getOrElse(0) // 当前值加上上一个批次的该状态的值
val output = (word, sum) // 输出单词和该单词出现的次数
state.update(sum) // 更新当前的状态
output // 该匿名函数的返回值为输出结果
}
// 使用 mapWithState 更新状态
val stateDstream = wordDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD))
stateDstream.print()
ssc.start()
ssc.awaitTermination()
}
}
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。