开发者社区> 问答> 正文

以超过1的并行度运行Flink

flink小助手 2018-11-28 15:54:09 885

"

我从2个来源读取消息,根据公共密钥进行连接并将其全部下载到kafka。

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(3)
...
source1
.keyBy(_.searchId)
.connect(source2.keyBy(_.searchId))
.process(new SearchResultsJoinFunction)
.addSink(KafkaSink.sink)
所以当我在本地启动它时它完全有效,它也可以在Parallelism设置为1的集群上工作,但是不再有3。

当我将它部署到1个作业管理器和3个任务管理器并使每个任务处于“RUNNING”状态时,在2分钟后(当没有任何东西正在下沉时),其中一个任务管理器获得以下日志:https:
//gist.github.com / zavalit / 1b1bf6621bed2a3848a05c1ef84c689c#文件gistfile1-TXT-L108

整个事件就这样关闭了。"

消息中间件 Kafka 流计算
分享到
取消 提交回答
全部回答(1)
  • flink小助手
    2019-07-17 23:16:47

    "问题似乎是这个任务管理器--flink-taskmanager-12-2qvcd(10.81.53.209) - 无法与至少一个其他任务管理器交谈,即flink-taskmanager-12-57jzd(10.81。 40.124:46240)。这就是为什么这份工作从未真正开始运作的原因。

    我会检查其他任务管理器的日志以查看它的内容,我还会检查你的网络配置。也许防火墙正在阻碍?
    "

    0 0
+ 订阅

构建可靠、高效、易扩展的技术基石

推荐文章
相似问题
推荐课程