Flink配置问题之配置时区失败如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:flink算子类在多个subtask中是各自初始化1个实例对象吗?

Hi,all:

flink算子在多个并行度的job中,每个算子(假如1个算子都有一个对应的java类)在多个subtask中会共享1个java类实例吗?还是每个subtask都会各自的实例?

我做了一个简单测试,写了一个flatmap类,在构造方法和open方法中打印类的toString方法,发现输出都不同,是否可以证明每个subtask都初始化了自己的类实例?

希望有朋友能解释下算子在job运行中初始化的过程。

测试相关代码如下:

// flink 1.10.2版本,并行度为3 @Slf4j public class PersonFlatMap extends RichFlatMapFunction<Tuple2<String, String>, Person> { private transient ValueState state;

public PersonFlatMap(){ log.info(String.format("PersonFlatMap【%s】: 创建实例",this.toString())); }

@Override public void open(Configuration parameters) throws IOException { //略去无关代码... log.info(String.format("PersonFlatMap【%s】:初始化状态!", this.toString())); }

@Override

public void flatMap(Tuple2<String, String> t, Collector collector) throws Exception { Person p = JSONUtil.toObject(t.f1,Person.class); collector.collect(p); if(state.value() == null){state.update(0);} state.update(state.value() + 1); log.info("state: "+state.value()); } }

//测试日志输出 ... flink-10.2 - [2020-11-16 13:41:54.360] - INFO [main] com.toonyoo.operator.PersonFlatMap - PersonFlatMap【com.toonyoo.operator.PersonFlatMap@ba8d91c】: 创建实例

//此处略去无关日志...

flink-10.2 - [2020-11-16 13:42:00.326] - INFO [Flat Map -> Sink: Print to Std. Out (1/3)] org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Initializing heap keyed state backend with stream factory. flink-10.2 - [2020-11-16 13:42:00.351] - INFO [Flat Map -> Sink: Print to Std. Out (1/3)] com.toonyoo.operator.PersonFlatMap - PersonFlatMap【com.toonyoo.operator.PersonFlatMap@5c9d895d】:初始化状态! flink-10.2 - [2020-11-16 13:42:00.354] - INFO [Flat Map -> Sink: Print to Std. Out (3/3)] com.toonyoo.operator.PersonFlatMap - PersonFlatMap【com.toonyoo.operator.PersonFlatMap@5c489c40】:初始化状态! flink-10.2 - [2020-11-16 13:42:00.356] - INFO [Flat Map -> Sink: Print to Std. Out (2/3)] com.toonyoo.operator.PersonFlatMap - PersonFlatMap【com.toonyoo.operator.PersonFlatMap@1cd4f5c9】:初始化状态! ...*来自志愿者整理的flink邮件归档



参考答案:

可以这么认为,大体上你可以认为每个并发有自己的环境。

技术上,算子对象是每个并发会实例化一个,而 static 变量的【共享】程度跟你设置的 slot per TM

值还有其他一些调度相关的参数有关,但是最好不要依赖这种实现层面的东西。

一种常见的误解是我创建一个 static HashMap 就神奇地拥有了全局的键值存储,这当然是不对的,只有在同一个 JVM 实例上也就是同一个 TM

上的任务才会看到同一个 HashMap 对象,而这几乎是不可控的。

可以看一下这篇文档[1]对物理部署的实际情况有一个基本的认知。

Best,

tison.

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/concepts/flink-architecture.html

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/364551?spm=a2c6h.13066369.question.13.6ad26382p9RmWX



问题二:flink 1.11.2 如何配置时区

你好!       我使用的是flink sql 1.11.2版本,通过proctime()在源上添加处理时间,发现生成的时间为UTC时间,而我需要的是+08的时间;而我通过设计env.java.opts参数设计jvm的时区参数也没有解决,请问我如何配置才可以拿到+08的时间?

我的程序的数据是json格式输出*来自志愿者整理的flink邮件归档



参考答案:

  1. 现在 proctime() 在设计上确实有问题,目前返回类型是 timestamp, 而不是 timestamp with local time

zone, 所以不会考虑 session time zone,转成 string 会用 utc 时区。这个问题会在 FLINK-20162 1

中修复。

  1. 可以看下这个文档[2].

Best,

Jark

[2]:

https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/json.html#json-timestamp-format-standard

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/364550?spm=a2c6h.13066369.question.14.6ad26382jcSTQP



问题三:flink1.11 sql ddl 连接kafka读取数据,使用事件时间窗口,无法触发,相同的sql

您好,请教您一个问题

flink1.11 sql ddl连接kafka,使用事件事件,无法触发窗口,使用process_time系统时间就可以正常触发 create table kafka_table ( log_id  string, event_date timestamp(3), process_time as PROCTIME(), ts as event_date, watermark for ts as ts - interval '1' second ) with (  'connector' = 'kafka',  'topic' = 'kafka_table',  'properties.bootstrap.servers' = '10.2.12.3:9092',  'properties.group.id' = 'tmp-log-consumer003',  'format' = 'json',  'scan.startup.mode' = 'latest-offset' ) 执行的sql是 select TUMBLE_START(kafka_table.event_date, INTERVAL '10' SECOND),TUMBLE_END(kafka_table.event_date, INTERVAL '10' SECOND),src_ip,count(dest_ip) from kafka_table group by TUMBLE(kafka_table.event_date, INTERVAL '10' SECOND),kafka_table.src_ip

