接上篇:https://developer.aliyun.com/article/1622638?spm=a2c6h.13148508.setting.21.27ab4f0ehhuqRu
方案1 外连接实现
package icu.wzk import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.ConstantInputDStream import org.apache.spark.streaming.{Seconds, StreamingContext} object BlackListFilter1 { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("BlackListFilter1") .setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(10)) // 黑名单 val blackList = Array(("spark", true), ("scala", true)) val blackListRDD = ssc.sparkContext.makeRDD(blackList) // 测试数据 val strArray: Array[String] = "spark java scala hadoop kafka hive hbase zookeeper" .split("\\s+") .zipWithIndex .map { case (word, index) => s"$index $word" } val rdd = ssc.sparkContext.makeRDD(strArray) val clickStream = new ConstantInputDStream(ssc, rdd) // 流式数据的处理 val clickStreamFormatted = clickStream .map(value => (value.split(" ")(1), value)) clickStreamFormatted.transform(clickRDD => { val joinedBlockListRDD: RDD[(String, (String, Option[Boolean]))] = clickRDD.leftOuterJoin(blackListRDD) joinedBlockListRDD.filter { case (word, (streamingLine, flag)) => { if (flag.getOrElse(false)) { false } else { true } } }.map { case (word, (streamingLine, flag)) => streamingLine } }).print() // 启动 ssc.start() ssc.awaitTermination() } }
方案1 运行结果
------------------------------------------- Time: 1721618670000 ms ------------------------------------------- 5 hive 6 hbase 1 java 7 zookeeper 3 hadoop 4 kafka ... 下一批
对应的结果如下图所示:
方案2 SQL实现
package icu.wzk import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.streaming.dstream.ConstantInputDStream import org.apache.spark.streaming.{Seconds, StreamingContext} object BlackListFilter2 { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("BlackListFilter2") .setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(10)) ssc.sparkContext.setLogLevel("WARN") // 黑名单 val blackList = Array(("spark", true), ("scala", true)) val blackListRDD = ssc.sparkContext.makeRDD(blackList) // 生成测试 DStream val strArray: Array[String] = "spark java scala hadoop kafka hive hbase zookeeper" .split("\\s+") .zipWithIndex .map { case (word, index) => s"$index $word" } val rdd = ssc.sparkContext.makeRDD(strArray) val clickStream = new ConstantInputDStream(ssc, rdd) // 流式数据的处理 val clickStreamFormatted = clickStream .map(value => (value.split(" ")(1), value)) clickStreamFormatted.transform { clickRDD => val spark = SparkSession .builder() .config(rdd.sparkContext.getConf) .getOrCreate() import spark.implicits._ val clickDF: DataFrame = clickRDD.toDF("word", "line") val blackDF: DataFrame = blackListRDD.toDF("word", "flag") clickDF.join(blackDF, Seq("word"), "left") .filter("flag is null or flag == false") .select("line") .rdd }.print() ssc.start() ssc.awaitTermination() } }
方案2 SQL运行结果
------------------------------------------- Time: 1721619900000 ms ------------------------------------------- [6 hbase] [4 kafka] [7 zookeeper] [1 java] [3 hadoop] [5 hive]
运行结果截图如下图所示:
方案3 直接过滤
package icu.wzk import org.apache.spark.SparkConf import org.apache.spark.broadcast.Broadcast import org.apache.spark.streaming.dstream.ConstantInputDStream import org.apache.spark.streaming.{Seconds, StreamingContext} object BlackListFilter3 { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("BlackListFilter3") .setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(10)) ssc.sparkContext.setLogLevel("WARN") // 黑名单 val blackList = Array(("spark", true), ("scala", true)) val blackListBC: Broadcast[Array[String]] = ssc .sparkContext .broadcast(blackList.filter(_._2).map(_._1)) // 生成测试DStream val strArray: Array[String] = "spark java scala hadoop kafka hive hbase zookeeper" .split("\\s+") .zipWithIndex .map { case (word, index) => s"$index $word" } val rdd = ssc.sparkContext.makeRDD(strArray) val clickStream = new ConstantInputDStream(ssc, rdd) // 流式数据的处理 clickStream.map(value => (value.split(" ")(1), value)) .filter { case (word, _) => !blackListBC.value.contains(word) } .map(_._2) .print() // 启动 ssc.start() ssc.awaitTermination() } }
方案3 直接过滤运行结果
------------------------------------------- Time: 1721627600000 ms ------------------------------------------- 1 java 3 hadoop 4 kafka 5 hive 6 hbase 7 zookeeper ... 下一批
运行结果如下图所示: