使用阿里云InfluxDB®和Spark Streaming实时处理时序数据

本文涉及的产品
可观测可视化 Grafana 版,10个用户账号 1个月
简介: 本文重点介绍怎样利用阿里云InfluxDB®和spark structured streaming来实时计算、存储和可视化数据。下面将介绍如何购买和初始化阿里云InfluxDB®,扩展spark foreach writer,以及设计阿里云InfluxDB®数据库时需要注意的事项。

本文重点介绍怎样利用阿里云InfluxDB®和spark structured streaming来实时计算、存储和可视化数据。下面将介绍如何购买和初始化阿里云InfluxDB®,扩展spark foreach writer,以及设计阿里云InfluxDB®数据库时需要注意的事项。
image
在大数据处理中,一个主要的趋势是人们希望看到metric是如何随着时间变化发展。这使得管理和处理时序数据(数值随时间变化的数据)成为数据科学家非常重要的研究方向。目前,已经有非常多的时序处理数据库产品,如OpenTSDB,TimeScaleDB,InfluxDB以及Druid等。InfluxDB因为完整的生态、类SQL的查询语言以及简单快捷的布署而非常受用户喜爱,居于DBEngine时序数据排列首位。阿里云已经将其进行开源托管,并且完善了TIG(Telegraf/InfluxDB/Grafana)生态,即将推出托管的Kapacitor流处理报警组件。

阿里云InfluxDB®

关于时序数据的一些重要概念和如何购买阿里云InfluxDB®可以参考之前的文章<阿里云InfluxDB®教你玩转A股数据>和官方文档。这里补充一下阿里云InfluxDB®提供的实例规格和管理帐号的创建。
image
当前,阿里云InfluxDB®大致提供2C8G/4C16G/8C32G/16C64G/32C128G/64C256G等大致6种规格,每种规格的读写能力参考如上图所示。阿里云InfluxDB®开放了开源版的几乎全部功能,用户可以在控制台创建管理员帐号,该帐号可以通过客户端和SDK进行所有的操作。
image

Writing Data From Spark

Spark是目前大数据处理领域中最流行、最高效的开源工具,通过spark structured streaming写数据到InfluxDB的开源适配器主要有chroniclerreactive-influx。chronicler与reactive-influx的区别是,在写入数据点之前,chronicler必须要将数据格式转换成influxdb行协议,在处理大量field字段和字符串值时会变得相当棘手,相较而言reactive-influx比较方便。
在sbt项目中引入reactive-influx:

libraryDependencies ++= Seq(
"com.pygmalios" % "reactiveinflux-spark_2.11" % "1.4.0.10.0.5.1",
"com.typesafe.netty" % "netty-http-pipelining" % "1.1.4"
)

InfluxDB entry 配置,其中阿里云InfluxDB®内网和公网的URL可以在控制台上找到:

reactiveinflux {
  url = "ts-xxxxx.influxdata.rds.aliyuncs.com:3242/"
  spark {
    batchSize = 1000 // No of records to be send in each batch
  }
}

扩展spark foreach writer, enable spark stuctured streaming 向阿里云InfluxDB®写数据的伪代码如下:

import com.pygmalios.reactiveinflux._
import com.pygmalios.reactiveinflux.spark._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.joda.time.DateTime
import com.pygmalios.reactiveinflux.{ReactiveInfluxConfig, ReactiveInfluxDbName}
import com.pygmalios.reactiveinflux.sync.{SyncReactiveInflux, SyncReactiveInfluxDb}
import scala.concurrent.duration._
class influxDBSink(dbName: String) extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row] {
        
    var  db:SyncReactiveInfluxDb = _
    implicit val awaitAtMost = 1.second
        
    // Define the database connection here
    def open(partitionId: Long, version: Long): Boolean = {
    val syncReactiveInflux =
    SyncReactiveInflux(ReactiveInfluxConfig(None))
    db = syncReactiveInflux.database(dbName);
    db.create() // create the database 
    true
  }
  
    // Write the process logic, and database commit code here
    def process(value: org.apache.spark.sql.Row): Unit = {
    val point = Point(
      time = time,  // system or event time 
      measurement = "measurement1",
      tags = Map(
        "t1" -> "A", 
        "t2" -> "B"
      ),
      fields = Map(
        "f1" -> 10.3, // BigDecimal field
        "f2" -> "x",  // String field
        "f3" -> -1,   // Long field
        "f4" -> true) // Boolean field
    )
    
    db.write(point)
  }
  
  // Close connection here
  def close(errorOrNull: Throwable): Unit = {
  }
}

引入Writer:

val influxWriter = new influxDBSink("dbName")
val influxQuery = ifIndicatorData
                                    .writeStream
                                    .foreach(influxWriter)
                                    .outputMode("append")
                                    .start()

可视化

