海量监控日志基于EMR Spark Streaming SQL进行实时聚合

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 从EMR-3.21.0 版本开始将提供Spark Streaming SQL的预览版功能,支持使用SQL来开发流式分析作业。结果数据可以实时写入Tablestore。 本文以LogHub为数据源,收集ECS上的日志数据,通过Spark Streaming SQL进行聚合后,将流计算结果数据实时写入Tablestore,展示一个简单的日志监控场景。

作者:伯箫,阿里云高级开发工程师。现在在阿里云表格存储团队,负责管控系统的开发,对NOSQL类数据库系统有一些了解。

前言


从EMR-3.21.0 版本开始将提供Spark Streaming SQL的预览版功能,支持使用SQL来开发流式分析作业。结果数据可以实时写入Tablestore。
本文以LogHub为数据源,收集ECS上的日志数据,通过Spark Streaming SQL进行聚合后,将流计算结果数据实时写入Tablestore,展示一个简单的日志监控场景。

image

场景设计


假设有一个商品表Goods,商品信息开放给用户浏览,用户浏览完以后会产生以下格式的日志数据:

"RequestId":"c85df119-f6db-449f-89bb-6773d2468f89",
"Time":2019-07-30 12:05:28,
"

我们需要将原始日志数据,根据GoodsName、OperationType和时间,聚合成一分钟一个点的监控数据。用于监控各商品的访问情况。
最终需要的监控数据格式如下:
image
过程如下图所示,比较简单。
image

技术选型


本文主要来看一下结果数据存放数据库的选型。对于本文的监控场景,结果数据的量级,取决于商品数量。真实情况下,可能还要增加商品的规格、颜色等监控指标,结果数据量会比较大。此外,对于时间粒度比较小的监控数据,一般都只需要保留最近的,时间比较久的历史数据需要删除。
将Tablestore与传统关系型数据库MySQL进行对比,Tablestore有以下优势:

  • 支持海量数据,无缝扩展
    表格存储通过数据分片和负载均衡技术,实现了无缝扩展。
  • 支持数据自动过期
    数据生命周期(Time To Live,简称 TTL)是数据表的一个属性,即数据的存活时间,单位为秒。表格存储会在后台对超过存活时间的数据进行清理,以减少用户的数据存储空间,降低存储成本。监控场景下比较适用,不需要手动去删除数据。

预备工作


创建EMR集群

开通EMR之前,先要对云账号进行实名认证,然后创建默认的EMR角色并授予 AliyunEMRDefaultRole和AliyunEmrEcsDefaultRole这两个角色。
EMR暂时还不支持在官网控制台上配置写入Tablestore的任务,需要登录到MER集群的机器上去操作,所以开通EMR集群的时候,请选择自定义购买,并在最后一步打开挂载公网、远程登录、密码方式登录,自己设置一个密码,如下图。
image
开通完以后,进入到ECS控制台,会看到有一台Master节点机器有弹性IP。后面步骤中,操作EMR Spark Streaming需要使用这个IP地址远程登录到机器上。
image

开通ECS和日志服务

详细步骤请参考官方文档。
其中ilogtail配置如下
image
日志收集到LogHub以后,可以通过官网控制台查看。本文示例中收集到的数据如下:
image

创建Tablestore结果表

image
Count列,作为属性列,不需要定义在主键中。

数据处理流程


1、下载支持数据源需要的jar包

下载地址
https://github.com/aliyun/aliyun-emapreduce-sdk/blob/master-2.x/jars/datasources/latest/emr-datasources_shaded_2.11-1.7.0.jar

2、进入streaming-sql执行环境

下载完jar包以下,在包所在目录执行以下命令进入交互式开发环境。

注意:jar包要上传到集群机器上,不支持远程引用oss文件。
image

3、创建LogHub数据源表

创建LogHub数据源表之前,需要手动开通日志服务的project和logStore,并将日志数据收集好,具体参考【预备工作】一节中,开通LogHub的官方文档。
然后再到EMR集群机器的交互式执行环境中创建LogHub数据源表,示例如下:

CREATE TABLE loghub_source(GoodsName string,OperationType string,RequestId string,__time__ timestamp)
USING loghub
OPTIONS (
sls.project = '{your project name}',
sls.store = '{store name}',
access.key.id = '{access-key}',
access.key.secret = '{access-key-secret}',
endpoint = 'http://{your project name}.cn-hangzhou-intranet.log.aliyuncs.com');

