Flink配置问题之配置时区失败如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:flink算子类在多个subtask中是各自初始化1个实例对象吗?

Hi,all:

flink算子在多个并行度的job中,每个算子(假如1个算子都有一个对应的java类)在多个subtask中会共享1个java类实例吗?还是每个subtask都会各自的实例?

我做了一个简单测试,写了一个flatmap类,在构造方法和open方法中打印类的toString方法,发现输出都不同,是否可以证明每个subtask都初始化了自己的类实例?

希望有朋友能解释下算子在job运行中初始化的过程。

测试相关代码如下:

// flink 1.10.2版本,并行度为3 @Slf4j public class PersonFlatMap extends RichFlatMapFunction<Tuple2<String, String>, Person> { private transient ValueState state;

public PersonFlatMap(){ log.info(String.format("PersonFlatMap【%s】: 创建实例",this.toString())); }

@Override public void open(Configuration parameters) throws IOException { //略去无关代码... log.info(String.format("PersonFlatMap【%s】:初始化状态!", this.toString())); }

@Override

public void flatMap(Tuple2<String, String> t, Collector collector) throws Exception { Person p = JSONUtil.toObject(t.f1,Person.class); collector.collect(p); if(state.value() == null){state.update(0);} state.update(state.value() + 1); log.info("state: "+state.value()); } }

//测试日志输出 ... flink-10.2 - [2020-11-16 13:41:54.360] - INFO [main] com.toonyoo.operator.PersonFlatMap - PersonFlatMap【com.toonyoo.operator.PersonFlatMap@ba8d91c】: 创建实例

//此处略去无关日志...

flink-10.2 - [2020-11-16 13:42:00.326] - INFO [Flat Map -> Sink: Print to Std. Out (1/3)] org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Initializing heap keyed state backend with stream factory. flink-10.2 - [2020-11-16 13:42:00.351] - INFO [Flat Map -> Sink: Print to Std. Out (1/3)] com.toonyoo.operator.PersonFlatMap - PersonFlatMap【com.toonyoo.operator.PersonFlatMap@5c9d895d】:初始化状态! flink-10.2 - [2020-11-16 13:42:00.354] - INFO [Flat Map -> Sink: Print to Std. Out (3/3)] com.toonyoo.operator.PersonFlatMap - PersonFlatMap【com.toonyoo.operator.PersonFlatMap@5c489c40】:初始化状态! flink-10.2 - [2020-11-16 13:42:00.356] - INFO [Flat Map -> Sink: Print to Std. Out (2/3)] com.toonyoo.operator.PersonFlatMap - PersonFlatMap【com.toonyoo.operator.PersonFlatMap@1cd4f5c9】:初始化状态! ...*来自志愿者整理的flink邮件归档



参考答案:

可以这么认为,大体上你可以认为每个并发有自己的环境。

技术上,算子对象是每个并发会实例化一个,而 static 变量的【共享】程度跟你设置的 slot per TM

值还有其他一些调度相关的参数有关,但是最好不要依赖这种实现层面的东西。

一种常见的误解是我创建一个 static HashMap 就神奇地拥有了全局的键值存储,这当然是不对的,只有在同一个 JVM 实例上也就是同一个 TM

上的任务才会看到同一个 HashMap 对象,而这几乎是不可控的。

可以看一下这篇文档[1]对物理部署的实际情况有一个基本的认知。

Best,

tison.

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/flink-architecture.html

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/364551?spm=a2c6h.13066369.question.13.6ad26382p9RmWX



问题二:flink 1.11.2 如何配置时区

你好!       我使用的是flink sql 1.11.2版本,通过proctime()在源上添加处理时间,发现生成的时间为UTC时间,而我需要的是+08的时间;而我通过设计env.java.opts参数设计jvm的时区参数也没有解决,请问我如何配置才可以拿到+08的时间?

我的程序的数据是json格式输出*来自志愿者整理的flink邮件归档



参考答案:

  1. 现在 proctime() 在设计上确实有问题,目前返回类型是 timestamp, 而不是 timestamp with local time

zone, 所以不会考虑 session time zone,转成 string 会用 utc 时区。这个问题会在 FLINK-20162 1

中修复。

  1. 可以看下这个文档[2].

Best,

Jark

[2]:

https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/json.html#json-timestamp-format-standard

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/364550?spm=a2c6h.13066369.question.14.6ad26382jcSTQP



问题三:flink1.11 sql ddl 连接kafka读取数据,使用事件时间窗口,无法触发,相同的sql

您好,请教您一个问题

flink1.11 sql ddl连接kafka,使用事件事件,无法触发窗口,使用process_time系统时间就可以正常触发 create table kafka_table ( log_id  string, event_date timestamp(3), process_time as PROCTIME(), ts as event_date, watermark for ts as ts - interval '1' second ) with (  'connector' = 'kafka',  'topic' = 'kafka_table',  'properties.bootstrap.servers' = '10.2.12.3:9092',  'properties.group.id' = 'tmp-log-consumer003',  'format' = 'json',  'scan.startup.mode' = 'latest-offset' ) 执行的sql是 select TUMBLE_START(kafka_table.event_date, INTERVAL '10' SECOND),TUMBLE_END(kafka_table.event_date, INTERVAL '10' SECOND),src_ip,count(dest_ip) from kafka_table group by TUMBLE(kafka_table.event_date, INTERVAL '10' SECOND),kafka_table.src_ip

