Flink 实时计算 x SLS 存储下推:阿里云 OpenAPI 网关监控平台实践

简介: 本文由潘伟龙(阿里云可观测)、阮孝振(阿里云开放平台)撰写,介绍阿里云OpenAPI网关实时监控体系的构建实践。面对TB级日志、多维分析、秒级告警等挑战,采用Flink+SLS云原生方案,创新分层聚合+Source端谓词下推,实现60+地域、300+产品、200TB/日的高可用实时监控,故障发现从分钟级降至秒级。

作者:潘伟龙(阿里云可观测)、阮孝振(阿里云开放平台)

1. 背景与挑战

业务背景

image.png

阿里云开放平台(OpenAPI) 是开发者管理云上资源的标准入口。开放平台承载了几乎所有云产品的对外接口,关乎客户的自动化业务和各类管控需求。随着企业对自动化的依赖日益加深,OpenAPI 的稳定性建设变得至关重要。

监控体系的需求方包括:

  • 开放平台运维团队:负责网关整体可用性,需要全局视角的监控告警

  • 各云产品团队(ECS/RDS/SLB 等):需要查看自己产品的 API 调用指标和大盘,并配置细粒度告警

  • SRE 团队:需要快速定位故障,进行根因分析

任何接口的波动都可能影响客户的生产业务,因此必须建立全方位的指标监控体系,并配套及时的告警能力,以确保高可用性。

核心挑战

构建监控体系的核心数据源是 API 网关的访问日志。这些日志由分布式部署在各地域的网关节点产生,具有以下鲜明特点:

image.png

挑战 描述 影响
海量并发 核心网关集群分布式部署,每分钟产生数十 GB 的日志,日均数据量达 TB 级 传统批处理方案无法满足实时性要求
维度复杂 包含 Region、产品、业务域、租户、API、错误码等多维信息,且相互交织 需要灵活的多维聚合能力
实时性高 故障发现需要秒级响应,告警延迟直接影响 MTTR 必须采用流式计算架构
稳定性要求高 监控系统本身的可用性要求 99.99%+,不能因自身故障导致漏报 需要高可靠的数据链路和容错机制

2. 技术方案

针对上述挑战,我们采用 Flink + SLS(日志服务) 的云原生组合来构建实时监控体系。

技术选型

该方案的核心组件及选型理由如下:

组件 选型理由
阿里云实时计算Flink 业界领先的流计算引擎,支持 Exactly-Once 语义、窗口聚合和丰富的状态管理
SLS(日志服务) 阿里云原生日志平台,天然支持海量日志采集、存储和消费,提供 Logtail 一键接入
Flink SLS Connector 阿里云实时计算Flink内置的SLS连接器,由SLS和Flink双方产研团队共同打造,支持消费组模式,天然实现 Checkpoint 对齐
MetricStore SLS 原生时序存储,完美兼容 Prometheus 协议,可直接对接 Grafana

这套组合的核心优势:

  • 全托管运维:SLS 和 Flink(基于阿里云实时计算)均为全托管服务,无需运维基础设施

  • 弹性扩展:消费吞吐和计算资源可按需扩缩

  • 端到端保障:从采集到告警的全链路可观测

整体架构

image.png

整个数据处理链路采用地域化部署 + 中心化汇聚的架构设计,在各地域内完成日志采集与聚合计算,最终将指标汇聚到中心化的 MetricStore 实现全局监控。

地域内处理(Regional Processing)

每个地域独立部署完整的数据处理链路,实现就近计算、降低延迟:

  1. 数据采集:由 Logtail 实时采集本地域网关节点日志。Logtail 是阿里云自研的高性能日志采集 Agent,具备毫秒级延迟百万级 EPS 吞吐能力,确保海量日志的可靠传输。

  2. 日志存储:各地域的 SLS Logstore 存储本地域 OpenAPI 的原始访问日志,支持对请求明细的实时查询与分析,同时作为 Flink 流计算的数据源。

  3. 流式聚合计算:各地域独立部署 Flink Job 1(聚合作业),关联 MySQL 维表(存储网关机器的集群信息、API 业务域如 ECS 等元数据),进行局部维度的业务指标汇总。地域内处理可大幅减少跨域传输的数据量。

    跨地域汇聚(Cross-Region Aggregation)

    多个地域的聚合结果统一写入中心化的 MetricStore,实现全局视图:

  4. 指标汇聚:各地域的 Flink Job 2(指标转换) 将聚合结果转为时序指标格式,统一推送到中心地域的 SLS MetricStore。通过汇聚设计,运维团队可在单一视图查看全球所有地域的指标。

  5. 可视化与告警:基于 Grafana 对接中心化的 SLS MetricStore,通过标准 PromQL 实现多维度的指标大盘展示,并配置细粒度告警规则,实现从异常发现到通知的闭环。

