使用阿里云InfluxDB®和Spark Streaming实时处理时序数据

本文涉及的产品
可观测可视化 Grafana 版,10个用户账号 1个月
简介: 本文重点介绍怎样利用阿里云InfluxDB®和spark structured streaming来实时计算、存储和可视化数据。下面将介绍如何购买和初始化阿里云InfluxDB®,扩展spark foreach writer,以及设计阿里云InfluxDB®数据库时需要注意的事项。

本文重点介绍怎样利用阿里云InfluxDB®和spark structured streaming来实时计算、存储和可视化数据。下面将介绍如何购买和初始化阿里云InfluxDB®,扩展spark foreach writer,以及设计阿里云InfluxDB®数据库时需要注意的事项。
image
在大数据处理中,一个主要的趋势是人们希望看到metric是如何随着时间变化发展。这使得管理和处理时序数据(数值随时间变化的数据)成为数据科学家非常重要的研究方向。目前,已经有非常多的时序处理数据库产品,如OpenTSDB,TimeScaleDB,InfluxDB以及Druid等。InfluxDB因为完整的生态、类SQL的查询语言以及简单快捷的布署而非常受用户喜爱,居于DBEngine时序数据排列首位。阿里云已经将其进行开源托管,并且完善了TIG(Telegraf/InfluxDB/Grafana)生态,即将推出托管的Kapacitor流处理报警组件。

阿里云InfluxDB®

关于时序数据的一些重要概念和如何购买阿里云InfluxDB®可以参考之前的文章<阿里云InfluxDB®教你玩转A股数据>和官方文档。这里补充一下阿里云InfluxDB®提供的实例规格和管理帐号的创建。
image
当前,阿里云InfluxDB®大致提供2C8G/4C16G/8C32G/16C64G/32C128G/64C256G等大致6种规格,每种规格的读写能力参考如上图所示。阿里云InfluxDB®开放了开源版的几乎全部功能,用户可以在控制台创建管理员帐号,该帐号可以通过客户端和SDK进行所有的操作。
image

Writing Data From Spark

Spark是目前大数据处理领域中最流行、最高效的开源工具,通过spark structured streaming写数据到InfluxDB的开源适配器主要有chroniclerreactive-influx。chronicler与reactive-influx的区别是,在写入数据点之前,chronicler必须要将数据格式转换成influxdb行协议,在处理大量field字段和字符串值时会变得相当棘手,相较而言reactive-influx比较方便。
在sbt项目中引入reactive-influx:

libraryDependencies ++= Seq(
"com.pygmalios" % "reactiveinflux-spark_2.11" % "1.4.0.10.0.5.1",
"com.typesafe.netty" % "netty-http-pipelining" % "1.1.4"
)

InfluxDB entry 配置,其中阿里云InfluxDB®内网和公网的URL可以在控制台上找到:

reactiveinflux {
  url = "ts-xxxxx.influxdata.rds.aliyuncs.com:3242/"
  spark {
    batchSize = 1000 // No of records to be send in each batch
  }
}

扩展spark foreach writer, enable spark stuctured streaming 向阿里云InfluxDB®写数据的伪代码如下:

import com.pygmalios.reactiveinflux._
import com.pygmalios.reactiveinflux.spark._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.joda.time.DateTime
import com.pygmalios.reactiveinflux.{ReactiveInfluxConfig, ReactiveInfluxDbName}
import com.pygmalios.reactiveinflux.sync.{SyncReactiveInflux, SyncReactiveInfluxDb}
import scala.concurrent.duration._
class influxDBSink(dbName: String) extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row] {
        
    var  db:SyncReactiveInfluxDb = _
    implicit val awaitAtMost = 1.second
        
    // Define the database connection here
    def open(partitionId: Long, version: Long): Boolean = {
    val syncReactiveInflux =
    SyncReactiveInflux(ReactiveInfluxConfig(None))
    db = syncReactiveInflux.database(dbName);
    db.create() // create the database 
    true
  }
  
    // Write the process logic, and database commit code here
    def process(value: org.apache.spark.sql.Row): Unit = {
    val point = Point(
      time = time,  // system or event time 
      measurement = "measurement1",
      tags = Map(
        "t1" -> "A", 
        "t2" -> "B"
      ),
      fields = Map(
        "f1" -> 10.3, // BigDecimal field
        "f2" -> "x",  // String field
        "f3" -> -1,   // Long field
        "f4" -> true) // Boolean field
    )
    
    db.write(point)
  }
  
  // Close connection here
  def close(errorOrNull: Throwable): Unit = {
  }
}

引入Writer:

val influxWriter = new influxDBSink("dbName")
val influxQuery = ifIndicatorData
                                    .writeStream
                                    .foreach(influxWriter)
                                    .outputMode("append")
                                    .start()

可视化

