3.2.1 用法及说明
需要继承Receiver,并实现onStart、onStop方法来自定义数据源采集。
3.2.2 案例实操
需求:自定义数据源,实现监控某个端口号,获取该端口号内容。
1) 自定义数据源
package streaming import java.util.UUID import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream} import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} object SpariStreaming03_DStream_diy { def main(args: Array[String]): Unit = { //SparkCore : SparkContext //SparkSQL : SparkSession //SparkStreaming: StreamingContext val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming").setMaster("local[*]") val ssc = new StreamingContext(sparkConf, Seconds(3)) val receiverDS: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver) receiverDS.print() //启动采集器(接收器) ssc.start() //等待采集器的结束 ssc.awaitTermination() //ssc.stop() } /** * 自定义采集器 * 1. 继承Receiver * * 2. 指定泛型 * * 3. 给父类构造器传递参数 * * 4. 实现方法 */ class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) { private var flag = true //receiver采集数据 override def onStart(): Unit = { // socket file kafka flume ...... while (flag) { Thread.sleep(1000) val uuid: String = UUID.randomUUID().toString //存储数据 store(uuid) } } override def onStop(): Unit = { flag = false } } }
Kafka数据源(面试、开发重点)
3.3.1 版本选型
ReceiverAPI:需要一个专门的Executor去接收数据,然后发送给其他的Executor做计算。存在的问题,接收数据的Executor和计算的Executor速度会有所不同,特别在接收数据的Executor速度大于计算的Executor速度,会导致计算数据的节点内存溢出。早期版本中提供此方式,当前版本不适用
DirectAPI:是由计算的Executor来主动消费Kafka的数据,速度由自身控制。
首先打卡liux端的kafka
检查自己的消费点
bin/kafka-topics.sh --bootstrap-server node1:9092 --list
bin/kafka-topics.sh --bootstrap-server node1:9092 --create --topic atguiguNew --partitions 3 --replication-factor 2
生产数据
bin/kafka-console-producer.sh --broker-list node1:9092 --topic atguiguNew
消费数据
package streaming import java.util.UUID import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.collection.mutable object SpariStreaming04_DStream_kafka { def main(args: Array[String]): Unit = { //SparkCore : SparkContext //SparkSQL : SparkSession //SparkStreaming: StreamingContext val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming").setMaster("local[*]") val ssc = new StreamingContext(sparkConf, Seconds(3)) val kafkaParams: mutable.Map[String, String] = mutable.Map[String, String]() kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092,node2:9092,node2:9092") kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, "abcx") kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") val kafkaDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Set("atguiguNew"), kafkaParams) ) // hello world a b c val kfValueDS: DStream[String] = kafkaDS.map(_.value()) //val wordcountDs: DStream[(String, Int)] = kfValueDS.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) //wordcountDs.print() kfValueDS.print() //启动采集器(接收器) ssc.start() //等待采集器的结束 ssc.awaitTermination() //ssc.stop() } }
查看Kafka消费进度
bin/kafka-consumer-groups.sh --describe --bootstrap-server linux1:9092 --group atguigu
DStream转换
DStream上的操作与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window相关的原语。
无状态转化操作
无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD。部分无状态转化操作列在了下表中。注意,针对键值对的DStream转化操作(比如 reduceByKey())要添加import StreamingContext._才能在Scala中使用。
有状态转化操作
UpdateStateByKey
UpdateStateByKey原语用于记录历史记录,有时,我们需要在DStream中跨批次维护状态(例如流计算中累加wordcount)。针对这种情况,updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对形式的DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。
updateStateByKey() 的结果会是一个新的DStream,其内部的RDD 序列是由每个时间区间对应的(键,状态)对组成的。
updateStateByKey操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功能,需要做下面两步:
1. 定义状态,状态可以是一个任意的数据类型。
2. 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。
使用updateStateByKey需要对检查点目录进行配置,会使用检查点来保存状态。
package streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} class SparkStreaming05_SStream { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming") val ssc = new StreamingContext(sparkConf, Seconds(3)) ssc.checkpoint("cp") //无状态数据操作,只对当前的采集周期内的数据进行处理 //在某些场合下,需要保留数据统计结果(状态),实现汇总 val datas: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999) val wordToOne: DStream[(String, Int)] = datas.map((_, 1)) // val wordTocount: DStream[(String, Int)] = wordToOne.reduceByKey(_ + _) // (Seq[V], Option[S]) => Option[S] // (Seq[Int], Option[S]) => Option[S] // Seq[V]: 当前批次中某个key所有的value // Option[S] : 历史批次记录的某个key对应的value // The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint(). val wordcountDS: DStream[(String, Int)] = wordToOne.updateStateByKey( (currentValues, buffValue: Option[Int]) => { val currentValueSum: Int = currentValues.sum val old: Int = buffValue.getOrElse(0) Option[Int](currentValueSum + old) } ) ssc.start() ssc.awaitTermination() } }
package streaming import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.collection.mutable object SpariStreaming06_state_transform { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming") val ssc = new StreamingContext(sparkConf, Seconds(3)) val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999) //transform方法可以将底层RDD获取后进行操作 //1.DStream功能不完善 //2.需要代码周期性执行 //Code Driver端 val newDs: DStream[String] = lines.transform( rdd => { //code:Driver端(周期性执行) rdd.map( str => { //Code:Executor端执行 str } ) } ) } }
无状态转化操作Transfor
package streaming import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.collection.mutable object SpariStreaming06_state_transform { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming") val ssc = new StreamingContext(sparkConf, Seconds(3)) val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999) //transform方法可以将底层RDD获取后进行操作 //1.DStream功能不完善 //2.需要代码周期性执行 //Code Driver端 val newDs: DStream[String] = lines.transform( rdd => { //code:Driver端(周期性执行) rdd.map( str => { //Code:Executor端执行 str } ) } ) } }
DStream转换-无状态操作-join
package streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} object SpariStreaming06_state_join { def main(args: Array[String]): Unit = { //SparkCore : SparkContext //SparkSQL : SparkSession //SparkStreaming: StreamingContext val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming").setMaster("local[*]") val ssc = new StreamingContext(sparkConf, Seconds(3)) val socketDS1: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102", 9999) val socketDS2: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102", 8888) val kv1: DStream[(String, Int)] = socketDS1.map((_, 1)) val kv2: DStream[(String, Int)] = socketDS2.map((_, 1)) val joinDS: DStream[(String, (Int, Int))] = kv1.join(kv2) joinDS.print() //启动采集器(接收器) ssc.start() //等待采集器的结束 ssc.awaitTermination() //ssc.stop() } }
WindowOperations
Window Operations可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming的允许状态。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。
Ø 窗口时长:计算内容的时间范围;
Ø 滑动步长:隔多久触发一次计算。
注意:这两者都必须为采集周期大小的整数倍。
WordCount第三版:3秒一个批次,窗口12秒,滑步6秒。
package streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} object SpariStreaming08_state_window { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming").setMaster("local[*]") //val ssc = new StreamingContext(sparkConf,Seconds(3)) val ssc = new StreamingContext(sparkConf, Seconds(3)) ssc.checkpoint("ssccp") val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999) val wordtoOneRDD: DStream[(String, Int)] = socketDS.flatMap(_.split(" ")).map((_, 1)) //val result: DStream[(String, Int)] = wordtoOneRDD.reduceByKey(_+_) //窗口的计算周期为滑动步长. 默认的滑动步长为采集周期 //窗口范围应该是采集的整数倍 val result: DStream[(String, Int)] = wordtoOneRDD.window(Seconds(6), Seconds(6)).reduceByKey(_ + _) result.print() //启动采集器(接收器) ssc.start() //等待采集器的结束 ssc.awaitTermination() //ssc.stop() } }
SparkStreaming获取KafkaRDD偏移量
package streaming import java.util.UUID import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange} import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.collection.mutable /* * 不使用消费者自动提交偏移量 * 自己获取RDD偏移量*/ object KafkaStreamingWordCountManageOffice { def main(args: Array[String]): Unit = { //SparkCore : SparkContext //SparkSQL : SparkSession //SparkStreaming: StreamingContext val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming").setMaster("local[*]") val ssc = new StreamingContext(sparkConf, Seconds(3)) val kafkaParams: Map[String, Object] = Map[String, Object]( "bootstrap.servers" -> "node1:9092,node2:9092,node3:9092", "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "group.id" -> "g003", "auto.offset.reset" -> "earliest", "enable.auto/commit" -> (false: java.lang.Boolean) //消费者不在自己管理偏移量 ) val kafkaDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](List("bigdata210701"), kafkaParams) ) //调用完createDirectStream //直接在KafkaDS调用foreachRDD,只有KafkaRDD中有偏移量 kafkaDS.foreachRDD(rdd=>{ // println(rdd) //将RDD转化为KafkaRDD,获取kafkaRDD每一个分区的偏移量[Driver端] val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //循环遍历每个分区的偏移量 for(range<-offsetRanges){ println(s"topic:${range.topic} partition:${range.partition} fromOffset:${range.fromOffset}->utilOffset:${range.untilOffset}") } }) //启动采集器(接收器) ssc.start() //等待采集器的结束 ssc.awaitTermination() } }
计算完一个批次后将偏移量写入到Kafka中
package streaming import java.util.UUID import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream} import org.apache.spark.streaming.kafka010.{CanCommitOffsets, ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange} import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.streaming.{Seconds, StreamingContext} import redis.clients.jedis.Jedis import util.JedisConnectionPool import scala.collection.mutable /* * 不使用消费者自动提交偏移量 * 自己获取RDD偏移量*/ object KafkaStreamingWordCountManageOffice { def main(args: Array[String]): Unit = { //SparkCore : SparkContext //SparkSQL : SparkSession //SparkStreaming: StreamingContext val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming").setMaster("local[*]") val ssc = new StreamingContext(sparkConf, Seconds(3)) val kafkaParams: Map[String, Object] = Map[String, Object]( "bootstrap.servers" -> "node1:9092,node2:9092,node3:9092", "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "group.id" -> "g003", "auto.offset.reset" -> "earliest", "enable.auto/commit" -> (false: java.lang.Boolean) //消费者不在自己管理偏移量 ) val kafkaDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](List("bigdata210701"), kafkaParams) ) //调用完createDirectStream //直接在KafkaDS调用foreachRDD,只有KafkaRDD中有偏移量 kafkaDS.foreachRDD(rdd=>{ if(!rdd.isEmpty()) { // println(rdd) //将RDD转化为KafkaRDD,获取kafkaRDD每一个分区的偏移量[Driver端] val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //循环遍历每个分区的偏移量 for (range <- offsetRanges) { println(s"topic:${range.topic} partition:${range.partition} fromOffset:${range.fromOffset}->utilOffset:${range.untilOffset}") } //将获取到的偏移量写入到相应的存储系统[Kafka、Redis、Mysql] //将偏移量写入kafka //对RDD进行处理 //Transformation开始 val lines: RDD[String] = rdd.map((_.value())) val words: RDD[String] = lines.flatMap(_.split(" ")) val wordAndOne: RDD[(String, Int)] = words.map((_, 1)) val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _) //Transformation 结束 //触发Action reduced.foreachPartition( it => { //在Executor端获取Redis连接 val jedis: Jedis = JedisConnectionPool.getConnection() //将分区对应的结果写入Redis it.foreach(t => { jedis.hincrBy("wc_adv", t._1, t._2) }) //将连接还回连接池 jedis.close() } ) //再更新这个批次每个分区的偏移量 //异步提交偏移量,将偏移量写入到Kafka特殊的topic中了 kafkaDS.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } }) //启动采集器(接收器) ssc.start() //等待采集器的结束 ssc.awaitTermination() } }
优雅关闭
流式任务需要7*24小时执行,但是有时涉及到升级代码需要主动停止程序,但是分布式程序,没办法做到一个个进程去杀死,所有配置优雅的关闭就显得至关重要了。
使用外部文件系统来控制内部程序关闭。
package streaming import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState} object SpariStreaming11_close { def main(args: Array[String]): Unit = { /* * 线程的关闭: * val thread=new Thread() * thread.start * thread.stop()强制关闭 * */ val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming").setMaster("local[*]") //val ssc = new StreamingContext(sparkConf,Seconds(3)) sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true") val ssc = new StreamingContext(sparkConf, Seconds(3)) ssc.checkpoint("ssccp") val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102", 9999) val wordcountDS: DStream[(String, Int)] = socketDS.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) wordcountDS.print() // new Thread(new StopThread(ssc)).start() //启动采集器(接收器) ssc.start() //如果想要关闭采集器,那么需要创建新的线程 new Thread( new Runnable { override def run(): Unit = { //优雅的关闭 //计算节点不在接收新的数据,而是将现有的数据处理完毕,然后关闭 while (true){ if(true){ //获取SparkStreaming的状态 val state: StreamingContextState = ssc.getState() if(state==StreamingContextState.ACTIVE){ ssc.stop(true, true) } } Thread.sleep(5000) } System.exit(0) } } ).start() //等待采集器的结束 ssc.awaitTermination() //ssc.stop() } }
//数据恢复 val ssc: StreamingContext = StreamingContext.getActiveOrCreate("cp", () => { val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming").setMaster("local[*]") //val ssc = new StreamingContext(sparkConf,Seconds(3)) sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true") val ssc = new StreamingContext(sparkConf, Seconds(3)) ssc.checkpoint("ssccp") val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102", 9999) val wordcountDS: DStream[(String, Int)] = socketDS.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) wordcountDS.print() ssc }) /*
SparkStreaming 案例实操
环境准备
pom文件
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.12</artifactId> <version>3.0.0</version> </dependency> <!-- https://mvnrepository.com/artifact/com.alibaba/druid --> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.1.10</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.27</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.10.1</version> </dependency> </dependencies>
生产数据
package streaming import java.net.URI import java.util.{Properties, Random} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState} import scala.collection.mutable.ListBuffer object SparkStreaming12_Req_mock { def main(args: Array[String]): Unit = { /*// 某个时间点 某个地区 某个城市 某个用户 某个广告 //每隔两秒钟生成一批数据 /** * 模拟的数据 * * 格式 :timestamp area city userid adid * 某个时间点 某个地区 某个城市 某个用户 某个广告 */ //将数据写入到kafka*/ while (true) { Thread.sleep(2000) //生成数据 val datas: ListBuffer[String] = mockdata() //将数据写入到kafka val configs = new Properties() configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, " node1:9092,node2:9092,node3:9092") configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String, String](configs) datas.foreach(println) for (data <- datas) { producer.send( new ProducerRecord[String,String]("atguiguNew",data)) // println(data) } } def mockdata(): ListBuffer[String] = { val datas: ListBuffer[String] = ListBuffer[String]() val random = new Random() var areaList = Array("华北", "华东", "华南") var cityList = Array("北京", "上海", "深圳") for (elem <- 1 to random.nextInt(50)) { var ts = System.currentTimeMillis() var area = areaList(random.nextInt(3)) var city = cityList(random.nextInt(3)) var userid = random.nextInt(8) + 1 var adid = random.nextInt(8) + 1 var log = s"$ts $area $city $userid $adid" datas.append(log) } datas } } }
kafka消费数据
package streaming import java.util.UUID import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.collection.mutable object SpariStreaming04_DStream_kafka { def main(args: Array[String]): Unit = { //SparkCore : SparkContext //SparkSQL : SparkSession //SparkStreaming: StreamingContext val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming").setMaster("local[*]") val ssc = new StreamingContext(sparkConf, Seconds(3)) val kafkaParams: mutable.Map[String, String] = mutable.Map[String, String]() kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092,node2:9092,node2:9092") kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, "abcx") kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") val kafkaDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Set("atguiguNew"), kafkaParams) ) // hello world a b c val kfValueDS: DStream[String] = kafkaDS.map(_.value()) //val wordcountDs: DStream[(String, Int)] = kfValueDS.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) //wordcountDs.print() kfValueDS.print() //启动采集器(接收器) ssc.start() //等待采集器的结束 ssc.awaitTermination() //ssc.stop() } }
需求一:广告黑名单
实现实时的动态黑名单机制:将每天对某个广告点击超过 100 次的用户拉黑。
注:黑名单保存到MySQL中。
7.3.1 思路分析
1)读取Kafka数据之后,并对MySQL中存储的黑名单数据做校验;
2)校验通过则对给用户点击广告次数累加一并存入MySQL;
3)在存入MySQL之后对数据做校验,如果单日超过100次则将该用户加入黑名单。
package streaming import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} import util.JDBCUtil import java.sql.{Connection, PreparedStatement, ResultSet} import java.text.SimpleDateFormat import java.util.Date import javax.sql.DataSource import scala.collection.mutable import scala.collection.mutable.ListBuffer object SpariStreaming12_BlackList { def main(args: Array[String]): Unit = { val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming").setMaster("local[*]") val ssc = new StreamingContext(sparkConf, Seconds(3)) val kafkaParams: mutable.Map[String, String] = mutable.Map[String, String]() kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092,node2:9092,node2:9092") kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, "abcx") kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") val kafkaDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Set("atguiguNew"), kafkaParams) ) val adClickData= kafkaDS.map( kafkadata => { val data = kafkadata.value() val datas= data.split(" ") AdClickData(datas(0), datas(1), datas(2), datas(3), datas(4)) } ) val ds=adClickData.transform( rdd=>{ val blacklist = ListBuffer[String]() val conn: Connection = JDBCUtil.getConnection val pstat: PreparedStatement = conn.prepareStatement("select * from black_list") val rs: ResultSet = pstat.executeQuery() while (rs.next()) { blacklist.append(rs.getString(1)) } pstat.close() conn.close() rs.close() //TODO判断点击用户是否在黑名单中 val filterRDD: RDD[AdClickData] = rdd.filter( data => { !blacklist.contains(data.user) } ) //TODO如果用户·不在黑名单中,那么进行统计数量(每个采集周期) filterRDD.map( data => { val sdf = new SimpleDateFormat("yyyy-MM-dd") val day: String = sdf.format(new Date(data.ts.toLong)) val user: String = data.user val ad: String = data.ad ((day, user, ad), 1) } ).reduceByKey(_ + _) } ) ds.foreachRDD( rdd=>{ rdd.foreach{ case ((day,user,ad),count)=>{ println(s"${day} ${user} ${ad} ${count}") if(count>=30){ //TODO 如果统计数量超过点击阀值(30),那么将用户拉入黑名单 val conn = JDBCUtil.getConnection val pstat: PreparedStatement = conn.prepareStatement( //解决插入重复问题 """ |insert into black_list (userid) values (?) |on DUPLICATE KEY |UPDATE userid=? |""".stripMargin ) pstat.setString(1, user) pstat.setString(2, user) pstat.executeUpdate() pstat.close() conn.close() }else{ //TODO 如果没有超过阀值,那么需要将当天的广告点击数量进行更新 val conn = JDBCUtil.getConnection val pstat: PreparedStatement = conn.prepareStatement( """ |select | * | from user_ad_count | where dt=? and userid=? and adid=? | |""".stripMargin ) pstat.setString(1, day) pstat.setString(2, user) pstat.setString(3, ad) val rs: ResultSet = pstat.executeQuery() //查询统计表数据 if(rs.next()){ //如果存在,那我们更新 val pstat1: PreparedStatement = conn.prepareStatement( """ |update user_ad_count |set count=count + ? |where dt=? and userid=? and adid=? |""".stripMargin ) //传值 pstat1.setInt(1, count) pstat1.setString(2, day) pstat1.setString(3, user) pstat1.setString(4, ad) pstat1.executeUpdate() pstat1.close() //TODO 判断更新后的点击数据是否超过阀值,如果超过了,那么将用户拉入到黑名单 val psta2: PreparedStatement = conn.prepareStatement( """ |select |* |from user_ad_count |where dt=? and userid=? and adid=? and count>=30 |""".stripMargin ) psta2.setString(1,day) psta2.setString(2,user) psta2.setString(3,ad) val rs2: ResultSet = psta2.executeQuery() if (rs.next()) { val pstat3: PreparedStatement = conn.prepareStatement( """ |insert into black_list (userid) values (?) |on DUPLICATE KEY |UPDATE userid=? |""".stripMargin ) pstat3.setString(1, user) pstat3.setString(2, user) pstat3.executeUpdate() pstat3.close() } rs2.close() }else{ //如果不存在,我们就新增 val pstat1: PreparedStatement = conn.prepareStatement( """ |insert into user_ad_count (dt,user_id,adid,count) values(?,?,?,?) | |""".stripMargin ) pstat1.setString(1, day) pstat1.setString(2, user) pstat1.setString(3, ad) pstat1.setInt(4, count) pstat1.executeUpdate() pstat1.close() } rs.close() pstat.close() conn.close() } } } } ) //启动采集器(接收器) ssc.start() //等待采集器的结束 ssc.awaitTermination() } //广告点击数据 case class AdClickData(ts:String,area:String,city:String,user:String,ad:String) }