分层设计理念

为什么要分两层?核心考虑是平衡实时性与资源效率

层级 部署方式 职责 设计考量
第一层:地域化处理 各地域独立部署 Logstore + Flink 关联维表、计算基础度量、写入本地域聚合日志 Logstore 数据减量:在数据源头(各地域)完成高维度的细节聚合,将 TB 级原始日志压缩为 GB 级聚合数据,大幅降低跨域带宽成本
第二层:中心化汇聚 各地域部署 Flink Job 2 + 中心 MetricStore 将聚合数据转换为指标格式,跨域写入中心 MetricStore 全局视图:通过中心化存储实现全球业务的统一监控与告警管理

为什么不直接一层聚合?

> 1. 数据倾斜(Data Skew):OpenAPI 流量分布极不均匀,某些大产品(如 ECS)的 QPS 是其他产品的数千倍。如果直接按 `Product` 进行 GroupBy,会导致特定 Flink Task 出现严重的数据倾斜和状态膨胀。

> 2. 资源效率:通过第一层单机维度的局部聚合,输出到下游的数据量可降低 90% 以上,大幅减轻了全局聚合作业的计算和存储开销。

3. 指标体系设计

我们需要生成的指标体系由 Metric Name(指标名称)Labels(标签) 组成,覆盖以下四个核心维度:

维度 指标前缀 (Prefix) 核心指标 (Metric) 关键标签 (Labels) 使用场景
产品维度 namespace_product_gw http_req (QPS), rt_mean, success_rate product, region_id, biz_region_id 各云产品团队监控自己产品的整体健康度
API 维度 namespace_api_gw http_req, http_5xx, slow_http_req, http503_rate product, api, version, priority 定位具体接口问题,分析慢调用
错误码 namespace_error_code_gw http_code, error_code product, api, error_code 错误分布分析,快速定位故障原因
租户维度 namespace_tenant_gw api_req_limiting_rate (限流比例) uid, gc_level (GC5/6/7) 大客户限流监控,容量规划

指标命名规范Prefix_MetricName。例如 Ecs 产品的 QPS 指标名为 namespace_product_gw_http_req

4. 核心作业实践

Job 1:聚合作业

设计意图:消费原始日志,关联 MySQL 维表补充元数据(网关集群信息、API 业务域等),并进行多阶段聚合:先进行细粒度的多维聚合(按产品/API/租户等)以减少下游数据量,再进行全局指标汇总。

1. 数据源:网关原始日志

原始日志由 Logtail 从网关节点采集。以下是一条典型的网关访问日志(已脱敏):

{
   
  "AK": "STS.NZD***Lgwc",
  "Api": "DescribeCustomResourceDetail",
  "CallerUid": "109837***3503",
  "ClientIp": "192.168.xx.xx",
  "Domain": "acc-vpc.cn-huhehaote.aliyuncs.com",
  "ErrorCode": "ResourceNotFound",
  "Ext5": "{\"logRegionId\":\"cn-huhehaote\",\"appGroup\":\"pop-region-cn-huhehaote\",\"callerInfo\":{...},\"headers\":{...}}",
  "HttpCode": "404",
  "LocalIp": "11.197.xxx.xxx",
  "Product": "acc",
  "RegionId": "cn-huhehaote",
  "RequestContent": "RegionId=cn-huhehaote;Action=DescribeCustomResourceDetail;Version=2024-04-02;...",
  "TotalUsedTime": "14",
  "Version": "2024-04-02",
  "__time__": "1768484243"
}

字段说明Ext5 包含嵌套的 JSON 结构(调用者信息、请求头等),RequestContent 是 KV 格式的请求参数。这些复杂结构需要在后续处理中解析。

基于上述日志结构,我们定义 Flink Source 表:

CREATE TABLE openapi_log_source (
  `__time__` BIGINT,
  LocalIp STRING,           -- 网关节点 IP
  Product STRING,           -- 产品 Code (如 Ecs, RDS)
  Api STRING,               -- API 名称 (如 DescribeInstances)
  Version STRING,           -- API 版本
  Domain STRING,            -- 请求域名
  AK STRING,                -- AccessKey
  CallerUid STRING,         -- 调用者 UID
  HttpCode STRING,          -- HTTP 状态码
  ErrorCode STRING,         -- 网关错误码
  TotalUsedTime BIGINT,     -- 请求耗时 (ms)
  ClientIp STRING,          -- 客户端 IP
  RegionId STRING,          -- 地域
  Ext5 STRING,              -- 扩展字段 (嵌套 JSON)
  RequestContent STRING,    -- 请求参数 (KV 格式)
  ts AS TO_TIMESTAMP_LTZ(`__time__` * 1000, 3),
  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
  'connector' = 'sls',
  'project' = '*****',
  'logstore' = 'pop_rpc_trace_log',
  'endpoint' = 'cn-shanghai-intranet.log.aliyuncs.com'
);

Watermark 策略说明:这里设置 ts - INTERVAL '5' SECOND 表示允许最多 5 秒的乱序数据。该值需根据实际业务场景权衡:生产环境中,网关日志通过 Logtail 采集,端到端延迟通常在 2-3 秒内,5 秒的 Watermark 延迟可以覆盖绝大多数场景。对于跨地域同步场景,可适当放宽至 10-15 秒。

2. MySQL 维表:元数据富化

为了满足指标的标签需求(如 app_group, gc_level),需要关联维表:

-- 网关集群信息 (关联 LocalIp)
CREATE TABLE gateway_cluster_dim (
  local_ip STRING,
  app_group STRING,          -- 集群名称
  region_id STRING,          -- 物理 Region
  PRIMARY KEY (local_ip) NOT ENFORCED
) WITH ('connector' = 'jdbc', ...);

-- 租户等级信息 (关联 Uid)
CREATE TABLE user_level_dim (
  uid STRING,
  gc_level STRING,           -- 客户等级 (GC5/GC6/GC7)
  PRIMARY KEY (uid) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://xxx:3306/dim_db',
  'table-name' = 'user_level',
  'lookup.cache.max-rows' = '50000',       -- 缓存最大行数
  'lookup.cache.ttl' = '10min',            -- 缓存过期时间
  'lookup.max-retries' = '3'               -- 查询失败重试次数
);

维表缓存策略选择:生产环境中,gateway_cluster_dim 采用 ALL 策略,启动时全量加载并定时刷新;user_level_dim 采用 LRU 策略,缓存 5 万条热点租户数据,TTL 设为 10 分钟以平衡命中率和数据新鲜度。

3. Job 1 输出:写入本地域聚合日志

计算结果写入本地域的 SLS Logstore machine_agg_log,作为中间存储。

-- 定义本地聚合日志存储
CREATE TABLE machine_agg_log_sink (
  window_start TIMESTAMP(3),
  product STRING,
  api STRING,
  version STRING,
  caller_uid STRING,
  region_id STRING,
  app_group STRING,
  gc_level STRING,
  http_code STRING,
  error_code STRING,
  qps BIGINT,
  rt_mean DOUBLE,
  slow1s_count BIGINT,
  http_2xx BIGINT,
  http_5xx BIGINT,
  http_503 BIGINT
) WITH (
  'connector' = 'sls',
  'project' = '****',
  'logstore' = 'machine_agg_log',  -- 地域化部署,写入本地域 Logstore
  'endpoint' = 'cn-shanghai-intranet.log.aliyuncs.com' -- 实际部署时替换为各地域 Endpoint
);

-- 执行写入
INSERT INTO machine_agg_log_sink
SELECT 
  TUMBLE_START(l.ts, INTERVAL '10' SECOND),
  l.Product, l.Api, l.Version, l.CallerUid, g.region_id, g.app_group, u.gc_level, l.HttpCode, l.ErrorCode,
  COUNT(*) as qps,
  AVG(CAST(l.TotalUsedTime AS DOUBLE)),
  SUM(CASE WHEN l.TotalUsedTime > 1000 THEN 1 ELSE 0 END),
  SUM(CASE WHEN l.HttpCode >= '200' AND l.HttpCode < '300' THEN 1 ELSE 0 END),
  SUM(CASE WHEN l.HttpCode >= '500' THEN 1 ELSE 0 END),
  SUM(CASE WHEN l.HttpCode = '503' THEN 1 ELSE 0 END)
