大数据Spark Streaming集成Kafka

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 大数据Spark Streaming集成Kafka

1 整合Kafka 0.8.2

在实际项目中,无论使用Storm还是SparkStreaming与Flink,主要从Kafka实时消费数据进行处理分析,流式数据实时处理技术架构大致如下:


技术栈: Flume/SDK/Kafka Producer API -> KafKa —> SparkStreaming/Flink/Storm(Hadoop YARN) -> Redis -> UI

1)、阿里工具Canal:监控MySQL数据库binlog文件,将数据同步发送到Kafka Topic中
https://github.com/alibaba/canal
https://github.com/alibaba/canal/wiki/QuickStart
2)、Maxwell:实时读取MySQL二进制日志binlog,并生成 JSON 格式的消息,作为生产者发送给 Kafka,Kinesis、RabbitMQ、
Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序。
http://maxwells-daemon.io/
https://github.com/zendesk/maxwell

扩展:Kafka 相关常见面试题:

1)、Kafka 集群大小(规模),Topic分区函数名及集群配置?
2)、Topic中数据如何管理?数据删除策略是什么?
3)、如何消费Kafka数据?
4)、发送数据Kafka Topic中时,如何保证数据发送成功?

Apache Kafka: 最原始功能【消息队列】,缓冲数据,具有发布订阅功能(类似微信公众号)。

1.1 回顾 Kafka 概念

Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用与大数

据实时处理领域。

  • 消息队列:Kafka 本质上是一个 MQ(Message Queue),使用消息队列的好处?(面试会问):
  1. 解耦:允许我们独立的扩展或修改队列两边的处理过程;
  2. 可恢复性:即使一个处理消息的进程挂掉,加入队列中的消息仍可以在系统恢复后被处理
  3. 缓冲:有助于解决生产消息和消费消息的处理速度不一致的情况;
  4. 灵活性&峰值处理能力:不会因为突发的超负荷的请求而完全崩溃,消息队列能够使关键
    组件顶住突发的访问压力;
  1. 异步通信:消息队列允许用户把消息放入队列但不立即处理它;
  • 发布/订阅模式:

一对多,生产者将消息发布到 Topic 中,有多个消费者订阅该主题,发布到 Topic 的消息会被所有订阅者消费,被消费的数据不会立即从 Topic 清除。

Kafka 框架架构图如下所示:

Kafka 存储的消息来自任意多被称为 Producer 生产者的进程,数据从而可以被发布到不同的Topic 主题下的不同 Partition 分区。在一个分区内,这些消息被索引并连同时间戳存储在一起。其它被称为 Consumer 消费者的进程可以从分区订阅消息。Kafka 运行在一个由一台或多台服务器组成的集群上,并且分区可以跨集群结点分布。Kafka 一些重要概念:

1)、Producer: 消息生产者,向 Kafka Broker 发消息的客户端;

2)、Consumer:消息消费者,从 Kafka Broker 取消息的客户端;

3)、Consumer Group:消费者组(CG),消费者组内每个消费者负责消费不同分区的数据,

提高消费能力。一个分区只能由组内一个消费者消费,消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者;

4)、Broker:一台 Kafka 机器就是一个 Broker。一个集群由多个 Broker 组成。一个 Broker可以容纳多个 Topic;

5)、Topic:可以理解为一个队列,Topic 将消息分类,生产者和消费者面向的是同一个 Topic;

6)、Partition:为了实现扩展性,提高并发能力,一个非常大的 Topic 可以分布到多个 Broker(即服务器)上,一个 Topic 可以分为多个 Partition,每个 Partition 是一个 有序的队列;

7)、Replica:副本,为实现备份的功能,保证集群中的某个节点发生故障时,该节点上的 Partition数据不丢失,且 Kafka 仍然能够继续工作,Kafka 提供了副本机制,一个 Topic 的每个分区都有若干个副本,一个 Leader 和若干个 Follower;

8)、Leader:每个分区多个副本的“主”副本,生产者发送数据的对象,以及消费者消费数据的对象,都是 Leader;

9)、Follower:每个分区多个副本的“从”副本,实时从 Leader 中同步数据,保持和 Leader 数据的同步。Leader 发生故障时,某个 Follower 还会成为新的 Leader;

