管道简单代码是研究员:
source = env.addSource(kafkaConsumer)
.map(func).setParallelism(2).sink()
如何确定顺序?
首先,让我们假设示例中的其他所有内容都具有一个并行性,并且只有map函数将并行运行。(虽然要真正实现这一点,但必须在某处配置;默认并行度高于1。)
我们还假设您的Kafka使用者正在使用一个分区读取单个主题,并且您正在询问如何实现保留输入中存在的排序的并行转换。
有了这些假设,答案就是你不能做很多事情。地图运算符的两个实例之间存在竞争,非并行接收器将以任意方式交错这两个传入流。
如果以某种方式标记流记录,比如使用递增的时间戳或ID,那么您可以假设引入一些缓冲并重新建立原始排序,可以是在自定义接收器中,也可以是在地图和接收器之间的非并行RichCoMap函数中运营商。
另一方面,如果您的源以某种方式进行分区或键控,并且您只需要在每个键的基础上维护或建立排序,那么就有更好的答案。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。