FROM openapi_log_source l
LEFT JOIN gateway_cluster_dim FOR SYSTEM_TIME AS OF l.ts AS g ON l.LocalIp = g.local_ip
LEFT JOIN user_level_dim FOR SYSTEM_TIME AS OF l.ts AS u ON l.CallerUid = u.uid
GROUP BY 
  TUMBLE(l.ts, INTERVAL '10' SECOND),
  l.Product, l.Api, l.Version, l.CallerUid, g.region_id, g.app_group, u.gc_level, l.HttpCode, l.ErrorCode;

Job 2:指标转换与跨域汇聚

设计意图:各地域独立部署 Job 2,消费本地域的聚合日志 machine_agg_log,将数据转换为时序格式,并跨域写入中心地域 (cn-shanghai) 的 MetricStore。

1. 数据源:消费本地聚合日志

CREATE TABLE machine_agg_log_source (
  window_start TIMESTAMP(3),
  product STRING,
  region_id STRING,
  -- ... 其他字段同 Sink 定义
  WATERMARK FOR window_start AS window_start - INTERVAL '5' SECOND
) WITH (
  'connector' = 'sls',
  'project' = '****',
  'logstore' = 'machine_agg_log',  -- 消费本地域 Logstore
  'endpoint' = 'cn-shanghai-intranet.log.aliyuncs.com'
);

2. 目标汇聚:中心化 MetricStore Sink

CREATE TABLE metricstore_sink (
  `__time_nano__` BIGINT,
  `__name__` STRING,
  `__labels__` STRING,
  `__value__` DOUBLE
) WITH (
  'connector' = 'sls',
  'project' = '****',      -- 中心化 Project
  'logstore' = 'openapi_metrics',            -- 中心化 MetricStore
  'endpoint' = 'cn-shanghai-intranet.log.aliyuncs.com' -- 统一指向中心地域 Endpoint
);

3. 计算与汇聚逻辑

Job 2 将聚合日志进一步汇总(如按 Product 维度)并格式化为 Metric 写入中心。

示例:计算产品维度 QPS 并汇聚

INSERT INTO metricstore_sink
SELECT 
  UNIX_TIMESTAMP(CAST(TUMBLE_START(window_start, INTERVAL '1' MINUTE) AS STRING)) * 1000000000,
  'namespace_product_gw_http_req',
  CONCAT('product=', product, '|region_id=', region_id), -- 保留地域标签,实现全球视角
  CAST(SUM(qps) AS DOUBLE)
FROM machine_agg_log_source
GROUP BY TUMBLE(window_start, INTERVAL '1' MINUTE), product, region_id;

架构优势

带宽节省:Job 1 将海量明细日志聚合为少量统计数据(数据量减少 99%),Job 2 仅跨域传输这些轻量级指标,极大降低了专线带宽成本。

隔离性:各地域计算独立,单地域故障不影响其他地域及中心监控的写入。

作业配置与调优

为了保障作业的稳定性和数据准确性,生产环境中我们对 Checkpoint 和状态后端进行了专项调优。

Checkpoint 配置与权衡

提供了两种配置策略,需根据业务对数据一致性服务可用性的偏好进行选择:

策略 A:标准与一致性优先(推荐通用场景)

适用于绝大多数对数据准确性有要求的监控场景。

SET 'execution.checkpointing.interval' = '60s';           -- 1分钟 Checkpoint
SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';      -- 精确一次语义
SET 'execution.checkpointing.timeout' = '10min';

策略 B:高可用优化配置(本案例生产实践)

在 OpenAPI 网关这种超高并发对可用性极度敏感的场景下,为了避免 Checkpoint 过于频繁导致的性能抖动,同时又不希望完全牺牲数据可靠性,我们采取了“弱一致性 + 低频打点 + 允许失败”的组合策略:

SET 'execution.checkpointing.interval' = '180s';          -- 延长至 3 分钟,减少频率
SET 'execution.checkpointing.mode' = 'AT_LEAST_ONCE';     -- 降低 Barrier 对齐开销
SET 'execution.checkpointing.timeout' = '15min';          -- 放宽超时时间
SET 'execution.checkpointing.max-concurrent-checkpoints' = '1';
SET 'execution.checkpointing.tolerable-failed-checkpoints' = '10'; -- 允许连续失败,不因 CP 失败重启作业

优化思路

策略 一致性 稳定性/开销 适用场景
标准配置 Exactly-Once 中 (需对齐 Barrier) 计费、审计等核心强一致数据
高可用优化 At-Least-Once 高 (无对齐等待,允许失败) 超大规模监控、实时大屏、趋势分析

状态后端选择

阿里云实时计算 Flink 版提供了企业级的 GeminiStateBackend,相比开源 RocksDB,它在存算分离架构下针对大状态场景进行了深度优化。针对本案例中“状态大(GB级)、聚合Key多”的特点,我们开启了 KV 分离功能:

SET 'table.exec.state.backend' = 'gemini';                -- 使用企业级 GeminiStateBackend
SET 'state.backend.gemini.kv.separate.mode' = 'GLOBAL_ENABLE'; -- 开启 KV 分离

GeminiStateBackend 核心优势对比

特性 GeminiStateBackend RocksDB 业务价值
KV 分离 支持自动/手动开启 不支持 (混合存储) 吞吐提升 50%+:将大 Value 分离存储,大幅降低 Compaction 带来的写放大,显著提升复杂聚合与 Join 的性能。
自适应调参 引擎托管 (Adaptive Tuning) 人工调优 稳定性增强:根据流量和访问模式自动调整内存与 IO 参数,避免因配置不当导致的 OOM 或性能抖动。
状态迁移 按需迁移 (Lazy Migration) 全量迁移 秒级启动:Failover 或扩缩容时,无需等待全量数据下载即可启动计算,大幅缩短业务断流时间。

生产建议:对于网关日志聚合这种 State Size 较大且吞吐要求极高的场景,强烈推荐使用 GeminiStateBackend + KV 分离。实测开启后,作业在流量高峰期的 CPU 使用率下降了 20%,且 Checkpoint 耗时更加稳定。

5. 可视化与告警

监控效果展示

通过两个 Flink 作业的聚合,我们在 Grafana 中构建了多维度的 OpenAPI 监控大盘,实现了从产品全局视图到具体错误码的深度下钻。

image.png

image.png

image.png

image.png

自助查询与告警

在 Grafana 中添加 SLS MetricStore 作为数据源后,各云产品团队可以使用 PromQL 语法自助查询指标并配置告警规则:

常用查询示例

# QPS 趋势
sum(namespace_product_gw_http_req) by (product)

# 错误率环比(当前 1 分钟与 1 小时前对比)
(
  sum(rate(namespace_product_gw_http_5xx[1m])) / sum(rate(namespace_product_gw_http_req[1m]))
) / (
  sum(rate(namespace_product_gw_http_5xx[1m] offset 1h)) / sum(rate(namespace_product_gw_http_req[1m] offset 1h))
) > 2

# 平均延迟趋势
avg(namespace_product_gw_rt_mean) by (product)

告警规则示例

