Flink报错问题之flink-1.11写hive报错如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:JobManager responsible for xxx lost the leadersh

JobManager responsible for ff2118284beed21ac220ee7cc0a639c0 lost the

leadership.

这种错误原因是什么,会导致任务重启,本身压力大,突然重启使用10分钟前的ckpt,压力更大了。*来自志愿者整理的flink邮件归档



参考答案:

你是 on-yarn 的模式吗? JobManager 并不是 worker,只是控制 Checkpoint ,接收 TM 的心跳等,可以看下在这个之前的其它日志。 还可以看下 ZK 是否正常等。 On-yarn 的话,也可以看下 NM 对这个AM处理 的日志。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371626?spm=a2c6h.13066369.question.79.6ad26382lyqdWM



问题二:keyBy的数据均衡性

我这边遇到一个情况比较奇怪。

(1)一整天数据的统计信息如下:

sid+subid+browser+ip: 13068577

sid+subid+browser+uid: 2962237

如上,sid和subid是渠道和子渠道,browser是浏览器,ip和uid都是一个相对变化较大的维度。

数字是distinct count信息。

(2)任务逻辑

流A,分别基于sid+subid+browser+ip和sid+subid+browser+uid组合维护做统计。window算子并发都是10。结果是sid+subid+browser+ip的window算子收到数据基本均衡(1.09G~1.48G),此处是指运行一段时间后。但sid+subid+browser+uid算子收到数据却很不均衡(230MB~6.84G)。

我的疑问是,虽然keyBy不能完全均衡,这很好理解。但是差距也太奇葩了。230MB和6.84G。

而且从统计信息看uid的确没有ip区分度大。但 sid+subid+browser+uid 的组合数达到 296w,并发才10,会这么不均衡的嘛?*来自志愿者整理的flink邮件归档



参考答案:

感觉好像有道理哈哈。

分析下:sid+subid+browser+uid 一共大约300w假设,*sid+subid+browser *则假设是300个。

那么uid=0的存在300种组合,即 *300w种组合 *中有 *300种组合(uid=0) *是相对大概率出现的。

那么这300种大概率出现的组合如果碰巧分布不够均衡,就会导致window算子部分不均衡。

之前我考虑了uid的问题,但想的是hash是一堆字段一起哈希,uid自身不均衡不会导致问题。但基于如上分析,貌似是有问题的。因为uid=0的组合数的

规模太小(300),如果这个规模稍微大点的话,uid的不均衡就不会导致这个问题了可能。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371627?spm=a2c6h.13066369.question.78.6ad26382oTAPra



问题三:union all 丢失部分数据

开发者好: 目前有此场景:算不同部门的总收入和所有部门的总收入,打算把两部分SQL union all ,但是实际情况发现 union all的时候会丢一部分数据,要么是各个部门的数据少了,要么是所有部门的总收入少了 如果把union all 的两段SQL 分别独立出来,插入同一张表,那么数据就是正常的,不知道是否是bug还是使用方法不对

原sql :

insert into dws_XXXX

select 0 as id ,cast (DATE_FORMAT(LOCALTIMESTAMP, 'yyyyMMddHH') as bigint) as ftime ,case when dept_name like '%XX%' then 'X1' when dept_name = 'xXX' then 'X2' else 'X3' end as paytype ,count(orderid) as paynum_h ,round(sum(amt)) as paymoney_h from dwd_XXX where write_time >=DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd') group by DATE_FORMAT(LOCALTIMESTAMP, 'yyyyMMddHH'), case when dept_name like '%XX%' then 'X1' when dept_name = 'xXX' then 'X2' else 'X3' end ;

union all

select 0 as id ,cast (DATE_FORMAT(LOCALTIMESTAMP, 'yyyyMMddHH') as int) as ftime ,'all' as paytype ,count(orderid) as paynum_h ,round(sum(amt)) as paymoney_h from dwd_XXX where write_time >=DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd') group by DATE_FORMAT(LOCALTIMESTAMP, 'yyyyMMddHH') ;*来自志愿者整理的flink邮件归档



参考答案:

你的 flink 版本是什么呢? 根据你的 SQL,如果是版本是 <= 1.10 的话,会根据 MetaDataHander 识别出你的 group by 后面的 key 作为 upsert key,这样就会产生覆盖的情况。 你看下结果是否是这种情况的?*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371628?spm=a2c6h.13066369.question.81.6ad26382TqkdaS



问题四:flink 1.11.0 chk超时

http://apache-flink.147419.n8.nabble.com/file/t538/QQ%E6%88%AA%E5%9B%BE20201105165123.jpg http://apache-flink.147419.n8.nabble.com/file/t538/QQ%E6%88%AA%E5%9B%BE20201105165200.jpg chk的历史如图,第三个subtask未能ack,同时在TM中只能找到如下信息:

2020-11-05 13:13:38,101 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 2 checkpointing for checkpoint with id=16 (max part counter=6). 2020-11-05 13:13:38,143 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 1 checkpointing for checkpoint with id=16 (max part counter=0). 2020-11-05 13:14:37,779 WARN org.apache.kafka.clients.NetworkClient [] - Connection to node -3 could not be established. Broker may not be available. 2020-11-05 13:14:37,786 WARN org.apache.kafka.clients.NetworkClient [] - Connection to node -2 could not be established. Broker may not be available. 2020-11-05 13:33:38,115 INFO org.apache.flink.runtime.taskmanager.Task [] - Attempting to cancel task Source: Custom Source -> Process -> (Sink: ***, Sink: *** sink, Sink: ***) (3/3) (68bfa6305a9aa5a7381b9ca4a8fef2fa).

请路过的大佬们指点下排查方式,多谢(当前日志级别暂无法改成debug)*来自志愿者整理的flink邮件归档



参考答案:

CP 超时的原因一般是因任务而议的。从你提供的 2 张截图来看,卡在第二个 operator 的 subtask3 上。 上下两个 operator 之间的关系是 forworad 还是 reblance 呢?如果是 forward 的话,可以看下是不是数据倾斜,subtask3 需要处理的数据量比较多。 如果是 reblance 的话,以为 subtask1 和 subtask2 都成功了,所以上游的 barrier 应该都往下发了,所以 subtask3也收到了上游的 barrier,而 reblance 数据量都一样,所以可以看下是不是 sink 出去太慢导致。 查看任务一般可以看下任务的日志, GC,采堆栈,画火焰图等。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371633?spm=a2c6h.13066369.question.82.6ad26382EgonTS



问题五:flink-1.11 写 hive 报错

flink 读 kafka 写 hive,之前运行好好的。在IDEA也能正常运行,打成jar包,提交到 flink 集群,报错如下。请问是什么原因?

2020-11-05 15:34:36 org.apache.flink.connectors.hive.FlinkHiveException: org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create Hive RecordWriter at org.apache.flink.connectors.hive.write.HiveWriterFactory.createRecordWriter(HiveWriterFactory.java:159) at org.apache.flink.connectors.hive.write.HiveBulkWriterFactory.create(HiveBulkWriterFactory.java:47) at org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:257) at org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:230) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104) at org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at StreamExecCalc$43.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:123) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at StreamExecCalc$19.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create Hive RecordWriter at org.apache.flink.table.catalog.hive.client.HiveShimV110.getHiveRecordWriter(HiveShimV110.java:58) at org.apache.flink.connectors.hive.write.HiveWriterFactory.createRecordWriter(HiveWriterFactory.java:151) ... 40 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.GeneratedMethodAccessor125.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.table.catalog.hive.client.HiveShimV110.getHiveRecordWriter(HiveShimV110.java:55) ... 41 more Caused by: java.lang.NoSuchFieldError: IGNORE_CLIENT_LOCALITY at org.apache.hadoop.hdfs.DFSOutputStream. (DFSOutputStream.java:204) at org.apache.hadoop.hdfs.DFSOutputStream. (DFSOutputStream.java:247) at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:313) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1182) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1161) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1099) at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:464) at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:461) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:475) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:402) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1103) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1083) at org.apache.parquet.hadoop.ParquetFileWriter. (ParquetFileWriter.java:218) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:312) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:288) at org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper. (ParquetRecordWriterWrapper.java:67) at org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getParquerRecordWriterWrapper(MapredParquetOutputFormat.java:126) at org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getHiveRecordWriter(MapredParquetOutputFormat.java:115) at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getRecordWriter(HiveFileFormatUtils.java:284) ... 45 more*来自志愿者整理的flink邮件归档



参考答案:

感觉像是依赖冲突,hive和Hadoop的版本是什么呢?*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371647?spm=a2c6h.13066369.question.83.6ad26382ZFlwVi

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
SQL DataWorks 关系型数据库
DataWorks操作报错合集之如何处理数据同步时(mysql->hive)报:Render instance failed
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
2月前
|
SQL Shell API
实时计算 Flink版操作报错合集之任务提交后出现 "cannot run program "/bin/bash": error=1, 不允许操作" ,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之任务提交后出现 "cannot run program "/bin/bash": error=1, 不允许操作" ,是什么原因
|
1月前
|
SQL Java 关系型数据库
Hive常见的报错信息
文章列举了Hive常见的几种报错信息,并提供了错误复现、原因分析以及相应的解决方案。
25 1
|
2月前
|
SQL Java Apache
实时计算 Flink版操作报错合集之使用parquet时,怎么解决报错:无法访问到java.uti.Arrays$ArrayList类的私有字段
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
2月前
|
Oracle 关系型数据库 Java
实时计算 Flink版操作报错合集之遇到了关于MySqIValidator类缺失的错误,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
2月前
|
SQL 关系型数据库 HIVE
实时计算 Flink版产品使用问题之如何将PostgreSQL数据实时入库Hive并实现断点续传
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
SQL 分布式计算 数据处理
实时计算 Flink版产品使用问题之怎么将数据从Hive表中读取并写入到另一个Hive表中
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL 数据采集 数据挖掘
大数据行业应用之Hive数据分析航班线路相关的各项指标
大数据行业应用之Hive数据分析航班线路相关的各项指标
173 1
|
3月前
|
SQL 分布式计算 大数据
大数据处理平台Hive详解
【7月更文挑战第15天】Hive作为基于Hadoop的数据仓库工具,在大数据处理和分析领域发挥着重要作用。通过提供类SQL的查询语言,Hive降低了数据处理的门槛,使得具有SQL背景的开发者可以轻松地处理大规模数据。然而,Hive也存在查询延迟高、表达能力有限等缺点,需要在实际应用中根据具体场景和需求进行选择和优化。

相关产品

  • 实时计算 Flink版
  • 下一篇
    无影云桌面