大数据Spark物联网设备数据分析

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 大数据Spark物联网设备数据分析

1 设备监控数据

在物联网时代,大量的感知器每天都在收集并产生着涉及各个领域的数据。物联网提供源源不断的数据流,使实时数据分析成为分析数据的理想工具。


模拟一个智能物联网系统的数据统计分析,产生设备数据发送到Kafka,结构化流Structured

Streaming实时消费统计。对物联网设备状态信号数据,实时统计分析:


1)、信号强度大于30的设备;

2)、各种设备类型的数量;

3)、各种设备类型的平均信号强度;

编写程序模拟生成物联网设备监控数据,发送到Kafka Topic中,此处为了演示字段较少,实际

生产项目中字段很多。


1.1 创建 Topic

启动Kafka Broker服务,创建Topic【search-log-topic】,命令如下所示:

# 启动Zookeeper
/export/server/zookeeper/bin/zkServer.sh start
# 启动Kafka Broker
/export/server/kafka/bin/kafka-server-start.sh -daemon /export/server/kafka/config/server.properties
rm -rf /export/server/kafka/logs/*
# 创建topic
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1.oldlu.cn:2181/kafka200 --replication-fa
ctor 1 --partitions 3 --topic iotTopic
# 模拟生产者
/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1.oldlu.cn:9092 --topic iotTopic
# 模拟消费者
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1.oldlu.cn:9092 --topic iotTopic
--from-beginning
# 删除topic
/export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1.oldlu.cn:2181/kafka200 --topic iotTopic

1.2 模拟数据

模拟设备监控日志数据,字段信息封装到CaseClass样例类【DeviceData】类,代码如下


package cn.oldlu.spark.iot
/**
 * 物联网设备发送状态数据
 *
 * @param device     设备标识符ID
 * @param deviceType 设备类型,如服务器mysql, redis, kafka或路由器route
 * @param signal     设备信号
 * @param time       发送数据时间
 */
case class DeviceData(
                       device: String, //
                       deviceType: String, //
                       signal: Double, //
                       time: Long //
                     )

模拟产生日志数据类【MockIotDatas】具体代码如下:

import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import org.json4s.jackson.Json
import scala.util.Random
object MockIotDatas {
  def main(args: Array[String]): Unit = {
    // 发送Kafka Topic
    val props = new Properties()
    props.put("bootstrap.servers", "node1.oldlu.cn:9092")
    props.put("acks", "1")
    props.put("retries", "3")
    props.put("key.serializer", classOf[StringSerializer].getName)
    props.put("value.serializer", classOf[StringSerializer].getName)
    val producer = new KafkaProducer[String, String](props)
    val deviceTypes = Array(
      "db", "bigdata", "kafka", "route", "bigdata", "db", "bigdata", "bigdata", "bigdata"
    )
    val random: Random = new Random()
    while (true) {
      val index: Int = random.nextInt(deviceTypes.length)
      val deviceId: String = s"device_${(index + 1) * 10 + random.nextInt(index + 1)}"
      val deviceType: String = deviceTypes(index)
      val deviceSignal: Int = 10 + random.nextInt(90)
      // 模拟构造设备数据
      val deviceData = DeviceData(deviceId, deviceType, deviceSignal, System.currentTimeMillis())
      // 转换为JSON字符串
      val deviceJson: String = new Json(org.json4s.DefaultFormats).write(deviceData)
      println(deviceJson)
      Thread.sleep(100 + random.nextInt(500))
      val record = new ProducerRecord[String, String]("iotTopic", deviceJson)
      producer.send(record)
    }
    // 关闭连接
    producer.close()
  }
}

相当于大机房中各个服务器定时发送相关监控数据至Kafka中,服务器部署服务有数据库db、大

数据集群bigdata、消息队列kafka及路由器route等等,数据样本:

{"device":"device_50","deviceType":"bigdata","signal":91.0,"time":1590660338429}
{"device":"device_20","deviceType":"bigdata","signal":17.0,"time":1590660338790}
{"device":"device_32","deviceType":"kafka","signal":93.0,"time":1590660338908}
{"device":"device_82","deviceType":"bigdata","signal":72.0,"time":1590660339380}
{"device":"device_32","deviceType":"kafka","signal":10.0,"time":1590660339972}
{"device":"device_96","deviceType":"bigdata","signal":18.0,"time":1590660343554}

2 基于DataFrame分析

按照业务需求,从Kafka消费日志数据,基于DataFrame数据结构调用函数分析,代码如下:

package cn.oldlu.spark.iot
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.types.{DoubleType, LongType}
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
 * 对物联网设备状态信号数据,实时统计分析:
 * 1)、信号强度大于30的设备
 * 2)、各种设备类型的数量
 * 3)、各种设备类型的平均信号强度
 */
object IotStreamingOnline {
  def main(args: Array[String]): Unit = {
    // 1. 构建SparkSession会话实例对象,设置属性信息
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[3]")
      .config("spark.sql.shuffle.partitions", "3")
      .getOrCreate()
    // 导入隐式转换和函数库
    import org.apache.spark.sql.functions._
    import spark.implicits._
    // 2. 从Kafka读取数据,底层采用New Consumer API
    val iotStreamDF: DataFrame = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "node1.oldlu.cn:9092")
      .option("subscribe", "iotTopic")
      // 设置每批次消费数据最大值
      .option("maxOffsetsPerTrigger", "100000")
      .load()
    // 3. 对获取数据进行解析,封装到DeviceData中
    val etlStreamDF: DataFrame = iotStreamDF
      // 获取value字段的值,转换为String类型
      .selectExpr("CAST(value AS STRING)")
      // 将数据转换Dataset
      .as[String] // 内部字段名为value
      // 过滤数据
      .filter(line => null != line && line.trim.length > 0)
      // 解析JSON数据:{"device":"device_65","deviceType":"db","signal":12.0,"time":1589718910796}
      .select(
        get_json_object($"value", "$.device").as("device_id"),
        get_json_object($"value", "$.deviceType").as("device_type"),
        get_json_object($"value", "$.signal").cast(DoubleType).as("signal"),
        get_json_object($"value", "$.time").cast(LongType).as("time")
      )
    // 4. 依据业务,分析处理
    // TODO: signal > 30 所有数据,按照设备类型 分组,统计数量、平均信号强度
    val resultStreamDF: DataFrame = etlStreamDF
      // 信号强度大于10
      .filter($"signal" > 30)
      // 按照设备类型 分组
      .groupBy($"device_type")
      // 统计数量、评价信号强度
      .agg(
        count($"device_type").as("count_device"),
        round(avg($"signal"), 2).as("avg_signal")
      )
    // 5. 启动流式应用,结果输出控制台
    val query: StreamingQuery = resultStreamDF.writeStream
      .outputMode(OutputMode.Complete())
      .format("console")
      .option("numRows", "10")
      .option("truncate", "false")
      .start()
    query.awaitTermination()
    query.stop()
  }
}

其中使用函数get_json_object提取JSON字符串中字段值,将最终结果打印控制台。


3 基于SQL分析

按照业务需求,从Kafka消费日志数据,提取字段信息,将DataFrame注册为临时视图,编写

SQL执行分析,代码如下:

package cn.oldlu.spark.iot
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.types.{DoubleType, LongType}
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
 * 对物联网设备状态信号数据,实时统计分析,基于SQL编程
 * 1)、信号强度大于30的设备
 * 2)、各种设备类型的数量
 * 3)、各种设备类型的平均信号强度
 */
