思路
源代码
窗口函数
代码实现
object Black extends App { import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} val sparkConf = new SparkConf().setMaster("local[2]").setAppName("WordCount") val ssc = new StreamingContext(sparkConf,Seconds(5)) /** * 构建黑名单 */ val blacks = List("zs","ls") val blacksRDD = ssc.sparkContext.parallelize(blacks).map(x=>(x,true)) //for(x<-blacksRDD){ // println(x) //} val lines = ssc.socketTextStream("hadoop2", 9999) val clicklog = lines.map(x=>(x.split(",")(1),x)).transform(rdd=>{ rdd.leftOuterJoin(blacksRDD) .filter(x=>x._2._2.getOrElse(false)!=true) .map(x=>x._2._1) }) clicklog.print() ssc.start() ssc.awaitTermination() }