SparkStreaming【实例演示】

本文涉及的产品
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
云数据库 RDS MySQL Serverless,价值2615元额度,1个月
简介: SparkStreaming【实例演示】

前言

1、环境准备

  1. 启动Zookeeper和Kafka集群
  2. 导入依赖:
<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.2.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>3.2.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>3.2.4</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.30</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.10</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.14.2</version>
        </dependency>

2、模拟生产数据

通过循环来不断生产随机数据、使用Kafka来发布订阅消息。

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import java.util.Properties
import scala.collection.mutable.ListBuffer
import scala.util.Random
// 生产模拟数据
object MockData {
  def main(args: Array[String]): Unit = {
    // 生成模拟数据
    // 格式: timestamp area city userid adid
    // 含义:   时间戳    省份  城市   用户  广告
    // 生产数据 => Kafka => SparkStreaming => 分析处理
    // 设置Zookeeper属性
    val props = new Properties()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
    val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](props)
    while (true){
      mockData().foreach(
        (data: String) => {
          // 向 Kafka 中生成数据
          val record = new ProducerRecord[String,String]("testTopic",data)
          producer.send(record)
          println(record)
        }
      )
      Thread.sleep(2000)
    }
  }
  def mockData(): ListBuffer[String] = {
    val list = ListBuffer[String]()
    val areaList = ListBuffer[String]("华东","华南","华北","华南")
    val cityList = ListBuffer[String]("北京","西安","上海","广东")
    for (i <- 1 to 30){
      val area = areaList(new Random().nextInt(4))
      val city = cityList(new Random().nextInt(4))
      val userid = new Random().nextInt(6) + 1
      val adid = new Random().nextInt(6) + 1
      list.append(s"${System.currentTimeMillis()} ${area} ${city} ${userid} ${adid}")
    }
    list
  }
}

3、模拟消费数据

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
// 消费数据
object Kafka_req1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("kafka req1")
    val ssc = new StreamingContext(conf,Seconds(3))
    // 定义Kafka参数: kafka集群地址、消费者组名称、key序列化、value序列化
    val kafkaPara: Map[String,Object] = Map[String,Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      ConsumerConfig.GROUP_ID_CONFIG ->"lyh",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
    )
    // 读取Kafka数据创建DStream
    val kafkaDStream: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String](
      ssc,
      LocationStrategies.PreferConsistent,  //优先位置
      ConsumerStrategies.Subscribe[String,String](Set("testTopic"),kafkaPara) // 消费策略:(订阅多个主题,配置参数)
    )
    // 将每条消息的KV取出
    val valueDStream: DStream[String] = kafkaDStream.map(_.value())
    // 计算WordCount
    valueDStream.print()
   // 开启任务
    ssc.start()
    ssc.awaitTermination()
  }
}

4、需求1 广告黑名单

实现实时的动态黑名单机制:将每天对某个广告点击超过 100 次的用户拉黑。(黑名单保存到 MySQL 中。)

先判断用户是否已经在黑名单中?过滤:判断用户点击是否超过阈值?拉入黑名单:更新用户的点击数量,并获取最新的点击数据再判断是否超过阈值?拉入黑名单:不做处理


需要两张表:黑名单、点击数量表。

create table black_list (userid char(1));
CREATE TABLE user_ad_count (
dt varchar(255),
userid CHAR (1),
adid CHAR (1),
count BIGINT,
PRIMARY KEY (dt, userid, adid)
);

JDBC工具类

import com.alibaba.druid.pool.DruidDataSourceFactory
import java.sql.Connection
import java.util.Properties
import javax.sql.DataSource
object JDBCUtil {
  var dataSource: DataSource = init()
  //初始化连接池
  def init(): DataSource = {
    val properties = new Properties()
    properties.setProperty("driverClassName", "com.mysql.jdbc.Driver")
    properties.setProperty("url", "jdbc:mysql://hadoop102:3306/spark-streaming?useUnicode=true&characterEncoding=UTF-8&useSSL=false")
    properties.setProperty("username", "root")
    properties.setProperty("password", "000000")
    properties.setProperty("maxActive", "50")
    DruidDataSourceFactory.createDataSource(properties)
  }
  //获取连接对象
  def getConnection(): Connection ={
    dataSource.getConnection
  }
}

需求实现:

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import java.sql.Connection
import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.mutable.ListBuffer
// 消费数据
object Kafka_req1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("kafka req1")
    val ssc = new StreamingContext(conf,Seconds(3))
    // 定义Kafka参数: kafka集群地址、消费者组名称、key序列化、value序列化
    val kafkaPara: Map[String,Object] = Map[String,Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      ConsumerConfig.GROUP_ID_CONFIG ->"lyh",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
    )
    // 读取Kafka数据创建DStream
    val kafkaDStream: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String](
      ssc,
      LocationStrategies.PreferConsistent,  //优先位置
      ConsumerStrategies.Subscribe[String,String](Set("testTopic"),kafkaPara) // 消费策略:(订阅多个主题,配置参数)
    )
    val clickData: DStream[AdClickData] = kafkaDStream.map(
      kafkaData => {
        val data = kafkaData.value()
        val datas = data.split(" ")
        AdClickData(datas(0), datas(1), datas(2), datas(3),datas(4))
      }
    )
    val ds: DStream[((String,String,String),Int)] = clickData.transform( //周期性地拿到 RDD 数据
      rdd => {
        // todo 周期性获取黑名单数据,就要周期性读取MySQL中的数据
        val black_list = ListBuffer[String]()
        val con: Connection = JDBCUtil.getConnection()
        val stmt = con.prepareStatement("select * from black_list")
        val rs = stmt.executeQuery()
        while (rs.next()) {
          black_list.append(rs.getString(1))
        }
        rs.close()
        stmt.close()
        con.close()
        // todo 判断用户是否在黑名单当中,在就过滤掉
        val filterRDD = rdd.filter(
          data => {
            !black_list.contains(data.user)
          }
        )
        // todo 如果不在,那么统计点击数量
        filterRDD.map(
          data => {
            val sdf = new SimpleDateFormat("yyyy-MM-dd")
            val day = sdf.format(new Date(data.ts.toLong))
            val user = data.user
            val ad = data.ad
            ((day, user, ad), 1) // 返回键值对
          }
        ).reduceByKey(_ + _)
      }
    )
    ds.foreachRDD(
      rdd => {
        rdd.foreach {
          case ((day, user, ad), count) => {
            println(s"$day $user $ad $count")
            if (count>=30){
              // todo 如果统计数量超过点击阈值(30),拉入黑名单
              val con = JDBCUtil.getConnection()
              val stmt = con.prepareStatement(
                """
                  |insert into black_list values(?)
                  |on duplicate key
                  |update userid=?
                  |""".stripMargin
              )
              stmt.setString(1,user)
              stmt.setString(2,user)
              stmt.executeUpdate()
              stmt.close()
              con.close()
            }else{
              // todo 如果没有超过阈值,更新到当天点击数量
              val con = JDBCUtil.getConnection()
              val stmt = con.prepareStatement(
                """
                  |select *
                  |from user_ad_count
                  |where dt=? and userid=? and adid=?
                  |""".stripMargin)
              stmt.setString(1,day)
              stmt.setString(2,user)
              stmt.setString(3,ad)
              val rs = stmt.executeQuery()
              if (rs.next()){ //如果存在数据
                val stmt1 = con.prepareStatement(
                  """
                    |update user_ad_count
                    |set count=count+?
                    |where dt=? and userid=? and adid=?
                    |""".stripMargin)
                stmt1.setInt(1,count)
                stmt1.setString(2,day)
                stmt1.setString(3,user)
                stmt1.setString(4,ad)
                stmt1.executeUpdate()
                stmt1.close()
                // todo 如果更新后的点击数量超过阈值,拉入黑名单
                val stmt2 = con.prepareStatement(
                  """
                    |select *
                    |from user_ad_count
                    |where dt=? and userid=? and adid=?
                    |""".stripMargin)
                stmt2.setString(1,day)
                stmt2.setString(2,user)
                stmt2.setString(3,ad)
                val rs1 = stmt2.executeQuery()
                if (rs1.next()){
                  val stmt3 = con.prepareStatement(
                    """
                      |insert into black_list(userid) values(?)
                      |on duplicate key
                      |update userid=?
                      |""".stripMargin)
                  stmt3.setString(1,user)
                  stmt3.setString(2,user)
                  stmt3.executeUpdate()
                  stmt3.close()
                }
                rs1.close()
                stmt2.close()
              }else{
                // todo 如果不存在数据,那么新增
                val stmt1 = con.prepareStatement(
                  """
                    |insert into user_ad_count(dt,userid,adid,count) values(?,?,?,?)
                    |""".stripMargin)
                stmt1.setString(1,day)
                stmt1.setString(2,user)
                stmt1.setString(3,ad)
                stmt1.setInt(4,count)
                stmt1.executeUpdate()
                stmt1.close()
              }
              rs.close()
              stmt.close()
              con.close()
            }
          }
        }
      }
    )
    // 开启任务
    ssc.start()
    ssc.awaitTermination()
  }
  // 广告点击数据
  case class AdClickData(ts: String,area: String,city: String,user: String,ad: String)
}