object IotStreamingOnlineSQL {
  def main(args: Array[String]): Unit = {
    // 1. 构建SparkSession会话实例对象,设置属性信息
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[3]")
      .config("spark.sql.shuffle.partitions", "3")
      .getOrCreate()
    import org.apache.spark.sql.functions._
    import spark.implicits._
    // 2. 从Kafka读取数据,底层采用New Consumer API
    val iotStreamDF: DataFrame = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "node1.oldlu.cn:9092")
      .option("subscribe", "iotTopic")
      // 设置每批次消费数据最大值
      .option("maxOffsetsPerTrigger", "100000")
      .load()
    // 3. 对获取数据进行解析,封装到DeviceData中
    val etlStreamDF: DataFrame = iotStreamDF
      // 获取value字段的值,转换为String类型
      .selectExpr("CAST(value AS STRING)")
      // 将数据转换Dataset
      .as[String] // 内部字段名为value
      // 过滤数据
      .filter(line => null != line && line.trim.length > 0)
      // 解析JSON数据:{"device":"device_65","deviceType":"db","signal":12.0,"time":1589718910796}
      .select(
        get_json_object($"value", "$.device").as("device_id"),
        get_json_object($"value", "$.deviceType").as("device_type"),
        get_json_object($"value", "$.signal").cast(DoubleType).as("signal"),
        get_json_object($"value", "$.time").cast(LongType).as("time")
      )
    // 4. 依据业务,分析处理
    // TODO: signal > 30 所有数据,按照设备类型 分组,统计数量、平均信号强度
    // 4.1 注册DataFrame为临时视图
    etlStreamDF.createOrReplaceTempView("view_tmp_stream_iots")
    // 4.2 编写SQL执行查询
    val resultStreamDF: DataFrame = spark.sql(
      """
        |SELECT
        | device_type, COUNT(device_type) AS count_device, ROUND(AVG(signal), 2) AS avg_signal
        |FROM view_tmp_stream_iots
        |WHERE signal > 30 GROUP BY device_type
        |""".stripMargin)
    // 5. 启动流式应用,结果输出控制台
    val query: StreamingQuery = resultStreamDF.writeStream
      .outputMode(OutputMode.Complete())
      .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
        println("===========================================")
        println(s"BatchId = ${batchId}")
        println("===========================================")
        if (!batchDF.isEmpty) batchDF.coalesce(1).show(20, truncate = false)
      }
      .start()
    query.awaitTermination()
    query.stop()
  }
}

运行流式应用,结果如下图所示:

1f84542b51c641adb6a61b1d24acb8e0.png


4 时间概念

在SparkStreaming中窗口统计分析:Window Operation(设置窗口大小WindowInterval和滑动大小SlideInterval),按照Streaming 流式应用接收数据的时间进行窗口设计的,其实是不符合实际应用场景的。

例如,在物联网数据平台中,每个设备产生的数据,其中包含数据产生的时间,然而数据需要

经过一系列采集传输才能被流式计算框架处理:SparkStreaming,此过程需要时间的,再按照处理

时间来统计业务的时候,准确性降低,存在不合理性。

在结构化流Structured Streaming中窗口数据统计时间是基于数据本身事件时间EventTime字

段统计,更加合理性,官方文档:

http://spark.apache.org/docs/2.4.5/structured-streaming-programming-guide.html#window-operations-on-event-time

在Streaming流式数据处理中,按照时间处理数据,其中时间有三种概念:


1)、事件时间EventTime,表示数据本身产生的时间,该字段在数据本身中;

2)、注入时间IngestionTime,表示数据到达流式系统时间,简而言之就是流式处理系统接收到

数据的时间;

3)、处理时间ProcessingTime,表示数据被流式系统真正开始计算操作的时间。

0483528fbc3c4b37ac0eba7cc0c5ebbe.png

不同流式计算框架支持时间不一样,SparkStreaming框架仅仅支持处理时间ProcessTime,StructuredStreaming支持事件时间和处理时间,Flink框架支持三种时间数据操作,实际项目中往往针对【事件时间EventTime】进行数据处理操作,更加合理化。

5 event-time 窗口分析

基于事件时间窗口聚合操作:基于窗口的聚合(例如每分钟事件数)只是事件时间列上特殊类型的分组和聚合,其中每个时间窗口都是一个组,并且每一行可以属于多个窗口/组。事件时间EventTime是嵌入到数据本身中的时间,数据实际真实产生的时间。例如,如果希望获得每分钟由物联网设备生成的事件数,那么可能希望使用生成数据的时间(即数据中的事件时间event time),而不是Spark接收数据的时间(receive time/archive time)。

这个事件时间很自然地用这个模型表示,设备中的每个事件(Event)都是表中的一行(Row),而事件时间(Event Time)是行中的一列值(Column Value)。

