问题一:flink1.12.1 Sink数据到ES7,遇到 问题
报错日志如下:我的flink sql 已用的 flink-sql-connector-elasticsearch7,代码里使用的flink-connector-elasticsearch7,然后在同一个flink运行,就会报这个错误
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:339) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:636) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:609) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:549) at org.apache.flink.streaming.runtime.tasks.OperatorChain. (OperatorChain.java:170) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:509) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: unexpected exception type at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1682) at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1254) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2076) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541) at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:323) ... 9 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230) at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1248) ... 33 more Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization at org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink$Builder.$deserializeLambda$(ElasticsearchSink.java:86) ... 42 more*来自志愿者整理的flink邮件归档
参考答案:
可以参考下[1], 如果是相同的问题,将依赖改为flink-connector-elasticsearch
[1] https://issues.apache.org/jira/browse/FLINK-18857来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370045?spm=a2c6h.13066369.question.13.33bf585fihaMnT
问题二:Flink SQL 1.11支持将数据写入到Hive吗?
看官网介绍是支持的:
但是找对应的连接器是没有Hive,是JDBC?
*来自志愿者整理的flink邮件归档
参考答案:
1.11的文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_read_write.html
1.12的文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hive/*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370047?spm=a2c6h.13066369.question.12.33bf585fO4ML1p
问题三:请问flink 1.12如何使用RateLimiter哈?
我这里有个场景是这样的,kafka里已经有一段时间的数据,读取的时候会一股脑的都读进来,我想模拟数据是刚进来的状态,比如数据是5秒一条,我就5秒钟读1条
*来自志愿者整理的flink邮件归档
参考答案:
如果使用 FlinkKafkaConsumer010 的话,可以调用 FlinkKafkaConsumer010#setRateLimiter(new GuavaFlinkConnectorRateLimiter().setRate) https://github.com/apache/flink/blob/fe3613574f76201a8d55d572a639a4ce7e18a9db/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java#L353
PS: 目前最新版本已经将 FlinkKafkaConsumer010, FlinkKafkaConsumer011 都删除了,只留一个Consumer,目前没有可以设置的入口,可关注 issue[1]. [1]https://issues.apache.org/jira/browse/FLINK-18740*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370048?spm=a2c6h.13066369.question.15.33bf585fu7TcVY
问题四:flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存
这面还想多请教一下:
我程序中每来一条数据都会去读MapState然后覆盖写入新的时间戳,刚刚发现某一条数据读出了两条一样的时间戳,我推断是第一个线程读出来后还没等覆盖掉,第二个线程又读了一遍,导致出现两条一样的时间戳;
所以想请问flink中MapState是线程安全的吗? *来自志愿者整理的flink邮件归档
参考答案:
是线程安全的,mapstate也是keyed state,同一个key的state肯定是同一个线程处理的*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370049?spm=a2c6h.13066369.question.16.33bf585fgGTSRN
问题五:flink jdbc connector 在checkpoint的时候出问题
JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为 OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的 代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。
我的问题是:是否有办法强制刷新buffer中的数据入库?
@Public public interface OutputFormat extends Serializable {
/** * Configures this output format. Since output formats are instantiated generically and hence parameterless, * this method is the place where the output formats set their basic fields based on configuration values. *
* This method is always called first on a newly instantiated output format. * * @param parameters The configuration with all parameters. */ void configure(Configuration parameters);
/** * Opens a parallel instance of the output format to store the result of its parallel instance. *
* When this method is called, the output format it guaranteed to be configured. * * @param taskNumber The number of the parallel instance. * @param numTasks The number of parallel tasks. * @throws IOException Thrown, if the output could not be opened due to an I/O problem. */ void open(int taskNumber, int numTasks) throws IOException;
/** * Adds a record to the output. *
* When this method is called, the output format it guaranteed to be opened. * * @param record The records to add to the output. * @throws IOException Thrown, if the records could not be added to to an I/O problem. */ void writeRecord(IT record) throws IOException;
/** * Method that marks the end of the life-cycle of parallel output instance. Should be used to close * channels and streams and release resources. * After this method returns without an error, the output is assumed to be correct. *
* When this method is called, the output format it guaranteed to be opened. * * @throws IOException Thrown, if the input could not be closed properly. */ void close() throws IOException; }
-- *来自志愿者整理的flink邮件归档
参考答案:
是的,感觉你是对的。 JdbcOutputFormat
会被 wrap 在 OutputFormatSinkFunction
中,而 OutputFormatSinkFunction
没有继承 CheckpointedFunction
,所以没法在 snapshotState 时候调用format.flush。*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370050?spm=a2c6h.13066369.question.15.33bf585fBiNQCm