bin-log-distributor消费数据丢失问题解决记录

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
日志服务 SLS,月写入数据量 50GB 1个月
简介: bin-log-distributor项目简介 bin-log-distributor是凯京科技开源的Mysql数据库数据变动实时监听分发中间件,详情见码云开源地址, github开源地址 背景 线上反馈有bin-log-distributor客户端偶尔有丢失数据的情况。

bin-log-distributor项目简介

bin-log-distributor是凯京科技开源的Mysql数据库数据变动实时监听分发中间件,详情见github开源地址

背景

线上反馈有bin-log-distributor客户端偶尔有丢失数据的情况。

验证方法

在装mysql的服务器上,使用多个线程并发循环对bin-log-distributor监控的数据表进行增、删、改。

预期

bin-log-distributor客户端出现数据丢失。

定位

分析客户端收到的数据,发现不仅有数据丢失,还有数据重复消费。
将客户端日志打印更仔细,发现以下两点问题:

  1. 从redis队列peek数据后,remove抛出如下异常:

a

  1. Redis队列数据没消费完其他线程异常进行了消费:

b

分析如下消费redis数据方法源码:

private void doRunWithLock() {
    RLock rLock = redissonClient.getLock(dataKeyLock);
    EventBaseDTO dto;
    try {
        RQueue<EventBaseDTO> queue = redissonClient.getQueue(dataKey);
        // 尝试加锁,最多等待20秒,上锁以后30秒自动解锁
        boolean lockRes = rLock.tryLock(20 * 1000, 3 * retryInterval,TimeUnit.MILLISECONDS);
        //拿到锁且 队列不为空 进入
        if (lockRes && !queue.isEmpty()) {
            //让这个线程把队列里的全部处理完吧
            rLock.lock();
            //DATA_KEY_IN_PROCESS.add(dataKey);
            while ((dto = queue.peek()) != null) {
                //处理完毕,把数据从队列摘除
                boolean handleRes = doHandleWithLock(dto, 0);
                if (handleRes) {
                    queue.remove();
                }
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
        log.severe("接收处理数据失败:" + e.toString());
    } finally {
        rLock.forceUnlock();
        rLock.delete();
        //DATA_KEY_IN_PROCESS.remove(dataKey);
    }
}

猜测是redis RLock并没有效,于是在rLock.tryLock() 和 rLock.delete() 方法后面分别打印出 acquireLock 和 releaseLock的日志,再次执行测试,得到如下日志:
c

发现 pool-2-thread-1线程成功获取锁后,pool-2-thread-2获取锁失败,并立刻将pool-2-thread-1 持有的锁release了。结合源码发现pool-2-thread-2获取锁失败后,执行了finally代码块中的rLock.forceUnlock()方法,查阅资料发现forceUnlock()方法可以直接释放相同key的其他线程持有的锁。

至此,定位到问题为:每次收到数据起一个线程执行doRunWithLock()方法,当数据库操作较频繁时,一个线程获取到的redis锁会被另一个线程release掉,导致redis锁内部本来应该是单例执行的消费redis队列数据变成了多个线程同时执行,从而出现多个线程消费队列数据冲突,引发数据丢失和重复消费的问题。

解决

确保每个线程获取到的Redis锁只能由当前线程release(),更改后的代码如下:

private void doRunWithLock() {
    RLock rLock = redissonClient.getLock(dataKeyLock);
    EventBaseDTO dto;
    boolean lockRes = false;
    try {
        // 尝试加锁,最多等待50ms(防止过多线程等待)
        // 上锁以后6个小时自动解锁(防止redis队列太长,当前拿到锁的线程处理时间过长)
        lockRes = rLock.tryLock(50, 6 * 3600 * 1000, TimeUnit.MILLISECONDS);
        if (!lockRes) {
            return;
        }
        DATA_KEY_IN_PROCESS.add(dataKey);
        //拿到锁之后再获取队列
        RQueue<EventBaseDTO> queue = redissonClient.getQueue(dataKey);
        if (!queue.isExists() || queue.isEmpty()) {
            return;
        }
        //拿到锁且 队列不为空 进入
        while ((dto = queue.peek()) != null) {
            //处理完毕,把数据从队列摘除
            boolean handleRes = doHandleWithLock(dto, 0);
            if (handleRes) {
                queue.remove();
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
        log.severe("接收处理数据失败:" + e.toString());
 
    } finally {
        //forceUnlock是可以释放别的线程拿到的锁的,需要判断是否是当前线程持有的锁
        if (lockRes) {
            rLock.forceUnlock();
            rLock.delete();
            DATA_KEY_IN_PROCESS.remove(dataKey);
        }
    }
}

回归

更改后执行继续执行相同的测试用例,得到如下结果日志:
d

pool-2-thread-2在获取到redis锁之后一直持续消费redis队列中的数据,其他线程尝试获取redis锁失败后,不再强制release该锁,客户端消费条数、类型结果正确。

完成

修改测试用例,使用更多线程、执行更多数据库操作,分析消费结果,结果数据条数正确,未出现数据丢失、重复消费等问题。
确认结果回归结果正确,提交代码。

相关实践学习
【涂鸦即艺术】基于云应用开发平台CAP部署AI实时生图绘板
【涂鸦即艺术】基于云应用开发平台CAP部署AI实时生图绘板
相关文章
|
存储 人工智能 索引
Yalmip入门教程(3)-约束条件的定义
Yalmip入门教程系列博客第三篇,约束条件的定义。
|
算法 Python
2022年全国大学生数学建模竞赛E题目-小批量物料生产安排详解+思路+Python代码时序预测模型(二)
2022年全国大学生数学建模竞赛E题目-小批量物料生产安排详解+思路+Python代码时序预测模型(二)
931 0
2022年全国大学生数学建模竞赛E题目-小批量物料生产安排详解+思路+Python代码时序预测模型(二)
|
消息中间件 运维 Kafka
【kafka问题】记一次kafka消费者未接收到消息问题
出现了这样一个问题, A说他的kafka消息发送了; B说它没有接收到; 那么问题来了: • A的消息是否发送了? • 如果A的消息发送成功了; B为何没有消费到? 好,带着上面的问题,我们来一步步排查一下问题所在
【kafka问题】记一次kafka消费者未接收到消息问题
|
存储 SQL NoSQL
如何破解HBase+Elasticsearch组合使用遇到的难题
面对海量数据的低成本存储+高效检索需求,业界通常使用HBase+ElasticSearch的组合方案,本文将介绍该方案的常见实现架构及其痛点,并提出一种更好的解决方法Lindorm Searchindex
4135 1
如何破解HBase+Elasticsearch组合使用遇到的难题
|
存储 编解码 人工智能
阿里云视频点播介绍
阿里云视频点播介绍
1114 0
阿里云视频点播介绍
|
数据可视化 数据挖掘 API
数据分析三剑客【AIoT阶段一(下)】(十万字博文 保姆级讲解)—Matplotlib—数据可视化进阶—Seaborn(1)(十四)
你好,感谢你能点进来本篇博客,请不要着急退出,相信我,如果你有一定的 Python 基础,想要学习 Python数据分析的三大库:numpy,pandas,matplotlib;这篇文章不会让你失望,本篇博客是 【AIoT阶段一(下)】 的内容:Python数据分析,
716 0
数据分析三剑客【AIoT阶段一(下)】(十万字博文 保姆级讲解)—Matplotlib—数据可视化进阶—Seaborn(1)(十四)
|
Unix Linux 网络安全
如何使用VNC远程Linux,这款开源神器你废吗?
如何使用VNC远程Linux,这款开源神器你废吗?
1147 0
如何使用VNC远程Linux,这款开源神器你废吗?
|
资源调度 Rust 搜索推荐
全球顶尖对冲基金薪资大调查!量化行业基本工资的上限:25万-30万美元!
在量化投资行业,基本工资的上限往往在25万至30万美元左右,其中大部分薪酬是以奖金形式支付的!
全球顶尖对冲基金薪资大调查!量化行业基本工资的上限:25万-30万美元!
|
网络协议 算法 安全
MQTT.fx接入物联网平台使用说明
通过MQTT.fx工具快速接入阿里云物联网平台
6188 0
MQTT.fx接入物联网平台使用说明
|
新零售 安全 算法
阿里云数据中台推出隐私增强计算产品DataTrust,让数据可用不可见
阿里云数据中台重磅推出隐私增强计算产品DataTrust,通过数据可用不可见技术,可实现产业间高效协同,帮助行业、机构实现数据价值的共享与协作。
阿里云数据中台推出隐私增强计算产品DataTrust,让数据可用不可见

热门文章

最新文章