上面的sql语句,创建一个database为helloemr,tablename为loghub_source的表,shema中有GoodsName、OperationType、Time、RequestId、__time__五个字段,其中__time__字段是在配置ilogtail的时候解析日志中的Time字段得到的,等同于Time字段。
需要注意的是,Endpoint请使用内网域名,公网域名速度上会慢很多。

4、创建Tablestore结果表

同样的,创建Tablestore结果表之前,也要先到官网控制台建到tablestore的实例和表。
创建EMR的Tablestore结果表,示例如下

CREATE TABLE tablestore_sink
USING tablestore
OPTIONS(
endpoint="https://sparkStreaming.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="{access-key}",
access.key.secret="{access-key-secret}",
table.name="tablestore_sink",
instance.name="sparkStreaming",
catalog='{"columns":{"GoodsName":{"col":"GoodsName","type":"string"},"OperationType":{"col":"OperationType","type":"string"},"Time":{"col":"Time","type":"long"},"Count":{"col":"Count","type":"long"}}}');

以上sql代码,创建了一个表名为tablestore_sink的表,实例名是sparkStreaming。注意:Endpoint请使用vpc域名。
创建成功以后,使用desc tablestore_sink; 查看表结构如下。
image

5、结果数据写入到Tablestore

通过GoodsName、OperationType聚合一分钟内的请求次数。这里要用到Spark Streaming的滚动窗口函数,取window.start作为聚合后的时间。

SET spark.sql.streaming.checkpointLocation.loghub_source=/home/helloemr;
SET spark.sql.streaming.query.trigger.loghub_source=ProcessingTime;
SET spark.sql.streaming.query.trigger.intervalMs.loghub_source=10000;
INSERT INTO tablestore_sink
SELECT GoodsName,OperationType,count(*) as Count,to_unix_timestamp(window.start, 'yyyy-MM-dd HH:mm:ss') as Time from loghub_source
where delay(__time__)<"2 minute" 
GROUP BY TUMBLING (__time__, interval 1 minute),GoodsName,OperationType;

示例SQL中只填了几个必填参数,具体可以参考作业模板。其中checkpointLocation代表本次流式查询作业的checkpoint路径,需要设置一个绝对路径值。
最终,Tablestore中的结果数据如下:

image

性能调优


Spark Streaming 是基于Spark的流式处理引擎,其基本原理是把输入数据以某一时间间隔批量的处理,当批处理间隔缩短到秒级时,便可以用于处理实时数据流。它本质上是微批处理。
使用上一节的示例代码,实际测试下来,一次作业需要耗费1秒左右的时间。在实际实例中,由于源表、目标数据表、数据量大小以及Sql的复杂程度不同,耗费的时间也会不同。

数据读取

本文使用LogHub为数据源,Shard数量多,Spark Streaming的作业并发度也会多,但需要设置合理的Shard数,具体请参考日志服务分区设置。
同时,创建LogHub数据源表的时候,请使用内网Endpoint。
Spark Streaming作业调优

设置合理的批处理时间

trigger.intervalMs代表批次间隔,单位毫秒,默认为0L。运行任务的时候,当间隔时间比一次任务的运行时间短的时候,任务会打印WARN日志。一般这个值的大小如果能够使得Streaming作业刚好处理好上一个的批处理的数据,那么这个就是最优值。
image

增加作业资源

EMR官网控制台里面有的监控大盘,可以看到作业占用的资源情况,可以根据实际情况调整分配给作业的资源大小。

数据写入