10)、Offset:消费者消费的位置信息,监控数据消费到什么位置,当消费者挂掉再重新恢复的时候,可以从消费位置继续消费;

11)、Zookeeper:Kafka 集群能够正常工作,需要依赖于 Zookeeper,Zookeeper 帮助 Kafka存储和管理集群信息;

1.2 集成方式

Spark Streaming与Kafka集成,有两套API,原因在于Kafka Consumer API有两套,文档:

http://spark.apache.org/docs/2.4.5/streaming-kafka-integration.html


方式一:Kafka 0.8.x版本

  1. 老的Old Kafka Consumer API
  2. 文档:http://spark.apache.org/docs/2.4.5/streaming-kafka-0-8-integration.html
  3. 老的Old消费者API,有两种方式:
  4. 第一种:高级消费API(Consumer High Level API),Receiver接收器接收数据
  5. 第二种:简单消费者API(Consumer Simple Level API) ,Direct 直接拉取数据

方式二:Kafka 0.10.x版本

  1. 新的 New Kafka Consumer API
  2. 文档:http://spark.apache.org/docs/2.4.5/streaming-kafka-0-10-integration.html
  3. 核心API:KafkaConsumer、ConsumerRecorder

1.3 两种方式区别

使用Kafka Old Consumer API集成两种方式,虽然实际生产环境使用Direct方式获取数据,但

是在面试的时候常常问到两者区别。

文档:http://spark.apache.org/docs/2.4.5/streaming-kafka-0-8-integration.html


Receiver-based Approach:基于接收器方式,消费Kafka Topic数据,但是企业中基本上不再使用;

Receiver作为常驻的Task运行在Executor等待数据,但是一个Receiver效率低,需要开启

多个,再手动合并数据(union),再进行处理,很麻烦;Receiver那台机器挂了,可能会丢失数据,所以需要开启WAL(预写日志)保证数据安全,那么效率又会降低;

Receiver方式是通过zookeeper来连接kafka队列,调用Kafka高阶API,offset存储在

zookeeper,由Receiver维护;

Spark在消费的时候为了保证数据不丢也会在Checkpoint中存一份offset,可能会出现数据

不一致;

Direct Approach (No Receivers):

直接方式,Streaming中每批次的每个job直接调用Simple Consumer API获取对应Topic数据,此种方式使用最多,面试时被问的最多;

Direct方式是直接连接kafka分区来获取数据,从每个分区直接读取数据大大提高并行能力

Direct方式调用Kafka低阶API(底层API),offset自己存储和维护,默认由Spark维护在checkpoint中,消除了与zk不一致的情况;

当然也可以自己手动维护,把offset存在MySQL、Redis和Zookeeper中;

上述两种方式区别,如下图所示:

2 Direct 方式集成

使用Kafka Old Consumer API方式集成Streaming,采用Direct方式,调用Old Simple

Consumer API,文档:

http://spark.apache.org/docs/2.2.0/streaming-kafka-0-8-integration.html#approach-2-direct-approach-no-receivers

2.1 编码实现

Direct方式会定期地从Kafka的topic下对应的partition中查询最新的偏移量,再根据偏移量范

围,在每个batch里面处理数据,Spark通过调用kafka简单的消费者API读取一定范围的数据。


Direct方式没有receiver层,其会周期性的获取Kafka中每个topic的每个partition中的最新

offsets,再根据设定的maxRatePerPartition来处理每个batch。较于Receiver方式的优势:


其一、简化的并行:Kafka中的partition与RDD中的partition是一一对应的并行读取Kafka数据;

其二、高效:no need for Write Ahead Logs;

其三、精确一次:直接使用simple Kafka API,Offsets则利用Spark Streaming的checkpoints进

行记录。

添加相关MAVEN依赖:

<!-- Spark Streaming 集成Kafka 0.8.2.1 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.4.5</version>
</dependency>

提供工具类KafkaUtils专门从Kafka消费数据,其中函数【createDirectStream】消费数据:

创建Topic及Console控制台发送数据,命令如下:

