Flink报错问题之flink 1.11 sql作业提交JM报错如何解决

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:flink1.10.1 flink sql消费kafka当parallelism大于1时不产生wat

本人使用的flink版本为flink 1.10.1, flink sql消费kafka, 当parallelism为1时正常运行,但讲parallelism修改为2时,在yarn-session web页面看不到watermark的指标信息了,也没有计算结果输出,sql如下:

insert into

x.report.bi_report_fence_common_indicators

select

fence_id,

'finishedOrderCnt' as indicator_name,

TUMBLE_END(dt, INTERVAL '5' MINUTE) as ts,

count(1) as indicator_val

from

(

select

dt,

fence_id,

fence_coordinates_array,

c.driver_location

from

(

select

*

from

(

select

dt,

driver_location,

r1.f1.fence_info as fence_info

from

(

select

o.dt,

o.driver_location,

MD5(r.city_code) as k,

PROCTIME() as proctime

from

(

select

order_no,

dt,

driver_location,

PROCTIME() as proctime

from

x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner

where

_type = 'insert'

and event_code = 'arriveAndSettlement'

) o

LEFT JOIN x.dim.saic_trip_create_t_order FOR SYSTEM_TIME AS OF o.proctime AS r ON r.order_no = o.order_no

) o1

LEFT JOIN x.dim.fence_info FOR SYSTEM_TIME AS OF o1.proctime AS r1 ON r1.k = o1.k

) a

where

fence_info is not null

) c

LEFT JOIN LATERAL TABLE(fence_split(c.fence_info)) as T(fence_id, fence_coordinates_array) ON TRUE

) as b

where

in_fence(fence_coordinates_array, driver_location)

group by

TUMBLE(dt, INTERVAL '5' MINUTE),

fence_id;

其中 x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner表中dt为watermark字段,建表语句如下:

CREATE TABLE x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner(

_type STRING,

_old_id BIGINT,

id BIGINT,

_old_order_no STRING,

order_no STRING,

_old_event_code STRING,

event_code STRING,

_old_from_state TINYINT,

from_state TINYINT,

_old_to_state TINYINT,

to_state TINYINT,

_old_operator_type TINYINT,

operator_type TINYINT,

_old_passenger_location STRING,

passenger_location STRING,

_old_driver_location STRING,

driver_location STRING,

_old_trans_time STRING,

trans_time STRING,

_old_create_time STRING,

create_time STRING,

_old_update_time STRING,

update_time STRING,

_old_passenger_poi_address STRING,

passenger_poi_address STRING,

_old_passenger_detail_address STRING,

passenger_detail_address STRING,

_old_driver_poi_address STRING,

driver_poi_address STRING,

_old_driver_detail_address STRING,

driver_detail_address STRING,

_old_operator STRING,

operator STRING,

_old_partition_index TINYINT,

partition_index TINYINT,

dt as TO_TIMESTAMP(trans_time),

WATERMARK FOR dt AS dt - INTERVAL '5' SECOND

) WITH (

'connector.type' = 'kafka',

'connector.properties.bootstrap.servers' = '*',

'connector.properties.zookeeper.connect' = '*',

'connector.version' = 'universal',

'format.type' = 'json',

'connector.properties.group.id' = 'testGroup',

'connector.startup-mode' = 'group-offsets',

'connector.topic' = 'xxxxx'

)

*来自志愿者整理的flink邮件归档



参考答案:

可以先看下 Kakfa topic 对应的partition有几个?是否每个分区都有数据。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370157?spm=a2c6h.12873639.article-detail.103.6f9243783Lv0fl



问题二:flink sql 侧输出

大佬们、我们这面主要基于blink sql完成转换计算,但是可能会有延迟数据,现在想把延迟数据通过侧输出保存下来,在table/sql

api中要怎么操作比较合理一点?或者有没有其他处理延迟数据的方式?

*来自志愿者整理的flink邮件归档



参考答案:

Flink SQL/Table 目前还不支持 side output。不过有一个实验性的功能可以处理延迟数据,

你可以给你的作业配上:

table.exec.emit.late-fire.enabled = true

table.exec.emit.late-fire.delay = 1min

同时 TableConfig#setIdleStateRetentionTime 需要配上,表示窗口状态允许保留多久,即 window

allowLateness 。

具体可以看下 org.apache.flink.table.planner.plan.utils.WindowEmitStrategy 这个类。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370156?spm=a2c6h.12873639.article-detail.104.6f9243783Lv0fl



问题三:flink 1.11 sql作业提交JM报错

我使用flink 1.11提交sql作业,从JM日志中看到有如下异常。我的作业里会通过tEnv.executeSQL执行多个ddl语句,通过tEnv.createStatementSet add多个dml语句,并执行execute。 如下异常可能原因是啥呢?还有个问题,这个异常虽然抛出来了,但是作业还是正常启动执行了。这又是为何?是不是不推荐在作业里同时使用executeSQL和statementset.execute?

Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than one execute() or executeAsync() call in a single environment. at org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:139) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:127) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57) ~[flink-table-blink_2.12-1.11.0.jar:1.11.0] at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699) ~[flink-table_2.12-1.11.0.jar:1.11.0] ... 24 more

*来自志愿者整理的flink邮件归档



参考答案:

