Flink报错问题之flink-1.11写hive报错如何解决

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:JobManager responsible for xxx lost the leadersh

JobManager responsible for ff2118284beed21ac220ee7cc0a639c0 lost the

leadership.

这种错误原因是什么,会导致任务重启,本身压力大,突然重启使用10分钟前的ckpt,压力更大了。*来自志愿者整理的flink邮件归档



参考答案:

你是 on-yarn 的模式吗? JobManager 并不是 worker,只是控制 Checkpoint ,接收 TM 的心跳等,可以看下在这个之前的其它日志。 还可以看下 ZK 是否正常等。 On-yarn 的话,也可以看下 NM 对这个AM处理 的日志。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371626?spm=a2c6h.13066369.question.79.6ad26382lyqdWM



问题二:keyBy的数据均衡性

我这边遇到一个情况比较奇怪。

(1)一整天数据的统计信息如下:

sid+subid+browser+ip: 13068577

sid+subid+browser+uid: 2962237

如上,sid和subid是渠道和子渠道,browser是浏览器,ip和uid都是一个相对变化较大的维度。

数字是distinct count信息。

(2)任务逻辑

流A,分别基于sid+subid+browser+ip和sid+subid+browser+uid组合维护做统计。window算子并发都是10。结果是sid+subid+browser+ip的window算子收到数据基本均衡(1.09G~1.48G),此处是指运行一段时间后。但sid+subid+browser+uid算子收到数据却很不均衡(230MB~6.84G)。

我的疑问是,虽然keyBy不能完全均衡,这很好理解。但是差距也太奇葩了。230MB和6.84G。

而且从统计信息看uid的确没有ip区分度大。但 sid+subid+browser+uid 的组合数达到 296w,并发才10,会这么不均衡的嘛?*来自志愿者整理的flink邮件归档



参考答案:

感觉好像有道理哈哈。

分析下:sid+subid+browser+uid 一共大约300w假设,*sid+subid+browser *则假设是300个。

那么uid=0的存在300种组合,即 *300w种组合 *中有 *300种组合(uid=0) *是相对大概率出现的。

那么这300种大概率出现的组合如果碰巧分布不够均衡,就会导致window算子部分不均衡。

之前我考虑了uid的问题,但想的是hash是一堆字段一起哈希,uid自身不均衡不会导致问题。但基于如上分析,貌似是有问题的。因为uid=0的组合数的

规模太小(300),如果这个规模稍微大点的话,uid的不均衡就不会导致这个问题了可能。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371627?spm=a2c6h.13066369.question.78.6ad26382oTAPra



问题三:union all 丢失部分数据

开发者好: 目前有此场景:算不同部门的总收入和所有部门的总收入,打算把两部分SQL union all ,但是实际情况发现 union all的时候会丢一部分数据,要么是各个部门的数据少了,要么是所有部门的总收入少了 如果把union all 的两段SQL 分别独立出来,插入同一张表,那么数据就是正常的,不知道是否是bug还是使用方法不对

原sql :

insert into dws_XXXX

select 0 as id ,cast (DATE_FORMAT(LOCALTIMESTAMP, 'yyyyMMddHH') as bigint) as ftime ,case when dept_name like '%XX%' then 'X1' when dept_name = 'xXX' then 'X2' else 'X3' end as paytype ,count(orderid) as paynum_h ,round(sum(amt)) as paymoney_h from dwd_XXX where write_time >=DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd') group by DATE_FORMAT(LOCALTIMESTAMP, 'yyyyMMddHH'), case when dept_name like '%XX%' then 'X1' when dept_name = 'xXX' then 'X2' else 'X3' end ;

union all

select 0 as id ,cast (DATE_FORMAT(LOCALTIMESTAMP, 'yyyyMMddHH') as int) as ftime ,'all' as paytype ,count(orderid) as paynum_h ,round(sum(amt)) as paymoney_h from dwd_XXX where write_time >=DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd') group by DATE_FORMAT(LOCALTIMESTAMP, 'yyyyMMddHH') ;*来自志愿者整理的flink邮件归档



参考答案:

你的 flink 版本是什么呢? 根据你的 SQL,如果是版本是 <= 1.10 的话,会根据 MetaDataHander 识别出你的 group by 后面的 key 作为 upsert key,这样就会产生覆盖的情况。 你看下结果是否是这种情况的?*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371628?spm=a2c6h.13066369.question.81.6ad26382TqkdaS



问题四:flink 1.11.0 chk超时

http://apache-flink.147419.n8.nabble.com/file/t538/QQ%E6%88%AA%E5%9B%BE20201105165123.jpg http://apache-flink.147419.n8.nabble.com/file/t538/QQ%E6%88%AA%E5%9B%BE20201105165200.jpg chk的历史如图,第三个subtask未能ack,同时在TM中只能找到如下信息:

2020-11-05 13:13:38,101 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 2 checkpointing for checkpoint with id=16 (max part counter=6). 2020-11-05 13:13:38,143 INFO org.apache.flink.streaming.api.functions.sink.filesystem.Buckets [] - Subtask 1 checkpointing for checkpoint with id=16 (max part counter=0). 2020-11-05 13:14:37,779 WARN org.apache.kafka.clients.NetworkClient [] - Connection to node -3 could not be established. Broker may not be available. 2020-11-05 13:14:37,786 WARN org.apache.kafka.clients.NetworkClient [] - Connection to node -2 could not be established. Broker may not be available. 2020-11-05 13:33:38,115 INFO org.apache.flink.runtime.taskmanager.Task [] - Attempting to cancel task Source: Custom Source -> Process -> (Sink: ***, Sink: *** sink, Sink: ***) (3/3) (68bfa6305a9aa5a7381b9ca4a8fef2fa).

请路过的大佬们指点下排查方式,多谢(当前日志级别暂无法改成debug)*来自志愿者整理的flink邮件归档



参考答案:

CP 超时的原因一般是因任务而议的。从你提供的 2 张截图来看,卡在第二个 operator 的 subtask3 上。 上下两个 operator 之间的关系是 forworad 还是 reblance 呢?如果是 forward 的话,可以看下是不是数据倾斜,subtask3 需要处理的数据量比较多。 如果是 reblance 的话,以为 subtask1 和 subtask2 都成功了,所以上游的 barrier 应该都往下发了,所以 subtask3也收到了上游的 barrier,而 reblance 数据量都一样,所以可以看下是不是 sink 出去太慢导致。 查看任务一般可以看下任务的日志, GC,采堆栈,画火焰图等。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371633?spm=a2c6h.13066369.question.82.6ad26382EgonTS



问题五:flink-1.11 写 hive 报错

flink 读 kafka 写 hive,之前运行好好的。在IDEA也能正常运行,打成jar包,提交到 flink 集群,报错如下。请问是什么原因?

2020-11-05 15:34:36 org.apache.flink.connectors.hive.FlinkHiveException: org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create Hive RecordWriter at org.apache.flink.connectors.hive.write.HiveWriterFactory.createRecordWriter(HiveWriterFactory.java:159) at org.apache.flink.connectors.hive.write.HiveBulkWriterFactory.create(HiveBulkWriterFactory.java:47) at org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:257) at org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedBucketWriter.openNewInProgressFile(HadoopPathBasedPartFileWriter.java:230) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104) at org.apache.flink.table.filesystem.stream.StreamingFileWriter.processElement(StreamingFileWriter.java:118) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at StreamExecCalc$43.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperator.processElement(WatermarkAssignerOperator.java:123) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at StreamExecCalc$19.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create Hive RecordWriter at org.apache.flink.table.catalog.hive.client.HiveShimV110.getHiveRecordWriter(HiveShimV110.java:58) at org.apache.flink.connectors.hive.write.HiveWriterFactory.createRecordWriter(HiveWriterFactory.java:151) ... 40 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.GeneratedMethodAccessor125.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.table.catalog.hive.client.HiveShimV110.getHiveRecordWriter(HiveShimV110.java:55) ... 41 more Caused by: java.lang.NoSuchFieldError: IGNORE_CLIENT_LOCALITY at org.apache.hadoop.hdfs.DFSOutputStream. (DFSOutputStream.java:204) at org.apache.hadoop.hdfs.DFSOutputStream. (DFSOutputStream.java:247) at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:313) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1182) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1161) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1099) at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:464) at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:461) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:475) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:402) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1103) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1083) at org.apache.parquet.hadoop.ParquetFileWriter. (ParquetFileWriter.java:218) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:312) at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:288) at org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper. (ParquetRecordWriterWrapper.java:67) at org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getParquerRecordWriterWrapper(MapredParquetOutputFormat.java:126) at org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getHiveRecordWriter(MapredParquetOutputFormat.java:115) at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getRecordWriter(HiveFileFormatUtils.java:284) ... 45 more*来自志愿者整理的flink邮件归档



参考答案:

感觉像是依赖冲突,hive和Hadoop的版本是什么呢?*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/371647?spm=a2c6h.13066369.question.83.6ad26382ZFlwVi

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
6月前
|
SQL 分布式计算 数据库
【YashanDB 知识库】Hive 命令工具 insert 崖山数据库报错
【YashanDB 知识库】Hive 命令工具 insert 崖山数据库报错
|
6月前
|
SQL 分布式计算 关系型数据库
【YashanDB知识库】hive初始化崖山报错YAS-04209
【YashanDB知识库】hive初始化崖山报错YAS-04209
|
6月前
|
SQL 分布式计算 数据库
【YashanDB知识库】Hive 命令工具insert崖山数据库报错
【YashanDB知识库】Hive 命令工具insert崖山数据库报错
|
6月前
|
SQL 分布式计算 关系型数据库
【YashanDB知识库】hive初始化崖山报错YAS-04209
【YashanDB知识库】hive初始化崖山报错YAS-04209
|
SQL Java 关系型数据库
Hive常见的报错信息
文章列举了Hive常见的几种报错信息,并提供了错误复现、原因分析以及相应的解决方案。
1333 1
|
SQL 关系型数据库 HIVE
实时计算 Flink版产品使用问题之如何将PostgreSQL数据实时入库Hive并实现断点续传
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
SQL 数据采集 数据挖掘
大数据行业应用之Hive数据分析航班线路相关的各项指标
大数据行业应用之Hive数据分析航班线路相关的各项指标
340 1
|
4月前
|
SQL 分布式计算 大数据
大数据新视界 --大数据大厂之Hive与大数据融合:构建强大数据仓库实战指南
本文深入介绍 Hive 与大数据融合构建强大数据仓库的实战指南。涵盖 Hive 简介、优势、安装配置、数据处理、性能优化及安全管理等内容,并通过互联网广告和物流行业案例分析,展示其实际应用。具有专业性、可操作性和参考价值。
大数据新视界 --大数据大厂之Hive与大数据融合:构建强大数据仓库实战指南
|
11月前
|
SQL 分布式计算 Java
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
大数据-96 Spark 集群 SparkSQL Scala编写SQL操作SparkSQL的数据源:JSON、CSV、JDBC、Hive
219 0

相关产品

  • 实时计算 Flink版