开发者社区> 问答> 正文

Flink SQL 1.11.3问题请教

我们使用Nifi将数据采集到kafka,然后使用Flink处理kafka中的数据,现在Nifi如果将多条数据当做一条记录(写入kafka中的数据格式为csv或者json)写入kafka后(就是一个offset中包含了多条数据),Flink只会处理其中的一条数据?有没有什么办法让Flink都处理一个offset中的多条数据?*来自志愿者整理的flink邮件归档

展开
收起
EXCEED 2021-12-02 14:26:56 573 0
1 条回答
写回答
取消 提交回答
  • 不知道下面场景是否与你描述的场景相同 ,假设采集到单条json消息格式为 {"name":"test"},将多条json消息合并为一条的格式为 [{"name":"test"},{"name":"test2"}]。 我的 Flink 任务是采用 FlinkSQL 编写,处理这种情况我的方式是通过编写一个 UDF (TableFunction), 之后在这个 UDF 中进行数据拆解,变为多行 (row),再输出到 sink。

    Row row = new Row(arity); collect(row);

    具体使用可参考:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/*来自志愿者整理的FLINK邮件归档

    2021-12-02 14:38:53
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载