5、需求2 广告实时点击数据

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import java.text.SimpleDateFormat
import java.util.Date
object Kafka_req2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("kafka req2")
    val ssc = new StreamingContext(conf,Seconds(3))
    // 定义Kafka参数: kafka集群地址、消费者组名称、key序列化、value序列化
    val kafkaPara: Map[String,Object] = Map[String,Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      ConsumerConfig.GROUP_ID_CONFIG ->"lyh",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
    )
    // 读取Kafka数据创建DStream
    val kafkaDStream: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String](
      ssc,
      LocationStrategies.PreferConsistent,  //优先位置
      ConsumerStrategies.Subscribe[String,String](Set("testTopic"),kafkaPara) // 消费策略:(订阅多个主题,配置参数)
    )
    // 对DStream进行转换操作
    val clickData: DStream[AdClickData] = kafkaDStream.map(
      kafkaData => {
        val data = kafkaData.value()
        val datas = data.split(" ")
        AdClickData(datas(0), datas(1), datas(2), datas(3),datas(4))
      }
    )
    val ds: DStream[((String, String, String, String), Int)] = clickData.map(
      (data: AdClickData) => {
        val sdf = new SimpleDateFormat("yyyy-MM-dd")
        val day = sdf.format(new Date(data.ts.toLong))
        val area = data.area
        val city = data.city
        val ad = data.ad
        ((day, area, city, ad), 1)
      }
    ).reduceByKey(_+_)
    ds.foreachRDD(
      rdd=>{
        rdd.foreachPartition(
          iter => {
            val con = JDBCUtil.getConnection()
            val stmt = con.prepareStatement(
              """
                |insert into area_city_ad_count (dt,area,city,adid,count)
                |values (?,?,?,?,?)
                |on duplicate key
                |update count=count+?
                |""".stripMargin)
            iter.foreach {
              case ((day, area, city, ad), sum) => {
                println(s"$day $area $city $ad $sum")
                stmt.setString(1,day)
                stmt.setString(2,area)
                stmt.setString(3,city)
                stmt.setString(4,ad)
                stmt.setInt(5,sum)
                stmt.setInt(6,sum)
                stmt.executeUpdate()
              }
            }
            stmt.close()
            con.close()
          }
        )
      }
    )
    ssc.start()
    ssc.awaitTermination()
  }
  // 广告点击数据
  case class AdClickData(ts: String,area: String,city: String,user: String,ad: String)
}

需求3、一段时间内的广告点击数据

注意:窗口范围和滑动范围必须是收集器收集数据间隔的整数倍!!

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
object Kafka_req3 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("kafka req3")
    val ssc = new StreamingContext(conf,Seconds(5)) //每5s收集器收集一次数据形成一个RDD加入到DStream中
    // 定义Kafka参数: kafka集群地址、消费者组名称、key序列化、value序列化
    val kafkaPara: Map[String,Object] = Map[String,Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      ConsumerConfig.GROUP_ID_CONFIG ->"lyh",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
    )
    // 读取Kafka数据创建DStream
    val kafkaDStream: InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String](
      ssc,
      LocationStrategies.PreferConsistent,  //优先位置
      ConsumerStrategies.Subscribe[String,String](Set("testTopic"),kafkaPara) // 消费策略:(订阅多个主题,配置参数)
    )
    val adClickData = kafkaDStream.map(
      (kafkaData: ConsumerRecord[String, String]) => {
        val data = kafkaData.value()
        val datas = data.split(" ")
        AdClickData(datas(0), datas(1), datas(2), datas(3), datas(4))
      }
    )
    val ds = adClickData.map(
      data => {
        val ts = data.ts.toLong
        /**
         * 为了结果展示的时候更加美观: ts=1698477282712ms
         * 我们希望统计的数据的是近一分钟的数据(每10s展示一次):
         * 15:10:00 ~ 15:11:00
         * 15:10:10 ~ 15:11:10
         * 15:10:20 ~ 15:11:20
         * ...
         * ts/1000 => 1698477282s (我们把秒换成0好看点) ts/10*10=1698477280s => 转成ms ts*1000 = 1698477282000ms
         * 所以就是 ts / 10000 * 10000
         */
        val newTs = ts / 10000 * 10000
        (newTs, 1)
      }
    ).reduceByKeyAndWindow((_: Int)+(_:Int), Seconds(60), Seconds(10))  //windowDurations和slideDuration都必须是收集器收集频率的整数倍
    ds.print()
    ssc.start()
    ssc.awaitTermination()
  }
  // 广告点击数据
  case class AdClickData(ts: String,area: String,city: String,user: String,ad: String)
}

产生的数据

相关文章
|
6月前
|
SQL 存储 分布式计算
Spark1:概述
Spark1:概述
74 0
|
2月前
|
消息中间件 分布式计算 Kafka
SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)
SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(一)
|
2月前
|
SQL 分布式计算 Java
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
139 1
|
2月前
|
消息中间件 JSON Kafka
实战 | Apache Hudi回调功能简介及使用示例
实战 | Apache Hudi回调功能简介及使用示例
19 0
|
2月前
|
存储 测试技术 API
Apache Hudi 负载类Payload使用案例剖析
Apache Hudi 负载类Payload使用案例剖析
44 4
|
2月前
|
机器学习/深度学习 分布式计算 监控
典型的Spark应用实例
典型的Spark应用实例
49 1
|
9月前
|
消息中间件 存储 分布式计算
Spark学习---6、SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(二)
Spark学习---6、SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(二)
|
9月前
|
消息中间件 分布式计算 Kafka
Spark学习---6、SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(一)
Spark学习---6、SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(一)
|
9月前
|
SQL 存储 JSON
Spark学习---5、SparkSQL(概述、编程、数据的加载和保存)(二)
Spark学习---5、SparkSQL(概述、编程、数据的加载和保存)(二)
|
9月前
|
SQL 缓存 分布式计算
Spark学习---5、SparkSQL(概述、编程、数据的加载和保存)(一)
Spark学习---5、SparkSQL(概述、编程、数据的加载和保存)(一)