问题一:pyflink udf中发送rest api会导致udf被调用两次
我观察到一个现象:我定义了一个tumble window,调用一个python udf,在这个udf里面使用requests发送rest api。 log显示这个udf会被调用两次。相隔不到一秒。这个是什么原因?requests库跟beam冲突了?
2020-07-09 17:44:17,501 INFO flink_test_stream_time_kafka.py:22 [] - start to ad 2020-07-09 17:44:17,530 INFO flink_test_stream_time_kafka.py:63 [] - start to send rest api. 2020-07-09 17:44:17,532 INFO flink_test_stream_time_kafka.py:69 [] - Receive: {"Received": "successful"} 2020-07-09 17:44:17,579 INFO /home/sysadmin/miniconda3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py:564 [] - Creating insecure state channel for localhost:57954. 2020-07-09 17:44:17,580 INFO /home/sysadmin/miniconda3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py:571 [] - State channel established. 2020-07-09 17:44:17,584 INFO /home/sysadmin/miniconda3/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py:526 [] - Creating client data channel for localhost:60902 2020-07-09 17:44:17,591 INFO org.apache.beam.runners.fnexecution.data.GrpcDataService [] - Beam Fn Data client connected. 2020-07-09 17:44:17,761 INFO flink_test_stream_time_kafka.py:22 [] - start to ad 2020-07-09 17:44:17,810 INFO flink_test_stream_time_kafka.py:63 [] - start to send rest api. 2020-07-09 17:44:17,812 INFO flink_test_stream_time_kafka.py:69 [] - Receive: {"Received": "successful"}
参考回答:
Table API的作业在执行之前会经过一系列的rule优化,最终的执行计划,存在一个UDF调用多次的可能,你可以把执行计划打印出来看看(TableEnvironment#explain)。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372238
问题二:flink1.10升级到flink1.11 提交到yarn失败
原来用1.10使用per job模式,可以提交的作业,现在用1.11使用应用模式提交失败,看日志,也不清楚原因, yarn log: Log Type: jobmanager.err
Log Upload Time: Thu Jul 09 21:02:48 +0800 2020
Log Length: 785
SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/yarn/nm/usercache/hdfs/appcache/application_1594271580406_0010/filecache/11/data-flow-1.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/jars/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] log4j:WARN No appenders could be found for logger (org.apache.flink.runtime.entrypoint.ClusterEntrypoint). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Log Type: jobmanager.out
Log Upload Time: Thu Jul 09 21:02:48 +0800 2020
Log Length: 0
Log Type: prelaunch.err
Log Upload Time: Thu Jul 09 21:02:48 +0800 2020
Log Length: 0
Log Type: prelaunch.out
Log Upload Time: Thu Jul 09 21:02:48 +0800 2020
Log Length: 70
Setting up env variables Setting up job resources Launching container
本地log: 2020-07-09 21:02:41,015 INFO org.apache.flink.client.cli.CliFrontend [] - -------------------------------------------------------------------------------- 2020-07-09 21:02:41,020 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.address, localhost 2020-07-09 21:02:41,020 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.port, 6123 2020-07-09 21:02:41,021 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.memory.process.size, 1600m 2020-07-09 21:02:41,021 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.memory.process.size, 1728m 2020-07-09 21:02:41,021 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.numberOfTaskSlots, 1 2020-07-09 21:02:41,021 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: parallelism.default, 1 2020-07-09 21:02:41,021 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.execution.failover-strategy, region 2020-07-09 21:02:41,164 INFO org.apache.flink.runtime.security.modules.HadoopModule [] - Hadoop user set to hdfs (auth:SIMPLE) 2020-07-09 21:02:41,172 INFO org.apache.flink.runtime.security.modules.JaasModule [] - Jaas file will be created as /tmp/jaas-2213111423022415421.conf. 2020-07-09 21:02:41,181 INFO org.apache.flink.client.cli.CliFrontend [] - Running 'run-application' command. 2020-07-09 21:02:41,194 INFO org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer [] - Submitting application in 'Application Mode'. 2020-07-09 21:02:41,201 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil [] - The configuration directory ('/opt/flink-1.11.0/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file. 2020-07-09 21:02:41,537 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2020-07-09 21:02:41,665 INFO org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider [] - Failing over to rm220 2020-07-09 21:02:41,717 INFO org.apache.hadoop.conf.Configuration [] - resource-types.xml not found 2020-07-09 21:02:41,718 INFO org.apache.hadoop.yarn.util.resource.ResourceUtils [] - Unable to find 'resource-types.xml'. 2020-07-09 21:02:41,755 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Cluster specification: ClusterSpecification{masterMemoryMB=2048, taskManagerMemoryMB=4096, slotsPerTaskManager=1} 2020-07-09 21:02:42,723 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Submitting application master application_1594271580406_0010 2020-07-09 21:02:42,969 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl [] - Submitted application application_1594271580406_0010 2020-07-09 21:02:42,969 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Waiting for the cluster to be allocated 2020-07-09 21:02:42,971 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Deploying cluster, current state ACCEPTED 2020-07-09 21:02:47,619 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - YARN application has been deployed successfully. 2020-07-09 21:02:47,620 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface cdh003:38716 of application 'application_1594271580406_0010'
参考回答:
这个看上去是提交到 Yarn 了,具体的原因需要看下 JM log 是啥原因。另外是否是日志没有贴全,这里只看到本地 log,其他的就只有小部分 jobmanager.err 的 log。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372217
问题三:Flink DataStream 统计UV问题
想问下,现在在用DataStream的api来统计每天的UV,代码如下,有2个使用问题:
1、在使用Tumbling窗口的时候,由于使用窗口跨度是1天(Time.days(1)),只有以一天结束的时候,才能输出一个uv值,
这样时间等待太长了,所以加了一个trigger,每来一条都触发一次窗口,不知道这样的用法没有问题。
2、还有想问下在窗口结束后,里面的state状态会自动释放吗?还是要自己手动设置TTL的。
参考回答:
1.如果数据流量不是很大的话,按每条数据触发也没问题。另外,基于事件时间的情况,提前触发可以选择ContinuousEventTimeTrigger,可以查看Trigger接口的实现找到你想要的trigger。
2.窗口结束后会自动释放。一般对于Global窗口需要手动设置TTL
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372152
问题四:flink时间窗口
你好,我最近业务上需要处理一个流式的时间处理窗口,部分业务代码如下
SingleOutputStreamOperator<Tuple2<warningPojo, String>> operator = flatMap.keyBy(0,1)
.timeWindow(Time.minutes(1))
.process(new ProcessWindowFunction)
当我运行的时候只有第一分钟的时间窗口会有数据进来,之后便没有数据进来了,业务逻辑代码也没有报错,请问这是什么原因。
参考回答:
根据你的keyby字段来看,你是根据 warningPojo + String 进行了keyby,可以看下是否相同的key只有一条相同数据。 并且可以看下使用到的是处理时间还是事件时间? 如果是事件时间,可以看下 timestamp assigner 是否正确,上游数据和时间戳是否符合预期。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372149
问题五:kafka connector问题
官网给的kafka table配置里的scan.startup.mode CREATE TABLE kafkaTable ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3) ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'format' = 'csv', 'scan.startup.mode' = 'earliest-offset' )看了总共有以下几总'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'如果我作业重启的话选择group-offsets能否从上次消费到的位置开始?这种情况下需要配置提交offset到kafka broker相关的东西吗?有没有从savepoint保存的offset继续消费的配置?
参考回答:
首先,从checkpoint/savepoint
恢复的话,一定会以checkpoint/savepoint中的offset为准,所以它的优先级是最高的,
不管你配置哪种startup mode。
如果你没有开启checkpoint,那么如果你用了group-offsets,那它就会从保存在kafka中的offset进行启动。
提交offset到kafka这个应该是默认就开了的。
关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372144