bin-log-distributor消费数据丢失问题解决记录

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
日志服务 SLS,月写入数据量 50GB 1个月
简介: bin-log-distributor项目简介 bin-log-distributor是凯京科技开源的Mysql数据库数据变动实时监听分发中间件,详情见码云开源地址, github开源地址 背景 线上反馈有bin-log-distributor客户端偶尔有丢失数据的情况。

bin-log-distributor项目简介

bin-log-distributor是凯京科技开源的Mysql数据库数据变动实时监听分发中间件,详情见github开源地址

背景

线上反馈有bin-log-distributor客户端偶尔有丢失数据的情况。

验证方法

在装mysql的服务器上,使用多个线程并发循环对bin-log-distributor监控的数据表进行增、删、改。

预期

bin-log-distributor客户端出现数据丢失。

定位

分析客户端收到的数据,发现不仅有数据丢失,还有数据重复消费。
将客户端日志打印更仔细,发现以下两点问题:

  1. 从redis队列peek数据后,remove抛出如下异常:

a

  1. Redis队列数据没消费完其他线程异常进行了消费:

b

分析如下消费redis数据方法源码:

private void doRunWithLock() {
    RLock rLock = redissonClient.getLock(dataKeyLock);
    EventBaseDTO dto;
    try {
        RQueue<EventBaseDTO> queue = redissonClient.getQueue(dataKey);
        // 尝试加锁,最多等待20秒,上锁以后30秒自动解锁
        boolean lockRes = rLock.tryLock(20 * 1000, 3 * retryInterval,TimeUnit.MILLISECONDS);
        //拿到锁且 队列不为空 进入
        if (lockRes && !queue.isEmpty()) {
            //让这个线程把队列里的全部处理完吧
            rLock.lock();
            //DATA_KEY_IN_PROCESS.add(dataKey);
            while ((dto = queue.peek()) != null) {
                //处理完毕,把数据从队列摘除
                boolean handleRes = doHandleWithLock(dto, 0);
                if (handleRes) {
                    queue.remove();
                }
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
        log.severe("接收处理数据失败:" + e.toString());
    } finally {
        rLock.forceUnlock();
        rLock.delete();
        //DATA_KEY_IN_PROCESS.remove(dataKey);
    }
}

猜测是redis RLock并没有效,于是在rLock.tryLock() 和 rLock.delete() 方法后面分别打印出 acquireLock 和 releaseLock的日志,再次执行测试,得到如下日志:
c

发现 pool-2-thread-1线程成功获取锁后,pool-2-thread-2获取锁失败,并立刻将pool-2-thread-1 持有的锁release了。结合源码发现pool-2-thread-2获取锁失败后,执行了finally代码块中的rLock.forceUnlock()方法,查阅资料发现forceUnlock()方法可以直接释放相同key的其他线程持有的锁。

至此,定位到问题为:每次收到数据起一个线程执行doRunWithLock()方法,当数据库操作较频繁时,一个线程获取到的redis锁会被另一个线程release掉,导致redis锁内部本来应该是单例执行的消费redis队列数据变成了多个线程同时执行,从而出现多个线程消费队列数据冲突,引发数据丢失和重复消费的问题。

解决

确保每个线程获取到的Redis锁只能由当前线程release(),更改后的代码如下:

private void doRunWithLock() {
    RLock rLock = redissonClient.getLock(dataKeyLock);
    EventBaseDTO dto;
    boolean lockRes = false;
    try {
        // 尝试加锁,最多等待50ms(防止过多线程等待)
        // 上锁以后6个小时自动解锁(防止redis队列太长,当前拿到锁的线程处理时间过长)
        lockRes = rLock.tryLock(50, 6 * 3600 * 1000, TimeUnit.MILLISECONDS);
        if (!lockRes) {
            return;
        }
        DATA_KEY_IN_PROCESS.add(dataKey);
        //拿到锁之后再获取队列
        RQueue<EventBaseDTO> queue = redissonClient.getQueue(dataKey);
        if (!queue.isExists() || queue.isEmpty()) {
            return;
        }
        //拿到锁且 队列不为空 进入
        while ((dto = queue.peek()) != null) {
            //处理完毕,把数据从队列摘除
            boolean handleRes = doHandleWithLock(dto, 0);
            if (handleRes) {
                queue.remove();
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
        log.severe("接收处理数据失败:" + e.toString());
 
    } finally {
        //forceUnlock是可以释放别的线程拿到的锁的,需要判断是否是当前线程持有的锁
        if (lockRes) {
            rLock.forceUnlock();
            rLock.delete();
            DATA_KEY_IN_PROCESS.remove(dataKey);
        }
    }
}

回归

更改后执行继续执行相同的测试用例,得到如下结果日志:
d

pool-2-thread-2在获取到redis锁之后一直持续消费redis队列中的数据,其他线程尝试获取redis锁失败后,不再强制release该锁,客户端消费条数、类型结果正确。

完成

修改测试用例,使用更多线程、执行更多数据库操作,分析消费结果,结果数据条数正确,未出现数据丢失、重复消费等问题。
确认结果回归结果正确,提交代码。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
6月前
|
SQL 关系型数据库 MySQL
MySQL数据库,可以使用二进制日志(binary log)进行时间点恢复
对于MySQL数据库,可以使用二进制日志(binary log)进行时间点恢复。二进制日志是MySQL中记录所有数据库更改操作的日志文件。要进行时间点恢复,您需要执行以下步骤: 1. 确保MySQL配置文件中启用了二进制日志功能。在配置文件(通常是my.cnf或my.ini)中找到以下行,并确保没有被注释掉: Copy code log_bin = /path/to/binary/log/file 2. 在需要进行恢复的时间点之前创建一个数据库备份。这将作为恢复的基准。 3. 找到您要恢复到的时间点的二进制日志文件和位置。可以通过执行以下命令来查看当前的二进制日志文件和位
591 1
|
SQL 关系型数据库 MySQL
如何使用MySQL Binlog Digger 4.14对binlog日志进行挖掘分析以便快速恢复误删除数据
MySQL Binlog Digger是一款运行在windows操作系统的挖掘与分析MySQL binlog的可视化工具,通过它可以快速打回被误操作时的数据,例如:delete, insert, update操作,并依据这些误操作生成相应的undo回滚语句,以便快速恢复数据,此外,它还可以支持离线binlog挖掘分析与binlog下载,它仅支持dml操作的回滚,但不支持ddl的回滚。
3828 1
如何使用MySQL Binlog Digger 4.14对binlog日志进行挖掘分析以便快速恢复误删除数据
|
6月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错之从specified offset启动没有问题,但是停机后从savepoint恢复则报错binlog被purge(实际文件还在),如何处理
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
6月前
|
关系型数据库 MySQL 数据管理
MySQL通过 bin-log 恢复从备份点到灾难点之间数据
MySQL通过 bin-log 恢复从备份点到灾难点之间数据
299 0
|
6月前
|
Java
log4j2定期删除日志文件的配置
确保将以上配置嵌入到你的Log4j 2配置文件中,并根据项目的需求进行适当的调整。
246 1
|
存储 SQL 机器学习/深度学习
MySQL 日志体系解析:保障数据一致性与恢复的三位英雄:Redo Log、Undo Log、Bin Log
MySQL 日志体系解析:保障数据一致性与恢复的三位英雄:Redo Log、Undo Log、Bin Log
267 0
|
消息中间件 Java Kafka
kafka log4j日志级别修改,一天生成一个日志文件
kafka log4j日志级别修改,一天生成一个日志文件
|
消息中间件 监控 Kafka
flume搜集日志:如何解决实时不断追加的日志文件及不断增加的文件个数问题
flume搜集日志:如何解决实时不断追加的日志文件及不断增加的文件个数问题
339 0
flume搜集日志:如何解决实时不断追加的日志文件及不断增加的文件个数问题
|
SQL 存储 缓存
分析MySQL执行的流程(连接、缓存、分析、优化、执行、Undo Log、Binlog、Redo Log)
熟悉MySQL的都知道MySQL服务端实现主要分为Server层和存储引擎层。Server层负责接收和管理客户端连接、管理缓存、解析SQL、优化SQL、调用存储引擎执行SQL;存储引擎层主要负责存储、查询数据。
分析MySQL执行的流程(连接、缓存、分析、优化、执行、Undo Log、Binlog、Redo Log)
|
消息中间件 Kafka Scala
从源码和日志文件结构中分析 Kafka 重启失败事件
上次的 Kafka 重启失败事件,对为什么重启失败的原因似乎并没有解释清楚,那么我就在这里按照我对 Kafka 的认识,从源码和日志文件结构去尝试寻找原因。
243 0
从源码和日志文件结构中分析 Kafka 重启失败事件