开发者社区> 问答> 正文

pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中 ?

使用pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中?

场景:使用pyflink通过filter进行条件过滤后插入到sink中, 比如以下两条消息,logType不同,可以使用filter接口进行过滤后插入到sink表中: { "logType":"syslog", "message":"sla;flkdsjf" } { "logType":"alarm", "message":"sla;flkdsjf" } t_env.from_path("source")
.filter("logType=syslog")
.insert_into("sink1") 有方法直接在上面的代码中通过判断logType字段的类型实现类似if else的逻辑吗: if logType=="syslog": insert_into(sink1) elif logType=="alarm": insert_into(sink2)

如果insert_into 有.filter .select等接口的返回值的话就好办了,可以接着往下通过filter进行判断,代码类似以下:

t_env.from_path("source")
.filter("logType=syslog")
.insert_into("sink1")
.filter("logType=alarm")
.insert_into("sink2") 请各位大牛指点,感谢

*来自志愿者整理的flink邮件归档

展开
收起
游客nnqbtnagn7h6s 2021-12-06 19:49:56 614 0
1 条回答
写回答
取消 提交回答
  • Table API 不用 if/else 直接用类似逻辑即可:

    val t1 = table.filter('x > 2).groupBy(..) val t2 = table.filter('x <= 2).groupBy(..) t1.insert_into("sink1) t2.insert_into("sink2")

    *来自志愿者整理的flink邮件归档

    2021-12-06 21:20:57
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Spring Boot2.0实战Redis分布式缓存 立即下载
CUDA MATH API 立即下载
API PLAYBOOK 立即下载