Apache Spark源码走读(六)Task运行期之函数调用关系分析 &存储子系统分析

简介: 本篇主要阐述在TaskRunner中执行的task其业务逻辑是如何被调用到的,另外试图讲清楚运行着的task其输入的数据从哪获取,处理的结果返回到哪里,如何返回;以及对存储子系统进行分析 。

<一>Task运行期之函数调用关系分析

概要

本篇主要阐述在TaskRunner中执行的task其业务逻辑是如何被调用到的,另外试图讲清楚运行着的task其输入的数据从哪获取,处理的结果返回到哪里,如何返回。

准备

  1. spark已经安装完毕
  2. spark运行在local mode或local-cluster mode

local-cluster mode

local-cluster模式也称为伪分布式,可以使用如下指令运行

MASTER=local[1,2,1024] bin/spark-shell

[1,2,1024] 分别表示,executor number, core number和内存大小,其中内存大小不应小于默认的512M

Driver Programme的初始化过程分析

初始化过程的涉及的主要源文件

  1. SparkContext.scala       整个初始化过程的入口
  2. SparkEnv.scala          创建BlockManager, MapOutputTrackerMaster, ConnectionManager, CacheManager
  3. DAGScheduler.scala       任务提交的入口,即将Job划分成各个stage的关键
  4. TaskSchedulerImpl.scala 决定每个stage可以运行几个task,每个task分别在哪个executor上运行
  5. SchedulerBackend
    1. 最简单的单机运行模式的话,看LocalBackend.scala
    2. 如果是集群模式,看源文件SparkDeploySchedulerBackend

初始化过程步骤详解

步骤1: 根据初始化入参生成SparkConf,再根据SparkConf来创建SparkEnv, SparkEnv中主要包含以下关键性组件 1. BlockManager 2. MapOutputTracker 3. ShuffleFetcher 4. ConnectionManager

 private[spark] val env = SparkEnv.create(
    conf,
    "",
    conf.get("spark.driver.host"),
    conf.get("spark.driver.port").toInt,
    isDriver = true,
    isLocal = isLocal)
  SparkEnv.set(env)

步骤2:创建TaskScheduler,根据Spark的运行模式来选择相应的SchedulerBackend,同时启动taskscheduler,这一步至为关键

  private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master, appName)
  taskScheduler.start()

TaskScheduler.start目的是启动相应的SchedulerBackend,并启动定时器进行检测

override def start() {
    backend.start()

    if (!isLocal && conf.getBoolean("spark.speculation", false)) {
      logInfo("Starting speculative execution thread")
      import sc.env.actorSystem.dispatcher
      sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
            SPECULATION_INTERVAL milliseconds) {
        checkSpeculatableTasks()
      }
    }
  }

步骤3:以上一步中创建的TaskScheduler实例为入参创建DAGScheduler并启动运行

@volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler)
  dagScheduler.start()

步骤4:启动WEB UI

ui.start()

RDD的转换过程

还是以最简单的wordcount为例说明rdd的转换过程

sc.textFile("README.md").flatMap(line=>line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)

上述一行简短的代码其实发生了很复杂的RDD转换,下面仔细解释每一步的转换过程和转换结果

步骤1:val rawFile = sc.textFile("README.md")

textFile先是生成hadoopRDD,然后再通过map操作生成MappedRDD,如果在spark-shell中执行上述语句,得到的结果可以证明所做的分析

scala> sc.textFile("README.md")
14/04/23 13:11:48 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
14/04/23 13:11:48 INFO MemoryStore: ensureFreeSpace(119741) called with curMem=0, maxMem=311387750
14/04/23 13:11:48 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 116.9 KB, free 296.8 MB)
14/04/23 13:11:48 DEBUG BlockManager: Put block broadcast_0 locally took  277 ms
14/04/23 13:11:48 DEBUG BlockManager: Put for block broadcast_0 without replication took  281 ms
res0: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at :13

步骤2: val splittedText = rawFile.flatMap(line => line.split(" "))

flatMap将原来的MappedRDD转换成为FlatMappedRDD

 def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] =                                                                                                  new FlatMappedRDD(this, sc.clean(f))

步骤3:val wordCount = splittedText.map(word => (word, 1))

利用word生成相应的键值对,上一步的FlatMappedRDD被转换成为MappedRDD

步骤4:val reduceJob = wordCount.reduceByKey(_ + _),这一步最复杂

步骤2,3中使用到的operation全部定义在RDD.scala中,而这里使用到的reduceByKey却在RDD.scala中见不到踪迹。reduceByKey的定义出现在源文件PairRDDFunctions.scala