# 1. 启动Zookeeper 服务
zookeeper-daemon.sh start
# 2. 启动Kafka 服务
kafka-daemon.sh start
# 3. Create Topic
kafka-topics.sh --create --topic wc-topic \
--partitions 3 --replication-factor 1 --zookeeper node1.oldlu.cn:2181/kafka200
# List Topics
kafka-topics.sh --list --zookeeper node1.oldlu.cn:2181/kafka200
# Producer
kafka-console-producer.sh --topic wc-topic --broker-list node1.oldlu.cn:9092
# Consumer
kafka-console-consumer.sh --topic wc-topic \
--bootstrap-server node1.oldlu.cn:9092 --from-beginning

具体演示代码如下:

import java.util.Date
import kafka.serializer.StringDecoder
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
 * 集成Kafka,采用Direct式读取数据,对每批次(时间为1秒)数据进行词频统计,将统计结果输出到控制台。
 */
object StreamingKafkaDirect {
  def main(args: Array[String]): Unit = {
    // 1、构建流式上下文实例对象StreamingContext,用于读取流式的数据和调度Batch Job执行
    val ssc: StreamingContext = {
      // a. 创建SparkConf实例对象,设置Application相关信息
      val sparkConf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setMaster("local[3]")
        //TODO: 设置每秒钟读取Kafka中Topic最大数据量
        .set("spark.streaming.kafka.maxRatePerPartition", "10000")
      // b. 创建StreamingContext实例,传递Batch Interval(时间间隔:划分流式数据)
      val context: StreamingContext = new StreamingContext(sparkConf, Seconds(5))
      // c. 返回上下文对象
      context
    }
    // TODO: 2、从流式数据源读取数据
    /*
    def createDirectStream[
    K: ClassTag, // Topic中Key数据类型
    V: ClassTag,
    KD <: Decoder[K]: ClassTag, // 表示Topic中Key数据解码(从文件中读取,反序列化)
    VD <: Decoder[V]: ClassTag] (
    ssc: StreamingContext,
    kafkaParams: Map[String, String],
    topics: Set[String]
    ): InputDStream[(K, V)]
    */
    // 表示从Kafka Topic读取数据时相关参数设置
    val kafkaParams: Map[String, String] = Map(
      "bootstrap.servers" -> "node1.oldlu.cn:9092",
      // 表示从Topic的各个分区的哪个偏移量开始消费数据,设置为最大的偏移量开始消费数据
      "auto.offset.reset" -> "largest"
    )
    // 从哪些Topic中读取数据
    val topics: Set[String] = Set("wc-topic")
    // 采用Direct方式从Kafka 的Topic中读取数据
    val kafkaDStream: DStream[(String, String)] = KafkaUtils
      .createDirectStream[String, String, StringDecoder, StringDecoder](
        ssc, //
        kafkaParams, //
        topics
      )
    // 3、对接收每批次流式数据,进行词频统计WordCount
    val resultDStream: DStream[(String, Int)] = kafkaDStream.transform(rdd => {
      rdd
        // 获取从Kafka读取数据的Message
        .map(tuple => tuple._2)
        // 过滤“脏数据”
        .filter(line => null != line && line.trim.length > 0)
        // 分割为单词
        .flatMap(line => line.trim.split("\\s+"))
        // 转换为二元组
        .mapPartitions{iter => iter.map(word => (word, 1))}
        // 聚合统计
        .reduceByKey((a, b) => a + b)
    })
    // 4、将分析每批次结果数据输出,此处答应控制台
    resultDStream.foreachRDD{ (rdd, time) =>
      // 使用lang3包下FastDateFormat日期格式类,属于线程安全的
      val batchTime = FastDateFormat.getInstance("yyyy/MM/dd HH:mm:ss")
        .format(new Date(time.milliseconds))
      println("-------------------------------------------")
      println(s"Time: $batchTime")
      println("-------------------------------------------")
      // TODO: 先判断RDD是否有数据,有数据在输出哦
      if(!rdd.isEmpty()){
        rdd
          // 对于结果RDD输出,需要考虑降低分区数目
          .coalesce(1)
          // 对分区数据操作
          .foreachPartition{iter =>iter.foreach(item => println(item))}
      }
    }
    // 5、对于流式应用来说,需要启动应用,正常情况下启动以后一直运行,直到程序异常终止或者人为干涉
    ssc.start() // 启动接收器Receivers,作为Long Running Task(线程) 运行在Executor
    ssc.awaitTermination()
    // 结束Streaming应用执行
    ssc.stop(stopSparkContext = true, stopGracefully = true)
  }
}

