Blink实时计算:Explorer大基数表的写入性能优化

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
日志服务 SLS,月写入数据量 50GB 1个月
简介: 在研发实时数据的过程中碰到了需要update写入Explore的大基数实时数据表的场景。本文记录了经过一系列方式调优后,在流量正常的情况下,任务不再出现explorer链接失败报错和延迟的全过程。

一、背景

最近在研发实时数据的过程中碰到了需要update写入Explore的大基数实时数据表的场景。为了支持支付宝某个广告位的实时看数需求,需要计算实时累计的流量效果数据。比如曝光pv、点击pv,曝光uv、点击uv,pv曝光点击率。

  • 注:Explorer是一款蚂蚁自研海量数据规模下低延时响应的实时分析型(OLAP)数据库, 目标是提供聚合查询能力和一些高阶分析功能。对标业界ClickHouse和阿里的云原生数据仓库AnalyticDB。

现根据业务需求,需要将用户多次行为记录按特定去重规则合并成1条计算。因此设计了一张在user_id、request_id粒度上的大基数explorer表,目标写入数据量在50~80万行/min左右。希望能在此基础上计算出准确的实时累计曝光pv数据。

Blink任务上线后,sink节点数据写入量在30w行/min以上时会持续报错explorer链接失败,任务频繁失败重启,导致延迟持续上涨。本文总结了常用的优化思路和操作,供参考。

com.alibaba.blink.streaming.connectors.common.exception.BlinkRuntimeException: 
************
ERR_ID:
CON-02010602
CAUSE:
Explorer operation failed, msg: Cannot get a connection, pool error Timeout waiting for idle object
ACTION:
Flush to explorer table failed, contact blink/explorer admin for help.
DETAIL:

************

explorer表例子:

image.png

二、问题出现的直接原因

直接原因是explorer集群中负载不均匀。部分机器负载过高,大批量写入的时候cpu使用率很高导致写入响应慢,最终造成写入超时报错,Blink任务失败重启。比如,explorer表配置的shard数是10(hashBucket) * 2(shardConfig_task_replicants) = 20,假如explorer集群的机器只有14台,理论上讲会有6台机器的负载比其他机器高,大批量写的时候cpu使用率会很高,写入响应慢, 导致Blink写入超时报错,任务失败重启。


三、优化思路

3.1. 均匀机器负载:通过优化explorer表分片数

  • 表的分片数需要根据集群有多少机器进行调整。比如集群机器有14台,可以通过设置hash_bucket=14,让链接都均匀的分布在14台机器上, 不让部分机器负载过高。
{
  "shardConfig_partition_columns": "test_column", // 根据该列进行哈希运算
  "hash_bucket": "14", // 哈希分桶个数
  "update_model": "Row", // 更新模式:追加写入/覆盖写入
  "shardConfig_task_replicants": "2", -- 副本数量
  "storage_engine": "test_engine",
  "storage_explorer_tier": "test",
}

3.2. 调大超时时间配置

(最简单粗暴的方式,不能从本质上解决问题)

blink是通过jdbc链接的explorer,创建explorer结果表示例如下:

create table explorer_output(
  user_id varchar, 
  request_id varchar, 
  ...,
  primary key(rowkey)
) WITH
(
  -- 写入explorer时的各种参数配置
  `user`='test_name'
  ,`url`='jdbc:mysql:///${test_ip}/cheetah?characterEncoding=utf8&autoReconnect=true&connectTimeout=10000&socketTimeout=30000&rewriteBatchedStatements=true''
  ,`zdalpassword`='${test_password}'
  ,`tablename`='test_table'
  ,`type`='explorer'
  ,`cache`='ALL'
  ,`batchInsertSize`='20000'
  ,`partitionBy`='rowkey'
)

超时配置注意事项:

  • blink链接explorer的超时时间由url中connectTimeout和socketTimeout配置。connectTimeout 是blink与explorer TCP建联的超时时间,socketTimeout是blink到explorer TCP读写数据的超时时间。

经尝试,实际将socketTimeOut适当调大后,报错的频率会减少一些。

3.3. 减少写入数据量

3.3.1. Blink sql逻辑优化:通过去重减少输出到sink算子的量

  • 【方法1】having count(*) = 1, 使得同维度下有多条数据时,只取第一条。效果类似first_value() OVER partion by xxx order by窗口函数。但是经实测后发现有问题,会丢失数据。怀疑是开了miniBatch微批处理后第一次count就超过1,数据被过滤掉了。
SELECT
  `user_id`,
  `request_id`,
   ...