因此,这种基于事件时间窗口的聚合查询既可以在静态数据集(例如,从收集的设备事件日志中)上定义,也可以在数据流上定义,从而使用户的使用更加容易。修改词频统计程序,数据流包含每行数据以及生成每行行的时间。希望在10分钟的窗口内对单词进行计数,每5分钟更新一次,如下图所示:

6987ee7ff246416f8ae126895db6dfb1.png

单词在10分钟窗口【12:00-12:10、12:05-12:15、12:10-12:20】等之间接收的单词中计数。注意,

【12:00-12:10】表示处理数据的事件时间为12:00之后但12:10之前的数据。思考一下,12:07的一条数据,应该增加对应于两个窗口12:00-12:10和12:05-12:15的计数。基于事件时间窗口统计有两个参数索引:分组键(如单词)和窗口(事件时间字段)。

ec4307f9991645cbb750df2a263a0ee9.png

为了演示案例,将上述案例中的每5分钟统计最近10分钟窗口改为每5秒统计最近10秒窗口数

据,测试数据集:

2019-10-12 09:00:02,cat dog
2019-10-12 09:00:03,dog dog
2019-10-12 09:00:07,owl cat
2019-10-12 09:00:11,dog
2019-10-12 09:00:13,owl

案例中三个时间范围,说明如下:

1、触发时间间隔,trigger interval:5秒 (案例:5分钟)
2、事件时间窗口大小,window interval:10秒(案例:10分钟)
3、滑动大小,slider interval:5秒(案例:5分钟)

官方案例演示代码如下:

import java.sql.Timestamp
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
 * 基于Structured Streaming 模块读取TCP Socket读取数据,进行事件时间窗口统计词频WordCount,将结果打印到控制台
 * TODO:每5秒钟统计最近10秒内的数据(词频:WordCount)
 *
 * EventTime即事件真正生成的时间:
 * 例如一个用户在10:06点击 了一个按钮,记录在系统中为10:06
 * 这条数据发送到Kafka,又到了Spark Streaming中处理,已经是10:08,这个处理的时间就是process Time。
 *
 * 测试数据:
 * 2019-10-12 09:00:02,cat dog
 * 2019-10-12 09:00:03,dog dog
 * 2019-10-12 09:00:07,owl cat
 * 2019-10-12 09:00:11,dog
 * 2019-10-12 09:00:13,owl
 */
object StructuredWindow {
  def main(args: Array[String]): Unit = {
    // 1. 构建SparkSession实例对象,传递sparkConf参数
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[2]")
      .config("spark.sql.shuffle.partitions", "2")
      .getOrCreate()
    import org.apache.spark.sql.functions._
    import spark.implicits._
    // 2. 使用SparkSession从TCP Socket读取流式数据
    val inputStreamDF: DataFrame = spark.readStream
      .format("socket")
      .option("host", "node1.oldlu.cn")
      .option("port", 9999)
      .load()
    // 3. 针对获取流式DStream进行词频统计
    val resultStreamDF = inputStreamDF
      // 将DataFrame转换为Dataset操作,Dataset是类型安全,强类型
      .as[String]
      .filter(line => null != line && line.trim.length > 0)
      // 将每行数据进行分割单词: 2019-10-12 09:00:02,cat dog
      .flatMap { line =>
        val arr = line.trim.split(",")
        arr(1).split("\\s+").map(word => (Timestamp.valueOf(arr(0)), word))
      }
      // 设置列的名称
      .toDF("insert_timestamp", "word")
      // TODO:设置基于事件时间(event time)窗口 -> insert_timestamp, 每5秒统计最近10秒内数据
      /*
      1. 先按照窗口分组、2. 再对窗口中按照单词分组、 3. 最后使用聚合函数聚合
      */
      .groupBy(
        window($"insert_timestamp", "10 seconds", "5 seconds"), $"word"
      ).count()
      .orderBy($"window") // 按照窗口字段降序排序
    /*
    root
    |-- window: struct (nullable = true)
    | |-- start: timestamp (nullable = true)
    | |-- end: timestamp (nullable = true)
    |-- word: string (nullable = true)
    |-- count: long (nullable = false)
    */
    //resultStreamDF.printSchema()
    // 4. 将计算的结果输出,打印到控制台
    val query: StreamingQuery = resultStreamDF.writeStream
      .outputMode(OutputMode.Complete())
      .format("console")
      .option("numRows", "100")
      .option("truncate", "false")
      .trigger(Trigger.ProcessingTime("5 seconds"))
      .start()
    query.awaitTermination()
    query.stop()
  }
}

