有大佬用过pulsar cdc功能吗,请问下,目前官网好像只有datastream api方式,sql方式还不支持,现在想消费datastream cdc的数据,将包含了u,d,i等数据映射成表,再在后面做sql业务指标的计算,这样可以实现嘛
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
可以实现您的需求。通过Pulsar的CDC功能,结合DataStream API消费变更数据,并进一步处理以支持SQL业务指标计算。以下是操作指导:
配置Pulsar CDC Source:
tableList
参数时需包含数据库名,例如:tableList("yourDatabaseName.yourTableName")
。这样可以捕获到表中的插入(I)、更新(U)和删除(D)等变更事件。映射变更数据到表结构:
map
或process
函数将原始变更事件转换为适合您后续SQL计算的结构化数据格式。这一步骤中,您需要定义如何将每种变更类型(U, D, I)映射到具体的表结构字段上。构建动态表进行SQL计算:
执行SQL业务指标计算:
注意事项:
综上所述,通过Pulsar CDC结合Flink的DataStream API及SQL能力,完全能够实现实时消费数据库变更数据,并在其基础上进行复杂的业务指标计算。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。