- alert: HighErrorRate
  expr: sum(namespace_product_gw_http_5xx) by (product) / sum(namespace_product_gw_http_req) by (product) > 0.01
  for: 2m
  labels:
    severity: warning
  annotations:
    summary: "{
   { $labels.product }} 错误率过高"
    description: "当前错误率: {
   {
    $value | printf \"%.2f\" }}%"

各云产品团队可以在 Grafana 中配置自己产品的监控大盘和告警规则,实现自主运维。

6. 规模化验证

该方案已在阿里云开放平台稳定运行,以下是生产环境的核心指标:

image.png

上图展示了系统在生产环境中的核心运行指标。得益于 Flink 的分布式计算能力和 SLS 的高吞吐存储,该方案成功支撑了阿里云开放平台全量 API 调用的实时监控,覆盖 60+ 个全球地域、300+ 个云产品,日均处理 200TB+ 压缩日志(原始日志约 2PB,单条日志约 4-5KB),生成 50 万+ 时序指标。

数据处理规模

指标 数值
日均日志量 200+ TB(压缩后)
峰值 QPS 200 万+/秒
覆盖地域 全球 60+ Region
接入云产品 300+

指标生成能力

指标类型 数量 更新频率
产品维度指标 5,000+ 20秒 ~ 1分钟
API 维度指标 200,000+ 20秒 ~ 1分钟
错误码维度指标 50,000+ 20秒 ~ 1分钟
租户维度指标 250,000+ 20秒 ~ 1分钟

系统稳定性

指标 表现
Flink 作业可用性 99.99%+
端到端延迟 P99 < 30 秒(从日志产生到指标可查)
告警触达时效 < 1 分钟
连续稳定运行 6 个月+ 无人工干预重启

业务价值

  • 故障发现提速:故障发现时间从分钟级缩短到秒级

  • 运维效率提升:300+ 云产品团队实现自助监控配置

在方案落地过程中,我们发现原始日志包含大量冗余字段和嵌套结构,而指标计算只需其中的核心字段。为此,我们引入了 Source 端谓词下推 技术,在数据进入 Flink 之前完成字段裁剪,有效降低了网络传输量并加速了 Flink 计算。

7. 进阶优化:Source 端谓词下推(Predicate Pushdown)

谓词下推概念与 Connector 能力对比

谓词下推(Predicate Pushdown) 是数据库和大数据领域的经典优化策略,核心思想是将过滤条件下推到数据源端执行,减少数据传输量和计算开销。

Flink 的下推能力取决于 Source Connector 的实现

Connector 下推能力 实现方式
Kafka ❌ 不支持 Kafka 本身不支持服务端过滤
JDBC ✅ 支持 将 WHERE 条件转为 SQL 下推到数据库
Hive ✅ 支持 分区裁剪 + 列裁剪
Iceberg/Hudi ✅ 支持 利用 Min/Max 统计信息跳过文件
SLS (日志服务) ✅ 通过SLS消费处理器支持 在服务端执行 SPL 语句

SLS 消费处理器:一种 Source 端下推实现

早期版本的 Flink SLS Connector 会默认全量拉取 SLS Logstore 的数据,但实际上很多字段是不需要的。借助 SLS 消费处理器,我们实现了真正的 Source 端谓词下推——过滤和转换逻辑在 SLS 服务端执行,Flink 只接收处理后的结果。

image.png

技术优势

  • SIMD 向量化引擎:SPL 底层采用向量化执行引擎,利用 CPU SIMD 指令集(如 AVX2/AVX-512)批量处理数据,相比逐行处理性能提升数倍

  • 同机房本地计算:数据处理在 SLS 存储节点本地完成,无需跨网络传输原始数据,避免了网络 I/O 成为瓶颈

  • 列式存储加速:SLS 底层采用列式存储,配合 project 列裁剪,只读取必要的列数据,大幅减少磁盘 I/O

  • 零拷贝传输:处理后的数据直接进入消费通道,减少内存拷贝开销

image.png

计费提示

普通消费:按传输的压缩数据量计费。

使用SPL消费处理器:按扫描的原始(未压缩)数据量计费。

详细定价及区别请参考 SLS 产品定价

SPL 配置示例

基于前文介绍的网关日志结构,我们通过 SPL 消费处理器实现 Source 端过滤。对比传统的 Flink 侧过滤:

-- 传统方式:Flink 侧过滤(需拉取全量数据)
SELECT * FROM openapi_log_source
WHERE Domain != 'popwarmup.aliyuncs.com'
  AND JSON_VALUE(Ext5, '$.logRegionId') NOT IN ('cn-shanghai', 'cn-beijing')

使用 SPL 消费处理器后,过滤和转换在 SLS 服务端完成:

-- 1. 行过滤:排除无效数据
* 
| where Domain != 'popwarmup.aliyuncs.com'

-- 2. 嵌套 JSON 分层展开(只对有效数据执行)
| parse-json -prefix='ext5_' Ext5  
| where ext5_logRegionId not in ('cn-shanghai', 'cn-beijing', 'cn-hangzhou')
| parse-json -prefix='callerInfo_' ext5_callerInfo  
| parse-json -prefix='headers_' ext5_headers  

-- 3. 正则提取 KV 格式字段
| parse-regexp RequestContent, '[;]RegionId=([^;]*)' as request_regionId  

-- 4. 列裁剪:只保留必要字段(放在最后,减少输出数据量)
| project LocalIp, Product, Version, Api, Domain, ErrorCode, HttpCode, 
         TotalUsedTime, AK, RegionId, ClientIp, 
         callerInfo_callerType, callerInfo_callerUid, callerInfo_ownerId,
         ext5_regionId, ext5_appGroup, ext5_stage, request_regionId

在 Flink SLS Source 中引用

在 Flink SQL 中,通过 processor 参数引用预先配置好的消费处理器:

CREATE TABLE openapi_log_source (
  `__time__` BIGINT,
  -- SPL 处理后的字段(已展开嵌套 JSON、已裁剪冗余列)
  LocalIp STRING,
  Product STRING,
  Version STRING,
  Api STRING,
  Domain STRING,
  ErrorCode STRING,
  HttpCode STRING,
  TotalUsedTime BIGINT,
  AK STRING,
  RegionId STRING,
  ClientIp STRING,
  callerInfo_callerType STRING,      -- 从 Ext5.callerInfo 展开
  callerInfo_callerUid STRING,
  callerInfo_ownerId STRING,
  ext5_regionId STRING,              -- 从 Ext5 展开
  ext5_appGroup STRING,
  ext5_stage STRING,
  request_regionId STRING,           -- 从 RequestContent 正则提取
  ts AS TO_TIMESTAMP_LTZ(`__time__` * 1000, 3),
  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
  'connector' = 'sls',
  'project' = '****',
  'logstore' = 'pop_rpc_trace_log',
  'endpoint' = 'cn-shanghai-intranet.log.aliyuncs.com',
  'processor' = 'openapi-processor'  -- 引用消费处理器,实现过滤下推
);

优化效果

通过 SPL 源头预处理,我们在以下几个维度取得了显著提升:

优化项 优化前 优化后 提升效果
数据传输量 100GB/min 20GB/min 降低 80%,跨域同步开销显著降低
Checkpoint 大小 100% 20% 降低 80%,Failover 恢复时间大幅缩短
作业稳定性 偶发 OOM 稳定运行 状态压力减轻,GC 频率降低
开发效率 Flink 侧处理 SLS 侧配置 SPL 语法简洁,无需修改作业代码

总结

通过 Flink + SLS 的云原生组合,我们成功构建了阿里云 OpenAPI 网关的实时监控体系:

挑战 解决方案
海量并发 SLS 高吞吐存储 + Flink 分布式计算
维度复杂 分层聚合设计,灵活支持多维分析
实时性高 端到端秒级延迟,快速故障发现
稳定性要求高 全托管服务 + Exactly-Once 语义保障

Flink 核心技术要点

技术点 应用场景 关键配置
Watermark 与事件时间 乱序日志的窗口聚合 WATERMARK FOR ts AS ts - INTERVAL 'X' SECOND
Lookup Join 流表与维表关联 FOR SYSTEM_TIME AS OF
分层聚合设计 解决 Data Skew 问题 局部聚合 → 全局汇总
GeminiStateBackend 大状态 / 存算分离 table.exec.state.backend = gemini
Source 端谓词下推 减少数据传输与计算 SLS 消费处理器

架构设计启示

  1. 分层聚合缓解数据倾斜:流量分布不均时,先按物理节点局部聚合,再按业务维度全局汇总

  2. 谓词下推降低成本:将过滤逻辑下推到 Source 端(如 SLS 消费处理器),减少网络传输和计算资源消耗

  3. 选择企业级状态后端:大状态场景选用 GeminiStateBackend + KV 分离,显著提升 I/O 效率与作业稳定性

本案例的技术方案可推广至微服务调用链监控、CDN 日志分析、物联网数据聚合等类似场景。

开发者资源

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
23天前
|
机器学习/深度学习 SQL 人工智能
别再群发拜年消息了!三步微调AI,让它学会你的“独家语气”
每逢春节,通用AI祝福总显生硬空洞。本文探讨如何通过微调(LoRA),将“人情世故”转化为结构化数据(称呼/关系/细节/风格等),让AI真正学会你的语气与记忆,生成有温度、带梗、专属的个性化祝福——技术不是替代表达,而是帮你把来不及说的情意,说得恰到好处。(239字)
266 16
别再群发拜年消息了!三步微调AI,让它学会你的“独家语气”
|
3月前
|
消息中间件 Java Kafka
在 OpenAI 打造流处理平台:超大规模实时计算的实践与思考
本文介绍OpenAI构建流处理平台的实践与挑战。面对Kafka高可用、Python生态兼容、云环境限制等问题,团队基于PyFlink打造跨区域流处理架构,集成Kafka HA组、自研代理与控制平面,支撑实时Embedding生成、特征计算等场景,并推动开源协作与平台自动化演进。
236 1
在 OpenAI 打造流处理平台:超大规模实时计算的实践与思考
|
23天前
|
人工智能 自然语言处理 小程序
给AI拜年差点翻车后,我悟了:RAG和微调,到底谁更懂“人情世故”?
大家好,我是AI伙伴狸猫算君!本文以“AI写春节祝福”为切入点,深入剖析RAG与微调的技术差异:RAG依赖检索拼凑,难捕获独特人情;微调则通过高质量关系感知数据,将“称呼、细节、风格”内化为模型本能。手把手演示30分钟用LLaMA-Factory完成Qwen3微调,让祝福真正有温度、有梗、有你。
136 13
|
23天前
|
安全 C++
关系记忆不是越完整越好:chunk size 的隐性代价
本文揭示关系型RAG(如祝福/道歉生成)中一个反直觉真相:关系信息并非越完整越好。大chunk会将“可引用的触发点”异化为“需总结的材料”,诱使模型转向安全、抽象、概括性表达,丧失走心感。核心原则是——切分重在“可被直接引用”,而非“逻辑完整”。
|
7天前
|
Serverless
阿里云产品二月刊来啦
千问 Qwen3.5-Plus 重磅登场,百炼 Coding Plan 支持多款开闭源模型,桌面 Agent 工具 CoPaw 开源,函数计算 AgentRun 重磅上线知识库功能|产品二月刊
233 6
|
2月前
|
存储 缓存 数据建模
StarRocks + Paimon: 构建 Lakehouse Native 数据引擎
12月10日,Streaming Lakehouse Meetup Online EP.2重磅回归,聚焦StarRocks与Apache Paimon深度集成,探讨Lakehouse Native数据引擎的构建。活动涵盖架构统一、多源联邦分析、性能优化及可观测性提升,助力企业打造高效实时湖仓一体平台。
421 39
|
20天前
|
弹性计算 安全 应用服务中间件
阿里云服务器如何部署安装LNMP程序环境?超简单,看完就能上手!
本文详解阿里云ECS部署LNMP环境的两种方式:一是通过系统运维管理控制台“一键安装”扩展程序,快速完成部署;二是手动安装Linux+Nginx+MySQL+PHP,支持Alibaba Cloud Linux/CentOS/Ubuntu,满足WordPress等对配置与安全的定制化需求。含完整步骤、命令及验证方法。
|
1月前
|
存储 人工智能 缓存
立春破冰!阿里云Tair KVCache重磅发布:开源商业双轮驱动,击穿大模型“显存墙”
立春之际,阿里云瑶池发布 Tair KVCache——面向大模型推理的缓存加速方案,开源 KVCache Manager 与 HiSim 仿真工具,推出企业级云服务。联合 NVIDIA、SGLang 等共建“计算-存储-调度”一体化AI基础设施,突破显存瓶颈,降低90%+显存占用,助力高效低成本 AI 推理。
|
17天前
|
人工智能 语音技术 云计算
书尖 AI 功能实测|阿里云 AI 技术加持,与喜马拉雅听书体验深度对比
在阿里云AI赋能下,书尖AI实测展现三大优势:1.2亿册全品类书库、双人互动式AI播客、2分钟极速提炼书籍精华,并依托阿里云TTS实现自然听书体验。相较喜马拉雅,其AI深度解读与定制化能力更胜一筹。(239字)
|
3月前
|
人工智能 数据处理 Apache
Forrester发布流式数据平台报告:Flink 创始团队跻身领导者行列,实时AI能力获权威认可
Ververica,由Apache Flink创始团队创立、阿里云旗下企业,首次入选Forrester 2025流式数据平台领导者象限,凭借在实时AI与流处理领域的技术创新及全场景部署能力获高度认可,成为全球企业构建实时数据基础设施的核心选择。
222 10
Forrester发布流式数据平台报告:Flink 创始团队跻身领导者行列,实时AI能力获权威认可

热门文章

最新文章