从WEB UI监控页面可以看出如下信息:

2.2 底层原理

SparkStreaming集成Kafka采用Direct方式消费数据,如下三个方面优势:

  • 第一、简单的并行度(Simplified Parallelism)
  1. 读取topics的总的分区数目 = 每批次RDD中分区数目;
  2. topic中每个分区数据 被读取到 RDD中每个分区进行处理


第二、高效(Efficiency)

处理数据比使用Receiver接收数据高效很多

使用Receiver接收数据的时候,要将数据存储到Executor、为了可靠性还需要将数据存储文件系统中WAL

第三、Exactly-once semantics

能保证一次性语义,从Kafka消费数据仅仅被消费一次,不会重复消费或者不消费

在Streaming数据处理分析中,需要考虑数据是否被处理及被处理次数,称为消费语义

At most once:最多一次,比如从Kafka Topic读取数据最多消费一次,可能出现不消费,此时数据丢失;

At least once:至少一次,比如从Kafka Topic读取数据至少消费一次,可能出现多次消费数据;

Exactly once:精确一次,比如从Kafka topic读取数据当且仅当消费一次,不多不少,最好的状态

e37ab7adacb4445fb22f2f543100d2ab.png

深入剖析SparkStreaming采用Direct方式消费Kafka数据,底层原理:

官方:this approach periodically queries Kafka for the latest offsets in each topic+partition, and accordingly
defines the offset ranges to process in each batch. When the jobs to process the data are launched, Kafka’s
simple consumer API is used to read the defined ranges of offsets from Kafka (similar to read files from
a file system).

采用Direct方式消费数据时,需要设置每批次处理数据的最大量,防止【波峰】时数据太多,

导致批次数据处理有性能问题:

  • 参数:spark.streaming.kafka.maxRatePerPartition
  • 含义:Topic中每个分区每秒中消费数据的最大值
  • 举例说明:
  1. BatchInterval:5s、Topic-Partition:3、maxRatePerPartition: 10000
  2. 最大消费数据量:10000 * 3 * 5 = 150000 条

3 集成Kafka 0.10.x

使用Kafka 0.10.+提供新版本Consumer API集成Streaming,实时消费Topic数据,进行处理。

http://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html

添加相关Maven依赖:

<!-- Spark Streaming 与Kafka 0.10.0 集成依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.4.5</version>
</dependency>

目前企业中基本都使用New Consumer API集成,优势如下:


第一、类似 Old Consumer API中Direct方式

直接到Kafka Topic中依据偏移量范围获取数据,进行处理分析;

The Spark Streaming integration for Kafka 0.10 is similar in design to the 0.8 Direct

Stream approach;

第二、简单并行度1:1

每批次中RDD的分区与Topic分区一对一关系;

It provides simple parallelism, 1:1 correspondence between Kafka partitions and Spark

partitions, and access to offsets and metadata;

获取Topic中数据的同时,还可以获取偏移量和元数据信息;工具类KafkaUtils中createDirectStream函数API使用说明(函数声明):

具体演示案例代码如下:

import java.util.Date
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
 * Streaming通过Kafka New Consumer消费者API获取数据
 */
