开发者社区> 问答> 正文

flink sql collect函数使用问题

使用场景如下,将kafka源表通过flink sql处理成mongodb汇表存入。按照班级进行group by,输出对应班级所有的学生数据集合。请问用flink sql自带的collect函数能实现吗?如果能的话要怎么写sql? 如果不能的话要怎么写UDAF,有例子参考吗?谢谢!

kafka源表: 班级 学号 姓名 年龄 1 20001 张三 15 2 20011 李四 16 1 20002 王五 16 2 20012 吴六 15

create table source_table ( class_no: INT, student_no: INT, name: STRING, age: INT ) with ( 'connector' = 'kafka', ... );

通过flink sql处理输出 ==>

mongodb目标表: 班级 学生信息 1 [{"student_no": 20001, "name":"张三", "age": 15}, {"student_no": 20002, "name":"王五", "age": 16}] 2 [{"student_no": 20011, "name":"李四", "age": 16}, {"student_no": 20012, "name":"吴六", "age": 15}]

create table sink_table ( class_no INT, students: ARRAY<ROW<student_no STRING, name STRING, age INT>> ) with ( 'connector' = 'mongodb', ... );*来自志愿者整理的flink

展开
收起
雪哥哥 2021-12-08 19:43:03 2640 0
1 条回答
写回答
取消 提交回答
  • af里acc为个list,merge的时候合并,输出的时候 list拼成string即可*来自志愿者整理的flink

    2021-12-08 19:44:23
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载