开发者社区 问答 正文

datastream union各个topic的数据后,数据有丢失

大家好, 我最近用datastream union各个topic的数据后,数据有丢失,比单独消费每个topic的数据少很多 代码如下: /** * 将多个流合并成一个进行处理 * * @param topics 通过配置文件传入的 Topic 和 消费者GroupID * @param env Flink运行环境 * @return 将多个Topic数据流整合成一个流 */ def getUnionConsumer(topics: List[String], env: StreamExecutionEnvironment): DataStream[String] = { var total: DataStream[String] = null for (str <- topics) { val topicName = str.split(":")(0) val groupId = str.split(":")(1) val source_data = getSourceData(topicName, groupId, env) if (total != null) { total = total.union(source_data) } else { total = source_data } } total }*来自志愿者整理的flink邮件归档

展开
收起
EXCEED 2021-12-02 14:46:53 590 分享 版权
1 条回答
写回答
取消 提交回答
  • 已经解决了,去掉循环,把每个kafka topic单独处理,再union*来自志愿者整理的FLINK邮件归档

    2021-12-02 15:01:43
    赞同 展开评论
问答地址: