Flink内存问题之超出物理内存如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:flink1.12.1 Sink数据到ES7,遇到 问题

报错日志如下:我的flink sql 已用的 flink-sql-connector-elasticsearch7,代码里使用的flink-connector-elasticsearch7,然后在同一个flink运行,就会报这个错误

Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:339) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:636) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:609) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:549) at org.apache.flink.streaming.runtime.tasks.OperatorChain. (OperatorChain.java:170) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:509) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: unexpected exception type at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1682) at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1254) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2076) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541) at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:323) ... 9 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230) at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1248) ... 33 more Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization at org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink$Builder.$deserializeLambda$(ElasticsearchSink.java:86) ... 42 more*来自志愿者整理的flink邮件归档



参考答案:

可以参考下[1], 如果是相同的问题,将依赖改为flink-connector-elasticsearch

[1] https://issues.apache.org/jira/browse/FLINK-18857来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370045?spm=a2c6h.13066369.question.13.33bf585fihaMnT



问题二:Flink SQL 1.11支持将数据写入到Hive吗?

看官网介绍是支持的:

但是找对应的连接器是没有Hive,是JDBC?

*来自志愿者整理的flink邮件归档



参考答案:

1.11的文档:

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_read_write.html

1.12的文档:

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hive/*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370047?spm=a2c6h.13066369.question.12.33bf585fO4ML1p



问题三:请问flink 1.12如何使用RateLimiter哈?

我这里有个场景是这样的,kafka里已经有一段时间的数据,读取的时候会一股脑的都读进来,我想模拟数据是刚进来的状态,比如数据是5秒一条,我就5秒钟读1条

*来自志愿者整理的flink邮件归档



参考答案:

如果使用 FlinkKafkaConsumer010 的话,可以调用 FlinkKafkaConsumer010#setRateLimiter(new GuavaFlinkConnectorRateLimiter().setRate) https://github.com/apache/flink/blob/fe3613574f76201a8d55d572a639a4ce7e18a9db/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java#L353

PS: 目前最新版本已经将 FlinkKafkaConsumer010, FlinkKafkaConsumer011 都删除了,只留一个Consumer,目前没有可以设置的入口,可关注 issue[1]. [1]https://issues.apache.org/jira/browse/FLINK-18740*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370048?spm=a2c6h.13066369.question.15.33bf585fu7TcVY



问题四:flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

这面还想多请教一下:

我程序中每来一条数据都会去读MapState然后覆盖写入新的时间戳,刚刚发现某一条数据读出了两条一样的时间戳,我推断是第一个线程读出来后还没等覆盖掉,第二个线程又读了一遍,导致出现两条一样的时间戳;

所以想请问flink中MapState是线程安全的吗? *来自志愿者整理的flink邮件归档



参考答案:

是线程安全的,mapstate也是keyed state,同一个key的state肯定是同一个线程处理的*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370049?spm=a2c6h.13066369.question.16.33bf585fgGTSRN



问题五:flink jdbc connector 在checkpoint的时候出问题

JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为 OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的 代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。

我的问题是:是否有办法强制刷新buffer中的数据入库?

@Public public interface OutputFormat extends Serializable {

/** * Configures this output format. Since output formats are instantiated generically and hence parameterless, * this method is the place where the output formats set their basic fields based on configuration values. *

* This method is always called first on a newly instantiated output format. * * @param parameters The configuration with all parameters. */ void configure(Configuration parameters);

/** * Opens a parallel instance of the output format to store the result of its parallel instance. *

* When this method is called, the output format it guaranteed to be configured. * * @param taskNumber The number of the parallel instance. * @param numTasks The number of parallel tasks. * @throws IOException Thrown, if the output could not be opened due to an I/O problem. */ void open(int taskNumber, int numTasks) throws IOException;

/** * Adds a record to the output. *

* When this method is called, the output format it guaranteed to be opened. * * @param record The records to add to the output. * @throws IOException Thrown, if the records could not be added to to an I/O problem. */ void writeRecord(IT record) throws IOException;

/** * Method that marks the end of the life-cycle of parallel output instance. Should be used to close * channels and streams and release resources. * After this method returns without an error, the output is assumed to be correct. *

* When this method is called, the output format it guaranteed to be opened. * * @throws IOException Thrown, if the input could not be closed properly. */ void close() throws IOException; }

-- *来自志愿者整理的flink邮件归档



参考答案:

是的,感觉你是对的。 JdbcOutputFormat 会被 wrap 在 OutputFormatSinkFunction 中,而 OutputFormatSinkFunction 没有继承 CheckpointedFunction,所以没法在 snapshotState 时候调用format.flush。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370050?spm=a2c6h.13066369.question.15.33bf585fBiNQCm

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
存储 算法 关系型数据库
实时计算 Flink版产品使用合集之在Flink Stream API中,可以在任务启动时初始化一些静态的参数并将其存储在内存中吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
96 4
|
10天前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之 从Kafka读取数据,并与两个仅在任务启动时读取一次的维度表进行内连接(inner join)时,如果没有匹配到的数据会被直接丢弃还是会被存储在内存中
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用合集之idea本地测试代码,要增大 Flink CDC 在本地 IDEA 测试环境中的内存大小如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
算法 内存技术
深入理解操作系统内存管理:从虚拟内存到物理内存的旅程
【5月更文挑战第24天】 在现代计算机系统中,操作系统的内存管理是确保系统高效稳定运行的关键组成部分。本文将探讨操作系统是如何通过虚拟内存到物理内存的映射机制,实现对内存资源的高效管理和保护。我们将剖析分页和分段两种主要的内存管理技术,并讨论它们如何协同工作以提供内存抽象、重定位、共享和保护。文章还将涉及虚拟内存的技术细节,包括页面置换算法和内存分配策略,以及它们对系统性能的影响。
|
1月前
|
关系型数据库 MySQL Java
实时计算 Flink版操作报错之整内存和cpu分配之后启动报错如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
1月前
|
消息中间件 SQL Java
实时计算 Flink版产品使用合集之管理内存webui上一直是百分百是什么导致的
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
SQL 资源调度 关系型数据库
实时计算 Flink版产品使用合集之可以使用高并发大内存的方式读取存量数据吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
关系型数据库 MySQL Java
实时计算 Flink版产品使用合集之是否支持内存表的创建
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
SQL Java 中间件
实时计算 Flink版产品使用合集之在进行全量拉取时,任务完成之后内存没有被完全释放如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
27天前
|
存储 缓存 算法
深入理解操作系统内存管理:从虚拟内存到物理内存
【5月更文挑战第30天】操作系统的心脏——内存管理,在系统性能和稳定性中扮演着关键角色。本文将深入探讨操作系统中的内存管理机制,特别是虚拟内存与物理内存之间的映射关系、分页机制以及内存分配策略。通过分析现代操作系统如何处理内存资源,我们可以更好地理解计算机系统的内部工作原理,并掌握提升系统性能的关键因素。

热门文章

最新文章

相关产品

  • 实时计算 Flink版