Flink报错问题之flink 1.11 sql作业提交JM报错如何解决

简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:flink1.10.1 flink sql消费kafka当parallelism大于1时不产生wat

本人使用的flink版本为flink 1.10.1, flink sql消费kafka, 当parallelism为1时正常运行,但讲parallelism修改为2时,在yarn-session web页面看不到watermark的指标信息了,也没有计算结果输出,sql如下:

insert into

x.report.bi_report_fence_common_indicators

select

fence_id,

'finishedOrderCnt' as indicator_name,

TUMBLE_END(dt, INTERVAL '5' MINUTE) as ts,

count(1) as indicator_val

from

(

select

dt,

fence_id,

fence_coordinates_array,

c.driver_location

from

(

select

*

from

(

select

dt,

driver_location,

r1.f1.fence_info as fence_info

from

(

select

o.dt,

o.driver_location,

MD5(r.city_code) as k,

PROCTIME() as proctime

from

(

select

order_no,

dt,

driver_location,

PROCTIME() as proctime

from

x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner

where

_type = 'insert'

and event_code = 'arriveAndSettlement'

) o

LEFT JOIN x.dim.saic_trip_create_t_order FOR SYSTEM_TIME AS OF o.proctime AS r ON r.order_no = o.order_no

) o1

LEFT JOIN x.dim.fence_info FOR SYSTEM_TIME AS OF o1.proctime AS r1 ON r1.k = o1.k

) a

where

fence_info is not null

) c

LEFT JOIN LATERAL TABLE(fence_split(c.fence_info)) as T(fence_id, fence_coordinates_array) ON TRUE

) as b

where

in_fence(fence_coordinates_array, driver_location)

group by

TUMBLE(dt, INTERVAL '5' MINUTE),

fence_id;

其中 x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner表中dt为watermark字段,建表语句如下:

CREATE TABLE x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner(

_type STRING,

_old_id BIGINT,

id BIGINT,

_old_order_no STRING,

order_no STRING,

_old_event_code STRING,

event_code STRING,

_old_from_state TINYINT,

from_state TINYINT,

_old_to_state TINYINT,

to_state TINYINT,

_old_operator_type TINYINT,

operator_type TINYINT,

_old_passenger_location STRING,

passenger_location STRING,

_old_driver_location STRING,

driver_location STRING,

_old_trans_time STRING,

trans_time STRING,

_old_create_time STRING,

create_time STRING,

_old_update_time STRING,

update_time STRING,

_old_passenger_poi_address STRING,

passenger_poi_address STRING,

_old_passenger_detail_address STRING,

passenger_detail_address STRING,

_old_driver_poi_address STRING,

driver_poi_address STRING,

_old_driver_detail_address STRING,

driver_detail_address STRING,

_old_operator STRING,

operator STRING,

_old_partition_index TINYINT,

partition_index TINYINT,

dt as TO_TIMESTAMP(trans_time),

WATERMARK FOR dt AS dt - INTERVAL '5' SECOND

) WITH (

'connector.type' = 'kafka',

'connector.properties.bootstrap.servers' = '*',

'connector.properties.zookeeper.connect' = '*',

'connector.version' = 'universal',

'format.type' = 'json',

'connector.properties.group.id' = 'testGroup',

'connector.startup-mode' = 'group-offsets',

'connector.topic' = 'xxxxx'

)

*来自志愿者整理的flink邮件归档



参考答案:

可以先看下 Kakfa topic 对应的partition有几个?是否每个分区都有数据。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370157?spm=a2c6h.12873639.article-detail.103.6f9243783Lv0fl



问题二:flink sql 侧输出

大佬们、我们这面主要基于blink sql完成转换计算,但是可能会有延迟数据,现在想把延迟数据通过侧输出保存下来,在table/sql

api中要怎么操作比较合理一点?或者有没有其他处理延迟数据的方式?

*来自志愿者整理的flink邮件归档



参考答案:

Flink SQL/Table 目前还不支持 side output。不过有一个实验性的功能可以处理延迟数据,

你可以给你的作业配上:

table.exec.emit.late-fire.enabled = true

table.exec.emit.late-fire.delay = 1min

同时 TableConfig#setIdleStateRetentionTime 需要配上,表示窗口状态允许保留多久,即 window

allowLateness 。

具体可以看下 org.apache.flink.table.planner.plan.utils.WindowEmitStrategy 这个类。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370156?spm=a2c6h.12873639.article-detail.104.6f9243783Lv0fl



问题三:flink 1.11 sql作业提交JM报错

我使用flink 1.11提交sql作业,从JM日志中看到有如下异常。我的作业里会通过tEnv.executeSQL执行多个ddl语句,通过tEnv.createStatementSet add多个dml语句,并执行execute。 如下异常可能原因是啥呢?还有个问题,这个异常虽然抛出来了,但是作业还是正常启动执行了。这又是为何?是不是不推荐在作业里同时使用executeSQL和statementset.execute?

Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than one execute() or executeAsync() call in a single environment. at org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:139) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:127) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57) ~[flink-table-blink_2.12-1.11.0.jar:1.11.0] at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699) ~[flink-table_2.12-1.11.0.jar:1.11.0] ... 24 more

*来自志愿者整理的flink邮件归档



参考答案:

简单说下这两个方法, TableEnvironment.executeSql(String statement)是为了用于执行单条的 sql 语句, SQL语句可以是 DDL/DML/DCL/DQL, DML(如insert)和DQL(如select)的执行是等 Flink job提交后返回该方法的执行结果,DDL(create table ...) 和 DCL(use database …) 的执行是对应的SQL语句执行完成就返回,理解起来就是需要提交 Flink job 的SQL需要等 job 提交后返回结果,其他是立即执行并返回。 Statementset.execute() 主要用于执行批量的 sql 语句,sql 语句只能是 insert xx,可以看接口的方法, 这个接口主要是为了 SQL 里有多个query的情况, (比如multiple sink:insert tableA from xx ;insert tableB from xx), 如果调用 TableEnvironment.executeSql(“insert tableA from xx”), TableEnvironment.executeSql(“insert tableA from xx”) 就会起两个 Flink job, 这应该不是用户需要的。 具体使用根据你的需要来使用。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370155?spm=a2c6h.12873639.article-detail.105.6f9243783Lv0fl



问题四:flink 1.11 local execution oom问题

我在使用1.11版本在本地idea起一个作业时,并发为1,抛出了如下关于内存的异常。。问题是之前从来没有显示配置过taskmanager的memory参数,这是为何? 感觉由1.10升级到1.11问题还是挺多的。。我尝试增加了JVM参数,增加DirectMemory内存配置,还是没有作用,请教大神帮忙看下。

Exception in thread "main" java.lang.OutOfMemoryError: Could not allocate enough memory segments for NetworkBufferPool (required (Mb): 64, allocated (Mb): 63, missing (Mb): 1). Cause: Direct buffer memory. The direct out-of-memory error has occurred. This can mean two things: either job(s) require(s) a larger size of JVM direct memory or there is a direct memory leak. The direct memory can be allocated by user code or some of its dependencies. In this case 'taskmanager.memory.task.off-heap.size' configuration option should be increased. Flink framework and its dependencies also consume the direct memory, mostly for network communication. The most of network memory is managed by Flink and should not result in out-of-memory error. In certain special cases, in particular for jobs with high parallelism, the framework may require more direct memory which is not managed by Flink. In this case 'taskmanager.memory.framework.off-heap.size' configuration option should be increased. If the error persists then there is probably a direct memory leak in user code or some of its dependencies which has to be investigated and fixed. The task executor has to be shutdown...

*来自志愿者整理的flink邮件归档



参考答案:

这个问题可以看下是否和 releasenote[1] 中 memory configuration

相关的修改有关,具体到这个错误,你可以按照提示增加一些内存看看

[1]

