开发者社区> 问答> 正文

canal同步mysql数据到MQ如何做到MQ 中json数据不乱序

canal同步mysql数据到MQ如何做到MySQL层批量更新某字段1000条数据,然后完全按照binlog的顺序存放到mq的消息队列中

样例:mysql update 字段数据 update t_admin_login_log set cdate = '2021-06-05 00:00:00' where id < 1000;

mq topic对应的json格式数据 { "data": [ { "cdate": "2021-06-05 00:00:01", "login_time": "2020-09-07 17:19:47", "admin_id": "1355", "id": "11", "admin_name": "于家祺", "login_addr": "", "udate": "2020-09-07 17:19:47", "login_ip": "114.242.249.124", "status": "1" }, { "cdate": "2021-06-05 00:00:01", "login_time": "2020-09-07 17:19:47", "admin_id": "1535", "id": "22", "admin_name": "周虎", "login_addr": "", "udate": "2020-09-07 17:19:47", "login_ip": "114.242.247.124", "status": "1" }, { "cdate": "2021-06-05 00:00:01", "login_time": "2020-09-07 20:25:09", "admin_id": "12443", "id": "33", "admin_name": "冯德平", "login_addr": "", "udate": "2020-09-07 20:25:09", "login_ip": "222.128.15.250", "status": "1" }, { "cdate": "2021-06-05 00:00:01", "login_time": "2020-09-08 15:51:15", "admin_id": "12452", "id": "44", "admin_name": "范雪成", "login_addr": "", "udate": "2020-09-08 15:51:15", "login_ip": "222.128.15.250", "status": "1" }, { "cdate": "2021-06-05 00:00:01", "login_time": "2020-09-09 10:51:57", "admin_id": "12725", "id": "55", "admin_name": "蔡雪奎", "login_addr": "", "udate": "2020-09-09 10:51:57", "login_ip": "222.128.15.250", "status": "1" }, { "cdate": "2021-06-05 00:00:01", "login_time": "2020-09-10 16:15:11", "admin_id": "12710", "id": "66", "admin_name": "张晓林", "login_addr": "", "udate": "2020-09-10 16:15:11", "login_ip": "222.128.15.250", "status": "1" }, { "cdate": "2021-06-05 00:00:01", "login_time": "2020-09-10 16:48:22", "admin_id": "12624", "id": "77", "admin_name": "吴佳辉", "login_addr": "", "udate": "2020-09-10 16:48:22", "login_ip": "27.19.194.45", "status": "1" } ], "pkNames": [ "id" ], "old": [ {}, { "cdate": "2021-06-05 00:00:00" }, {}, { "cdate": "2021-06-05 00:00:00" }, { "cdate": "2021-06-05 00:00:00" }, { "cdate": "2021-06-05 00:00:00" }, { "cdate": "2021-06-05 00:00:00" } ], "type": "UPDATE", "es": 1637225320000, "sql": "", "database": "db_admin", "sqlType": { "cdate": 93, "login_time": 93, "admin_id": 4, "logout_time": 93, "id": 4, "admin_name": 12, "login_addr": 12, "udate": 93, "login_ip": 12, "status": -6 }, "mysqlType": { "cdate": "datetime", "login_time": "datetime", "admin_id": "int(10)", "logout_time": "datetime", "id": "int(11)", "admin_name": "varchar(16)", "login_addr": "varchar(64)", "udate": "datetime", "login_ip": "varchar(32)", "status": "tinyint(4)" }, "id": 12, "isDdl": false, "table": "t_admin_login_log", "ts": 1637225320944 }

想要的结果是:一个修该后的数据对应一个old值。 canal是否提供这样的参数来控制实现 { "cdate": "2021-06-05 00:00:01", "login_time": "2020-09-07 17:19:47", "admin_id": "1355", "id": "11", "admin_name": "于家祺", "login_addr": "", "udate": "2020-09-07 17:19:47", "login_ip": "114.242.249.124", "status": "1" },

"old": [ {}, { "cdate": "2021-06-05 00:00:00" },

原提问者GitHub用户wujianwei11

展开
收起
山海行 2023-04-27 11:19:01 133 0
1 条回答
写回答
取消 提交回答
  • 一个binlog会有多行记录,并非只有一行

    原回答者GitHub用户agapple

    2023-04-27 21:03:38
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
搭建电商项目架构连接MySQL 立即下载
搭建4层电商项目架构,实战连接MySQL 立即下载
PolarDB MySQL引擎重磅功能及产品能力盛大发布 立即下载

相关镜像