数据写入InfluxDB之后,便可以利用各种工具进行数据可视化,如Grafana,Chronograf等。一个简单的可视化展示如下:
image
当前阿里云InfluxDB®已经自带Grafana数据可视化,用户只需要在控制台一键开通既可,具体可以参考<5分钟快速完成监控系统搭建之实践篇>

总结

目前InfluxDB已经在阿里云完全托管,被用户广泛使用。随着商业化时间的发展,我们在提高稳定性和性能的同时,功能也一步步丰富起来。当前已经提供了TIG(Telegraf/InfluxDB/Grafana)生态,下一步将完全兼容TICK(Telegraf/InfluxDB/Chorograf/Kapacitor)生态。覆盖的业务场景包括DevOps监控、车联网、智慧交通、金融和IOT传感器数据采集,欢迎大家试用并提供意见。
阿里云InfluxDB®为用户提供7*24小时服务,欢迎加入下面的钉钉群咨询。
image

参考文献

  1. Processing Time Series Data in Real-Time with InfluxDB and Structured Streaming
  2. 阿里云InfluxDB®教你玩转A股数据
  3. chronicler-spark
  4. reactiveinflux
目录
相关文章
|
1月前
|
关系型数据库 MySQL 数据挖掘
阿里云 SelectDB 携手 DTS ,一键实现 TP 数据实时入仓
DTS 作为阿里云核心的数据交互引擎,以其高效的实时数据流处理能力和广泛的数据源兼容性,为用户构建了一个安全可靠、可扩展、高可用的数据架构桥梁。阿里云数据库 SelectDB 通过与 DTS 联合,为用户提供了简单、实时、极速且低成本的事务数据分析方案。用户可以通过 DTS 数据传输服务,一键将自建 MySQL / RDS MySQL / PolarDB for MySQL 数据库,迁移或同步至阿里云数据库 SelectDB 的实例中,帮助企业在短时间内完成数据迁移或同步,并即时获得深度洞察。
阿里云 SelectDB 携手 DTS ,一键实现 TP 数据实时入仓
|
1月前
|
SQL 人工智能 数据挖掘
阿里云DMS,身边的智能化数据分析助手
生成式AI颠覆了人机交互的传统范式,赋予每个人利用AI进行低门槛数据分析的能力。Data Fabric与生成式AI的强强联合,不仅能够实现敏捷数据交付,还有效降低了数据分析门槛,让人人都能数据分析成为可能!阿里云DMS作为阿里云统一的用数平台,在2021年初就开始探索使用Data Fabric理念构建逻辑数仓来加速企业数据价值的交付,2023年推出基于大模型构建的Data Copilot,降低用数门槛,近期我们将Notebook(分析窗口)、逻辑数仓(Data Fabric)、Data Copilot(生成式AI)进行有机组合,端到端的解决用数难题,给用户带来全新的分析体验。
110110 118
阿里云DMS,身边的智能化数据分析助手
|
2月前
|
存储 数据可视化 数据管理
基于阿里云服务的数据平台架构实践
本文主要介绍基于阿里云大数据组件服务,对企业进行大数据平台建设的架构实践。
714 2
|
2月前
|
SQL 分布式计算 API
Spark学习------SparkSQL(概述、编程、数据的加载和保存)
Spark学习------SparkSQL(概述、编程、数据的加载和保存)
55 2
|
5天前
|
分布式计算 大数据 数据处理
【Flink】Flink跟Spark Streaming的区别?
【4月更文挑战第17天】【Flink】Flink跟Spark Streaming的区别?
|
1月前
|
分布式计算 运维 大数据
阿里云 EMR Serverless Spark 版免费邀测中
阿里云 EMR Serverless Spark 版,以 Spark Native Engine 为基础,旨在提供一个全托管、一站式的数据开发平台。诚邀您参与 EMR Serverless Spark 版免费测试,体验 100% 兼容 Spark 的 Serverless 服务:https://survey.aliyun.com/apps/zhiliao/iscizrF54
392 0
阿里云 EMR Serverless Spark 版免费邀测中
|
1月前
|
SQL 分布式计算 Java
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
82 1
|
1月前
|
存储 分布式计算 Spark
实战|使用Spark Streaming写入Hudi
实战|使用Spark Streaming写入Hudi
39 0
|
1月前
|
存储 分布式计算 API
adb spark的lakehouse api访问内表数据,还支持算子下推吗
【2月更文挑战第21天】adb spark的lakehouse api访问内表数据,还支持算子下推吗
107 2
|
1月前
|
消息中间件 编解码 运维
阿里云 Serverless 异步任务处理系统在数据分析领域的应用
本文主要介绍异步任务处理系统中的数据分析,函数计算异步任务最佳实践-Kafka ETL,函数计算异步任务最佳实践-音视频处理等。
175304 348