运行上述基于事件时间Event Time窗口统计流式应用,演示效果图如下所示:

f20a6aa185054dfa918cca3573508f3e.png

6 event-time 窗口生成

Structured Streaming中如何依据EventTime事件时间生成窗口的呢?查看类TimeWindowing源码中生成窗口规则:

org.apache.spark.sql.catalyst.analysis.TimeWindowing
// 窗口个数
/* 最大的窗口数 = 向上取整(窗口长度/滑动步长) */
maxNumOverlapping <- ceil(windowDuration / slideDuration)
for (i <- 0 until maxNumOverlapping)
/**
timestamp是event-time 传进的时间戳
startTime是window窗口参数,默认是0 second 从时间的0s
含义:event-time从1970年...有多少个滑动步长,如果说浮点数会向上取整
*/
windowId <- ceil((timestamp - startTime) / slideDuration)
/**
windowId * slideDuration 向上取能整除滑动步长的时间
(i - maxNumOverlapping) * slideDuration 每一个窗口开始时间相差一个步长
*/
windowStart <- windowId * slideDuration + (i - maxNumOverlapping) * slideDuration + startTime
windowEnd <- windowStart + windowDuration
return windowStart, windowEnd

将【(event-time向上取 能整除 滑动步长的时间) - (最大窗口数×滑动步长)】作为"初始窗口"

的开始时间,然后按照窗口滑动宽度逐渐向时间轴前方推进,直到某个窗口不再包含该event-time

为止,最终以"初始窗口"与"结束窗口"之间的若干个窗口作为最终生成的 event-time 的时间窗口。

acf189903ab543dabd050801a0ddbe6c.png

每个窗口的起始时间start与结束时间end都是前闭后开(左闭右开)的区间,因此初始窗口和结束窗口都不会包含 event-time,最终不会被使用。假设数据为【2019-08-14 10:50:00, dog】,按照上述规则计算窗口示意图如下:26c42fa9044649329ba71164a8980ce7.png

得到窗口如下:

78b76cfac0d4496792d37996fa460a96.png

7 延迟数据处理

Structed Streaming与Spark Streaming相比一大特性就是支持基于数据中的时间戳的数据处理。也就是在处理数据时,可以对记录中的eventTime事件时间字段进行考虑。因为eventTime更好的代表数据本身的信息,且可以借助eventTime处理比预期晚到达的数据,但是需要有一个限度(阈值),不能一直等,应该要设定最多等多久。


7.1 延迟数据

在很多流计算系统中,数据延迟到达(the events arrives late to the application)的情况很常见,并且很多时候是不可控的,因为很多时候是外围系统自身问题造成的。Structured Streaming可以保证一条旧的数据进入到流上时,依然可以基于这些“迟到”的数据重新计算并更新计算结果。

30e498b4936a46bab533208d16eaf2e6.png

上图中在12:04(即事件时间)生成的单词可能在12:11被应用程序接收,此时,应用程序应使用时间12:04而不是12:11更新窗口12:00-12:10的旧计数。但是会出现如下两个问题:


问题一:延迟数据计算是否有价值

如果某些数据,延迟很长时间(如30分钟)才到达流式处理系统,数据还需要再次计算吗?

计算的结果还有价值吗?原因在于流式处理系统处理数据关键核心在于实时性;

实践表明,流计算关注的是近期数据,更新一个很早之前的状态往往已经不再具有很大的业务价值;

问题二:以前状态保存浪费资源

实时统计来说,如果保存很久以前的数据状态,很多时候没有作用的,反而浪费大量资源;

Spark 2.1引入的watermarking允许用户指定延迟数据的阈值,也允许引擎清除掉旧的状态。即根据watermark机制来设置和判断消息的有效性,如可以获取消息本身的时间戳,然后根据该时间戳来判断消息的到达是否延迟(乱序)以及延迟的时间是否在容忍的范围内(延迟的数据是否处理)。

