Flink问题之本地集群报错如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:pyflink使用的一些疑问


你好, 最近项目想使用flink进行分布式计算,之前项目是Python的pandas项目,想尝试用pyflink进行项目改造,在使用dataset做批处理时,对于Java的版本没有相关map

reduce函数,所以有以下疑问:

1.Python flink的SDK还没支持dataset吗?

2.是不是有其他替代方法?

3.如果还没支持,有计划支持的时间吗?

4.flink table为啥不支持map reduce操作?

5.我们项目使用dataframe来处理数据,能放到flink上做分布式运算吗?dataframe直接转化为table的方式,table不支持map

reduce操作,对应pandas项目改造成flink,有什么好的建议么?

  1. datastream api为什么没有实现Windows方法?后面版本会支持吗?

非常感谢,十分看好flink,希望社区越做越大,辛苦了!*来自志愿者整理的flink邮件归档


参考回答:

您好,

目前同样在做pyflink 结合pandas的分布式计算调研和尝试,对于您的问题,仅有一些经验性的分享。

pyflink以后应该都会集成到DataStream,所以应该不会再支持DataSet;

不建议在计算中间采用 table.to_pandas()的方式进行table和dataFrame互转,会影响计算效率;

目前采用的计算效率较好的方式,是定义pandas类型的udf/udaf方式,但相较java版接口同样的方式,pyflink还是会慢很多;

个人感觉,pyflink耗时较多的地方,还是sql_query的操作,相同sql语句,执行效率上较java差别还是很大的。

以上仅个人使用感觉,若存在问题,欢迎路过大佬批评指正~

还有,因为调研相同领域,希望能交流调研新发现,感谢~祝好


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359317?spm=a2c6h.13262185.0.0.54e839c0D2mgIx


问题二:Pyflink 提交到本地集群报错怎么办?


我在terminal中用python xx.py文件就可以执行,然而用flink run -m localhost:8081 -py xx.py就会报上面的错误说没有pyflink的组件。 (base) huilin@huilin-Lenovo:~/Documents/Learning/experiment$ flink run -m localhost:8081 -py demo_predict.py Traceback (most recent call last): File "demo_predict.py", line 51, in from pyflink.common.serialization import SimpleStringEncoder ModuleNotFoundError: No module named 'pyflink.common.serialization'

我已经试了很多方法,创建了虚拟环境在里面安装了对应的包,还是不行。请问有什么解决办法?*来自志愿者整理的flink邮件归档


参考回答:

从报错看,似乎是作业运行的时候,找不到pyflink,如果确实是这样的话,有几个解决方案:

  • 通过API指定集群端的Python路径: set_python_executable,参考 [1]
  • 通过配置python.executable,参考[2]

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/dependency_management.html

[2]

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/python_config.html#python-executable


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359318?spm=a2c6h.13262185.0.0.54e839c0D2mgIx


问题三:flink sql 的 count(distinct )问题


各位大佬好,想问下flinksql里的count (distinct)默认是用哪种state保存的状态*来自志愿者整理的flink邮件归档


参考回答:

Hi,

你可以理解为用的是MapState来保存的状态。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359320?spm=a2c6h.13262185.0.0.54e839c0D2mgIx


问题四:Temproal Tables 请教下大家,官网中对于时态表的定义的案例是基于debezium的,我现在具基于canal这样定义有问题吗?


create table produce( id string, name string, price decimal(10,4) update_time timestamp(3) metadata from ‘timestamp’ virtual, primary key(id) not enforced, watermark for update_time as update_time )with( ‘connector’=‘Kafka’, ‘topic’=‘product’, ‘scan.startup.mode’=‘earliest-offset’, ‘properties.bootstrap.server’=‘localhost:9092’ ‘value.format’=‘canal-Jason’ );

主要是 metadata那块 填写timestamp 对吗?*来自志愿者整理的flink邮件归档


参考回答:

这个问题我解决了,这样定义应该是可以


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359323?spm=a2c6h.13262185.0.0.54e839c0D2mgIx


问题五:flink1.12版本,使用yarn-application模式提交任务失败怎么办?


通过脚本提交flink作业,提交命令: /bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirs="hdfs://xx/flink120/" hdfs://xx/flink-example.jar --sqlFilePath /xxx/kafka2print.sql

flink使用的Lib及user jar已经上传到Hdfs路径,但是抛出以下错误:

The program finished with the following exception:

org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn Application Cluster at org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:465) at org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:67) at org.apache.flink.client.cli.CliFrontend.runApplication(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1061) at org.apache.flink.client.cli.CliFrontend.lambdamainmainmain10(CliFrontend.java:1136) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1136) Caused by: java.lang.IllegalArgumentException: Wrong FS: hdfs://xx/flink120/, expected: file:/// at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:648) at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:82) at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:606) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824) at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601) at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:428) at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1425) at org.apache.flink.yarn.YarnApplicationFileUploader.lambdagetAllFilesInProvidedLibDirsgetAllFilesInProvidedLibDirsgetAllFilesInProvidedLibDirs2(YarnApplicationFileUploader.java:469) at org.apache.flink.util.function.FunctionUtils.lambdauncheckedConsumeruncheckedConsumeruncheckedConsumer3(FunctionUtils.java:93) at java.util.ArrayList.forEach(ArrayList.java:1257) at org.apache.flink.yarn.YarnApplicationFileUploader.getAllFilesInProvidedLibDirs(YarnApplicationFileUploader.java:466) at org.apache.flink.yarn.YarnApplicationFileUploader. (YarnApplicationFileUploader.java:106) at org.apache.flink.yarn.YarnApplicationFileUploader.from(YarnApplicationFileUploader.java:381) at org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:789) at org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:592) at org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:458) ... 9 more*来自志愿者整理的flink邮件归档


参考回答:

Hi 从你的日志看作业启动失败的原因是: Caused by: java.lang.IllegalArgumentException: Wrong FS: hdfs://xx/flink120/, expected: file:/// 看上去你设置的地址和 需要的 schema 不一样,你需要解决一下这个问题


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359324?spm=a2c6h.13046898.publish-article.7.4d546ffal22sZX

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
zdl
|
2月前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
174 56
|
3月前
|
Kubernetes Cloud Native 流计算
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
103 3
|
3月前
|
存储 运维 监控
实时计算Flink版在稳定性、性能、开发运维、安全能力等等跟其他引擎及自建Flink集群比较。
实时计算Flink版在稳定性、性能、开发运维和安全能力等方面表现出色。其自研的高性能状态存储引擎GeminiStateBackend显著提升了作业稳定性,状态管理优化使性能提升40%以上。核心性能较开源Flink提升2-3倍,资源利用率提高100%。提供一站式开发管理、自动化运维和丰富的监控告警功能,支持多语言开发和智能调优。安全方面,具备访问控制、高可用保障和全链路容错能力,确保企业级应用的安全与稳定。
56 0
|
5月前
|
SQL Shell API
实时计算 Flink版操作报错合集之任务提交后出现 "cannot run program "/bin/bash": error=1, 不允许操作" ,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之任务提交后出现 "cannot run program "/bin/bash": error=1, 不允许操作" ,是什么原因
|
5月前
|
资源调度 监控 关系型数据库
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
|
5月前
|
消息中间件 监控 Java
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
联通实时计算平台问题之监控Kafka集群的断传和积压情况要如何操作
|
5月前
|
SQL Kubernetes 数据处理
实时计算 Flink版产品使用问题之如何把集群通过kubernetes进行部署
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL Java Apache
实时计算 Flink版操作报错合集之使用parquet时,怎么解决报错:无法访问到java.uti.Arrays$ArrayList类的私有字段
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
Oracle 关系型数据库 Java
实时计算 Flink版操作报错合集之遇到了关于MySqIValidator类缺失的错误,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
SQL 存储 资源调度
实时计算 Flink版操作报错合集之启动项目时报错缺少MySqlValidator类,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。

相关产品

  • 实时计算 Flink版