问题一:pyflink使用的一些疑问
你好, 最近项目想使用flink进行分布式计算,之前项目是Python的pandas项目,想尝试用pyflink进行项目改造,在使用dataset做批处理时,对于Java的版本没有相关map
reduce函数,所以有以下疑问:
1.Python flink的SDK还没支持dataset吗?
2.是不是有其他替代方法?
3.如果还没支持,有计划支持的时间吗?
4.flink table为啥不支持map reduce操作?
5.我们项目使用dataframe来处理数据,能放到flink上做分布式运算吗?dataframe直接转化为table的方式,table不支持map
reduce操作,对应pandas项目改造成flink,有什么好的建议么?
- datastream api为什么没有实现Windows方法?后面版本会支持吗?
非常感谢,十分看好flink,希望社区越做越大,辛苦了!*来自志愿者整理的flink邮件归档
参考回答:
您好,
目前同样在做pyflink 结合pandas的分布式计算调研和尝试,对于您的问题,仅有一些经验性的分享。
pyflink以后应该都会集成到DataStream,所以应该不会再支持DataSet;
不建议在计算中间采用 table.to_pandas()的方式进行table和dataFrame互转,会影响计算效率;
目前采用的计算效率较好的方式,是定义pandas类型的udf/udaf方式,但相较java版接口同样的方式,pyflink还是会慢很多;
个人感觉,pyflink耗时较多的地方,还是sql_query的操作,相同sql语句,执行效率上较java差别还是很大的。
以上仅个人使用感觉,若存在问题,欢迎路过大佬批评指正~
还有,因为调研相同领域,希望能交流调研新发现,感谢~祝好
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359317?spm=a2c6h.13262185.0.0.54e839c0D2mgIx
问题二:Pyflink 提交到本地集群报错怎么办?
我在terminal中用python xx.py文件就可以执行,然而用flink run -m localhost:8081 -py xx.py就会报上面的错误说没有pyflink的组件。 (base) huilin@huilin-Lenovo:~/Documents/Learning/experiment$ flink run -m localhost:8081 -py demo_predict.py Traceback (most recent call last): File "demo_predict.py", line 51, in from pyflink.common.serialization import SimpleStringEncoder ModuleNotFoundError: No module named 'pyflink.common.serialization'
我已经试了很多方法,创建了虚拟环境在里面安装了对应的包,还是不行。请问有什么解决办法?*来自志愿者整理的flink邮件归档
参考回答:
从报错看,似乎是作业运行的时候,找不到pyflink,如果确实是这样的话,有几个解决方案:
- 通过API指定集群端的Python路径: set_python_executable,参考 [1]
- 通过配置python.executable,参考[2]
[1]
[2]
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359318?spm=a2c6h.13262185.0.0.54e839c0D2mgIx
问题三:flink sql 的 count(distinct )问题
各位大佬好,想问下flinksql里的count (distinct)默认是用哪种state保存的状态*来自志愿者整理的flink邮件归档
参考回答:
Hi,
你可以理解为用的是MapState来保存的状态。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359320?spm=a2c6h.13262185.0.0.54e839c0D2mgIx
问题四:Temproal Tables 请教下大家,官网中对于时态表的定义的案例是基于debezium的,我现在具基于canal这样定义有问题吗?
create table produce( id string, name string, price decimal(10,4) update_time timestamp(3) metadata from ‘timestamp’ virtual, primary key(id) not enforced, watermark for update_time as update_time )with( ‘connector’=‘Kafka’, ‘topic’=‘product’, ‘scan.startup.mode’=‘earliest-offset’, ‘properties.bootstrap.server’=‘localhost:9092’ ‘value.format’=‘canal-Jason’ );
主要是 metadata那块 填写timestamp 对吗?*来自志愿者整理的flink邮件归档
参考回答:
这个问题我解决了,这样定义应该是可以
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359323?spm=a2c6h.13262185.0.0.54e839c0D2mgIx
问题五:flink1.12版本,使用yarn-application模式提交任务失败怎么办?
通过脚本提交flink作业,提交命令: /bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirs="hdfs://xx/flink120/" hdfs://xx/flink-example.jar --sqlFilePath /xxx/kafka2print.sql
flink使用的Lib及user jar已经上传到Hdfs路径,但是抛出以下错误:
The program finished with the following exception:
org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn Application Cluster at org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:465) at org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:67) at org.apache.flink.client.cli.CliFrontend.runApplication(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1061) at org.apache.flink.client.cli.CliFrontend.lambdamainmainmain10(CliFrontend.java:1136) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1136) Caused by: java.lang.IllegalArgumentException: Wrong FS: hdfs://xx/flink120/, expected: file:/// at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:648) at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:82) at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:606) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824) at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601) at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:428) at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1425) at org.apache.flink.yarn.YarnApplicationFileUploader.lambdagetAllFilesInProvidedLibDirsgetAllFilesInProvidedLibDirsgetAllFilesInProvidedLibDirs2(YarnApplicationFileUploader.java:469) at org.apache.flink.util.function.FunctionUtils.lambdauncheckedConsumeruncheckedConsumeruncheckedConsumer3(FunctionUtils.java:93) at java.util.ArrayList.forEach(ArrayList.java:1257) at org.apache.flink.yarn.YarnApplicationFileUploader.getAllFilesInProvidedLibDirs(YarnApplicationFileUploader.java:466) at org.apache.flink.yarn.YarnApplicationFileUploader. (YarnApplicationFileUploader.java:106) at org.apache.flink.yarn.YarnApplicationFileUploader.from(YarnApplicationFileUploader.java:381) at org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:789) at org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:592) at org.apache.flink.yarn.YarnClusterDescriptor.deployApplicationCluster(YarnClusterDescriptor.java:458) ... 9 more*来自志愿者整理的flink邮件归档
参考回答:
Hi 从你的日志看作业启动失败的原因是: Caused by: java.lang.IllegalArgumentException: Wrong FS: hdfs://xx/flink120/, expected: file:/// 看上去你设置的地址和 需要的 schema 不一样,你需要解决一下这个问题
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/359324?spm=a2c6h.13046898.publish-article.7.4d546ffal22sZX