开发者社区> 问答> 正文

canal binlog 丢失

现网数据变更较大,发现数据存在遗漏的情况,定位在binlog未接收到事件信息。

对比发现binlog 29827255 未知的信息丢失

dump binlog

提问363.png

canal log

提问364.png

提问365.png

日志丢失的问题看源码找到原因了,但是binlog接收处确实未接收到,log每次会将接收到的binlog打印出来,29827255未被接收,处理代码如下

void execute() { long batchId; LOGGER.debug("execute destination : " + destination); while (true) { try { connector.connect(); connector.subscribe(filter); while (true) { Message message = connector.getWithoutAck(BATCH_SIZE); batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { try { Thread.sleep(SLEEP_TIME); } catch (InterruptedException ignored) { // ignored } } else { process(message.getEntries()); } connector.ack(batchId); } } catch (Exception e) { LOGGER.error("canal connect error, destination : " + destination, e); AlarmUtil.dcAlarm(App.DC_ID, "canal_connect_error", "canal connect error, destination : " + destination); } finally { connector.disconnect(); }

        try {
            Thread.sleep(1000 * 60);
        } catch (InterruptedException e) {
            // ignored
        }
    }
}


private void process(List<CanalEntry.Entry> entryList) {
    for (CanalEntry.Entry entry : entryList) {
        if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
                || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
            continue;
        }

        if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) {
            continue;
        }

        CanalEntry.RowChange rowChange;
        try {
            rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
        } catch (Exception e) {
            throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
        }

        CanalEntry.EventType eventType = rowChange.getEventType();
        if (eventType == CanalEntry.EventType.DELETE) {
            return;
        }

        if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE) {
            return;
        }

        List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
        if (rowDataList == null || rowDataList.size() == 0) {
            continue;
        }

        List<BinlogColumnDTO> columns = new ArrayList<>();
        try {
            for (CanalEntry.RowData rowData : rowDataList) {
                columns = convertColumnList(rowData.getAfterColumnsList());
                long updatedCount = columns.stream().filter(BinlogColumnDTO::getUpdated).count();
                if (updatedCount < 1) {
                    return;
                }

                String binlogFileOffset = entry.getHeader().getLogfileName() + ":"
                        + entry.getHeader().getLogfileOffset() + ":"
                        + DateUtil.timestamp2DateTime(entry.getHeader().getExecuteTime());

                LOGGER.log(ACCESS, binlogFileOffset + ", eventType : {}, data : {}",
                        eventType, toJsonString(columns));

                if (eventType == CanalEntry.EventType.INSERT) {
                    syncService.insert(columns, binlogFileOffset);
                }
                else if (eventType == CanalEntry.EventType.UPDATE){
                    syncService.update(columns, binlogFileOffset);
                }
            }
        }
        catch (Exception e) {
            LOGGER.error("binlog handle error, data : " + JSONArray.toJSONString(columns), e);
            AlarmUtil.dcAlarm(App.DC_ID, "binlog_handle_error", e.toString());
        }

    }
}

跟踪源码,canal server打印debug日志,发现client发请求过来的时候是有读取到对应postion的,但就是没有返回事件内容过去。

另一个案例的截图

binlog file

提问366.png

canal server log

提问367.png

原提问者GitHub用户xiaopan0513

展开
收起
绿子直子 2023-05-09 12:00:50 152 0
1 条回答
写回答
取消 提交回答
  • 1、你这最后的日志是在位点定位的时候得,不是数据读取的日志

    2、canal log里记录的位点是以批次为单位,并没有精确到一条记录

    建议你们先用标准的example工程打印接收到的数据,如果有能复现的方式最好能提供一下

    原回答者GitHub用户agapple

    2023-05-10 10:25:45
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
PolarDB-X 2.0 全局 Binlog 与备份恢复能 立即下载
低代码开发师(初级)实战教程 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载