https://flink.apache.org/news/2020/07/06/release-1.11.0.html#other-improvements

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370153?spm=a2c6h.12873639.article-detail.106.6f9243783Lv0fl



问题五:flink1.10.1在yarn上无法写入kafka的问题

请教各位:

flink任务在本机写入测试环境kafka集群没问题,

但是上传到yarn环境,就是写不进去,其他job运行在yarn可以写入测试环境的kafka

异常信息如下:

2020-07-09 19:17:33,126 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - KeyedProcess (1/2) (9449b1e3b758a40fb5e1e60cf84fd844) switched from DEPLOYING to RUNNING.

2020-07-09 19:17:33,164 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - KeyedProcess (2/2) (bc6eefd911cf44412121939d0afa6a81) switched from DEPLOYING to RUNNING.

2020-07-09 19:17:39,049 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - async wait operator -> Sink: Unnamed (1/2) (cfc31005099a8ad7e44a94dc617dd45f) switched from RUNNING to FAILE

D.

org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka:

at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1225)

at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:767)

at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)

at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)

at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)

at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)

at org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.emitResult(StreamRecordQueueEntry.java:65)

at org.apache.flink.streaming.api.operators.async.queue.OrderedStreamElementQueue.emitCompletedElement(OrderedStreamElementQueue.java:71)

*来自志愿者整理的flink邮件归档



参考答案:

应该是你提交到 Yarn 的环境,这个环境和你的测试环境的 kafka 连接不上,获取不到元数据。

这里你检查一下你的 Yarn 环境,Flink kafka broker 地址是否是测试环境的 kafka broker 地址。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370151?spm=a2c6h.12873639.article-detail.107.6f9243783Lv0fl

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
7月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
1058 43
|
7月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
466 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
8月前
|
SQL 消息中间件 Kafka
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是 Apache Flink 提供的 SQL 引擎,支持流批一体处理,统一操作流数据与批数据,具备高性能、低延迟、丰富数据源支持及标准 SQL 兼容性,适用于实时与离线数据分析。
1156 1
|
7月前
|
存储 分布式计算 数据处理
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
阿里云实时计算Flink团队,全球领先的流计算引擎缔造者,支撑双11万亿级数据处理,推动Apache Flink技术发展。现招募Flink执行引擎、存储引擎、数据通道、平台管控及产品经理人才,地点覆盖北京、杭州、上海。技术深度参与开源核心,打造企业级实时计算解决方案,助力全球企业实现毫秒洞察。
705 0
「48小时极速反馈」阿里云实时计算Flink广招天下英雄
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
4239 74
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
zdl
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
684 56
|
人工智能 Apache 流计算
Flink Forward Asia 2024 上海站|探索实时计算新边界
Flink Forward Asia 2024 即将盛大开幕!11 月 29 至 30 日在上海举行,大会聚焦 Apache Flink 技术演进与未来规划,涵盖流式湖仓、流批一体、Data+AI 融合等前沿话题,提供近百场专业演讲。立即报名,共襄盛举!官网:https://asia.flink-forward.org/shanghai-2024/
1506 33
Flink Forward Asia 2024 上海站|探索实时计算新边界
|
消息中间件 关系型数据库 MySQL
Flink CDC 在阿里云实时计算Flink版的云上实践
本文整理自阿里云高级开发工程师阮航在Flink Forward Asia 2024的分享,重点介绍了Flink CDC与实时计算Flink的集成、CDC YAML的核心功能及应用场景。主要内容包括:Flink CDC的发展及其在流批数据处理中的作用;CDC YAML支持的同步链路、Transform和Route功能、丰富的监控指标;典型应用场景如整库同步、Binlog原始数据同步、分库分表同步等;并通过两个Demo展示了MySQL整库同步到Paimon和Binlog同步到Kafka的过程。最后,介绍了未来规划,如脏数据处理、数据限流及扩展数据源支持。
870 0
Flink CDC 在阿里云实时计算Flink版的云上实践

相关产品

  • 实时计算 Flink版