对于表格存储(Tablestore)来说,合理的主键设计,是提高写入性能的关键因素。具体可以参考表格存储最佳实践。一个设计良好的主键,需要避免访问压力集中在一个小范围的连续的分片键上,也就是说避免热点分片。设计良好的表结构,整张表的访问压力能够均匀的分散在各个分片上,这样才能充分利用后端服务器的能力。
如果是新建的Tablestore表,而且数据写入量比较大,最好联系下@表格存储技术支持,对表进行预分区。可以提高初始状态下的写入性能。
_

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
24天前
|
SQL 分布式计算 Serverless
EMR Serverless Spark:一站式全托管湖仓分析利器
本文根据2024云栖大会阿里云 EMR 团队负责人李钰(绝顶) 演讲实录整理而成
118 2
|
2月前
|
SQL 分布式计算 Serverless
阿里云 EMR Serverless Spark 版正式开启商业化
阿里云 EMR Serverless Spark 版正式开启商业化,内置 Fusion Engine,100% 兼容开源 Spark 编程接口,相比于开源 Spark 性能提升300%;提供 Notebook 及 SQL 开发、调试、发布、调度、监控诊断等一站式数据开发体验!
149 3
阿里云 EMR Serverless Spark 版正式开启商业化
|
1月前
|
存储 监控 固态存储
如何监控和优化 WAL 日志文件的存储空间使用?
如何监控和优化 WAL 日志文件的存储空间使用?
|
1月前
|
监控 网络协议 CDN
阿里云国际监控查询流量、用量查询流量与日志统计流量有差异?
阿里云国际监控查询流量、用量查询流量与日志统计流量有差异?
|
2月前
|
运维 Kubernetes 监控
Loki+Promtail+Grafana监控K8s日志
综上,Loki+Promtail+Grafana 监控组合对于在 K8s 环境中优化日志管理至关重要,它不仅提供了强大且易于扩展的日志收集与汇总工具,还有可视化这些日志的能力。通过有效地使用这套工具,可以显著地提高对应用的运维监控能力和故障诊断效率。
294 0
|
3月前
|
SQL 数据库 Java
Hibernate 日志记录竟藏着这些秘密?快来一探究竟,解锁调试与监控最佳实践
【8月更文挑战第31天】在软件开发中,日志记录对调试和监控至关重要。使用持久化框架 Hibernate 时,合理配置日志可帮助理解其内部机制并优化性能。首先,需选择合适的日志框架,如 Log4j 或 Logback,并配置日志级别;理解 Hibernate 的多级日志,如 DEBUG 和 ERROR,以适应不同开发阶段需求;利用 Hibernate 统计功能监测数据库交互情况;记录自定义日志以跟踪业务逻辑;定期审查和清理日志避免占用过多磁盘空间。综上,有效日志记录能显著提升 Hibernate 应用的性能和稳定性。
50 0
|
3月前
|
开发者 前端开发 编解码
Vaadin解锁移动适配新境界:一招制胜,让你的应用征服所有屏幕!
【8月更文挑战第31天】在移动互联网时代,跨平台应用开发备受青睐。作为一款基于Java的Web应用框架,Vaadin凭借其组件化设计和强大的服务器端渲染能力,助力开发者轻松构建多设备适应的Web应用。本文探讨Vaadin与移动设备的适配策略,包括响应式布局、CSS媒体查询、TouchKit插件及服务器端优化,帮助开发者打造美观且实用的移动端体验。通过这些工具和策略的应用,可有效应对屏幕尺寸、分辨率及操作系统的多样性挑战,满足广大移动用户的使用需求。
66 0
|
3月前
|
存储 运维 监控
Entity Framework Core 实现审计日志记录超棒!多种方法助你跟踪数据变化、监控操作,超实用!
【8月更文挑战第31天】在软件开发中,审计日志记录对于跟踪数据变化、监控用户操作及故障排查至关重要。Entity Framework Core (EF Core) 作为强大的对象关系映射框架,提供了多种实现审计日志记录的方法。例如,可以使用 EF Core 的拦截器在数据库操作前后执行自定义逻辑,记录操作类型、时间和执行用户等信息。此外,也可通过在实体类中添加审计属性(如 `CreatedBy`、`CreatedDate` 等),并在保存实体时更新这些属性来记录审计信息。这两种方法都能有效帮助我们追踪数据变更并满足合规性和安全性需求。
78 0
|
3月前
|
存储 JSON 监控
FastAPI日志之谜:如何揭开Web应用监控与调试的面纱?
【8月更文挑战第31天】在现代Web开发中,日志记录对于监控应用状态、诊断问题和了解用户行为至关重要。FastAPI框架提供了强大的日志功能,使开发者能轻松集成日志记录。本文将详细介绍如何在FastAPI中设置和利用日志,包括基础配置、请求响应日志、错误处理和结构化日志等内容,帮助提升应用的可维护性和性能。
135 0
|
3月前
|
消息中间件 Prometheus 监控
Producer的监控与日志记录最佳实践
【8月更文第29天】在分布式系统中,消息队列作为关键组件之一,其稳定性和性能至关重要。生产者(Producer)负责生成并发送消息到消息队列中,因此确保生产者的健康运行是非常重要的。本文将探讨如何为生产者设置监控和日志记录,以跟踪其健康状况和性能指标。
62 0