flink进行etl向doris 写入数据时,报错如下!这个会是什么原因?

flink进行etl向doris 写入数据时,报错如下image.png
,但是检查之后发现be运行一直正常,上次启动时间已经是8月31号的了?这个会是什么原因?代码结构相同的其余任务就没有发现这个问题?

展开
收起
真的很搞笑 2023-09-19 20:26:58 195 分享 版权
阿里云 AI 助理回答

根据您提供的报错信息以及知识库中的相关资料,以下是可能导致问题的原因及解决方案:


1. 报错原因分析

在Flink向Doris写入数据时,出现异常但BE(Backend)运行正常且上次启动时间较早(8月31日),可能的原因包括以下几种:

1.1 事务超限

报错信息中提到的current running txns on db 10006 is 100, larger than limit 100表明当前数据库的事务数超过了Doris的限制。
- 原因:Doris对每个数据库的并发事务数有限制,默认为100。如果事务未及时提交或回滚,可能会导致事务堆积。 - 检查点: - 检查是否有大量未完成的事务。 - 确认Flink任务是否频繁开启新事务而未正确关闭。

1.2 数据乱序

Flink写入Doris时,如果数据未按照主键进行Shuffle,可能会导致写入失败或性能下降。
- 原因:相同主键的数据分布在不同的Flink Task上,无法保证写入顺序。 - 检查点: - 确认Flink SQL逻辑中是否按照Doris表的主键进行了Shuffle。

1.3 脏数据问题

如果Flink导入过程中存在脏数据(如格式错误、字段缺失等),可能会导致写入失败。
- 原因:Doris对数据格式有严格要求,脏数据会导致写入中断。 - 检查点: - 检查Flink任务的日志,确认是否存在脏数据。 - 配置Flink跳过脏数据的参数(如max-error-rows)。

1.4 Thrift框架并发Bug

报错信息中提到的TApplicationException: get_next failed: out of sequence response可能是由于Thrift框架的并发Bug导致的。
- 原因:Flink Connector与Doris之间的通信可能存在兼容性问题。 - 检查点: - 确认使用的Flink Connector版本是否为最新。 - 确认Flink版本与Doris版本是否兼容。

1.5 BE节点状态异常

虽然BE节点显示运行正常,但可能存在隐性问题(如内存不足、磁盘空间不足等)。
- 原因:BE节点资源不足可能导致写入失败。 - 检查点: - 检查BE节点的磁盘空间和内存使用情况。 - 查看BE节点日志,确认是否存在异常。


2. 解决方案

2.1 处理事务超限问题

  • 操作步骤
    1. 登录Doris集群,执行以下SQL查询当前事务状态:
      SHOW PROC '/transactions/{db_id}';
      
    2. 如果发现大量未完成事务,手动清理超时事务:
      KILL TRANSACTION {transaction_id};
      
    3. 调整Doris的事务限制(如需):
      SET global max_running_txn_num_per_db = 200;
      

2.2 确保数据有序性

  • 操作步骤
    1. 在Flink SQL中,确保写入Doris时按照主键进行Shuffle:
      CREATE TABLE doris_sink (
       id BIGINT,
       name STRING,
       PRIMARY KEY (id) NOT ENFORCED
      ) WITH (
       'connector' = 'doris',
       'fenodes' = 'doris_host:8030',
       'table.identifier' = 'db.table',
       'sink.enable-2pc' = 'true'
      );
      
    2. 确认Flink任务的并行度设置合理,避免数据分布不均。

2.3 跳过脏数据

  • 操作步骤
    1. 在Flink任务配置中添加以下参数,允许跳过一定数量的脏数据:
      sink.max-error-rows=1000
      
    2. 检查Flink任务日志,定位脏数据来源并修复上游数据源。

2.4 升级Flink Connector

  • 操作步骤
    1. 下载并使用最新版本的Flink-Doris Connector。
    2. 确认Flink版本与Connector版本兼容。

2.5 检查BE节点状态

  • 操作步骤
    1. 登录Doris集群,检查BE节点的磁盘和内存使用情况:
      df -h
      free -m
      
    2. 查看BE节点日志,确认是否存在异常:
      tail -f be.INFO
      

3. 其他建议

  • 监控与告警:建议为Doris集群配置监控和告警,及时发现资源不足或事务超限等问题。
  • 代码一致性检查:对比代码结构相同的其他任务,确认是否存在配置差异(如并行度、Sink参数等)。

通过以上步骤,您可以逐步排查并解决Flink向Doris写入数据时的异常问题。如果问题仍未解决,请提供更详细的报错日志以便进一步分析。您可以复制页面截图提供更多信息,我可以进一步帮您分析问题原因。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

还有其他疑问?
咨询AI助理