object StreamingSourceKafka {
  def main(args: Array[String]): Unit = {
    // 1. 构建StreamingContext流式上下文实例对象
    val ssc: StreamingContext = {
      // a. 创建SparkConf对象,设置应用配置信息
      val sparkConf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setMaster("local[3]")
      // b.创建流式上下文对象, 传递SparkConf对象,TODO: 时间间隔 -> 用于划分流式数据为很多批次Batch
      val context = new StreamingContext(sparkConf, Seconds(5))
      // c. 返回
      context
    }
    // TODO: 2. 读取Kafka Topic中数据
    /*
    def createDirectStream[K, V](
    ssc: StreamingContext,
    locationStrategy: LocationStrategy,
    consumerStrategy: ConsumerStrategy[K, V]
    ): InputDStream[ConsumerRecord[K, V]]
    */
    // i.位置策略
    val locationStrategy: LocationStrategy = LocationStrategies.PreferConsistent
    /*
    def Subscribe[K, V](
    topics: ju.Collection[jl.String],
    kafkaParams: ju.Map[String, Object]
    ): ConsumerStrategy[K, V]
    */
    // ii.读取哪些Topic数据
    val topics = Array("wc-topic")
    // iii.消费Kafka 数据配置参数
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "node1.oldlu.cn:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "group_id_streaming_0001",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    // iv.消费数据策略
    val consumerStrategy: ConsumerStrategy[String, String] = ConsumerStrategies.Subscribe(
      topics, kafkaParams
    )
    // v.采用新消费者API获取数据,类似于Direct方式
    val kafkaDStream: DStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      ssc, locationStrategy, consumerStrategy
    )
    // 3. 对每批次的数据进行词频统计
    val resultDStream: DStream[(String, Int)] = kafkaDStream.transform(kafkaRDD => {
      val resultRDD: RDD[(String, Int)] = kafkaRDD
        .map(record => record.value()) // 获取Message数据
        // 过滤不合格的数据
        .filter(line => null != line && line.trim.length > 0)
        // 按照分隔符划分单词
        .flatMap(line => line.trim.split("\\s+"))
        // 转换数据为二元组,表示每个单词出现一次
        .map(word => (word, 1))
        // 按照单词分组,聚合统计
        .reduceByKey((tmp, item) => tmp + item)
      resultRDD
    })
    // 4. 将结果数据输出 -> 将每批次的数据处理以后输出
    resultDStream.foreachRDD{ (rdd, time) =>
      val batchTime: String = FastDateFormat.getInstance("yyyy/MM/dd HH:mm:ss")
        .format(new Date(time.milliseconds))
      println("-------------------------------------------")
      println(s"Time: $batchTime")
      println("-------------------------------------------")
      // TODO: 先判断RDD是否有数据,有数据在输出哦
      if(!rdd.isEmpty()){
        rdd
          // 对于结果RDD输出,需要考虑降低分区数目
          .coalesce(1)
          // 对分区数据操作
          .foreachPartition{iter =>iter.foreach(item => println(item))}
      }
    }
    // 5. 对于流式应用来说,需要启动应用
    ssc.start()
    // 流式应用启动以后,正常情况一直运行(接收数据、处理数据和输出数据),除非人为终止程序或者程序异常停止
    ssc.awaitTermination()
    // 关闭流式应用(参数一:是否关闭SparkContext,参数二:是否优雅的关闭)
    ssc.stop(stopSparkContext = true, stopGracefully = true)
  }
}

4 获取偏移量

当SparkStreaming集成Kafka时,无论是Old Consumer API中Direct方式还是New

Consumer API方式获取的数据,每批次的数据封装在KafkaRDD中,其中包含每条数据的

元数据信息。

文档:http://spark.apache.org/docs/2.4.5/streaming-kafka-0-10-integration.html#obtaining-offsets

具体说明如下:

获取偏移量信息代码如下:

代码演示获取每批次RDD中对应Kafka分区中数据偏移量信息:

具体演示代码如下:

import java.util.Date
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010._
/**
 * 集成Kafka,实时消费Topic中数据,获取每批次数据对应Topic各个分区数据偏移量
 */
