问题一:flink1.10.1 flink sql消费kafka当parallelism大于1时不产生wat
本人使用的flink版本为flink 1.10.1, flink sql消费kafka, 当parallelism为1时正常运行,但讲parallelism修改为2时,在yarn-session web页面看不到watermark的指标信息了,也没有计算结果输出,sql如下:
insert into
x.report.bi_report_fence_common_indicators
select
fence_id,
'finishedOrderCnt' as indicator_name,
TUMBLE_END(dt, INTERVAL '5' MINUTE) as ts,
count(1) as indicator_val
from
(
select
dt,
fence_id,
fence_coordinates_array,
c.driver_location
from
(
select
*
from
(
select
dt,
driver_location,
r1.f1.fence_info as fence_info
from
(
select
o.dt,
o.driver_location,
MD5(r.city_code) as k,
PROCTIME() as proctime
from
(
select
order_no,
dt,
driver_location,
PROCTIME() as proctime
from
x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner
where
_type = 'insert'
and event_code = 'arriveAndSettlement'
) o
LEFT JOIN x.dim.saic_trip_create_t_order FOR SYSTEM_TIME AS OF o.proctime AS r ON r.order_no = o.order_no
) o1
LEFT JOIN x.dim.fence_info FOR SYSTEM_TIME AS OF o1.proctime AS r1 ON r1.k = o1.k
) a
where
fence_info is not null
) c
LEFT JOIN LATERAL TABLE(fence_split(c.fence_info)) as T(fence_id, fence_coordinates_array) ON TRUE
) as b
where
in_fence(fence_coordinates_array, driver_location)
group by
TUMBLE(dt, INTERVAL '5' MINUTE),
fence_id;
其中 x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner表中dt为watermark字段,建表语句如下:
CREATE TABLE x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner(
_type STRING,
_old_id BIGINT,
id BIGINT,
_old_order_no STRING,
order_no STRING,
_old_event_code STRING,
event_code STRING,
_old_from_state TINYINT,
from_state TINYINT,
_old_to_state TINYINT,
to_state TINYINT,
_old_operator_type TINYINT,
operator_type TINYINT,
_old_passenger_location STRING,
passenger_location STRING,
_old_driver_location STRING,
driver_location STRING,
_old_trans_time STRING,
trans_time STRING,
_old_create_time STRING,
create_time STRING,
_old_update_time STRING,
update_time STRING,
_old_passenger_poi_address STRING,
passenger_poi_address STRING,
_old_passenger_detail_address STRING,
passenger_detail_address STRING,
_old_driver_poi_address STRING,
driver_poi_address STRING,
_old_driver_detail_address STRING,
driver_detail_address STRING,
_old_operator STRING,
operator STRING,
_old_partition_index TINYINT,
partition_index TINYINT,
dt as TO_TIMESTAMP(trans_time),
WATERMARK FOR dt AS dt - INTERVAL '5' SECOND
) WITH (
'connector.type' = 'kafka',
'connector.properties.bootstrap.servers' = '*',
'connector.properties.zookeeper.connect' = '*',
'connector.version' = 'universal',
'format.type' = 'json',
'connector.properties.group.id' = 'testGroup',
'connector.startup-mode' = 'group-offsets',
'connector.topic' = 'xxxxx'
)
*来自志愿者整理的flink邮件归档
参考答案:
可以先看下 Kakfa topic 对应的partition有几个?是否每个分区都有数据。
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370157?spm=a2c6h.12873639.article-detail.103.6f9243783Lv0fl
问题二:flink sql 侧输出
大佬们、我们这面主要基于blink sql完成转换计算,但是可能会有延迟数据,现在想把延迟数据通过侧输出保存下来,在table/sql
api中要怎么操作比较合理一点?或者有没有其他处理延迟数据的方式?
*来自志愿者整理的flink邮件归档
参考答案:
Flink SQL/Table 目前还不支持 side output。不过有一个实验性的功能可以处理延迟数据,
你可以给你的作业配上:
table.exec.emit.late-fire.enabled = true
table.exec.emit.late-fire.delay = 1min
同时 TableConfig#setIdleStateRetentionTime 需要配上,表示窗口状态允许保留多久,即 window
allowLateness 。
具体可以看下 org.apache.flink.table.planner.plan.utils.WindowEmitStrategy 这个类。
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370156?spm=a2c6h.12873639.article-detail.104.6f9243783Lv0fl
问题三:flink 1.11 sql作业提交JM报错
我使用flink 1.11提交sql作业,从JM日志中看到有如下异常。我的作业里会通过tEnv.executeSQL执行多个ddl语句,通过tEnv.createStatementSet add多个dml语句,并执行execute。 如下异常可能原因是啥呢?还有个问题,这个异常虽然抛出来了,但是作业还是正常启动执行了。这又是为何?是不是不推荐在作业里同时使用executeSQL和statementset.execute?
Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than one execute() or executeAsync() call in a single environment. at org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:139) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:127) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57) ~[flink-table-blink_2.12-1.11.0.jar:1.11.0] at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699) ~[flink-table_2.12-1.11.0.jar:1.11.0] ... 24 more
*来自志愿者整理的flink邮件归档
参考答案:
简单说下这两个方法, TableEnvironment.executeSql(String statement)是为了用于执行单条的 sql 语句, SQL语句可以是 DDL/DML/DCL/DQL, DML(如insert)和DQL(如select)的执行是等 Flink job提交后返回该方法的执行结果,DDL(create table ...) 和 DCL(use database …) 的执行是对应的SQL语句执行完成就返回,理解起来就是需要提交 Flink job 的SQL需要等 job 提交后返回结果,其他是立即执行并返回。 Statementset.execute() 主要用于执行批量的 sql 语句,sql 语句只能是 insert xx,可以看接口的方法, 这个接口主要是为了 SQL 里有多个query的情况, (比如multiple sink:insert tableA from xx ;insert tableB from xx), 如果调用 TableEnvironment.executeSql(“insert tableA from xx”), TableEnvironment.executeSql(“insert tableA from xx”) 就会起两个 Flink job, 这应该不是用户需要的。 具体使用根据你的需要来使用。
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370155?spm=a2c6h.12873639.article-detail.105.6f9243783Lv0fl
问题四:flink 1.11 local execution oom问题
我在使用1.11版本在本地idea起一个作业时,并发为1,抛出了如下关于内存的异常。。问题是之前从来没有显示配置过taskmanager的memory参数,这是为何? 感觉由1.10升级到1.11问题还是挺多的。。我尝试增加了JVM参数,增加DirectMemory内存配置,还是没有作用,请教大神帮忙看下。
Exception in thread "main" java.lang.OutOfMemoryError: Could not allocate enough memory segments for NetworkBufferPool (required (Mb): 64, allocated (Mb): 63, missing (Mb): 1). Cause: Direct buffer memory. The direct out-of-memory error has occurred. This can mean two things: either job(s) require(s) a larger size of JVM direct memory or there is a direct memory leak. The direct memory can be allocated by user code or some of its dependencies. In this case 'taskmanager.memory.task.off-heap.size' configuration option should be increased. Flink framework and its dependencies also consume the direct memory, mostly for network communication. The most of network memory is managed by Flink and should not result in out-of-memory error. In certain special cases, in particular for jobs with high parallelism, the framework may require more direct memory which is not managed by Flink. In this case 'taskmanager.memory.framework.off-heap.size' configuration option should be increased. If the error persists then there is probably a direct memory leak in user code or some of its dependencies which has to be investigated and fixed. The task executor has to be shutdown...
*来自志愿者整理的flink邮件归档
参考答案:
这个问题可以看下是否和 releasenote[1] 中 memory configuration
相关的修改有关,具体到这个错误,你可以按照提示增加一些内存看看
[1]
https://flink.apache.org/news/2020/07/06/release-1.11.0.html#other-improvements
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370153?spm=a2c6h.12873639.article-detail.106.6f9243783Lv0fl
问题五:flink1.10.1在yarn上无法写入kafka的问题
请教各位:
flink任务在本机写入测试环境kafka集群没问题,
但是上传到yarn环境,就是写不进去,其他job运行在yarn可以写入测试环境的kafka
异常信息如下:
2020-07-09 19:17:33,126 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - KeyedProcess (1/2) (9449b1e3b758a40fb5e1e60cf84fd844) switched from DEPLOYING to RUNNING.
2020-07-09 19:17:33,164 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - KeyedProcess (2/2) (bc6eefd911cf44412121939d0afa6a81) switched from DEPLOYING to RUNNING.
2020-07-09 19:17:39,049 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - async wait operator -> Sink: Unnamed (1/2) (cfc31005099a8ad7e44a94dc617dd45f) switched from RUNNING to FAILE
D.
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka:
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1225)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:767)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
at org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.emitResult(StreamRecordQueueEntry.java:65)
at org.apache.flink.streaming.api.operators.async.queue.OrderedStreamElementQueue.emitCompletedElement(OrderedStreamElementQueue.java:71)
*来自志愿者整理的flink邮件归档
参考答案:
应该是你提交到 Yarn 的环境,这个环境和你的测试环境的 kafka 连接不上,获取不到元数据。
这里你检查一下你的 Yarn 环境,Flink kafka broker 地址是否是测试环境的 kafka broker 地址。
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370151?spm=a2c6h.12873639.article-detail.107.6f9243783Lv0fl