开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

谁能解答?从mysql的binlog读取数据到kafka,但是数据类型有Insert,updata,

谁能解答?从mysql的binlog读取数据到kafka,但是数据类型有Insert,updata,delete ,如何保证通过flink流式计算kafka出来的结果跟在mysql计算出来的一样呢?就比如,mysql是select id,sum(money) money from tb1 group by id; 那么此时flink sql应该怎么写呢?

展开
收起
游客3oewgrzrf6o5c 2022-08-04 17:18:45 388 0
1 条回答
写回答
取消 提交回答
  • nnn

    对于从MySQL的binlog读取数据到Kafka,并且需要保证通过Flink流式计算得到的结果与在MySQL中计算得到的结果相同,你可以按照以下步骤进行处理:

    配置MySQL的binlog读取:使用适当的工具或库(如Debezium等),配置MySQL的binlog读取,并确保监控并捕获Insert、Update和Delete操作的数据变化。

    将数据写入Kafka:将捕获到的数据以适当的格式发送到Kafka主题中。这可以通过编写自定义代码来实现,也可以使用现有的工具或库。

    使用Flink进行流式计算:在Flink中配置流式计算作业,以读取Kafka中的数据,并执行相应的聚合操作(如sum)和分组操作(如group by)。你可以使用Flink SQL或Flink的DataStream API来实现计算逻辑。

    与MySQL计算结果进行比较:为了确保Flink计算得到的结果与在MySQL中计算得到的结果相同,你可以定期或实时地将Flink计算结果与MySQL计算结果进行比较验证。这可以通过编写自定义代码来实现,以查询MySQL并与Flink计算结果进行比较。

    需要注意的是,在进行比较时,由于数据变化的性质(例如删除操作),可能存在一些差异。你可能需要针对特定情况进行一些处理,例如忽略已删除的数据或添加适当的过滤条件。

    总结起来,你需要配置MySQL的binlog读取,将数据写入Kafka,使用Flink进行流式计算并执行相应的聚合和分组操作,然后将计算结果与MySQL计算结果进行比较验证。这样可以确保通过Flink流式计算得到的结果与在MySQL中计算得到的结果一致。

    2023-07-06 10:35:24
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载

相关镜像