Flink问题之隔断时间重启如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:flink-提交jar 隔断时间自己重启怎么办?


你好 参考官网 https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/jdbc.html 这边读取mysql jdbc数据报错Exception in thread "main" org.apache.flink.table.api.TableException: Only insert statement is supported now. String a = "-- register a MySQL table 'users' in Flink SQL\n" + "CREATE TABLE MyUserTable (\n" + " id BIGINT\n" + ") WITH (\n" + " 'connector' = 'jdbc',\n" + " 'url' = 'jdbc:mysql://***:3306/monitor',\n" + " 'table-name' = 't1',\n" + " 'username' = 'root',\n" + " 'password' = '***'\n" + ") ";

String b ="-- scan data from the JDBC table\n" + "SELECT id FROM MyUserTable\n";

tEnv.executeSql(a);

来自志愿者整理的flink邮件归档


参考回答:

没看懂问题。任务自动重启?失败了自然就重启了,restart策略设置的吧。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359041


问题二:flink-1.12.2 TM无法使用自定的serviceAccount访问configmap怎么办?


在 flink sql 中,可以使用 proc time 来进行 interval join,但是在 stream api 中,只能用 event time 进行 interval join,如何能使用 process time 呢?


参考回答:

您好:之前提交过一个关于这方面的issue,链接如下: http://apache-flink.147419.n8.nabble.com/flink1-12-k8s-session-TM-td10153.html

目前看还是没有fix对应的issue。 报错如下:

目前看jira上的issue已经关闭了, 请确认是否修复。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359039?


问题三:flink sql count distonct 优化应该怎么办?


在SQL中,如果开启了 local-global 参数:set table.optimizer.agg-phase-strategy=TWO_PHASE; 或者开启了Partial-Final 参数:set table.optimizer.distinct-agg.split.enabled=true; set table.optimizer.distinct-agg.split.bucket-num=1024; 还需要对应的将SQL改写为两段式吗? 例如: 原SQL: SELECT day, COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day,

对所需DISTINCT字段buy_id模1024自动打散后,SQL: SELECT day, SUM(cnt) total FROM ( SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt FROM T GROUP BY day, MOD(buy_id, 1024)) GROUP BY day

还是flink会帮我自动改写SQL,我不用关心?

另外,如果只设置开启上述参数,没有改写SQL,感觉没有优化,在flink web ui界面上也没有看到两阶段算子

来自志愿者整理的flink邮件归档


参考回答:

我看你的作业里面是window agg,目前 window agg 还不支持自动拆分。1.13 的基于 window tvf 的 window agg支持这个参数了。可以期待下。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359042


问题四:Flink 消费kafka ,怎么写ORC文件?


【现状如下】 Flink Job消费kafka消息,每半个小时将消费到的消息进行一系列聚合操作(flink 窗口聚合),然后写入一个orc文件。

据了解,flink写orc的桶分配策略[1],有两种:

一种是基于时间,即按时间为目录创建orc文件。[test/realtime/ : 为根目录]

test/realtime/ └── 2021-03-23--07 ├── part-0-0.orc ├── part-0-1.orc └── 2021-03-23--08 ├── part-0-0.orc ├── part-0-1.orc

一种是将所有部分文件放在一个目录下:

test/realtime/ ├── part-0-0.orc ├── part-0-1.orc ├── part-0-2.orc ├── part-0-3.orc

【问题】

最终需求是想按照partition将每半个小时的orc文件load到hive,hive表dt为分区字段,值为时间戳,如:

hive> show partitions table_demo;

OK dt=1616455800000 dt=1616457600000 dt=1616459400000 dt=1616461200001 dt=1616463000001

Time taken: 0.134 seconds, Fetched: 5 row(s)

因此希望每个orc文件的所在目录名都是dt=时间戳的格式:

http://apache-flink.147419.n8.nabble.com/file/t1162/dir.png

用flink实现这些功能后,发现这两种桶分配策略都不能实现上述需求。

不知如何实现?之前一直是自己写代码实现聚合、写orc的操作,目录文件名一切东西完全可控,现在用flink自带的功能实现,发现不太容易实现上述需求了

来自志愿者整理的flink邮件归档


参考回答:

官网有这么一段:我们可以在格式构建器上调用 .withBucketAssigner(assigner) 来自定义 BucketAssigner 链接: https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/file_sink.html#%E6%A1%B6%E5%88%86%E9%85%8D


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359046?spm=a2c6h.13262185.0.0.677f6c07q66JNp


