开发者社区> 问答> 正文

能从canal能直接得到binlog的sql吗?

example中的例子,打印的是类似以下的形似,canal能给出执行insert,update,delete,create table等命令的具体sql语句吗?delete from sharddb. tab_shard where id =5;

BEGIN ----> Thread id: 18 ----------------> binlog[mysql-bin.000005:1200] , name[sharddb,tab_shard] , eventType : DELETE , executeTime : 1512015314000 , delay : 709ms id : 5 type=int(10) unsigned randomstr : 14504 type=varchar(100) create_time : 2017-11-28 05:00:35 type=timestamp

END ----> transaction id: 20565 ================> binlog[mysql-bin.000005:1251] , executeTime : 1512015314000 , delay : 709ms

原提问者GitHub用户bayannur

展开
收起
绿子直子 2023-05-09 11:58:26 157 0
2 条回答
写回答
取消 提交回答
  • binlog格式可以网上先了解一下

    原回答者GitHub用户agapple

    2023-05-10 10:25:45
    赞同 展开评论 打赏
  • 可以从Canal中获得binlog的SQL语句,Canal将binlog转换为JSON格式的事件,其中包括执行的SQL语句和受影响的行。 你可以使用Canal的客户端API来订阅binlog事件,并从事件中提取SQL语句。例如,在Canal的配置文件中设置producerType为json,然后使用Canal的Java客户端API订阅binlog事件,如下所示:

    CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111),
        "example", "", "");
    connector.connect();
    connector.subscribe(".*\\..*");
    while (true) {
        Message message = connector.getWithoutAck(1000);
        long batchId = message.getId();
        int size = message.getEntries().size();
        if (batchId == -1 || size == 0) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } else {
            for (CanalEntry.Entry entry : message.getEntries()) {
                if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                    CanalEntry.RowChange rowChange = null;
                    try {
                        rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                    } catch (Exception e) {
                        throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                                e);
                    }
                    if (rowChange.getEventType() == CanalEntry.EventType.QUERY || rowChange.getSqlType() == CanalEntry.EventType.CREATE) {
                        String sql = rowChange.getSql();
                        System.out.println(sql);
                    } else {
                        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                            String sql = generateSql(rowData, rowChange.getSqlType());
                            System.out.println(sql);
                        }
                    }
                }
            }
            connector.ack(batchId);
        }
    }
    

    在上面的示例中,你可以看到,当CanalEntry.EventType为QUERY或CREATE时,可以直接从RowChange对象中获取SQL语句。否则,你需要使用generateSql方法从RowData对象中生成SQL语句。

    2023-05-09 13:42:09
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
SQL Server在电子商务中的应用与实践 立即下载
SQL Server 2017 立即下载
GeoMesa on Spark SQL 立即下载