object StreamingKafkaOffset {
  def main(args: Array[String]): Unit = {
    // TODO: 1. 构建StreamingContext流式上下文实例对象
    val ssc: StreamingContext = {
      // a. 创建SparkConf对象,设置应用配置信息
      val sparkConf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setMaster("local[3]")
      // b.创建流式上下文对象, 传递SparkConf对象,TODO: 时间间隔 -> 用于划分流式数据为很多批次Batch
      val context = new StreamingContext(sparkConf, Seconds(5))
      // c. 返回
      context
    }
    // 2. 读取Kafka Topic中数据
    /*
    def createDirectStream[K, V](
    ssc: StreamingContext,
    locationStrategy: LocationStrategy,
    consumerStrategy: ConsumerStrategy[K, V]
    ): InputDStream[ConsumerRecord[K, V]]
    */
    // i.位置策略
    val locationStrategy: LocationStrategy = LocationStrategies.PreferConsistent
    /*
    def Subscribe[K, V](
    topics: ju.Collection[jl.String],
    kafkaParams: ju.Map[String, Object]
    ): ConsumerStrategy[K, V]
    */
    // ii.读取哪些Topic数据
    val topics = Array("wc-topic")
    // iii.消费Kafka 数据配置参数
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "node1.oldlu.cn:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "group_id_streaming_0001",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    // iv.消费数据策略
    val consumerStrategy: ConsumerStrategy[String, String] = ConsumerStrategies.Subscribe(
      topics, kafkaParams
    )
    // v.采用新消费者API获取数据,类似于Direct方式
    val kafkaDStream: DStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      ssc, locationStrategy, consumerStrategy
    )
    // TODO:其一、定义数组存储每批次数据对应RDD中各个分区的Topic Partition中偏移量信息
    var offsetRanges: Array[OffsetRange] = Array.empty
    // 3. 对每批次的数据进行词频统计
    val resultDStream: DStream[(String, Int)] = kafkaDStream.transform(kafkaRDD => {
      // TODO:其二、直接从Kafka获取的每批次KafkaRDD中获取偏移量信息
      offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges
      val resultRDD: RDD[(String, Int)] = kafkaRDD
        .map(record => record.value()) // 获取Message数据
        // 过滤不合格的数据
        .filter(line => null != line && line.trim.length > 0)
        // 按照分隔符划分单词
        .flatMap(line => line.trim.split("\\s+"))
        // 转换数据为二元组,表示每个单词出现一次
        .map(word => (word, 1))
        // 按照单词分组,聚合统计
        .reduceByKey((tmp, item) => tmp + item)
      resultRDD
    })
    // 4. 将结果数据输出 -> 将每批次的数据处理以后输出
    resultDStream.foreachRDD{ (rdd, time) =>
      val batchTime: String = FastDateFormat.getInstance("yyyy/MM/dd HH:mm:ss")
        .format(new Date(time.milliseconds))
      println("-------------------------------------------")
      println(s"Time: $batchTime")
      println("-------------------------------------------")
      // 先判断RDD是否有数据,有数据在输出
      if(!rdd.isEmpty()){
        rdd
          // 对于结果RDD输出,需要考虑降低分区数目
          .coalesce(1)
          // 对分区数据操作
          .foreachPartition{iter =>iter.foreach(item => println(item))}
      }
      // TODO: 其三、当DStream进行Output操作完成以后,更新偏移量至外部存储系统(如Zookeeper、Redis等)
      for (offsetRange <- offsetRanges) {
        println(s"topic: ${offsetRange.topic} partition: ${offsetRange.partition} offsets:${o
          ffsetRange.fromOffset} to ${offsetRange.untilOffset}")
      }
    }
    // 5. 对于流式应用来说,需要启动应用
    ssc.start()
    // 流式应用启动以后,正常情况一直运行(接收数据、处理数据和输出数据),除非人为终止程序或者程序异常停止
    ssc.awaitTermination()
    // 关闭流式应用(参数一:是否关闭SparkContext,参数二:是否优雅的关闭)
    ssc.stop(stopSparkContext = true, stopGracefully = true)
  }
}
目录
相关文章
|
1月前
|
机器学习/深度学习 分布式计算 大数据
Spark 适合解决多种类型的大数据处理问题
【9月更文挑战第1天】Spark 适合解决多种类型的大数据处理问题
39 3
|
2月前
|
分布式计算 大数据 Apache
跨越界限:当.NET遇上Apache Spark,大数据世界的新篇章如何谱写?
【8月更文挑战第28天】随着信息时代的发展,大数据已成为推动企业决策、科研与技术创新的关键力量。Apache Spark凭借其卓越的分布式计算能力和多功能数据处理特性,在大数据领域占据重要地位。然而,对于.NET开发者而言,如何在Spark生态中发挥自身优势成为一个新课题。为此,微软与Apache Spark社区共同推出了.NET for Apache Spark,使开发者能用C#、F#等语言编写Spark应用,不仅保留了Spark的强大功能,还融合了.NET的强类型系统、丰富库支持及良好跨平台能力,极大地降低了学习门槛并拓展了.NET的应用范围。
52 3
|
2月前
|
分布式计算 搜索推荐 物联网
大数据及AI典型场景实践问题之通过KafKa+OTS+MaxCompute完成物联网系统技术重构如何解决
大数据及AI典型场景实践问题之通过KafKa+OTS+MaxCompute完成物联网系统技术重构如何解决
|
2月前
|
分布式计算 大数据 数据处理
Apache Spark的应用与优势:解锁大数据处理的无限潜能
【8月更文挑战第23天】Apache Spark以其卓越的性能、易用性、通用性、弹性与可扩展性以及丰富的生态系统,在大数据处理领域展现出了强大的竞争力和广泛的应用前景。随着大数据技术的不断发展和普及,Spark必将成为企业实现数字化转型和业务创新的重要工具。未来,我们有理由相信,Spark将继续引领大数据处理技术的发展潮流,为企业创造更大的价值。
|
2月前
|
存储 分布式计算 供应链
Spark在供应链核算中应用问题之调整Spark读取ODPS离线表分区大小如何解决
Spark在供应链核算中应用问题之调整Spark读取ODPS离线表分区大小如何解决
|
2月前
|
Java Spring API
Spring框架与GraphQL的史诗级碰撞:颠覆传统,重塑API开发的未来传奇!
【8月更文挑战第31天】《Spring框架与GraphQL:构建现代API》介绍了如何结合Spring框架与GraphQL构建高效、灵活的API。首先通过引入`spring-boot-starter-data-graphql`等依赖支持GraphQL,然后定义查询和类型,利用`@GraphQLQuery`等注解实现具体功能。Spring的依赖注入和事务管理进一步增强了GraphQL服务的能力。示例展示了从查询到突变的具体实现,证明了Spring与GraphQL结合的强大潜力,适合现代API设计与开发。
59 0
|
2月前
|
消息中间件 Kafka 数据处理
实时数据流处理:Dask Streams 与 Apache Kafka 集成
【8月更文第29天】在现代数据处理领域,实时数据流处理已经成为不可或缺的一部分。随着物联网设备、社交媒体和其他实时数据源的普及,处理这些高吞吐量的数据流成为了一项挑战。Apache Kafka 作为一种高吞吐量的消息队列服务,被广泛应用于实时数据流处理场景中。Dask Streams 是 Dask 库的一个子模块,它为 Python 开发者提供了一个易于使用的实时数据流处理框架。本文将介绍如何将 Dask Streams 与 Apache Kafka 结合使用,以实现高效的数据流处理。
28 0
|
SQL JSON 分布式计算
日志服务(SLS)集成 Spark 流计算实战
日志服务集成 Spark 流式计算:使用Spark Streaming和Structured Streaming对采集到日志服务中的数据进行消费,计算并将结果写回到日志服务。
8195 0
日志服务(SLS)集成 Spark 流计算实战
|
3月前
|
机器学习/深度学习 分布式计算 算法
Spark快速大数据分析PDF下载读书分享推荐
《Spark快速大数据分析》适合初学者,聚焦Spark实用技巧,同时深入核心概念。作者团队来自Databricks,书中详述Spark 3.0新特性,结合机器学习展示大数据分析。Spark是大数据分析的首选工具,本书助你驾驭这一利器。[PDF下载链接][1]。 ![Spark Book Cover][2] [1]: https://zhangfeidezhu.com/?p=345 [2]: https://i-blog.csdnimg.cn/direct/6b851489ad1944548602766ea9d62136.png#pic_center
127 1
Spark快速大数据分析PDF下载读书分享推荐
|
2月前
|
分布式计算 资源调度 大数据
【决战大数据之巅】:Spark Standalone VS YARN —— 揭秘两大部署模式的恩怨情仇与终极对决!
【8月更文挑战第7天】随着大数据需求的增长,Apache Spark 成为关键框架。本文对比了常见的 Spark Standalone 与 YARN 部署模式。Standalone 作为自带的轻量级集群管理服务,易于设置,适用于小规模或独立部署;而 YARN 作为 Hadoop 的资源管理系统,支持资源的统一管理和调度,更适合大规模生产环境及多框架集成。我们将通过示例代码展示如何在这两种模式下运行 Spark 应用程序。
165 3

热门文章

最新文章

下一篇
无影云桌面