select log_id,process_time,ts from kafka_table查询的表结构如下 表结构为 root  |-- log_id: STRING  |-- process_time: TIMESTAMP(3) NOT NULL PROCTIME  |-- ts: TIMESTAMP(3) ROWTIME

输入数据为 log_id,process_time,ts 13547876357,2020-11-14T08:22:08.699,2020-11-07T08:23:09.806 13547876358,2020-11-14T08:22:08.857,2020-11-07T08:23:09.806 13547876359,2020-11-14T08:22:09.061,2020-11-07T08:23:09.806 13547876360,2020-11-14T08:22:09.310,2020-11-07T08:23:09.806 13547876361,2020-11-14T08:22:09.526,2020-11-07T08:23:09.806 13552070656,2020-11-14T08:22:09.772,2020-11-07T08:23:09.806*来自志愿者整理的flink邮件归档



参考答案:

重复的问题。我将刚刚的回答也贴在这里。

如果要测试事件时间窗口,请保证以下几点,否则窗口不会触发:

  1. 保证所有 partition 都有数据。
  2. 且每个 partition 数据的 event time 都在前进
  3. 且 event time 前进的距离要超过 window size + watermark offset, 即你的例子中的 10s+1s =

11s

以上如果不满足,则系统不会认为窗口结束,所以窗口就不会触发。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/364549?spm=a2c6h.13066369.question.15.6ad263820deoHJ



问题四:Re: flink-1.11 使用 application 模式时 jobid 问题

这个问题目前有 Issue 在跟踪吗?在 1.12 测试发现还是有这个问题,有办法解决吗?*来自志愿者整理的flink邮件归档



参考答案:

可以参考一下这篇文章: https://mp.weixin.qq.com/s/S_Spm88eDtbza1QoLKiWlg

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/364548?spm=a2c6h.13066369.question.14.6ad26382qM0Gyw



问题五:Flink与Yarn的状态一致性问题

最近在使用Flink-1.11.1 On Yarn Per Job模式提交简单的kafka->mysql任务时,发现当有脏数据或mysql主键限制等等原因导致Flink任务状态置为Failed时,Yarn application仍处于运行状态

疑问是Flink任务处于Failed或Finished时,不会反馈自身状态给Yarn吗?期待大佬解惑,谢谢*来自志愿者整理的flink邮件归档



参考答案:

PerJob 模式下,在作业完全挂掉之后,是会上报 YARN RM 应用自己失败的状态的。

当然,在 FLINK 察觉自己 FAILED 到上报给 YARN 是有一定的时延的,也有可能因为网络等问题上报失败。

你这个是短暂的不一致时间窗口,还是说 FLINK 集群已经退了,YARN 的状态还没有变化呢?

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/364547?spm=a2c6h.13066369.question.15.6ad26382asj0Fk

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
SQL 资源调度 Java
Flink问题之动态配置如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
48 1
|
1月前
|
存储 SQL Oracle
flink cdc 时区问题之文档添加参数无效如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
1月前
|
XML Java Apache
Apache Flink自定义 logback xml配置
Apache Flink自定义 logback xml配置
151 0
|
1月前
|
分布式计算 网络安全 流计算
Flink【环境搭建 01】(flink-1.9.3 集群版安装、配置、验证)
【2月更文挑战第15天】Flink【环境搭建 01】(flink-1.9.3 集群版安装、配置、验证)
61 0
|
1月前
|
Oracle 关系型数据库 MySQL
Flink CDC产品常见问题之从EARLIEST_OFFSET启动就报错如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
2月前
|
Kubernetes Java 数据库连接
Flink问题之自定义分隔符写入如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
31 2
|
2月前
|
消息中间件 Oracle 关系型数据库
Flink CDC 数据源问题之参数配置如何解决
Flink CDC数据源指的是使用Apache Flink的CDC特性来连接并捕获外部数据库变更数据的数据源;本合集将介绍如何配置和管理Flink CDC数据源,以及解决数据源连接和同步过程中遇到的问题。
38 0
|
2月前
|
Java 流计算
【极数系列】Flink配置参数如何获取?(06)
【极数系列】Flink配置参数如何获取?(06)
|
2月前
|
SQL Oracle 关系型数据库
Flink cdc报错问题之时区报错如何解决
Flink CDC报错指的是使用Apache Flink的Change Data Capture(CDC)组件时遇到的错误和异常;本合集将汇总Flink CDC常见的报错情况,并提供相应的诊断和解决方法,帮助用户快速恢复数据处理任务的正常运行。
Flink cdc报错问题之时区报错如何解决
|
3月前
|
消息中间件 存储 Kafka
在Flink中,可以通过配置`KafkaConsumer`的`properties`参数来设置两个不同的SASL机制
【1月更文挑战第19天】【1月更文挑战第91篇】在Flink中,可以通过配置`KafkaConsumer`的`properties`参数来设置两个不同的SASL机制
78 3

相关产品

  • 实时计算 Flink版