前言
1、环境准备
- 启动Zookeeper和Kafka集群
- 导入依赖:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.2.4</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>3.2.4</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.12</artifactId> <version>3.2.4</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.30</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.1.10</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.14.2</version> </dependency>
2、模拟生产数据
通过循环来不断生产随机数据、使用Kafka来发布订阅消息。
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import java.util.Properties import scala.collection.mutable.ListBuffer import scala.util.Random // 生产模拟数据 object MockData { def main(args: Array[String]): Unit = { // 生成模拟数据 // 格式: timestamp area city userid adid // 含义: 时间戳 省份 城市 用户 广告 // 生产数据 => Kafka => SparkStreaming => 分析处理 // 设置Zookeeper属性 val props = new Properties() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer") val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](props) while (true){ mockData().foreach( (data: String) => { // 向 Kafka 中生成数据 val record = new ProducerRecord[String,String]("testTopic",data) producer.send(record) println(record) } ) Thread.sleep(2000) } } def mockData(): ListBuffer[String] = { val list = ListBuffer[String]() val areaList = ListBuffer[String]("华东","华南","华北","华南") val cityList = ListBuffer[String]("北京","西安","上海","广东") for (i <- 1 to 30){ val area = areaList(new Random().nextInt(4)) val city = cityList(new Random().nextInt(4)) val userid = new Random().nextInt(6) + 1 val adid = new Random().nextInt(6) + 1 list.append(s"${System.currentTimeMillis()} ${area} ${city} ${userid} ${adid}") } list } }
3、模拟消费数据
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} // 消费数据 object Kafka_req1 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("kafka req1") val ssc = new StreamingContext(conf,Seconds(3)) // 定义Kafka参数: kafka集群地址、消费者组名称、key序列化、value序列化 val kafkaPara: Map[String,Object] = Map[String,Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092", ConsumerConfig.GROUP_ID_CONFIG ->"lyh", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer] ) // 读取Kafka数据创建DStream val kafkaDStream: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String]( ssc, LocationStrategies.PreferConsistent, //优先位置 ConsumerStrategies.Subscribe[String,String](Set("testTopic"),kafkaPara) // 消费策略:(订阅多个主题,配置参数) ) // 将每条消息的KV取出 val valueDStream: DStream[String] = kafkaDStream.map(_.value()) // 计算WordCount valueDStream.print() // 开启任务 ssc.start() ssc.awaitTermination() } }
4、需求1 广告黑名单
实现实时的动态黑名单机制:将每天对某个广告点击超过 100 次的用户拉黑。(黑名单保存到 MySQL 中。)
先判断用户是否已经在黑名单中?过滤:判断用户点击是否超过阈值?拉入黑名单:更新用户的点击数量,并获取最新的点击数据再判断是否超过阈值?拉入黑名单:不做处理
需要两张表:黑名单、点击数量表。
create table black_list (userid char(1));
CREATE TABLE user_ad_count ( dt varchar(255), userid CHAR (1), adid CHAR (1), count BIGINT, PRIMARY KEY (dt, userid, adid) );
import com.alibaba.druid.pool.DruidDataSourceFactory import java.sql.Connection import java.util.Properties import javax.sql.DataSource object JDBCUtil { var dataSource: DataSource = init() //初始化连接池 def init(): DataSource = { val properties = new Properties() properties.setProperty("driverClassName", "com.mysql.jdbc.Driver") properties.setProperty("url", "jdbc:mysql://hadoop102:3306/spark-streaming?useUnicode=true&characterEncoding=UTF-8&useSSL=false") properties.setProperty("username", "root") properties.setProperty("password", "000000") properties.setProperty("maxActive", "50") DruidDataSourceFactory.createDataSource(properties) } //获取连接对象 def getConnection(): Connection ={ dataSource.getConnection } }
需求实现:
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} import java.sql.Connection import java.text.SimpleDateFormat import java.util.Date import scala.collection.mutable.ListBuffer // 消费数据 object Kafka_req1 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("kafka req1") val ssc = new StreamingContext(conf,Seconds(3)) // 定义Kafka参数: kafka集群地址、消费者组名称、key序列化、value序列化 val kafkaPara: Map[String,Object] = Map[String,Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092", ConsumerConfig.GROUP_ID_CONFIG ->"lyh", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer] ) // 读取Kafka数据创建DStream val kafkaDStream: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String]( ssc, LocationStrategies.PreferConsistent, //优先位置 ConsumerStrategies.Subscribe[String,String](Set("testTopic"),kafkaPara) // 消费策略:(订阅多个主题,配置参数) ) val clickData: DStream[AdClickData] = kafkaDStream.map( kafkaData => { val data = kafkaData.value() val datas = data.split(" ") AdClickData(datas(0), datas(1), datas(2), datas(3),datas(4)) } ) val ds: DStream[((String,String,String),Int)] = clickData.transform( //周期性地拿到 RDD 数据 rdd => { // todo 周期性获取黑名单数据,就要周期性读取MySQL中的数据 val black_list = ListBuffer[String]() val con: Connection = JDBCUtil.getConnection() val stmt = con.prepareStatement("select * from black_list") val rs = stmt.executeQuery() while (rs.next()) { black_list.append(rs.getString(1)) } rs.close() stmt.close() con.close() // todo 判断用户是否在黑名单当中,在就过滤掉 val filterRDD = rdd.filter( data => { !black_list.contains(data.user) } ) // todo 如果不在,那么统计点击数量 filterRDD.map( data => { val sdf = new SimpleDateFormat("yyyy-MM-dd") val day = sdf.format(new Date(data.ts.toLong)) val user = data.user val ad = data.ad ((day, user, ad), 1) // 返回键值对 } ).reduceByKey(_ + _) } ) ds.foreachRDD( rdd => { rdd.foreach { case ((day, user, ad), count) => { println(s"$day $user $ad $count") if (count>=30){ // todo 如果统计数量超过点击阈值(30),拉入黑名单 val con = JDBCUtil.getConnection() val stmt = con.prepareStatement( """ |insert into black_list values(?) |on duplicate key |update userid=? |""".stripMargin ) stmt.setString(1,user) stmt.setString(2,user) stmt.executeUpdate() stmt.close() con.close() }else{ // todo 如果没有超过阈值,更新到当天点击数量 val con = JDBCUtil.getConnection() val stmt = con.prepareStatement( """ |select * |from user_ad_count |where dt=? and userid=? and adid=? |""".stripMargin) stmt.setString(1,day) stmt.setString(2,user) stmt.setString(3,ad) val rs = stmt.executeQuery() if (rs.next()){ //如果存在数据 val stmt1 = con.prepareStatement( """ |update user_ad_count |set count=count+? |where dt=? and userid=? and adid=? |""".stripMargin) stmt1.setInt(1,count) stmt1.setString(2,day) stmt1.setString(3,user) stmt1.setString(4,ad) stmt1.executeUpdate() stmt1.close() // todo 如果更新后的点击数量超过阈值,拉入黑名单 val stmt2 = con.prepareStatement( """ |select * |from user_ad_count |where dt=? and userid=? and adid=? |""".stripMargin) stmt2.setString(1,day) stmt2.setString(2,user) stmt2.setString(3,ad) val rs1 = stmt2.executeQuery() if (rs1.next()){ val stmt3 = con.prepareStatement( """ |insert into black_list(userid) values(?) |on duplicate key |update userid=? |""".stripMargin) stmt3.setString(1,user) stmt3.setString(2,user) stmt3.executeUpdate() stmt3.close() } rs1.close() stmt2.close() }else{ // todo 如果不存在数据,那么新增 val stmt1 = con.prepareStatement( """ |insert into user_ad_count(dt,userid,adid,count) values(?,?,?,?) |""".stripMargin) stmt1.setString(1,day) stmt1.setString(2,user) stmt1.setString(3,ad) stmt1.setInt(4,count) stmt1.executeUpdate() stmt1.close() } rs.close() stmt.close() con.close() } } } } ) // 开启任务 ssc.start() ssc.awaitTermination() } // 广告点击数据 case class AdClickData(ts: String,area: String,city: String,user: String,ad: String) }
5、需求2 广告实时点击数据
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import java.text.SimpleDateFormat import java.util.Date object Kafka_req2 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("kafka req2") val ssc = new StreamingContext(conf,Seconds(3)) // 定义Kafka参数: kafka集群地址、消费者组名称、key序列化、value序列化 val kafkaPara: Map[String,Object] = Map[String,Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092", ConsumerConfig.GROUP_ID_CONFIG ->"lyh", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer] ) // 读取Kafka数据创建DStream val kafkaDStream: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String]( ssc, LocationStrategies.PreferConsistent, //优先位置 ConsumerStrategies.Subscribe[String,String](Set("testTopic"),kafkaPara) // 消费策略:(订阅多个主题,配置参数) ) // 对DStream进行转换操作 val clickData: DStream[AdClickData] = kafkaDStream.map( kafkaData => { val data = kafkaData.value() val datas = data.split(" ") AdClickData(datas(0), datas(1), datas(2), datas(3),datas(4)) } ) val ds: DStream[((String, String, String, String), Int)] = clickData.map( (data: AdClickData) => { val sdf = new SimpleDateFormat("yyyy-MM-dd") val day = sdf.format(new Date(data.ts.toLong)) val area = data.area val city = data.city val ad = data.ad ((day, area, city, ad), 1) } ).reduceByKey(_+_) ds.foreachRDD( rdd=>{ rdd.foreachPartition( iter => { val con = JDBCUtil.getConnection() val stmt = con.prepareStatement( """ |insert into area_city_ad_count (dt,area,city,adid,count) |values (?,?,?,?,?) |on duplicate key |update count=count+? |""".stripMargin) iter.foreach { case ((day, area, city, ad), sum) => { println(s"$day $area $city $ad $sum") stmt.setString(1,day) stmt.setString(2,area) stmt.setString(3,city) stmt.setString(4,ad) stmt.setInt(5,sum) stmt.setInt(6,sum) stmt.executeUpdate() } } stmt.close() con.close() } ) } ) ssc.start() ssc.awaitTermination() } // 广告点击数据 case class AdClickData(ts: String,area: String,city: String,user: String,ad: String) }
需求3、一段时间内的广告点击数据
注意:窗口范围和滑动范围必须是收集器收集数据间隔的整数倍!!
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} object Kafka_req3 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[*]").setAppName("kafka req3") val ssc = new StreamingContext(conf,Seconds(5)) //每5s收集器收集一次数据形成一个RDD加入到DStream中 // 定义Kafka参数: kafka集群地址、消费者组名称、key序列化、value序列化 val kafkaPara: Map[String,Object] = Map[String,Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092", ConsumerConfig.GROUP_ID_CONFIG ->"lyh", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer] ) // 读取Kafka数据创建DStream val kafkaDStream: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String]( ssc, LocationStrategies.PreferConsistent, //优先位置 ConsumerStrategies.Subscribe[String,String](Set("testTopic"),kafkaPara) // 消费策略:(订阅多个主题,配置参数) ) val adClickData = kafkaDStream.map( (kafkaData: ConsumerRecord[String, String]) => { val data = kafkaData.value() val datas = data.split(" ") AdClickData(datas(0), datas(1), datas(2), datas(3), datas(4)) } ) val ds = adClickData.map( data => { val ts = data.ts.toLong /** * 为了结果展示的时候更加美观: ts=1698477282712ms * 我们希望统计的数据的是近一分钟的数据(每10s展示一次): * 15:10:00 ~ 15:11:00 * 15:10:10 ~ 15:11:10 * 15:10:20 ~ 15:11:20 * ... * ts/1000 => 1698477282s (我们把秒换成0好看点) ts/10*10=1698477280s => 转成ms ts*1000 = 1698477282000ms * 所以就是 ts / 10000 * 10000 */ val newTs = ts / 10000 * 10000 (newTs, 1) } ).reduceByKeyAndWindow((_: Int)+(_:Int), Seconds(60), Seconds(10)) //windowDurations和slideDuration都必须是收集器收集频率的整数倍 ds.print() ssc.start() ssc.awaitTermination() } // 广告点击数据 case class AdClickData(ts: String,area: String,city: String,user: String,ad: String) }
产生的数据