简单说下这两个方法, TableEnvironment.executeSql(String statement)是为了用于执行单条的 sql 语句, SQL语句可以是 DDL/DML/DCL/DQL, DML(如insert)和DQL(如select)的执行是等 Flink job提交后返回该方法的执行结果,DDL(create table ...) 和 DCL(use database …) 的执行是对应的SQL语句执行完成就返回,理解起来就是需要提交 Flink job 的SQL需要等 job 提交后返回结果,其他是立即执行并返回。 Statementset.execute() 主要用于执行批量的 sql 语句,sql 语句只能是 insert xx,可以看接口的方法, 这个接口主要是为了 SQL 里有多个query的情况, (比如multiple sink:insert tableA from xx ;insert tableB from xx), 如果调用 TableEnvironment.executeSql(“insert tableA from xx”), TableEnvironment.executeSql(“insert tableA from xx”) 就会起两个 Flink job, 这应该不是用户需要的。 具体使用根据你的需要来使用。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370155?spm=a2c6h.12873639.article-detail.105.6f9243783Lv0fl



问题四:flink 1.11 local execution oom问题

我在使用1.11版本在本地idea起一个作业时,并发为1,抛出了如下关于内存的异常。。问题是之前从来没有显示配置过taskmanager的memory参数,这是为何? 感觉由1.10升级到1.11问题还是挺多的。。我尝试增加了JVM参数,增加DirectMemory内存配置,还是没有作用,请教大神帮忙看下。

Exception in thread "main" java.lang.OutOfMemoryError: Could not allocate enough memory segments for NetworkBufferPool (required (Mb): 64, allocated (Mb): 63, missing (Mb): 1). Cause: Direct buffer memory. The direct out-of-memory error has occurred. This can mean two things: either job(s) require(s) a larger size of JVM direct memory or there is a direct memory leak. The direct memory can be allocated by user code or some of its dependencies. In this case 'taskmanager.memory.task.off-heap.size' configuration option should be increased. Flink framework and its dependencies also consume the direct memory, mostly for network communication. The most of network memory is managed by Flink and should not result in out-of-memory error. In certain special cases, in particular for jobs with high parallelism, the framework may require more direct memory which is not managed by Flink. In this case 'taskmanager.memory.framework.off-heap.size' configuration option should be increased. If the error persists then there is probably a direct memory leak in user code or some of its dependencies which has to be investigated and fixed. The task executor has to be shutdown...

*来自志愿者整理的flink邮件归档



参考答案:

这个问题可以看下是否和 releasenote[1] 中 memory configuration

相关的修改有关,具体到这个错误,你可以按照提示增加一些内存看看

[1]

https://flink.apache.org/news/2020/07/06/release-1.11.0.html#other-improvements

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370153?spm=a2c6h.12873639.article-detail.106.6f9243783Lv0fl



问题五:flink1.10.1在yarn上无法写入kafka的问题

请教各位:

flink任务在本机写入测试环境kafka集群没问题,

但是上传到yarn环境,就是写不进去,其他job运行在yarn可以写入测试环境的kafka

异常信息如下:

2020-07-09 19:17:33,126 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - KeyedProcess (1/2) (9449b1e3b758a40fb5e1e60cf84fd844) switched from DEPLOYING to RUNNING.

2020-07-09 19:17:33,164 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - KeyedProcess (2/2) (bc6eefd911cf44412121939d0afa6a81) switched from DEPLOYING to RUNNING.

2020-07-09 19:17:39,049 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - async wait operator -> Sink: Unnamed (1/2) (cfc31005099a8ad7e44a94dc617dd45f) switched from RUNNING to FAILE

D.

org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka:

at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1225)

at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:767)

at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)

at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)

at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)

at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)

at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)

at org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.emitResult(StreamRecordQueueEntry.java:65)

at org.apache.flink.streaming.api.operators.async.queue.OrderedStreamElementQueue.emitCompletedElement(OrderedStreamElementQueue.java:71)

*来自志愿者整理的flink邮件归档



参考答案:

应该是你提交到 Yarn 的环境,这个环境和你的测试环境的 kafka 连接不上,获取不到元数据。

这里你检查一下你的 Yarn 环境,Flink kafka broker 地址是否是测试环境的 kafka broker 地址。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370151?spm=a2c6h.12873639.article-detail.107.6f9243783Lv0fl

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
8月前
|
SQL 大数据 数据处理
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是为应对传统数据处理框架中流批分离的问题而诞生的,它融合了SQL的简洁性和Flink的强大流批处理能力,降低了大数据处理门槛。其核心工作原理包括生成逻辑执行计划、查询优化和构建算子树,确保高效执行。Flink SQL 支持过滤、投影、聚合、连接和窗口等常用算子,实现了流批一体处理,极大提高了开发效率和代码复用性。通过统一的API和语法,Flink SQL 能够灵活应对实时和离线数据分析场景,为企业提供强大的数据处理能力。
1542 27
|
9月前
|
SQL 存储 缓存
Flink SQL Deduplication 去重以及如何获取最新状态操作
Flink SQL Deduplication 是一种高效的数据去重功能,支持多种数据类型和灵活的配置选项。它通过哈希表、时间窗口和状态管理等技术实现去重,适用于流处理和批处理场景。本文介绍了其特性、原理、实际案例及源码分析,帮助读者更好地理解和应用这一功能。
671 14
|
11月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
204 0
|
SQL NoSQL Java
Flink SQL 问题之执行报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
1012 2
|
SQL Java 关系型数据库
Flink SQL 问题之用代码执行报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
1285 6
|
SQL 消息中间件 Oracle
Flink SQL 问题之写入ES报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
178 4
|
SQL JSON Java
Flink SQL 问题之重启报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
316 3
|
SQL 资源调度 分布式数据库
Flink SQL 问题之服务器报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
264 3
|
SQL 存储 数据处理
Flink SQL 问题之提交程序运行报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
474 3
|
SQL 消息中间件 关系型数据库
Flink SQL 问题之提交执行报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
641 2

相关产品

  • 实时计算 Flink版