开发者社区> 问答> 正文

[严重问题]增量数据部分丢失数据

问题概述 线上环境发现有些表部分增量数据一致未同步,目前定位到com.alibaba.otter.node.etl.select.selector.MessageParser#internParse(com.alibaba.otter.shared.common.model.config.pipeline.Pipeline, com.alibaba.otter.canal.protocol.CanalEntry.Entry)中的rowChange = RowChange.parseFrom(entry.getStoreValue());为空导致数据直接丢弃。

日志 com.alibaba.otter.node.etl.select.selector.MessageParser#parse调整: 代码调整 public List parse(Long pipelineId, List datas) throws SelectException { List eventDatas = new ArrayList(); Pipeline pipeline = configClientService.findPipeline(pipelineId); List transactionDataBuffer = new ArrayList(); // hz为主站点,us->hz的数据,需要回环同步会us。并且需要开启回环补救算法 PipelineParameter pipelineParameter = pipeline.getParameters(); boolean enableLoopbackRemedy = pipelineParameter.isEnableRemedy() && pipelineParameter.isHome() && pipelineParameter.getRemedyAlgorithm().isLoopback(); boolean isLoopback = false; boolean needLoopback = false; // 判断是否属于需要loopback处理的类型,只处理正常otter同步产生的回环数据,因为会有业务方手工屏蔽同步的接口,避免回环

    String randStr = "";
    if(pipelineId == 23){
        randStr = RandomStringUtils.randomAlphanumeric(30);
    }

    long now = new Date().getTime();
    try {
        for (Entry entry : datas) {
            switch (entry.getEntryType()) {
                case TRANSACTIONBEGIN:
                    isLoopback = false;
                    break;
                case ROWDATA:
                    String tableName = entry.getHeader().getTableName();
                    // 判断是否是回环表retl_mark
                    boolean isMarkTable = tableName.equalsIgnoreCase(pipeline.getParameters().getSystemMarkTable());
                    if (isMarkTable) {
                        RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                        if (!rowChange.getIsDdl()) {

// int loopback = checkLoopback(pipeline, rowChange.getRowDatas(0)); int loopback = 0; if (rowChange.getRowDatasCount() > 0) { loopback = checkLoopback(pipeline, rowChange.getRowDatas(0)); } if (loopback == 2) { needLoopback |= true; // 只处理正常同步产生的回环数据 }

                            isLoopback |= loopback > 0;
                        }
                    }

                    // 检查下otter3.0的回环表,对应的schmea会比较随意,所以不做比较
                    boolean isCompatibleLoopback = tableName.equalsIgnoreCase(compatibleMarkTable);

                    if (isCompatibleLoopback) {
                        RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                        if (!rowChange.getIsDdl()) {

// int loopback = checkCompatibleLoopback(pipeline, rowChange.getRowDatas(0)); int loopback = 0; if (rowChange.getRowDatasCount() > 0) { loopback = checkCompatibleLoopback(pipeline, rowChange.getRowDatas(0)); } if (loopback == 2) { needLoopback |= true; // 只处理正常同步产生的回环数据 } isLoopback |= loopback > 0; } }

                    // debug调试日志代码
                    if(pipelineId == 23 && ("global_drug_relate_indication".equalsIgnoreCase(tableName) || "drug_indications_relation".equalsIgnoreCase(tableName))){
                        logger.warn("binlogdebug-messageRowParser randStr:{},entry:{},"
                            + "isLoopback:{},enableLoopbackRemedy:{},needLoopback:{},isMarkTable:{},isCompatibleLoopback:{}",randStr,entry.getHeader().getLogfileOffset(),
                            isLoopback,enableLoopbackRemedy,needLoopback,isMarkTable,isCompatibleLoopback);
                    }

                    if ((!isLoopback || (enableLoopbackRemedy && needLoopback)) && !isMarkTable
                        && !isCompatibleLoopback) {
                        transactionDataBuffer.add(entry);
                    }
                    break;
                case TRANSACTIONEND:
                    if (!isLoopback || (enableLoopbackRemedy && needLoopback)) {
                        // 添加数据解析
                        for (Entry bufferEntry : transactionDataBuffer) {
                            List<EventData> parseDatas = internParse(pipeline, bufferEntry);
                            if (CollectionUtils.isEmpty(parseDatas)) {// 可能为空,针对ddl返回时就为null
                                continue;
                            }

                            // 初步计算一下事件大小
                            long totalSize = bufferEntry.getHeader().getEventLength();
                            long eachSize = totalSize / parseDatas.size();
                            for (EventData eventData : parseDatas) {
                                if (eventData == null) {
                                    continue;
                                }

                                eventData.setSize(eachSize);// 记录一下大小
                                if (needLoopback) {// 针对需要回环同步的
                                    // 如果延迟超过指定的阀值,则设置为需要反查db
                                    if (now - eventData.getExecuteTime() > 1000 * pipeline.getParameters()
                                        .getRemedyDelayThresoldForMedia()) {
                                        eventData.setSyncConsistency(SyncConsistency.MEDIA);
                                    } else {
                                        eventData.setSyncConsistency(SyncConsistency.BASE);
                                    }
                                    eventData.setRemedy(true);
                                }
                                eventDatas.add(eventData);
                            }
                        }
                        if(pipeline.getId() == 23){
                            logger.warn("binlogdebug-messageRowInteralParser1 randStr:{} transactionDataBuffer:{},eventDatas:{}",randStr,transactionDataBuffer.size(),eventDatas.size());
                        }
                    }

                    isLoopback = false;
                    needLoopback = false;
                    transactionDataBuffer.clear();
                    break;
                default:
                    break;
            }
        }

        // 添加最后一次的数据,可能没有TRANSACTIONEND
        if (!isLoopback || (enableLoopbackRemedy && needLoopback)) {
            // 添加数据解析
            for (Entry bufferEntry : transactionDataBuffer) {
                List<EventData> parseDatas = internParse(pipeline, bufferEntry);
                if (CollectionUtils.isEmpty(parseDatas)) {// 可能为空,针对ddl返回时就为null
                    continue;
                }

                // 初步计算一下事件大小
                long totalSize = bufferEntry.getHeader().getEventLength();
                long eachSize = totalSize / parseDatas.size();
                for (EventData eventData : parseDatas) {
                    if (eventData == null) {
                        continue;
                    }

                    eventData.setSize(eachSize);// 记录一下大小
                    if (needLoopback) {// 针对需要回环同步的
                        // 如果延迟超过指定的阀值,则设置为需要反查db
                        if (now - eventData.getExecuteTime() > 1000 * pipeline.getParameters()
                            .getRemedyDelayThresoldForMedia()) {
                            eventData.setSyncConsistency(SyncConsistency.MEDIA);
                        } else {
                            eventData.setSyncConsistency(SyncConsistency.BASE);
                        }
                    }
                    eventDatas.add(eventData);
                }
            }
            if(pipeline.getId() == 23){
                logger.warn("binlogdebug-messageRowInteralParser2 randStr:{} transactionDataBuffer:{},eventDatas:{}",randStr,transactionDataBuffer.size(),eventDatas.size());
            }
        }

        // debug调试日志代码
        if(pipelineId == 23){
            if(datas.size() == 2 && EntryType.TRANSACTIONBEGIN.getNumber() == datas.get(0).getEntryType().getNumber() &&
                EntryType.TRANSACTIONEND.getNumber() == datas.get(1).getEntryType().getNumber()){
            }else{
                logger.warn("binlogdebug-messageParser randStr:{},pid:{},size:{},binlog:{}",randStr,pipelineId,datas.size(),datas.toString());
                logger.warn("binlogdebug-messageParser randStr:{},eventDatas:{}",randStr,null == eventDatas?"":eventDatas.toString());
            }
        }
    } catch (Exception e) {
        throw new SelectException(e);
    }

    return eventDatas;

日志:

Sep 15 07:01:18 otter2 otter-node-prd[397902]: entryType: ROWDATA Sep 15 07:01:18 otter2 otter-node-prd[397902]: storeValue: "\020\aZ\027SET INSERT_ID = 1014152" Sep 15 07:01:18 otter2 otter-node-prd[397902]: , header { Sep 15 07:01:18 otter2 otter-node-prd[397902]: version: 1 Sep 15 07:01:18 otter2 otter-node-prd[397902]: logfileName: "mysql-bin-3306.020092" Sep 15 07:01:18 otter2 otter-node-prd[397902]: logfileOffset: 924171204 Sep 15 07:01:18 otter2 otter-node-prd[397902]: serverId: 67 Sep 15 07:01:18 otter2 otter-node-prd[397902]: serverenCode: "UTF-8" Sep 15 07:01:18 otter2 otter-node-prd[397902]: executeTime: 1600124478000 Sep 15 07:01:18 otter2 otter-node-prd[397902]: sourceType: MYSQL Sep 15 07:01:18 otter2 otter-node-prd[397902]: schemaName: "drug" Sep 15 07:01:18 otter2 otter-node-prd[397902]: tableName: "drug_indications_relation" Sep 15 07:01:18 otter2 otter-node-prd[397902]: eventLength: 177 Sep 15 07:01:18 otter2 otter-node-prd[397902]: eventType: INSERT Sep 15 07:01:18 otter2 otter-node-prd[397902]: } Sep 15 07:01:18 otter2 otter-node-prd[397902]: entryType: ROWDATA Sep 15 07:01:18 otter2 otter-node-prd[397902]: storeValue: "\020\001Z_insert into drug_indications_relation (drug_id, indications_id, type) values (2099547, 1630, 6)r\004drug" 解释:插入表drug_indications_relation的一条数据,主键为1014152,偏移量为924171204。

com.alibaba.otter.node.etl.select.selector.MessageParser#internParse(com.alibaba.otter.shared.common.model.config.pipeline.Pipeline, com.alibaba.otter.canal.protocol.CanalEntry.Entry) 代码调整:

private List internParse(Pipeline pipeline, Entry entry) { RowChange rowChange = null; try { rowChange = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new SelectException("parser of canal-event has an error , data:" + entry.toString(), e); }

    if(23 == pipeline.getId() && ("global_drug_relate_indication".equalsIgnoreCase(entry.getHeader().getTableName())
        || "drug_indications_relation".equalsIgnoreCase(entry.getHeader().getTableName()) )){
        logger.warn("binlogdebug-interparse entryposition:{} entryType:{}",entry.getHeader().getLogfileOffset(),
            null==rowChange?"null":rowChange.getEventType());
        if(null != rowChange.getRowDatasList() && rowChange.getRowDatasList().size()>0){
            for(RowData rowData:rowChange.getRowDatasList()){
                logger.warn("binlogdebug-interparse entryposition:{} rowData:{}",entry.getHeader().getLogfileOffset(),
                    rowData.toString());
            }
        }else{
            logger.warn("binlogdebug-interparse entryposition:{} rowDatasList is null",entry.getHeader().getLogfileOffset());
        }
    }

    if (rowChange == null) {
        return null;
    }

    String schemaName = entry.getHeader().getSchemaName();
    String tableName = entry.getHeader().getTableName();
    EventType eventType = EventType.valueOf(rowChange.getEventType().name());

    // 处理下DDL操作
    if (eventType.isQuery()) {
        // 直接忽略query事件
        return null;
    }

    // 首先判断是否为系统表
    if (StringUtils.equalsIgnoreCase(pipeline.getParameters().getSystemSchema(), schemaName)) {
        // do noting
        if (eventType.isDdl()) {
            return null;
        }

        if (StringUtils.equalsIgnoreCase(pipeline.getParameters().getSystemDualTable(), tableName)) {
            // 心跳表数据直接忽略
            return null;
        }
    } else {
        if (eventType.isDdl()) {
            boolean notExistReturnNull = false;
            if (eventType.isRename()) {
                notExistReturnNull = true;
            }

            DataMedia dataMedia = ConfigHelper.findSourceDataMedia(pipeline,
                schemaName,
                tableName,
                notExistReturnNull);
            // 如果EventType是CREATE/ALTER,需要reload
            // DataMediaInfo;并且把CREATE/ALTER类型的事件丢弃掉.
            if (dataMedia != null && (eventType.isCreate() || eventType.isAlter() || eventType.isRename())) {
                DbDialect dbDialect = dbDialectFactory.getDbDialect(pipeline.getId(),
                    (DbMediaSource) dataMedia.getSource());
                dbDialect.reloadTable(schemaName, tableName);// 更新下meta信息
            }

            boolean ddlSync = pipeline.getParameters().getDdlSync();
            if (ddlSync) {
                // 处理下ddl操作
                EventData eventData = new EventData();
                eventData.setSchemaName(schemaName);
                eventData.setTableName(tableName);
                eventData.setEventType(eventType);
                eventData.setExecuteTime(entry.getHeader().getExecuteTime());
                eventData.setSql(rowChange.getSql());
                eventData.setDdlSchemaName(rowChange.getDdlSchemaName());
                eventData.setTableId(dataMedia.getId());
                return Arrays.asList(eventData);
            } else {
                return null;
            }
        }
    }

    List<EventData> eventDatas = new ArrayList<EventData>();
    for (RowData rowData : rowChange.getRowDatasList()) {
        EventData eventData = internParse(pipeline, entry, rowChange, rowData);
        if (eventData != null) {
            eventDatas.add(eventData);
        }
        if(23 == pipeline.getId() && ("global_drug_relate_indication".equalsIgnoreCase(entry.getHeader().getTableName())
            || "drug_indications_relation".equalsIgnoreCase(entry.getHeader().getTableName()) )){
            logger.warn("binlogdebug-interparse entryposition:{} eventData:{}",entry.getHeader().getLogfileOffset(),null==eventData?"null":eventData.getKeys().toString());
        }
    }

    return eventDatas;
}

根据定位到上个insert的偏移量为924171204定位到解析rowChange日志如下:

Sep 15 07:01:18 otter2 otter-node-prd[397902]: 2020-09-15 07:01:18.065 [pipelineId = 23,taskName = ProcessSelect] WARN com.alibaba.otter.node.etl.select.selector.MessageParser - binlogdebug-messageRowParser randStr:oTQo55Qa3d5Yyq0uVkzKcG4uCf4hp7,entry:924171204,isLoopback:false,enableLoopbackRemedy:false,needLoopback:false,isMarkTable:false,isCompatibleLoopback:false Sep 15 07:01:18 otter2 otter-node-prd[397902]: 2020-09-15 07:01:18.065 [pipelineId = 23,taskName = ProcessSelect] WARN com.alibaba.otter.node.etl.select.selector.MessageParser - binlogdebug-interparse entryposition:924171204 entryType:INSERT Sep 15 07:01:18 otter2 otter-node-prd[397902]: 2020-09-15 07:01:18.065 [pipelineId = 23,taskName = ProcessSelect] WARN com.alibaba.otter.node.etl.select.selector.MessageParser - binlogdebug-interparse entryposition:924171204 rowDatasList is null 发现没有解析到rowchange,导致binlog直接被过滤而造成数据丢失!!!

已确认点 本来以为是反序列化rowchange造成的数据丢失,但是发现但序列化的时候做了异常捕获和抛出,日志中也没有发现异常日志,所以应该不是反序列化的问题。 otter的版本:4.1.17版本基础的定制版本,但是我们messageparse这块没有修改调整过。 canal版本升级至1.1.4版本 mysql数据库版本5.6 疑问点 为什么会出现拿到binlog,然后解析rowchange为空?? 会不会是canal中的bug?

原提问者GitHub用户wuqiu-ai

展开
收起
云上静思 2023-06-14 23:31:30 108 0
3 条回答
写回答
取消 提交回答
  • 是binlog-format格式的问题,由于DBA修改了ROW之后,由于是线上环境,不能做mysql重启,在加上业务也没重新部署,导致部分数据库连接的format格式还是之前的statement格式,最终导致增量失败

    原回答者Github用户 wuqiu-ai

    2023-06-16 15:19:53
    赞同 展开评论 打赏
  • 北京阿里云ACE会长

    问题可能是由于Canal日志中缺少特定的RowChange数据导致的。具体来说,当Canal解析binlog并将数据转换为RowChange对象时,如果缺少特定的数据,则可能会导致RowChange对象为空,从而导致数据丢失。

    为了解决这个问题,您可以尝试以下方法:

    检查Canal Server的配置:请检查您的Canal Server的配置,确保它已正确配置,并且Canal日志可以正确解析binlog数据。您可以尝试重新配置Canal Server,以确保它已正确设置。

    检查Canal日志文件:请检查Canal日志文件,以查看是否有任何错误或警告消息,这些消息可能会提供更多关于数据丢失的信息。您可以尝试使用Canal提供的命令行工具来查看Canal日志文件。

    检查Canal版本:请检查您使用的Canal版本是否与您的数据库版本兼容。如果版本不兼容,可能会导致数据丢失或解析错误。

    检查数据库的写入:请检查您的数据库是否在写入数据。如果您的数据库在写入数据时发生了故障或错误,可能会导致数据丢失。

    检查Otter的配置:请检查您的Otter配置,确保它已正确设置,并且可以正确处理Canal解析的数据。您可以尝试重新配置Otter,以确保它已正确设置。

    2023-06-15 08:06:22
    赞同 展开评论 打赏
  • 根据问题描述,该问题是在线上环境中发现有些表的部分增量数据没有同步,而在定位问题时发现是由于com.alibaba.otter.node.etl.select.selector.MessageParser#internParse(com.alibaba.otter.shared.common.model.config.pipeline.Pipeline, com.alibaba.otter.canal.protocol.CanalEntry.Entry)方法中的rowChange = RowChange.parseFrom(entry.getStoreValue());返回为空导致数据丢失。为了解决该问题,日志com.alibaba.otter.node.etl.select.selector.MessageParser#parse进行了调整。

    然而,该问题的具体原因仍需要进一步排查,例如是否由于数据源结构变更、网络传输问题等原因导致数据丢失。建议对具体情况进行更深入的分析和排查,以确定问题的根本原因并采取相应措施进行修复。

    2023-06-15 08:05:30
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载