数据写入InfluxDB之后,便可以利用各种工具进行数据可视化,如Grafana,Chronograf等。一个简单的可视化展示如下:
image
当前阿里云InfluxDB®已经自带Grafana数据可视化,用户只需要在控制台一键开通既可,具体可以参考<5分钟快速完成监控系统搭建之实践篇>

总结

目前InfluxDB已经在阿里云完全托管,被用户广泛使用。随着商业化时间的发展,我们在提高稳定性和性能的同时,功能也一步步丰富起来。当前已经提供了TIG(Telegraf/InfluxDB/Grafana)生态,下一步将完全兼容TICK(Telegraf/InfluxDB/Chorograf/Kapacitor)生态。覆盖的业务场景包括DevOps监控、车联网、智慧交通、金融和IOT传感器数据采集,欢迎大家试用并提供意见。
阿里云InfluxDB®为用户提供7*24小时服务,欢迎加入下面的钉钉群咨询。
image

参考文献

  1. Processing Time Series Data in Real-Time with InfluxDB and Structured Streaming
  2. 阿里云InfluxDB®教你玩转A股数据
  3. chronicler-spark
  4. reactiveinflux
目录
相关文章
|
7月前
|
分布式计算 运维 搜索推荐
立马耀:通过阿里云 Serverless Spark 和 Milvus 构建高效向量检索系统,驱动个性化推荐业务
蝉妈妈旗下蝉选通过迁移到阿里云 Serverless Spark 及 Milvus,解决传统架构性能瓶颈与运维复杂性问题。新方案实现离线任务耗时减少40%、失败率降80%,Milvus 向量检索成本降低75%,支持更大规模数据处理,查询响应提速。
318 57
|
5月前
|
人工智能 分布式计算 DataWorks
一体系数据平台的进化:基于阿里云 EMR Serverless Spark 的持续演进
本文介绍了一体系汽配供应链平台如何借助阿里云EMR Serverless Spark实现从传统Hadoop平台向云原生架构的迁移。通过融合高质量零部件供应与创新互联网科技,一体系利用EMR Serverless Spark和DataWorks构建高效数据分析体系,解决大规模数据处理瓶颈。方案涵盖实时数据集成、Lakehouse搭建、数仓分层设计及BI/ML应用支持,显著提升数据处理性能与业务响应速度,降低运维成本,为数字化转型奠定基础。最终实现研发效率提升、运维压力减轻,并推动AI技术深度整合,迈向智能化云原生数据平台。
174 4
|
5月前
|
分布式计算 运维 监控
Fusion 引擎赋能:流利说如何用阿里云 Serverless Spark 实现数仓计算加速
本文介绍了流利说与阿里云合作,利用EMR Serverless Spark优化数据处理的全过程。流利说是科技驱动的教育公司,通过AI技术提升用户英语水平。原有架构存在资源管理、成本和性能等痛点,采用EMR Serverless Spark后,实现弹性资源管理、按需计费及性能优化。方案涵盖数据采集、存储、计算到查询的完整能力,支持多种接入方式与高效调度。迁移后任务耗时减少40%,失败率降低80%,成本下降30%。未来将深化合作,探索更多行业解决方案。
270 1
|
6月前
|
消息中间件 分布式计算 监控
从InfluxDB到StarRocks:Grab实现Spark监控平台10倍性能提升
Grab 是东南亚领先的超级应用,其 Spark 可观测平台 Iris 核心存储迁移到 StarRocks 后性能显著提升。新架构统一了实时与历史数据分析,减少多平台切换复杂性,查询速度提升 10 倍以上,资源使用效率提高 40%。通过物化视图、动态分区和直接 Kafka 摄取数据等优化,简化数据管道并降低运维成本。未来 Grab 将进一步增强推荐系统、集成机器学习,持续优化用户体验与系统可扩展性。
|
9月前
|
存储 分布式计算 物联网
美的楼宇科技基于阿里云 EMR Serverless Spark 构建 LakeHouse 湖仓数据平台
美的楼宇科技基于阿里云 EMR Serverless Spark 建设 IoT 数据平台,实现了数据与 AI 技术的有效融合,解决了美的楼宇科技设备数据量庞大且持续增长、数据半结构化、数据价值缺乏深度挖掘的痛点问题。并结合 EMR Serverless StarRocks 搭建了 Lakehouse 平台,最终实现不同场景下整体性能提升50%以上,同时综合成本下降30%。
668 58
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
206 0
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
257 0
|
9月前
|
机器学习/深度学习 分布式计算 大数据
阿里云 EMR Serverless Spark 在微财机器学习场景下的应用
面对机器学习场景下的训练瓶颈,微财选择基于阿里云 EMR Serverless Spark 建立数据平台。通过 EMR Serverless Spark,微财突破了单机训练使用的数据规模瓶颈,大幅提升了训练效率,解决了存算分离架构下 Shuffle 稳定性和性能困扰,为智能风控等业务提供了强有力的技术支撑。
395 15
|
9月前
|
SQL 分布式计算 Serverless
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
191 0

热门文章

最新文章

下一篇
开通oss服务