select log_id,process_time,ts from kafka_table查询的表结构如下 表结构为 root  |-- log_id: STRING  |-- process_time: TIMESTAMP(3) NOT NULL PROCTIME  |-- ts: TIMESTAMP(3) ROWTIME

输入数据为 log_id,process_time,ts 13547876357,2020-11-14T08:22:08.699,2020-11-07T08:23:09.806 13547876358,2020-11-14T08:22:08.857,2020-11-07T08:23:09.806 13547876359,2020-11-14T08:22:09.061,2020-11-07T08:23:09.806 13547876360,2020-11-14T08:22:09.310,2020-11-07T08:23:09.806 13547876361,2020-11-14T08:22:09.526,2020-11-07T08:23:09.806 13552070656,2020-11-14T08:22:09.772,2020-11-07T08:23:09.806*来自志愿者整理的flink邮件归档



参考答案:

重复的问题。我将刚刚的回答也贴在这里。

如果要测试事件时间窗口,请保证以下几点,否则窗口不会触发:

  1. 保证所有 partition 都有数据。
  2. 且每个 partition 数据的 event time 都在前进
  3. 且 event time 前进的距离要超过 window size + watermark offset, 即你的例子中的 10s+1s =

11s

以上如果不满足,则系统不会认为窗口结束,所以窗口就不会触发。

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/364549?spm=a2c6h.13066369.question.15.6ad263820deoHJ



问题四:Re: flink-1.11 使用 application 模式时 jobid 问题

这个问题目前有 Issue 在跟踪吗?在 1.12 测试发现还是有这个问题,有办法解决吗?*来自志愿者整理的flink邮件归档



参考答案:

可以参考一下这篇文章: https://mp.weixin.qq.com/s/S_Spm88eDtbza1QoLKiWlg

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/364548?spm=a2c6h.13066369.question.14.6ad26382qM0Gyw



问题五:Flink与Yarn的状态一致性问题

最近在使用Flink-1.11.1 On Yarn Per Job模式提交简单的kafka->mysql任务时,发现当有脏数据或mysql主键限制等等原因导致Flink任务状态置为Failed时,Yarn application仍处于运行状态

疑问是Flink任务处于Failed或Finished时,不会反馈自身状态给Yarn吗?期待大佬解惑,谢谢*来自志愿者整理的flink邮件归档



参考答案:

PerJob 模式下,在作业完全挂掉之后,是会上报 YARN RM 应用自己失败的状态的。

当然,在 FLINK 察觉自己 FAILED 到上报给 YARN 是有一定的时延的,也有可能因为网络等问题上报失败。

你这个是短暂的不一致时间窗口,还是说 FLINK 集群已经退了,YARN 的状态还没有变化呢?

*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/364547?spm=a2c6h.13066369.question.15.6ad26382asj0Fk

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
7天前
|
资源调度 调度 流计算
Flink 细粒度资源管理问题之为不同的SSG配置资源如何解决
Flink 细粒度资源管理问题之为不同的SSG配置资源如何解决
|
7天前
|
存储 NoSQL 分布式数据库
Flink 细粒度资源管理问题之调整 slot 配置来提高资源利用效率如何解决
Flink 细粒度资源管理问题之调整 slot 配置来提高资源利用效率如何解决
|
12天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何配置Connector来保持与MySOL一致
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
16天前
|
资源调度 关系型数据库 MySQL
【Flink on YARN + CDC 3.0】神操作!看完这篇教程,你也能成为数据流处理高手!从零开始,一步步教会你在Flink on YARN模式下如何配置Debezium CDC 3.0,让你的数据库变更数据瞬间飞起来!
【8月更文挑战第15天】随着Apache Flink的普及,企业广泛采用Flink on YARN部署流处理应用,高效利用集群资源。变更数据捕获(CDC)工具在现代数据栈中至关重要,能实时捕捉数据库变化并转发给下游系统处理。本文以Flink on YARN为例,介绍如何在Debezium CDC 3.0中配置MySQL连接器,实现数据流处理。首先确保YARN上已部署Flink集群,接着安装Debezium MySQL连接器并配置Kafka Connect。最后,创建Flink任务消费变更事件并提交任务到Flink集群。通过这些步骤,可以构建出从数据库变更到实时处理的无缝数据管道。
43 2
|
2天前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版产品使用问题之使用CTAS同步MySQL到Hologres时出现的时区差异,该如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
消息中间件 NoSQL Redis
实时计算 Flink版产品使用问题之配置了最大连续失败数不为1,在Kafka的精准一次sink中,如果ck失败了,这批数据是否会丢失
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
SQL 分布式计算 关系型数据库
实时计算 Flink版产品使用问题之在使用FlinkCDC与PostgreSQL进行集成时,该如何配置参数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
实时计算 Flink版产品使用问题之在使用FlinkCDC与PostgreSQL进行集成时,该如何配置参数
|
2月前
|
存储 关系型数据库 MySQL
实时计算 Flink版产品使用问题之处理包含时间戳并且希望在摄取、转换或输出时考虑特定时区的CDC数据,该如何操作
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
存储 SQL 关系型数据库
实时计算 Flink版产品使用问题之要配置MySQL集群存储节点,该如何配置
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
SQL 关系型数据库 数据处理
实时计算 Flink版产品使用问题之使用的是JsonDebeziumDeserializationSchema,如何更改时区
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

热门文章

最新文章

相关产品

  • 实时计算 Flink版
  • 下一篇
    云函数