细心的你一定会问reduceByKey不是MappedRDD的属性和方法啊,怎么能被MappedRDD调用呢?其实这背后发生了一个隐式的转换,该转换将MappedRDD转换成为PairRDDFunctions

implicit def rddToPairRDDFunctions[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) =
    new PairRDDFunctions(rdd)

这种隐式的转换是scala的一个语法特征,如果想知道的更多,请用关键字"scala implicit method"进行查询,会有不少的文章对此进行详尽的介绍。

接下来再看一看reduceByKey的定义:

  def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
    reduceByKey(defaultPartitioner(self), func)
  }

  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
    combineByKey[V]((v: V) => v, func, func, partitioner)
  }

  def combineByKey[C](createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializerClass: String = null): RDD[(K, C)] = {
    if (getKeyClass().isArray) {
      if (mapSideCombine) {
        throw new SparkException("Cannot use map-side combining with array keys.")
      }
      if (partitioner.isInstanceOf[HashPartitioner]) {
        throw new SparkException("Default partitioner cannot partition array keys.")
      }
    }
    val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
    if (self.partitioner == Some(partitioner)) {
      self.mapPartitionsWithContext((context, iter) => {
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    } else if (mapSideCombine) {
      val combined = self.mapPartitionsWithContext((context, iter) => {
        aggregator.combineValuesByKey(iter, context)
      }, preservesPartitioning = true)
      val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
        .setSerializer(serializerClass)
      partitioned.mapPartitionsWithContext((context, iter) => {
        new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context))
      }, preservesPartitioning = true)
    } else {
      // Don't apply map-side combiner.
      val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)
      values.mapPartitionsWithContext((context, iter) => {
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    }
  }

reduceByKey最终会调用combineByKey, 在这个函数中PairedRDDFunctions会被转换成为ShuffleRDD,当调用mapPartitionsWithContext之后,shuffleRDD被转换成为MapPartitionsRDD

Log输出能证明我们的分析

res1: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[8] at reduceByKey at :13

RDD转换小结

小结一下整个RDD转换过程

HadoopRDD->MappedRDD->FlatMappedRDD->MappedRDD->PairRDDFunctions->ShuffleRDD->MapPartitionsRDD

整个转换过程好长啊,这一切的转换都发生在任务提交之前。

运行过程分析

数据集操作分类

在对任务运行过程中的函数调用关系进行分析之前,我们也来探讨一个偏理论的东西,作用于RDD之上的Transformantion为什么会是这个样子?

对这个问题的解答和数学搭上关系了,从理论抽象的角度来说,任务处理都可归结为“input->processing->output"。input和output对应于数据集dataset.

在此基础上作一下简单的分类

  1. one-one 一个dataset在转换之后还是一个dataset,而且dataset的size不变,如map
  2. one-one 一个dataset在转换之后还是一个dataset,但size发生更改,这种更改有两种可能:扩大或缩小,如flatMap是size增大的操作,而subtract是size变小的操作
  3. many-one 多个dataset合并为一个dataset,如combine, join
  4. one-many 一个dataset分裂为多个dataset, 如groupBy

Task运行期的函数调用

task的提交过程参考本系列中的第二篇文章。本节主要讲解当task在运行期间是如何一步步调用到作用于RDD上的各个operation

  • TaskRunner.run
    • Task.run
      • Task.runTask (Task是一个基类,有两个子类,分别为ShuffleMapTask和ResultTask)
        • RDD.iterator
          • RDD.computeOrReadCheckpoint
            • RDD.compute 

或许当看到RDD.compute函数定义时,还是觉着f没有被调用,以MappedRDD的compute定义为例

  override def compute(split: Partition, context: TaskContext) =                                                                                                      
    firstParent[T].iterator(split, context).map(f)  

注意,这里最容易产生错觉的地方就是map函数,这里的map不是RDD中的map,而是scala中定义的iterator的成员函数map, 请自行参考http://www.scala-lang.org/api/2.10.4/index.html#scala.collection.Iterator

堆栈输出

 80         at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:111)
 81         at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:154)
 82         at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149)
 83         at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64)
 84         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
 85         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
 86         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 87         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
 88         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
 89         at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
 90         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
 91         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
 92         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 93         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
 94         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
 95         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34)
 96         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
 97         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
 98         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
 99         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
100         at org.apache.spark.scheduler.Task.run(Task.scala:53)
101         at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)

ResultTask

compute的计算过程对于ShuffleMapTask比较复杂,绕的圈圈比较多,对于ResultTask就直接许多。

