本人使用的flink版本为flink 1.10.1, flink sql消费kafka, 当parallelism为1时正常运行,但讲parallelism修改为2时,在yarn-session web页面看不到watermark的指标信息了,也没有计算结果输出,sql如下:
insert into
x.report.bi_report_fence_common_indicators
select
fence_id,
'finishedOrderCnt' as indicator_name,
TUMBLE_END(dt, INTERVAL '5' MINUTE) as ts,
count(1) as indicator_val
from
(
select
dt,
fence_id,
fence_coordinates_array,
c.driver_location
from
(
select
*
from
(
select
dt,
driver_location,
r1.f1.fence_info as fence_info
from
(
select
o.dt,
o.driver_location,
MD5(r.city_code) as k,
PROCTIME() as proctime
from
(
select
order_no,
dt,
driver_location,
PROCTIME() as proctime
from
x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner
where
_type = 'insert'
and event_code = 'arriveAndSettlement'
) o
LEFT JOIN x.dim.saic_trip_create_t_order FOR SYSTEM_TIME AS OF o.proctime AS r ON r.order_no = o.order_no
) o1
LEFT JOIN x.dim.fence_info FOR SYSTEM_TIME AS OF o1.proctime AS r1 ON r1.k = o1.k
) a
where
fence_info is not null
) c
LEFT JOIN LATERAL TABLE(fence_split(c.fence_info)) as T(fence_id, fence_coordinates_array) ON TRUE
) as b
where
in_fence(fence_coordinates_array, driver_location)
group by
TUMBLE(dt, INTERVAL '5' MINUTE),
fence_id;
其中 x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner表中dt为watermark字段,建表语句如下:
CREATE TABLE x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner(
_type STRING,
_old_id BIGINT,
id BIGINT,
_old_order_no STRING,
order_no STRING,
_old_event_code STRING,
event_code STRING,
_old_from_state TINYINT,
from_state TINYINT,
_old_to_state TINYINT,
to_state TINYINT,
_old_operator_type TINYINT,
operator_type TINYINT,
_old_passenger_location STRING,
passenger_location STRING,
_old_driver_location STRING,
driver_location STRING,
_old_trans_time STRING,
trans_time STRING,
_old_create_time STRING,
create_time STRING,
_old_update_time STRING,
update_time STRING,
_old_passenger_poi_address STRING,
passenger_poi_address STRING,
_old_passenger_detail_address STRING,
passenger_detail_address STRING,
_old_driver_poi_address STRING,
driver_poi_address STRING,
_old_driver_detail_address STRING,
driver_detail_address STRING,
_old_operator STRING,
operator STRING,
_old_partition_index TINYINT,
partition_index TINYINT,
dt as TO_TIMESTAMP(trans_time),
WATERMARK FOR dt AS dt - INTERVAL '5' SECOND
) WITH (
'connector.type' = 'kafka',
'connector.properties.bootstrap.servers' = '*',
'connector.properties.zookeeper.connect' = '*',
'connector.version' = 'universal',
'format.type' = 'json',
'connector.properties.group.id' = 'testGroup',
'connector.startup-mode' = 'group-offsets',
'connector.topic' = 'xxxxx'
)*来自志愿者整理的flink邮件归档
可以先看下 Kakfa topic 对应的partition有几个?是否每个分区都有数据。*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。