问题五:interval join 如何用 process time?


在 flink sql 中,可以使用 proc time 来进行 interval join,但是在 stream api 中,只能用 event time 进行 interval join,如何能使用 process time 呢?

来自志愿者整理的flink邮件归档


参考回答:

你好,DataStream API 中的 Interval Join 目前还不支持 process time,参考 [1]. 不过如果不要去严格准确的 process time 的话,是否可以在 Join 之前把 process time 用某个字段带出来,当 event time 用?


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359047?spm=a2c6h.13262185.0.0.677f6c07q66JNp

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
7天前
|
API 数据库 流计算
有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
【2月更文挑战第27天】有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
67 3
|
7天前
|
Kubernetes 流计算 容器
在Rancher K8s上部署Flink时,TaskManager连接不上并不断重启可能是由多种原因导致的。
在Rancher K8s上部署Flink时,TaskManager连接不上并不断重启可能是由多种原因导致的。【1月更文挑战第22天】【1月更文挑战第106篇】
66 1
|
6天前
|
Kubernetes 流计算 Perl
在Rancher K8s上部署Flink时,TaskManager连接不上并不断重启可能是由多种原因导致的
在Rancher K8s上部署Flink时,TaskManager连接不上并不断重启可能是由多种原因导致的
43 7
|
4天前
|
Oracle 关系型数据库 MySQL
实时计算 Flink版产品使用合集之在指定 db.* 后添加表就重启任务如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
14 1
|
7天前
|
Java 流计算
Flink任务管理器(TaskManager)在凌晨重启,可能是由于以下几种原因
【2月更文挑战第16天】Flink任务管理器(TaskManager)在凌晨重启,可能是由于以下几种原因
76 2
|
7天前
|
关系型数据库 MySQL 数据库
Flink CDC数据同步问题之用savepoint重启任务报错如何解决
Flink CDC数据同步是指利用Flink CDC实现不同数据源之间的实时数据同步任务;本合集旨在提供Flink CDC数据同步的操作指南、性能优化建议和常见问题处理,助力用户高效实施数据同步。
|
7天前
|
资源调度 关系型数据库 MySQL
Flink问题之应用程序重启如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
109 0
|
7天前
|
SQL JSON Java
Flink SQL 问题之重启报错如何解决
Flink SQL报错通常指在使用Apache Flink的SQL接口执行数据处理任务时遇到的问题;本合集将收集常见的Flink SQL报错情况及其解决方法,帮助用户迅速恢复数据处理流程。
48 3
|
7天前
|
NoSQL Java 关系型数据库
Flink 动态更新配置,不需要重启作业
Flink 动态更新配置,不需要重启作业
105 1
|
7天前
|
数据库连接 数据库 流计算
Flink CDC比如检查到挂了,我重启了,这个会重新连不,我刚刚重启了还是一样的错?
Flink CDC比如检查到挂了,我重启了,这个会重新连不,我刚刚重启了还是一样的错?
56 0

热门文章

最新文章

  • 1
    实时计算 Flink版操作报错合集之遇到报错:"An OperatorEvent from an OperatorCoordinator to a task was lost. Triggering task failover to ensure consistency." ,该怎么办
    16
  • 2
    实时计算 Flink版操作报错合集之在连接Oracle 19c时报错如何解决
    23
  • 3
    实时计算 Flink版操作报错合集之写入 Kafka 报错 "Failed to send data to Kafka: Failed to allocate memory within the configured max blocking time 60000 ms",该怎么解决
    13
  • 4
    实时计算 Flink版操作报错合集之报错显示“Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT and DELETE"是什么意思
    17
  • 5
    实时计算 Flink版操作报错合集之报错io.debezium.DebeziumException: The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot. 是什么原因
    17
  • 6
    实时计算 Flink版操作报错合集之本地打成jar包,运行报错,idea运行不报错,是什么导致的
    12
  • 7
    实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
    17
  • 8
    实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
    22
  • 9
    实时计算 Flink版操作报错合集之查询sqlserver ,全量阶段出现报错如何解决
    16
  • 10
    实时计算 Flink版操作报错合集之执行Flink job,报错“Could not execute SQL statement. Reason:org.apache.flink.table.api.ValidationException: One or more required options are missing”,该怎么办
    39
  • 相关产品

  • 实时计算 Flink版