FROM `expo_detail`
WHERE `request_id` IS NOT NULL
GROUP BY
  `user_id`,
  `request_id`,
   ...
having count(*) = 1 
  • 【方法2】后续改成row_number 方案,相同维度根据日志时间排序取第一条记录即可,示例代码如下:
SELECT 
      `user_id`,
      `behavior`,
      `request_id`,
        ...
      ROW_NUMBER() OVER (
        PARTITION BY   
          `behavior`,
          `request_id`,
            ...
          ORDER BY loged_time -- 顺序排就行
      ) AS rn
    FROM
    ( 
        SELECT
          `user_id`,
          `behavior`,
          `request_id`,
            ...
        FROM `expo_detail`
        WHERE `request_id` IS NOT NULL
    )
 ) WHERE rn <= 1 -- 只取第一条记录,不丢失即可

topN语句参考:https://help.aliyun.com/apsara/enterprise/v_3_15_0_20210816/sc/enterprise-developer-guide/topn-statement.html?spm=a2c4g.14484438.10001.163

3.3.2. Blink sql逻辑优化:过滤冗余数据不参与计算

  • 在读取上游数据时可以尽量过滤掉冗余数据,不参与后续算子的计算。
  • 本案例中的日志时间有两个,客户端事实发生的日志时间和服务端上报的日志时间。
  • 客户端日志时间会存在乱序的情况(比如几天前的数据延迟到达、由于时区不同导致的“未来”的时间)。通过限制log_time(客户端日志时间)在当天内且<=当前最新时间,可以过滤掉不需要的数据,减少一定数据量。
  • loged_time服务端上报时间相比客户端日志时间,乱序的情况会少一些。此处由于业务需要,以客户端日志时间为准。

3.3.3. Blink参数优化:限制读取日志流的tps

  • 可通过限制上游输入的tps,让数据稳定快速的被处理完输出出去。
  • 如果设置过大,在tps高峰出现时,source节点输入tps量会暴涨,给任务带来较大的性能压力,最终也会影响sink节点写入explorer的稳定性。
CREATE TABLE test_table (
  ....
) WITH
(
  -- blink参数配置
`batchGetSize`='5', -- 适当调小,缓解tps高峰时带来的性能压力
  ...
);

参数释义:https://help.aliyun.com/apsara/enterprise/v_3_15_0_20210816/sc/enterprise-developer-guide/create-a-log-service-source-table-1.html?spm=a2c4g.14484438.10001.114

3.3.4. Blink执行计划优化:减少source节点并发

  • 可以通过减少source节点的并发,减少下游算子压力
  • 如图,可以在Flink UI界面上查看source节点的并发数。一般source节点的并发和上游分片数保持一致。适当调小也可以减少下游写入压力。

image.png

3.4. 其他常见延迟调优方法

3.4.1. Blink参数优化:调整explorer写入参数

可以改配置控制Blink写入explorer的频率和一次写入的数据量,提高写入的效率。Explorer写入Blink参数配置说明:

参数 注释说明 备注
batchInsertSize 一次写入的条数 可选,默认200
flushPoolSize 写入线程数 可选,默认1,超过1,写数据会乱序, 如果是update表不能开启
workQueueSize executor的工作队列大小,和buffer大小成正比,调大就允许更多数据在缓存中 可选,默认是20
flushIntervalMs 刷数据周期,写入速度 1次/2s,30次/m 默认2s,表示每2s会刷一次数据
CREATE TABLE test_table (
`user_id`   VARCHAR,
`rowkey`    VARCHAR,
`loged_time`    TIMESTAMP,
 ...
primary key(`user_id`, `rowkey`)
) WITH
(
  -- 部分参数示例
`tablename`='test_table'
,`type`='explorer'
,`cache`='ALL'
,`batchInsertSize`='500' -- 一次写入的条数 默认200
,`partitionBy`='rowkey' 
,`workeQueueSize`='50' -- executor的工作队列大小,默认20
,`flushIntervalMs`='2000' -- 刷数据周期,默认2s,每2s刷一次数据
);

补充:TaskManager/subTask/slot和线程池/线程和insertBatchSize的关系

一个slot对应一个subTask,一个taskManager假设有32个slot,有32个subTask, 那么一个subTask对应一个线程。整个connectionPool共32个线程,会同时有活跃和非活跃的线程。Sink节点并发数和cpu数会影响subTask的个数。创建explorer结果表的配置insertBatchSize会影响一次写入的数据量。flushInterval影响写入的频率。update表一次写入的数据量增加,耗时会增大。

image.png

