在这个一切都需要进行加速的时代,流数据的使用变得越来越普遍。我们经常不再听到客户问:“我可以流式传输这些数据吗?”,更多的是问:“我们能以多快的速度流式传输这些数据?”,而诸如Kafka和Delta Lake之类技术的普及更突显了这一势头。我们认为传统流式数据传输的一种形式是以非常快的速度到达的半结构化或非结构化(例如JSON)数据,通常情况下一批数据的量也比较小。这种形式的工作场景横跨各行各业,举一个这样的客户案例,某个证券交易所和数据提供商,他们负责每分钟流式传输数十万个数据项目,包括股票行情、新闻、报价及其他财务数据。该顾客使用Databricks、Delta Lake以及Structured Streaming,实时高可用地处理和分析这些流式数据。但是,随着使用流式数据普遍性的提升,我们见到了另一种类型的客户,他们使用流式技术进行低频次、类批处理的数据处理方式。在这种架构下,流式数据处理扮演的角色通常为监控特定的目录、S3存储桶或其他存放数据的区域,并且会在数据到达之后立即自动处理数据,这种架构消除了传统调度的许多负担,特别是在任务失败或只需要处理部分数据的情况下。所有这些应用场景都表明,流式技术已经不再只是用于实时或类实时的数据计算。
尽管流式技术的出现有许多积极的方面,但这种体系结构也带来了一些麻烦。特别是,历史上一直存在着一个权衡:我们是要高质量的数据还是高速数据?实际上,这不是一个有意义的问题,对于所有实际操作来说,质量都必须与速度相关联,为了实现高速度,我们需要高质量的数据。毕竟,低质量、高速度的数据通常都需要分批进行进一步的处理;另一方面,高质量、低速度的数据不能满足许多现代场景的需要。随着越来越多的公司将流式传输数据作为其数据处理体系结构的关键,速度和质量都必须同时提高。
在本博文中,我们将深入探讨一种数据管理架构,该架构可以在数据到达时,通过主动监控和分析来检测流式数据中损坏或不良的数据,并且不会造成瓶颈。
构建流式数据分析和监控流程
在Databricks,我们看到客户中不断涌现出许多数据处理模式,这些新模式的产生推动了可能的极限,在速度和质量问题上也不例外。为了帮助解决这一矛盾,我们开始考虑使用正确的工具,不仅可以支持所需的数据速度,还可以提供可接受的数据质量水平。Structured Streaming和Delta Lake非常适合用于数据获取和存储层,因为他们能够配合创造一个具有扩展性、容错性和类实时的系统,并且具有exactly-once处理保证。
为企业数据质量分析找到可接受的工具要困难一些,特别是这个工具需要具有对数据质量指标的状态汇总的能力。另外,还需要能够对整个数据集进行检查(例如检测出多少比例的记录为空值),这些都会随着所提取的数据量的增加而增加计算成本。这对所有流式系统而言都是需要的,这一要求就排除了很多可用的工具。
在我们最初的解决方案中,我们选择了Amazon的数据质量检测工具Deequ,因为它能提供简单而强大的API,有对数据质量指标进行状态聚合的能力,以及对Scala的支持。将来,其他Spark原生的工具将提供额外的选择。
流式数据质量监控的实现
我们通过在EC2实例上运行一个小型的Kafka producer来模拟数据流,该实例将模拟的股票交易信息写入Kafka topic,并使用原生的Databricks连接器将这些数据导入到Delta Lake表当中。为了展示Spark Streaming中数据质量检查的功能,我们选择在整个流程中实现Deequ的不同功能:
- 根据历史数据生成约束条件;
- 使用foreachBatch算子对到达的数据进行增量质量分析;
- 使用foreachBatch算子对到达的数据执行(较小的)单元测试,并将质量不佳的batch隔离到质量不佳记录表中;
- 对于每个到达的batch,将最新的状态指标写入到Delta表当中;
- 对整个数据集定期执行(较大的)单元测试,并在MLFlow中跟踪结果;
- 根据验证结果发送通知(如通过电子邮件或Slack);
- 捕获MLFlow中的指标以进行可视化和记录。
我们结合了MLFlow来跟踪一段时间内数据性能指标的质量、Delta表的版本迭代以及结合了一个用于通知和告警的Slack连接器。整个流程可以用如下的图片进行表示:
由于Spark中具有统一的批处理/流式处理接口,因此我们能够在这个流程的任何位置提取报告、告警和指标,作为实时更新或批处理快照。这对于设置触发器或限制特别有用,因此,如果某个指标超过了阈值,则可以执行数据质量改善措施。还要注意的是,我们并没有对初始到达的原始数据造成影响,这些数据将立即提交到我们的Delta表,这意味着我们不会限制数据输入的速率。下游系统可以直接从该表中读取数据,如果超过了上述任何触发条件或质量阈值,则可能会中断。此外,我们可以轻松地创建一个排除质量不佳记录的view以提供一个干净的表。
在一个较高的层次,执行我们的数据质量跟踪和验证的代码如下所示:
spark.readStream
.table("trades_delta")
.writeStream
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
// reassign our current state to the previous next state
val stateStoreCurr = stateStoreNext
// run analysis on the current batch, aggregate with saved state
val metricsResult = AnalysisRunner.run(data=batchDF, ...)
// verify the validity of our current microbatch
val verificationResult = VerificationSuite()
.onData(batchDF)
.addCheck(...).run()
// if verification fails, write batch to bad records table
if (verificationResult.status != CheckStatus.Success) {...}
// write the current results into the metrics table
Metric_results.write
.format("delta")
.mode("overwrite")
.saveAsTable("deequ_metrics")
}
.start()
使用数据质量工具Deequ
在Databricks中使用Deequ是相对比较容易的事情,你需要首先定义一个analyzer,然后在dataframe上运行该analyzer。例如,我们可以跟踪Deequ本地提供的几个相关指标检查,包括检查数量和价格是否为非负数、原始IP地址是否不为空以及符号字段在所有事务中的唯一性。Deequ的StateProvider对象在流式数据配置中特别有用,它能允许用户将我们指标的状态保存在内存或磁盘中,并在以后汇总这些指标。这意味着每个处理的批次仅分析该批次中的数据记录,而不会分析整个表。即使随着数据大小的增长,这也可以使性能保持相对稳定,这在长时间运行的生产环境中很重要,因为生产环境需要在任意数量的数据上保持一致。
MLFlow还可以很好地跟踪指标随时间的演变,在我们的notebook中,我们跟踪在foreachBatch代码中分析的所有Deequ约束作为指标,并使用Delta的versionID和时间戳作为参数。在Databricks的notebook中,集成的MLFlow服务对于指标跟踪特别方便。
通过使用Structured Streaming、Delta Lake和Deequ,我们能够消除传统情况下数据质量和速度之间的权衡,而专注于实现两者的可接受水平。这里特别重要的是灵活性——不仅在如何处理不良记录(隔离、报错、告警等),而且在体系结构上(例如何时以及在何处执行检查?)和生态上(如何使用我们的数据?)。开源技术(如Delta Lake、Structured Streaming和Deequ)是这种灵活性的关键。随着技术的发展,能够使用最新最、最强大的解决方案是提升其竞争优势的驱动力。最重要的是,你的数据的速度和质量一定不能对立,而要保持一致,尤其是在流式数据处理越来越靠近核心业务运营时。很快,这将不会是一种选择,而是一种期望和要求,我们正朝着这个未来方向一次一小步地不断前进。