"
我从2个来源读取消息,根据公共密钥进行连接并将其全部下载到kafka。
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(3)
...
source1
.keyBy(_.searchId)
.connect(source2.keyBy(_.searchId))
.process(new SearchResultsJoinFunction)
.addSink(KafkaSink.sink)
所以当我在本地启动它时它完全有效,它也可以在Parallelism设置为1的集群上工作,但是不再有3。
当我将它部署到1个作业管理器和3个任务管理器并使每个任务处于“RUNNING”状态时,在2分钟后(当没有任何东西正在下沉时),其中一个任务管理器获得以下日志:https:
//gist.github.com / zavalit / 1b1bf6621bed2a3848a05c1ef84c689c#文件gistfile1-TXT-L108
整个事件就这样关闭了。"
"问题似乎是这个任务管理器--flink-taskmanager-12-2qvcd(10.81.53.209) - 无法与至少一个其他任务管理器交谈,即flink-taskmanager-12-57jzd(10.81。 40.124:46240)。这就是为什么这份工作从未真正开始运作的原因。
我会检查其他任务管理器的日志以查看它的内容,我还会检查你的网络配置。也许防火墙正在阻碍?
"
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。