Flink报错问题之flink 1.11 sql作业提交JM报错如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:flink1.10.1 flink sql消费kafka当parallelism大于1时不产生wat

本人使用的flink版本为flink 1.10.1, flink sql消费kafka, 当parallelism为1时正常运行,但讲parallelism修改为2时,在yarn-session web页面看不到watermark的指标信息了,也没有计算结果输出,sql如下:

insert into

x.report.bi_report_fence_common_indicators

select

fence_id,

'finishedOrderCnt' as indicator_name,

TUMBLE_END(dt, INTERVAL '5' MINUTE) as ts,

count(1) as indicator_val

from

(

select

dt,

fence_id,

fence_coordinates_array,

c.driver_location

from

(

select

*

from

(

select

dt,

driver_location,

r1.f1.fence_info as fence_info

from

(

select

o.dt,

o.driver_location,

MD5(r.city_code) as k,

PROCTIME() as proctime

from

(

select

order_no,

dt,

driver_location,

PROCTIME() as proctime

from

x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner

where

_type = 'insert'

and event_code = 'arriveAndSettlement'

) o

LEFT JOIN x.dim.saic_trip_create_t_order FOR SYSTEM_TIME AS OF o.proctime AS r ON r.order_no = o.order_no

) o1

LEFT JOIN x.dim.fence_info FOR SYSTEM_TIME AS OF o1.proctime AS r1 ON r1.k = o1.k

) a

where

fence_info is not null

) c

LEFT JOIN LATERAL TABLE(fence_split(c.fence_info)) as T(fence_id, fence_coordinates_array) ON TRUE

) as b

where

in_fence(fence_coordinates_array, driver_location)

group by

TUMBLE(dt, INTERVAL '5' MINUTE),

fence_id;

其中 x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner表中dt为watermark字段,建表语句如下:

CREATE TABLE x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner(

_type STRING,

_old_id BIGINT,

id BIGINT,

_old_order_no STRING,

order_no STRING,

_old_event_code STRING,

event_code STRING,

_old_from_state TINYINT,

from_state TINYINT,

_old_to_state TINYINT,

to_state TINYINT,

_old_operator_type TINYINT,

operator_type TINYINT,

_old_passenger_location STRING,

passenger_location STRING,

_old_driver_location STRING,

driver_location STRING,

_old_trans_time STRING,

trans_time STRING,

_old_create_time STRING,

create_time STRING,

_old_update_time STRING,

update_time STRING,

_old_passenger_poi_address STRING,

passenger_poi_address STRING,

_old_passenger_detail_address STRING,

passenger_detail_address STRING,

_old_driver_poi_address STRING,

driver_poi_address STRING,

_old_driver_detail_address STRING,

driver_detail_address STRING,

_old_operator STRING,

operator STRING,

_old_partition_index TINYINT,

partition_index TINYINT,

dt as TO_TIMESTAMP(trans_time),

WATERMARK FOR dt AS dt - INTERVAL '5' SECOND

) WITH (

'connector.type' = 'kafka',

'connector.properties.bootstrap.servers' = '*',

'connector.properties.zookeeper.connect' = '*',

'connector.version' = 'universal',

'format.type' = 'json',

'connector.properties.group.id' = 'testGroup',

'connector.startup-mode' = 'group-offsets',

'connector.topic' = 'xxxxx'

)

*来自志愿者整理的flink邮件归档



参考答案:

可以先看下 Kakfa topic 对应的partition有几个?是否每个分区都有数据。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370157?spm=a2c6h.12873639.article-detail.103.6f9243783Lv0fl



问题二:flink sql 侧输出

大佬们、我们这面主要基于blink sql完成转换计算,但是可能会有延迟数据,现在想把延迟数据通过侧输出保存下来,在table/sql

api中要怎么操作比较合理一点?或者有没有其他处理延迟数据的方式?

*来自志愿者整理的flink邮件归档



参考答案:

Flink SQL/Table 目前还不支持 side output。不过有一个实验性的功能可以处理延迟数据,

你可以给你的作业配上:

table.exec.emit.late-fire.enabled = true

table.exec.emit.late-fire.delay = 1min

同时 TableConfig#setIdleStateRetentionTime 需要配上,表示窗口状态允许保留多久,即 window

allowLateness 。

具体可以看下 org.apache.flink.table.planner.plan.utils.WindowEmitStrategy 这个类。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370156?spm=a2c6h.12873639.article-detail.104.6f9243783Lv0fl



问题三:flink 1.11 sql作业提交JM报错

我使用flink 1.11提交sql作业,从JM日志中看到有如下异常。我的作业里会通过tEnv.executeSQL执行多个ddl语句,通过tEnv.createStatementSet add多个dml语句,并执行execute。 如下异常可能原因是啥呢?还有个问题,这个异常虽然抛出来了,但是作业还是正常启动执行了。这又是为何?是不是不推荐在作业里同时使用executeSQL和statementset.execute?

Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than one execute() or executeAsync() call in a single environment. at org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:139) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:127) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57) ~[flink-table-blink_2.12-1.11.0.jar:1.11.0] at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699) ~[flink-table_2.12-1.11.0.jar:1.11.0] ... 24 more

*来自志愿者整理的flink邮件归档



参考答案:

简单说下这两个方法, TableEnvironment.executeSql(String statement)是为了用于执行单条的 sql 语句, SQL语句可以是 DDL/DML/DCL/DQL, DML(如insert)和DQL(如select)的执行是等 Flink job提交后返回该方法的执行结果,DDL(create table ...) 和 DCL(use database …) 的执行是对应的SQL语句执行完成就返回,理解起来就是需要提交 Flink job 的SQL需要等 job 提交后返回结果,其他是立即执行并返回。 Statementset.execute() 主要用于执行批量的 sql 语句,sql 语句只能是 insert xx,可以看接口的方法, 这个接口主要是为了 SQL 里有多个query的情况, (比如multiple sink:insert tableA from xx ;insert tableB from xx), 如果调用 TableEnvironment.executeSql(“insert tableA from xx”), TableEnvironment.executeSql(“insert tableA from xx”) 就会起两个 Flink job, 这应该不是用户需要的。 具体使用根据你的需要来使用。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370155?spm=a2c6h.12873639.article-detail.105.6f9243783Lv0fl



问题四:flink 1.11 local execution oom问题

我在使用1.11版本在本地idea起一个作业时,并发为1,抛出了如下关于内存的异常。。问题是之前从来没有显示配置过taskmanager的memory参数,这是为何? 感觉由1.10升级到1.11问题还是挺多的。。我尝试增加了JVM参数,增加DirectMemory内存配置,还是没有作用,请教大神帮忙看下。

Exception in thread "main" java.lang.OutOfMemoryError: Could not allocate enough memory segments for NetworkBufferPool (required (Mb): 64, allocated (Mb): 63, missing (Mb): 1). Cause: Direct buffer memory. The direct out-of-memory error has occurred. This can mean two things: either job(s) require(s) a larger size of JVM direct memory or there is a direct memory leak. The direct memory can be allocated by user code or some of its dependencies. In this case 'taskmanager.memory.task.off-heap.size' configuration option should be increased. Flink framework and its dependencies also consume the direct memory, mostly for network communication. The most of network memory is managed by Flink and should not result in out-of-memory error. In certain special cases, in particular for jobs with high parallelism, the framework may require more direct memory which is not managed by Flink. In this case 'taskmanager.memory.framework.off-heap.size' configuration option should be increased. If the error persists then there is probably a direct memory leak in user code or some of its dependencies which has to be investigated and fixed. The task executor has to be shutdown...

*来自志愿者整理的flink邮件归档



参考答案:

这个问题可以看下是否和 releasenote[1] 中 memory configuration

相关的修改有关,具体到这个错误,你可以按照提示增加一些内存看看

[1]

https://flink.apache.org/news/2020/07/06/release-1.11.0.html#other-improvements

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370153?spm=a2c6h.12873639.article-detail.106.6f9243783Lv0fl



问题五:flink1.10.1在yarn上无法写入kafka的问题

请教各位:

flink任务在本机写入测试环境kafka集群没问题,

但是上传到yarn环境,就是写不进去,其他job运行在yarn可以写入测试环境的kafka

异常信息如下:

2020-07-09 19:17:33,126 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - KeyedProcess (1/2) (9449b1e3b758a40fb5e1e60cf84fd844) switched from DEPLOYING to RUNNING.

2020-07-09 19:17:33,164 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - KeyedProcess (2/2) (bc6eefd911cf44412121939d0afa6a81) switched from DEPLOYING to RUNNING.

2020-07-09 19:17:39,049 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - async wait operator -> Sink: Unnamed (1/2) (cfc31005099a8ad7e44a94dc617dd45f) switched from RUNNING to FAILE

D.

org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka:

at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1225)

at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:767)

at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)

at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)

at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)

at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)

at org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.emitResult(StreamRecordQueueEntry.java:65)

at org.apache.flink.streaming.api.operators.async.queue.OrderedStreamElementQueue.emitCompletedElement(OrderedStreamElementQueue.java:71)

*来自志愿者整理的flink邮件归档



参考答案:

应该是你提交到 Yarn 的环境,这个环境和你的测试环境的 kafka 连接不上,获取不到元数据。

这里你检查一下你的 Yarn 环境,Flink kafka broker 地址是否是测试环境的 kafka broker 地址。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370151?spm=a2c6h.12873639.article-detail.107.6f9243783Lv0fl

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
24天前
|
SQL JSON Kubernetes
Seata常见问题之服务端 error日志没有输出,客户端执行sql报错如何解决
Seata 是一个开源的分布式事务解决方案,旨在提供高效且简单的事务协调机制,以解决微服务架构下跨服务调用(分布式场景)的一致性问题。以下是Seata常见问题的一个合集
97 0
|
1月前
|
SQL 数据库 索引
解决SQL报错:索引中丢失IN或OUT參数
解决SQL报错:索引中丢失IN或OUT參数
|
1月前
|
SQL 资源调度 Oracle
Flink CDC产品常见问题之sql运行中查看日志任务失败如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
消息中间件 Java Kafka
Apache Hudi + Flink作业运行指南
Apache Hudi + Flink作业运行指南
80 1
|
1月前
|
SQL 分布式计算 DataWorks
dataworks常见问题之通过sql查询查看任务依赖关系如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
28 1
|
1月前
|
SQL JSON 运维
dataworks常见问题之selectdb前置sql参数无法获取如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
28 0
|
1月前
|
SQL 关系型数据库 MySQL
Flink 提供了一种名为 Flink SQL 的查询语言,它支持多种数据库之间的 DDL 语句转换
【2月更文挑战第18天】Flink 提供了一种名为 Flink SQL 的查询语言,它支持多种数据库之间的 DDL 语句转换
171 2
|
1月前
|
SQL 存储 Apache
在 Apache Flink SQL 中,并没有内置的 GROUP_CONCAT 函数
【2月更文挑战第16天】在 Apache Flink SQL 中,并没有内置的 GROUP_CONCAT 函数
186 2
|
2月前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
476 5
|
30天前
|
SQL Java API
官宣|Apache Flink 1.19 发布公告
Apache Flink PMC(项目管理委员)很高兴地宣布发布 Apache Flink 1.19.0。
1073 1
官宣|Apache Flink 1.19 发布公告

相关产品

  • 实时计算 Flink版