3.4.2. 避免频繁Full GC

垃圾回收期间,任务会中断执行,影响Blink性能。如何发现Full GC:通过监控Flink metrics可以直接发现。

image.png

也可以在Flink UI界面上点击某一个taskManager,点击Metric查看Old Generation GC次数,太大说明存在频繁GC的情况,该taskManager的heap内存不够。

image.png

怎么判断内存不够?一般需要缓存大量的数据的地方就需要调大Task Off-Heap堆内存。比如做开窗计算时,需要缓存window窗口段内的数据。补充:

image.png

Flink内存模型:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/deployment/memory/mem_setup_tm/

3.4.3. 其他内存怎么调


四、总结

经过以上方式调优后,在流量正常的情况下,任务不再出现explorer链接失败报错和延迟,写入explorer表的数据量稳定在60-80w行/min。

但从解决指标去重的业务问题的角度来讲,通过存uid、request_id级明细数据的方式计算实时累计曝光pv在这个场景下并不是一个最好的方案。

换个思路,该页面区块的曝光pv需要在uid、request_id级别上去重,看起来是一个多维度去重的问题,但实际上uid和request_id是1对多的关系,不是多对多,对于每一个uid,一个request_id下的多次曝光需要去重,可以转化为直接对request_id去重。计算uv是单维度去重问题的典型例子,参考常见的uv去重的方式,可以采用了hyperLogLog算法(原理见参考文献3)计算曝光pv数据。此处再附一个几种去重方案的性能对比数据,可以看出方案4 explorer存储15min级分时adm数据+hyperLogLog算法计算去重曝光pv比较好(前提是业务看数可以接受1-5%由hyperLogLog带来的误差)。

image.png

参考文档/文献

  1. Flink官方文档:https://nightlies.apache.org/flink/flink-docs-master/zh/docs/concepts/flink-architecture/
  2. 阿里云-实时计算Blink用户文档: https://help.aliyun.com/apsara/enterprise/v_3_15_0_20210816/sc/enterprise-user-guide/what-is-realtime-compute1.html?spm=a2c4g.14484438.10001.25
  3. Flajolet, P., Fusy, É., Gandouet, O., & Meunier, F. (2007). Hyperloglog: the analysis of a near-optimal cardinality estimation algorithm. Discrete mathematics & theoretical computer science, (Proceedings).



作者 | 怀璟

来源 | 阿里云开发者公众号


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
6月前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之通过flink同步kafka数据进到doris,decimal数值类型的在kafka是正常显示数值,但是同步到doris表之后数据就变成了整数,该如何处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
NoSQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之设置什么参数可以让多张表同时写入
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL 缓存 监控
14个Flink SQL性能优化实践分享
【7月更文挑战第12天】 1. **合理设置并行度**: 根据数据量和资源调整以提高处理速度. 2. **优化数据源**: 使用分区表并进行预处理减少输入量. 3. **数据缓存**: 采用 `BROADCAST` 或 `REPARTITION` 缓存常用数据. 4. **索引和分区**: 创建索引并按常用字段分区. 5. **避免不必要的计算**: 检查并移除多余的计算步骤. 6. **调整内存配置**: 分配足够内存避免性能下降. 7. **优化连接操作**: 选择适合大表和小表的连接方式. 8. **数据类型优化**: 选择合适类型以节省资源. ........
149 1
|
6月前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之 从Kafka读取数据,并与两个仅在任务启动时读取一次的维度表进行内连接(inner join)时,如果没有匹配到的数据会被直接丢弃还是会被存储在内存中
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之CTAS特性只支持新增表,不支持删除表吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL 数据处理 流计算
实时计算 Flink版产品使用问题之怎么创建永久表
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
实时计算 Flink版产品使用问题之怎么创建永久表
|
6月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版产品使用问题之任务在同步过程中新增同步表后选择全量初始化历史数据,是否会阻塞原先其余表的增量同步
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
实时计算 Flink版产品使用问题之任务在同步过程中新增同步表后选择全量初始化历史数据,是否会阻塞原先其余表的增量同步
|
5月前
|
SQL 监控 关系型数据库
实时计算 Flink版操作报错合集之在设置监控PostgreSQL数据库时,将wal_level设置为logical,出现一些表更新和删除操作报错,怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
SQL 分布式计算 Hadoop
实时计算 Flink版产品使用问题之建了一张upsert-kafka的flink表,但是数据为空,该如何排查
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL 数据处理 API
实时计算 Flink版产品使用问题之设置什么可以控制非binlogSource表的轮询时间
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。