Flink内存问题之超出物理内存如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:flink1.12.1 Sink数据到ES7,遇到 问题

报错日志如下:我的flink sql 已用的 flink-sql-connector-elasticsearch7,代码里使用的flink-connector-elasticsearch7,然后在同一个flink运行,就会报这个错误

Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:339) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:636) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:609) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:549) at org.apache.flink.streaming.runtime.tasks.OperatorChain. (OperatorChain.java:170) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:509) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: unexpected exception type at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1682) at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1254) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2076) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541) at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:323) ... 9 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230) at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1248) ... 33 more Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization at org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink$Builder.$deserializeLambda$(ElasticsearchSink.java:86) ... 42 more*来自志愿者整理的flink邮件归档



参考答案:

可以参考下[1], 如果是相同的问题,将依赖改为flink-connector-elasticsearch

[1] https://issues.apache.org/jira/browse/FLINK-18857来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370045?spm=a2c6h.13066369.question.13.33bf585fihaMnT



问题二:Flink SQL 1.11支持将数据写入到Hive吗?

看官网介绍是支持的:

但是找对应的连接器是没有Hive,是JDBC?

*来自志愿者整理的flink邮件归档



参考答案:

1.11的文档:

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_read_write.html

1.12的文档:

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hive/*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370047?spm=a2c6h.13066369.question.12.33bf585fO4ML1p



问题三:请问flink 1.12如何使用RateLimiter哈?

我这里有个场景是这样的,kafka里已经有一段时间的数据,读取的时候会一股脑的都读进来,我想模拟数据是刚进来的状态,比如数据是5秒一条,我就5秒钟读1条

*来自志愿者整理的flink邮件归档



参考答案:

如果使用 FlinkKafkaConsumer010 的话,可以调用 FlinkKafkaConsumer010#setRateLimiter(new GuavaFlinkConnectorRateLimiter().setRate) https://github.com/apache/flink/blob/fe3613574f76201a8d55d572a639a4ce7e18a9db/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java#L353

PS: 目前最新版本已经将 FlinkKafkaConsumer010, FlinkKafkaConsumer011 都删除了,只留一个Consumer,目前没有可以设置的入口,可关注 issue[1]. [1]https://issues.apache.org/jira/browse/FLINK-18740*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370048?spm=a2c6h.13066369.question.15.33bf585fu7TcVY



问题四:flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

这面还想多请教一下:

我程序中每来一条数据都会去读MapState然后覆盖写入新的时间戳,刚刚发现某一条数据读出了两条一样的时间戳,我推断是第一个线程读出来后还没等覆盖掉,第二个线程又读了一遍,导致出现两条一样的时间戳;

所以想请问flink中MapState是线程安全的吗? *来自志愿者整理的flink邮件归档



参考答案:

是线程安全的,mapstate也是keyed state,同一个key的state肯定是同一个线程处理的*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370049?spm=a2c6h.13066369.question.16.33bf585fgGTSRN



问题五:flink jdbc connector 在checkpoint的时候出问题

JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为 OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的 代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。

我的问题是:是否有办法强制刷新buffer中的数据入库?

@Public public interface OutputFormat extends Serializable {

/** * Configures this output format. Since output formats are instantiated generically and hence parameterless, * this method is the place where the output formats set their basic fields based on configuration values. *

* This method is always called first on a newly instantiated output format. * * @param parameters The configuration with all parameters. */ void configure(Configuration parameters);

/** * Opens a parallel instance of the output format to store the result of its parallel instance. *

* When this method is called, the output format it guaranteed to be configured. * * @param taskNumber The number of the parallel instance. * @param numTasks The number of parallel tasks. * @throws IOException Thrown, if the output could not be opened due to an I/O problem. */ void open(int taskNumber, int numTasks) throws IOException;

/** * Adds a record to the output. *

* When this method is called, the output format it guaranteed to be opened. * * @param record The records to add to the output. * @throws IOException Thrown, if the records could not be added to to an I/O problem. */ void writeRecord(IT record) throws IOException;

/** * Method that marks the end of the life-cycle of parallel output instance. Should be used to close * channels and streams and release resources. * After this method returns without an error, the output is assumed to be correct. *

* When this method is called, the output format it guaranteed to be opened. * * @throws IOException Thrown, if the input could not be closed properly. */ void close() throws IOException; }

-- *来自志愿者整理的flink邮件归档



参考答案:

是的,感觉你是对的。 JdbcOutputFormat 会被 wrap 在 OutputFormatSinkFunction 中,而 OutputFormatSinkFunction 没有继承 CheckpointedFunction,所以没法在 snapshotState 时候调用format.flush。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370050?spm=a2c6h.13066369.question.15.33bf585fBiNQCm

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
5月前
|
数据采集 监控 Oracle
实时计算 Flink版产品使用问题之如何从Oracle物理备用库中进行实时数据抽取
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
关系型数据库 MySQL 数据库
实时计算 Flink版操作报错合集之网络缓冲池(NetworkBufferPool)中可用内存不足,该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
Linux Shell 虚拟化
使用LiME收集主机物理内存的内容时发生宕机
使用LiME收集主机物理内存的内容时发生宕机
|
4月前
crash —— 获取物理内存布局信息
crash —— 获取物理内存布局信息
|
5月前
|
NoSQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之全量同步的内存释放该怎么实现
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL 存储 关系型数据库
实时计算 Flink版产品使用问题之同步MySQL多张表的过程中,内存释放依赖于什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
开发者 Java
JVM内存问题之top命令的物理内存信息中,'used'和'free','avail Mem'分别表示什么
JVM内存问题之top命令的物理内存信息中,'used'和'free','avail Mem'分别表示什么
|
6月前
|
存储 缓存 监控
Flink内存管理机制及其参数调优
Flink内存管理机制及其参数调优
|
6月前
|
SQL Java 调度
实时计算 Flink版产品使用问题之使用Spring Boot启动Flink处理任务时,使用Spring Boot的@Scheduled注解进行定时任务调度,出现内存占用过高,该怎么办
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
缓存 关系型数据库 MySQL
实时计算 Flink版产品使用问题之缓存内存占用较大一般是什么导致的
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

相关产品

  • 实时计算 Flink版