本文重点介绍怎样利用阿里云InfluxDB®和spark structured streaming来实时计算、存储和可视化数据。下面将介绍如何购买和初始化阿里云InfluxDB®,扩展spark foreach writer,以及设计阿里云InfluxDB®数据库时需要注意的事项。
在大数据处理中,一个主要的趋势是人们希望看到metric是如何随着时间变化发展。这使得管理和处理时序数据(数值随时间变化的数据)成为数据科学家非常重要的研究方向。目前,已经有非常多的时序处理数据库产品,如OpenTSDB,TimeScaleDB,InfluxDB以及Druid等。InfluxDB因为完整的生态、类SQL的查询语言以及简单快捷的布署而非常受用户喜爱,居于DBEngine时序数据排列首位。阿里云已经将其进行开源托管,并且完善了TIG(Telegraf/InfluxDB/Grafana)生态,即将推出托管的Kapacitor流处理报警组件。
阿里云InfluxDB®
关于时序数据的一些重要概念和如何购买阿里云InfluxDB®可以参考之前的文章<阿里云InfluxDB®教你玩转A股数据>和官方文档。这里补充一下阿里云InfluxDB®提供的实例规格和管理帐号的创建。
当前,阿里云InfluxDB®大致提供2C8G/4C16G/8C32G/16C64G/32C128G/64C256G等大致6种规格,每种规格的读写能力参考如上图所示。阿里云InfluxDB®开放了开源版的几乎全部功能,用户可以在控制台创建管理员帐号,该帐号可以通过客户端和SDK进行所有的操作。
Writing Data From Spark
Spark是目前大数据处理领域中最流行、最高效的开源工具,通过spark structured streaming写数据到InfluxDB的开源适配器主要有chronicler和reactive-influx。chronicler与reactive-influx的区别是,在写入数据点之前,chronicler必须要将数据格式转换成influxdb行协议,在处理大量field字段和字符串值时会变得相当棘手,相较而言reactive-influx比较方便。
在sbt项目中引入reactive-influx:
libraryDependencies ++= Seq(
"com.pygmalios" % "reactiveinflux-spark_2.11" % "1.4.0.10.0.5.1",
"com.typesafe.netty" % "netty-http-pipelining" % "1.1.4"
)
InfluxDB entry 配置,其中阿里云InfluxDB®内网和公网的URL可以在控制台上找到:
reactiveinflux {
url = "ts-xxxxx.influxdata.rds.aliyuncs.com:3242/"
spark {
batchSize = 1000 // No of records to be send in each batch
}
}
扩展spark foreach writer, enable spark stuctured streaming 向阿里云InfluxDB®写数据的伪代码如下:
import com.pygmalios.reactiveinflux._
import com.pygmalios.reactiveinflux.spark._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.joda.time.DateTime
import com.pygmalios.reactiveinflux.{ReactiveInfluxConfig, ReactiveInfluxDbName}
import com.pygmalios.reactiveinflux.sync.{SyncReactiveInflux, SyncReactiveInfluxDb}
import scala.concurrent.duration._
class influxDBSink(dbName: String) extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row] {
var db:SyncReactiveInfluxDb = _
implicit val awaitAtMost = 1.second
// Define the database connection here
def open(partitionId: Long, version: Long): Boolean = {
val syncReactiveInflux =
SyncReactiveInflux(ReactiveInfluxConfig(None))
db = syncReactiveInflux.database(dbName);
db.create() // create the database
true
}
// Write the process logic, and database commit code here
def process(value: org.apache.spark.sql.Row): Unit = {
val point = Point(
time = time, // system or event time
measurement = "measurement1",
tags = Map(
"t1" -> "A",
"t2" -> "B"
),
fields = Map(
"f1" -> 10.3, // BigDecimal field
"f2" -> "x", // String field
"f3" -> -1, // Long field
"f4" -> true) // Boolean field
)
db.write(point)
}
// Close connection here
def close(errorOrNull: Throwable): Unit = {
}
}
引入Writer:
val influxWriter = new influxDBSink("dbName")
val influxQuery = ifIndicatorData
.writeStream
.foreach(influxWriter)
.outputMode("append")
.start()
可视化
数据写入InfluxDB之后,便可以利用各种工具进行数据可视化,如Grafana,Chronograf等。一个简单的可视化展示如下:
当前阿里云InfluxDB®已经自带Grafana数据可视化,用户只需要在控制台一键开通既可,具体可以参考<5分钟快速完成监控系统搭建之实践篇>
总结
目前InfluxDB已经在阿里云完全托管,被用户广泛使用。随着商业化时间的发展,我们在提高稳定性和性能的同时,功能也一步步丰富起来。当前已经提供了TIG(Telegraf/InfluxDB/Grafana)生态,下一步将完全兼容TICK(Telegraf/InfluxDB/Chorograf/Kapacitor)生态。覆盖的业务场景包括DevOps监控、车联网、智慧交通、金融和IOT传感器数据采集,欢迎大家试用并提供意见。
阿里云InfluxDB®为用户提供7*24小时服务,欢迎加入下面的钉钉群咨询。