部署多个实例,就会监听重复啊。一个集群,部署了多个节点,每个节点都通过 Flink 来监听 Mysql (全库)或者某些表的变化,当Mysql 表变化时,每个实例都收到相同的变更消息
我想只保留一条消息,再分发给业务系统。部署多实例,就是想当一个实例重启时,另一个实例可以继续监听。
如果您想在 Flink CDC 中仅保留一条消息,并将其分发给业务系统,可以考虑使用 Flink 的窗口操作和聚合操作来实现。
具体来说,您可以使用 Flink 的滚动窗口(Tumbling Window)来将 CDC 数据流中的数据按照时间窗口进行划分,然后使用 Flink 的聚合操作(Aggregate)对窗口内的数据进行聚合,最后将聚合结果发送给业务系统。
假设您的 CDC 数据流中包含了订单数据,并且您希望每分钟仅保留一条订单数据,并将其发送给业务系统。您可以按照以下步骤来实现:
使用 Flink 的滚动窗口将 CDC 数据流按照 1 分钟进行划分:
java
Copy
DataStream orderStream = ... // CDC 数据流对象
DataStream orderWindowStream = orderStream
.keyBy(Order::getOrderId)
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.aggregate(new MyAggregator());
在上述代码中,MyAggregator 是您实现的聚合器,用于对窗口内的订单数据进行聚合。
在聚合器中实现逻辑代码,仅保留每个窗口内的第一条订单数据并发送给业务系统:
java
Copy
public class MyAggregator implements AggregateFunction {
@Override
public Order createAccumulator() {
return new Order();
}
@Override
public Order add(Order value, Order accumulator) {
if (accumulator.getOrderId() == null) {
// 当前窗口没有数据,使用当前订单数据作为窗口结果
return value;
} else {
// 当前窗口已经有数据,不做处理
return accumulator;
}
}
@Override
public Order getResult(Order accumulator) {
return accumulator;
}
@Override
public Order merge(Order a, Order b) {
// 不需要合并,直接返回其中一个订单数据
return a;
}
@Override
public TypeInformation getProducedType() {
return TypeInformation.of(Order.class);
}
}
在上述代码中,add 方法用于判断当前窗口是否已经有数据,如果没有则使用当前订单数据作为窗口结果,否则不做处理。getResult 方法直接返回窗口结果,merge 方法和 getProducedType 方法可以不用实现。
你可以整库同步mysql数据到kafka,然后消费kafka的消息按照不同的group-id+table-Name组合消费数据
flink有自己的内存管理系统,你和spring强绑定,spring的业务系统影响到flink,这是个坑,本身这两个就需要接偶。此回答整理至钉群“Flink CDC 社区”。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。