开发者社区> 问答> 正文

​请问在flinksql中如何使用聚合函数 LISTAGG?

版本: flinksql 1.11.0 需求: 需要实现多行聚合成一行功能 代码如下: environmentSettings = EnvironmentSettings.new_instance().in_streaming_mode().build() t_env = StreamTableEnvironment.create(environment_settings = environmentSettings) t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed", 'true')

a_df = pd.DataFrame({"id":["1","1","1"],"uuid":["1","2","3"]}) a_table = t_env.from_pandas(a_df, DataTypes.ROW([DataTypes.FIELD("id", DataTypes.STRING()), DataTypes.FIELD("uuid", DataTypes.STRING())])) t_env.create_temporary_view("table_a",a_table)

b_df = pd.DataFrame({"val":["3","4","5","6"],"uuid":["1","2","3","4"]}) table_b = t_env.from_pandas(b_df , DataTypes.ROW([DataTypes.FIELD("val", DataTypes.STRING()), DataTypes.FIELD("uuid", DataTypes.STRING())])) t_env.create_temporary_view("table_b",table_b)

t_env.sql_update(""" CREATE TABLE mySink ( b varchar , c varchar ) WITH ( 'connector' = 'print' ) """)

t_env.sql_update(""" insert into mySink select t1.id ,LISTAGG(t2.val , ',') from table_a t1 left join table_b t2 on t1.uuid = t2.uuid group by t1.id """) t_env.execute("tutorial_job")

报错: Caused by: java.lang.ClassCastException: org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to org.apache.flink.table.data.StringData at org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169) at org.apache.flink.table.data.JoinedRowData.getString(JoinedRowData.java:139) at org.apache.flink.table.data.RowData.get(RowData.java:273) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:156) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:123) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:205) at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43) at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345) at org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$910/1247863497.runDefaultAction(Unknown Source) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:745)

Benchao Li - 2020年8月12日星期三 GMT+8 下午9:32:40 *来自志愿者整理的flink邮件归档

展开
收起
EXCEED 2021-12-08 11:04:54 2453 0
1 条回答
写回答
取消 提交回答
  • 看起来是一个已知bug[1],已经修复,但是还没有发布。

    [1] https://issues.apache.org/jira/browse/FLINK-18862*来自志愿者整理的flink

    2021-12-08 11:14:03
    赞同 展开评论 打赏
问答地址:
问答排行榜
最热
最新

相关电子书

更多
时序数据库TSDB新功能 - 如何用SQL进行时序查询 立即下载
PyODPS架构以及DataFrame实现原理 立即下载
低代码开发师(初级)实战教程 立即下载