override def runTask(context: TaskContext): U = {
    metrics = Some(context.taskMetrics)
    try {
      func(context, rdd.iterator(split, context))
    } finally {
      context.executeOnCompleteCallbacks()
    }
  } 

 计算结果的传递

上面的分析知道,wordcount这个job在最终提交之后,被DAGScheduler分为两个stage,第一个Stage是shuffleMapTask,第二个Stage是ResultTask.

那么ShuffleMapTask的计算结果是如何被ResultTask取得的呢?这个过程简述如下

  1. ShffuleMapTask将计算的状态(注意不是具体的数据)包装为MapStatus返回给DAGScheduler
  2. DAGScheduler将MapStatus保存到MapOutputTrackerMaster中
  3. ResultTask在执行到ShuffleRDD时会调用BlockStoreShuffleFetcher的fetch方法去获取数据
    1. 第一件事就是咨询MapOutputTrackerMaster所要取的数据的location
    2. 根据返回的结果调用BlockManager.getMultiple获取真正的数据

BlockStoreShuffleFetcher的fetch函数伪码

    val blockManager = SparkEnv.get.blockManager

    val startTime = System.currentTimeMillis
    val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)
    logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format(
      shuffleId, reduceId, System.currentTimeMillis - startTime))

    val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer)
    val itr = blockFetcherItr.flatMap(unpackBlock) 

注意上述代码中的getServerStatusesgetMultiple,一个是询问数据的位置,一个是去获取真正的数据。

有关Shuffle的详细解释,请参考”详细探究Spark的shuffle实现一文" http://jerryshao.me/architecture/2014/01/04/spark-shuffle-detail-investigation/

<二>存储子系统分析

楔子

Spark计算速度远胜于Hadoop的原因之一就在于中间结果是缓存在内存而不是直接写入到disk,本文尝试分析Spark中存储子系统的构成,并以数据写入和数据读取为例,讲述清楚存储子系统中各部件的交互关系。

存储子系统概览

上图是Spark存储子系统中几个主要模块的关系示意图,现简要说明如下

  • CacheManager  RDD在进行计算的时候,通过CacheManager来获取数据,并通过CacheManager来存储计算结果
  • BlockManager   CacheManager在进行数据读取和存取的时候主要是依赖BlockManager接口来操作,BlockManager决定数据是从内存(MemoryStore)还是从磁盘(DiskStore)中获取
  • MemoryStore   负责将数据保存在内存或从内存读取
  • DiskStore        负责将数据写入磁盘或从磁盘读入
  • BlockManagerWorker  数据写入本地的MemoryStore或DiskStore是一个同步操作,为了容错还需要将数据复制到别的计算结点,以防止数据丢失的时候还能够恢复,数据复制的操作是异步完成,由BlockManagerWorker来处理这一部分事情
  • ConnectionManager 负责与其它计算结点建立连接,并负责数据的发送和接收
  • BlockManagerMaster 注意该模块只运行在Driver Application所在的Executor,功能是负责记录下所有BlockIds存储在哪个SlaveWorker上,比如RDD Task运行在机器A,所需要的BlockId为3,但在机器A上没有BlockId为3的数值,这个时候Slave worker需要通过BlockManager向BlockManagerMaster询问数据存储的位置,然后再通过ConnectionManager去获取,具体参看“数据远程获取一节”

支持的操作

由于BlockManager起到实际的存储管控作用,所以在讲支持的操作的时候,以BlockManager中的public api为例

  • put  数据写入
  • get      数据读取
  • remoteRDD 数据删除,一旦整个job完成,所有的中间计算结果都可以删除

启动过程分析

上述的各个模块由SparkEnv来创建,创建过程在SparkEnv.create中完成

    val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
      "BlockManagerMaster",
      new BlockManagerMasterActor(isLocal, conf)), conf)
    val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer, conf)

    val connectionManager = blockManager.connectionManager
    val broadcastManager = new BroadcastManager(isDriver, conf)
    val cacheManager = new CacheManager(blockManager)

这段代码容易让人疑惑,看起来像是在所有的cluster node上都创建了BlockManagerMasterActor,其实不然,仔细看registerOrLookup函数的实现。如果当前节点是driver则创建这个actor,否则建立到driver的连接。

    def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
      if (isDriver) {
        logInfo("Registering " + name)
        actorSystem.actorOf(Props(newActor), name = name)
      } else {
        val driverHost: String = conf.get("spark.driver.host", "localhost")
        val driverPort: Int = conf.getInt("spark.driver.port", 7077)
        Utils.checkHost(driverHost, "Expected hostname")
        val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name"
        val timeout = AkkaUtils.lookupTimeout(conf)
        logInfo(s"Connecting to $name: $url")
        Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
      }
    }

初始化过程中一个主要的动作就是BlockManager需要向BlockManagerMaster发起注册

数据写入过程分析

数据写入的简要流程:

  1. RDD.iterator是与storage子系统交互的入口
  2. CacheManager.getOrCompute调用BlockManager的put接口来写入数据
  3. 数据优先写入到MemoryStore即内存,如果MemoryStore中的数据已满则将最近使用次数不频繁的数据写入到磁盘
  4. 通知BlockManagerMaster有新的数据写入,在BlockManagerMaster中保存元数据
  5. 将写入的数据与其它slave worker进行同步,一般来说在本机写入的数据,都会另先一台机器来进行数据的备份,即replicanumber=1

序列化与否

写入的具体内容可以是序列化之后的bytes也可以是没有序列化的value. 此处有一个对scala的语法中Either, Left, Right关键字的理解。

数据读取过程分析

 def get(blockId: BlockId): Option[Iterator[Any]] = {
    val local = getLocal(blockId)
    if (local.isDefined) {
      logInfo("Found block %s locally".format(blockId))
      return local
    }
    val remote = getRemote(blockId)
    if (remote.isDefined) {
      logInfo("Found block %s remotely".format(blockId))
      return remote
    }
    None
  }

本地读取

首先在查询本机的MemoryStore和DiskStore中是否有所需要的block数据存在,如果没有则发起远程数据获取。

远程读取

远程获取调用路径, getRemote->doGetRemote, 在doGetRemote中最主要的就是调用BlockManagerWorker.syncGetBlock来从远程获得数据

def syncGetBlock(msg: GetBlock, toConnManagerId: ConnectionManagerId): ByteBuffer = {
    val blockManager = blockManagerWorker.blockManager
    val connectionManager = blockManager.connectionManager
    val blockMessage = BlockMessage.fromGetBlock(msg)
    val blockMessageArray = new BlockMessageArray(blockMessage)
    val responseMessage = connectionManager.sendMessageReliablySync(
        toConnManagerId, blockMessageArray.toBufferMessage)
    responseMessage match {
      case Some(message) => {
        val bufferMessage = message.asInstanceOf[BufferMessage]
        logDebug("Response message received " + bufferMessage)
        BlockMessageArray.fromBufferMessage(bufferMessage).foreach(blockMessage => {
            logDebug("Found " + blockMessage)
            return blockMessage.getData
          })
      }
      case None => logDebug("No response message received")
    }
    null
  }

上述这段代码中最有意思的莫过于sendMessageReliablySync,远程数据读取毫无疑问是一个异步i/o操作,这里的代码怎么写起来就像是在进行同步的操作一样呢。也就是说如何知道对方发送回来响应的呢?

别急,继续去看看sendMessageReliablySync的定义

def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: Message)
      : Future[Option[Message]] = {
    val promise = Promise[Option[Message]]
    val status = new MessageStatus(
      message, connectionManagerId, s => promise.success(s.ackMessage))
    messageStatuses.synchronized {
      messageStatuses += ((message.id, status))
    }
    sendMessage(connectionManagerId, message)
    promise.future
  }

要是我说秘密在这里,你肯定会说我在扯淡,但确实在此处。注意到关键字Promise和Future没。

如果这个future执行完毕,返回s.ackMessage。我们再看看这个ackMessage是在什么地方被写入的呢。看一看ConnectionManager.handleMessage中的代码片段