7.2 Watermarking 水位

水位watermarking官方定义:

lets the engine automatically track the current event time in the data and attempt to clean up old state accordingly.

翻译:让Spark SQL引擎自动追踪数据中当前事件时间EventTime,依据规则清除旧的状态数据。通过指定event-time列(上一批次数据中EventTime最大值)和预估事件的延迟时间上限(Threshold)来定义一个查询的水位线watermark。

Watermark = MaxEventTime - Threshod

第一点:执行第一批次数据时,Watermarker为0,所以此批次中所有数据都参与计算;

第二点:Watermarker值只能逐渐增加,不能减少;

第三点:Watermark机制主要解决处理聚合延迟数据和减少内存中维护的聚合状态;

第四点:设置Watermark以后,输出模式OutputMode只能是Append和Update;

如下方式设置阈值Threshold,计算每批次数据执行时的水位Watermark:

f763d96785244ef6bebeabda2578b7ff.png

看一下官方案例:词频统计WordCount,设置阈值Threshold为10分钟,每5分钟触发执行一次。

b0f5db45baa1495e8b0624ddadc1d22a.png

延迟到达但没有超过watermark:(12:08, dog)

在12:20触发执行窗口(12:10-12:20)数据中,(12:08, dog) 数据是延迟数据,阈值Threshold设定为10分钟,此时水位线【Watermark = 12:14 - 10m = 12:04】,因为12:14是上个窗口(12:05-12:15)中接收到的最大的事件时间,代表目标系统最后时刻的状态,由于12:08在12:04之后,因此被视为“虽然迟到但尚且可以接收”的数据而被更新到了结果表中,也就是(12:00 - 12:10, dog, 2)和(12:05 - 12:11,dog, 3)。

fa94378955934611aa9366bd721e561d.png

超出watermark:(12:04, donkey)

在12:25触发执行窗口(12:15-12:25)数据中,(12:04, donkey)数据是延迟数据,上个窗口中接收到最大的事件时间为12:21,此时水位线【Watermark = 12:21 - 10m = 12:11】,而(12:04, donkey)比这个值还要早,说明它”太旧了”,所以不会被更新到结果表中了。

c9ac47cbca9743d4a877b5ab2bc261d4.png

设置水位线Watermark以后,不同输出模式OutputMode,结果输出不一样:

Update模式:总是倾向于“尽可能早”的将处理结果更新到sink,当出现迟到数据时,早期的某个计算结果将会被更新;

Append模式:推迟计算结果的输出到一个相对较晚的时刻,确保结果是稳定的,不会再被更新,

比如:12:00 - 12:10窗口的处理结果会等到watermark更新到12:11之后才会写入到sink。如果用于接收处理结果的sink不支持更新操作,则只能选择Append模式。

7.3 官方案例演示

编写代码,演示官方案例,如下几点注意:

1、该outputMode为update模式,即只会输出那些有更新的数据!!
2、该开窗窗口长度为10 min,步长5 min,水印为eventtime-10 min,(需理解开窗规则)
3、官网案例trigger(Trigger.ProcessingTime("5 minutes")),但是测试的时候不建议使用这个
4、未输出数据不代表已经在内存中被剔除,只是由于update模式的原因
5、建议比对append理解水印

测试数据:


dog,2019-10-10 12:00:07
owl,2019-10-10 12:00:08
dog,2019-10-10 12:00:14
cat,2019-10-10 12:00:09
cat,2019-10-10 12:00:15
dog,2019-10-10 12:00:08
owl,2019-10-10 12:00:13
owl,2019-10-10 12:00:21
owl,2019-10-10 12:00:17

具体案例代码如下:

import java.sql.Timestamp
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
 * 基于Structured Streaming 读取TCP Socket读取数据,事件时间窗口统计词频,将结果打印到控制台
 * TODO:每5秒钟统计最近10秒内的数据(词频:WordCount),设置水位Watermark时间为10秒
 * dog,2019-10-10 12:00:07
 * owl,2019-10-10 12:00:08
 * dog,2019-10-10 12:00:14
 * cat,2019-10-10 12:00:09
 * cat,2019-10-10 12:00:15
 * dog,2019-10-10 12:00:08
 * owl,2019-10-10 12:00:13
 * owl,2019-10-10 12:00:21
 * owl,2019-10-10 12:00:17
 */
