开发者社区 问答 正文

使用了流应用中使用了mysql jdbc的sour,Execution处于FINISHED无反应

HI! 这边做测试时遇到一个问题: 在流应用中使用了一个mysql jdbc的source作为维表,为了优化处理效率使用了Lookup Cache,下面是注册的表: bsTableEnv.executeSql("CREATE TABLE tm_dealers (dealer_code STRING,is_valid DECIMAL(10,0),proctime AS PROCTIME(),PRIMARY KEY (dealer_code) NOT ENFORCED\n" + ") WITH (" + "'connector' = 'jdbc'," + "'url' = 'jdbc:mysql://10.0.15.83:3306/flink-test?useSSL=false'," + "'table-name' = 'tm_dealers'," + "'driver' = 'com.mysql.cj.jdbc.Driver'," + "'username' = 'root'," + "'password' = 'Cdh2020:1'," + "'lookup.cache.max-rows' = '500',"+ "'lookup.cache.ttl' = '1800s',"+ "'sink.buffer-flush.interval' = '60s'"+ ")");

我发现这样的话checkpoint配置会失效,不能触发检查点,日志报如下错误: job bad9f419433f78d24e703e659b169917 is notin state RUNNING but FINISHED instead. Aborting checkpoint.

进入WEB UI 看一下视图发现该Execution处于FINISHED状态,FINISHED状态无法进行checkpoint,这种有其它办法吗?

感谢大佬指导一下,拜谢!*来自志愿者整理的flink

展开
收起
雪哥哥 2021-12-05 12:04:02 648 分享 版权
阿里云 AI 助理回答

您好!遇到这个问题,看起来是由于您的Flink作业在尝试进行checkpoint时,其执行状态已经变成了FINISHED,这导致了checkpoint无法正常触发。针对您描述的情况,这里有几个可能的解决方向和建议:

  1. 检查作业逻辑:首先确认您的流处理作业是否设计为一次性(one-time)执行或者有误操作导致作业提前结束。如果作业逻辑中存在使得作业自然完成的条件,请检查并确保作业应该是长期运行的服务型作业,以便持续接受数据并能周期性地做checkpoint。

  2. Checkpoint配置验证:请再次检查您的Flink作业的checkpoint配置是否正确设置,并且确保相关的checkpoint策略(如checkpoint间隔、模式等)已正确定义。同时,确认CheckpointingMode是否被显式设置为适合您场景的模式,比如EXACTLY_ONCEAT_LEAST_ONCE

  3. 资源与配置问题:有时候,资源限制(如CPU、内存不足)或配置不当(例如超时时间设置过短)也可能导致作业意外终止。检查您的Flink集群资源分配情况,以及是否有任何异常日志提示资源不足或配置错误。

  4. 依赖库版本兼容性:确认使用的Flink版本与MySQL JDBC驱动以及其他相关依赖库之间的兼容性。有时,特定版本组合可能会引入未知的问题。

  5. 使用异步快照机制:Flink支持异步快照机制,可以提高checkpoint效率并减少对作业本身的影响。确保您的环境中启用了这一特性。

  6. 查看详细的日志信息:深入分析报错日志,特别是那些关于作业为何提前结束的具体原因的日志。这有助于定位问题的根本原因。

  7. 考虑使用Savepoints而非仅依赖Checkpoints:如果您需要在作业停止后能够恢复到某个确切的状态,除了定期的checkpoint外,还可以考虑使用savepoints。Savepoints提供了一种更灵活的手动保存作业状态的方式,可以在作业重启时从指定的savepoint恢复,而不受作业生命周期状态的影响。

如果以上建议仍不能解决问题,建议详细查阅Flink官方文档关于checkpoint和作业管理的部分,或者在Flink社区论坛提问,那里有更多的专业人士和开发者可以提供帮助。希望这些建议对您有所帮助!您可以复制页面截图提供更多信息,我可以进一步帮您分析问题原因。

有帮助
无帮助
AI 助理回答生成答案可能存在不准确,仅供参考
0 条回答
写回答
取消 提交回答