case bufferMessage: BufferMessage => {
        if (authEnabled) {
          val res = handleAuthentication(connection, bufferMessage)
          if (res == true) {
            // message was security negotiation so skip the rest
            logDebug("After handleAuth result was true, returning")
            return
          }
        }
        if (bufferMessage.hasAckId) {
          val sentMessageStatus = messageStatuses.synchronized {
            messageStatuses.get(bufferMessage.ackId) match {
              case Some(status) => {
                messageStatuses -= bufferMessage.ackId
                status
              }
              case None => {
                throw new Exception("Could not find reference for received ack message " +
                  message.id)
                null
              }
            }
          }
          sentMessageStatus.synchronized {
            sentMessageStatus.ackMessage = Some(message)
            sentMessageStatus.attempted = true
            sentMessageStatus.acked = true
            sentMessageStaus.markDone()
          }

注意,此处的所调用的sentMessageStatus.markDone就会调用在sendMessageReliablySync中定义的promise.Success. 不妨看看MessageStatus的定义。

 class MessageStatus(
      val message: Message,
      val connectionManagerId: ConnectionManagerId,
      completionHandler: MessageStatus => Unit) {

    var ackMessage: Option[Message] = None
    var attempted = false
    var acked = false

    def markDone() { completionHandler(this) }
  }

我想至此调用关系搞清楚了,scala中的Future和Promise理解起来还有有点费劲。

TachyonStore

 在Spark的最新源码中,Storage子系统引入了TachyonStore. TachyonStore是在内存中实现了hdfs文件系统的接口,主要目的就是尽可能的利用内存来作为数据持久层,避免过多的磁盘读写操作。

有关该模块的功能介绍,可以参考http://www.meetup.com/spark-users/events/117307472/

小结

一点点疑问,目前在Spark的存储子系统中,通信模块里传递的数据即有“心跳检测消息”,“数据同步的消息”又有“数据获取之类的信息流”。如果可能的话,要将心跳检测与数据同步即数据获取所使用的网卡分离以提高可靠性。

参考资料

  1. Spark源码分析之-Storage模块 http://jerryshao.me/architecture/2013/10/08/spark-storage-module-analysis/

  2. Tachyon  http://www.slideshare.net/rxin/a-tachyon-2013-0509sparkmeetup?qid=39ee582d-e0bf-41d2-ab01-dc2439abc626&v=default&b=&from_search=2
目录
相关文章
|
3月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
225 2
|
15天前
|
存储 SQL Apache
Apache Doris 开源最顶级基于MPP架构的高性能实时分析数据库
Apache Doris 是一个基于 MPP 架构的高性能实时分析数据库,以其极高的速度和易用性著称。它支持高并发点查询和复杂分析场景,适用于报表分析、即席查询、数据仓库和数据湖查询加速等。最新发布的 2.0.2 版本在性能、稳定性和多租户支持方面有显著提升。社区活跃,已广泛应用于电商、广告、用户行为分析等领域。
Apache Doris 开源最顶级基于MPP架构的高性能实时分析数据库
|
16天前
|
监控 Cloud Native BI
8+ 典型分析场景,25+ 标杆案例,Apache Doris 和 SelectDB 精选案例集(2024版)电子版上线
飞轮科技正式推出 Apache Doris 和 SelectDB 精选案例集 ——《走向现代化的数据仓库(2024 版)》,汇聚了来自各行各业的成功案例与实践经验。该书以行业为划分标准,辅以使用场景标签,旨在为读者提供一个高度整合、全面涵盖、分类清晰且易于查阅的学习资源库。
|
1月前
|
分布式计算 大数据 Java
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
24 1
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
|
29天前
|
分布式计算 大数据 Apache
利用.NET进行大数据处理:Apache Spark与.NET for Apache Spark
【10月更文挑战第15天】随着大数据成为企业决策和技术创新的关键驱动力,Apache Spark作为高效的大数据处理引擎,广受青睐。然而,.NET开发者面临使用Spark的门槛。本文介绍.NET for Apache Spark,展示如何通过C#和F#等.NET语言,结合Spark的强大功能进行大数据处理,简化开发流程并提升效率。示例代码演示了读取CSV文件及统计分析的基本操作,突显了.NET for Apache Spark的易用性和强大功能。
36 1
|
2月前
|
存储 JSON 物联网
查询性能提升 10 倍、存储空间节省 65%,Apache Doris 半结构化数据分析方案及典型场景
本文我们将聚焦企业最普遍使用的 JSON 数据,分别介绍业界传统方案以及 Apache Doris 半结构化数据存储分析的三种方案,并通过图表直观展示这些方案的优势与不足。同时,结合具体应用场景,分享不同需求场景下的使用方式,帮助用户快速选择最合适的 JSON 数据存储及分析方案。
查询性能提升 10 倍、存储空间节省 65%,Apache Doris 半结构化数据分析方案及典型场景
|
1月前
|
消息中间件 druid 大数据
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(二)
32 2
|
1月前
|
消息中间件 分布式计算 druid
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
大数据-153 Apache Druid 案例 从 Kafka 中加载数据并分析(一)
54 1
|
3月前
|
存储 消息中间件 运维
招联金融基于 Apache Doris 数仓升级:单集群 QPS 超 10w,存储成本降低 70%
招联内部已有 40+ 个项目使用 Apache Doris ,拥有超百台集群节点,个别集群峰值 QPS 可达 10w+ 。通过应用 Doris ,招联金融在多场景中均有显著的收益,比如标签关联计算效率相较之前有 6 倍的提升,同等规模数据存储成本节省超 2/3,真正实现了降本提效。
招联金融基于 Apache Doris 数仓升级:单集群 QPS 超 10w,存储成本降低 70%
|
1月前
|
消息中间件 druid Kafka
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
73 0

推荐镜像

更多