object StructuredWatermarkUpdate {
  def main(args: Array[String]): Unit = {
    // 1. 构建SparkSession实例对象,传递sparkConf参数
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[2]")
      .config("spark.sql.shuffle.partitions", "2")
      .getOrCreate()
    // b. 导入隐式转换及函数库
    import org.apache.spark.sql.functions._
    import spark.implicits._
    // 2. 使用SparkSession从TCP Socket读取流式数据
    val inputStreamDF: DataFrame = spark.readStream
      .format("socket")
      .option("host", "node1.oldlu.cn")
      .option("port", 9999)
      .load()
    // 3. 针对获取流式DStream设置EventTime窗口及Watermark水位限制
    val resultStreamDF = inputStreamDF
      // 将DataFrame转换为Dataset操作,Dataset是类型安全,强类型
      .as[String]
      // 过滤无效数据
      .filter(line => null != line && line.trim.length > 0)
      // 将每行数据进行分割单词: 2019-10-12 09:00:02,cat dog
      .map { line =>
        val arr = line.trim.split(",")
        (arr(0), Timestamp.valueOf(arr(1)))
      }
      // 设置列的名称
      .toDF("word", "time")
      // TODO:设置水位Watermark
      .withWatermark("time", "10 seconds")
      // TODO:设置基于事件时间(event time)窗口 -> time, 每5秒统计最近10秒内数据
      .groupBy(
        window($"time", "10 seconds", "5 seconds"),
        $"word"
      ).count()
    // 4. 将计算的结果输出,打印到控制台
    val query: StreamingQuery = resultStreamDF.writeStream
      .outputMode(OutputMode.Update())
      .format("console")
      .option("numRows", "100")
      .option("truncate", "false")
      .trigger(Trigger.ProcessingTime("5 seconds"))
      .start() // 流式DataFrame,需要启动
    // 查询器一直等待流式应用结束
    query.awaitTermination()
    query.stop()
  }
}


目录
相关文章
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
130 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
1月前
|
Web App开发 网络协议 安全
基于Web攻击的方式发现并攻击物联网设备介绍
基于Web攻击的方式发现并攻击物联网设备介绍
40 4
|
1月前
|
安全 物联网 物联网安全
智能物联网安全:物联网设备的防护策略与最佳实践
【10月更文挑战第26天】随着物联网(IoT)技术的快速发展,智能设备已广泛应用于智能家居、工业控制和智慧城市等领域。然而,设备数量的激增也带来了严重的安全问题,如黑客攻击、数据泄露和恶意控制,对个人隐私、企业运营和国家安全构成威胁。因此,加强物联网设备的安全防护至关重要。
88 7
|
1月前
|
SQL 机器学习/深度学习 分布式计算
Spark快速上手:揭秘大数据处理的高效秘密,让你轻松应对海量数据
【10月更文挑战第25天】本文全面介绍了大数据处理框架 Spark,涵盖其基本概念、安装配置、编程模型及实际应用。Spark 是一个高效的分布式计算平台,支持批处理、实时流处理、SQL 查询和机器学习等任务。通过详细的技术综述和示例代码,帮助读者快速掌握 Spark 的核心技能。
83 6
|
1月前
|
存储 人工智能 大数据
物联网、大数据、云计算、人工智能之间的关系
物联网、大数据、云计算、人工智能之间的关系是紧密相连、相互促进的。这四者既有各自独立的技术特征,又能在不同层面上相互融合,共同推动信息技术的发展和应用。
489 0
|
1月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
110 2
|
1月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
75 1
|
1月前
|
分布式计算 Java 开发工具
阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析
本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。
|
1月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
69 1
|
2月前
|
物联网
物联网卡不能使用在什么设备上
“物联网卡不能使用在什么设备上”这一操作或规定,通常基于物联网卡的特性、使用条款以及设备兼容性等因素。以下是对这一问题的详细分析和操作建议:
下一篇
DataWorks