Tair分布式锁 实践经验(160805更新)

简介: # 前言 在JVM中,怎么样防止因多线程并发造成的数据不一致或逻辑混乱?大家一定都能想到锁,能想到`java.util.concurrent`包下的各种各样的用法。 但在分布式环境下,不同的机器、不同的jvm,那些好用的并发锁工具,并不能满足需要。 所以我们需要有一个`分布式锁`,来解决分布式环境下的并发问题。而本文正是在这条道路上,走出的一些经验的总结。 我们按照待解决问题的

前言

在JVM中,怎么样防止因多线程并发造成的数据不一致或逻辑混乱?大家一定都能想到锁,能想到java.util.concurrent包下的各种各样的用法。
但在分布式环境下,不同的机器、不同的jvm,那些好用的并发锁工具,并不能满足需要。

所以我们需要有一个分布式锁,来解决分布式环境下的并发问题。而本文正是在这条道路上,走出的一些经验的总结。

我们按照待解决问题的场景,一步一步看下去。

问题一:如何实现一个分布式锁;

锁,大多是基于一个只能被1个线程占用、得到的资源来实现的。
JVM的锁,是基于CPU对于寄存器的修改指令cmpxchg,来保证旧值改为新值的操作,只有一个能成功。这里的旧值,就是这个被争夺的资源,大多数情况,并发时第一个线程对旧值修改成功后,其他线程就没有机会了。(当然,ABA是另外一个话题,这里就不说了。。)

所以,在分布式环境下,要实现一个锁,我们就要找个一个只能被1个线程占用的资源。
有经验的开发很快能想到,共享磁盘文件、缓存、mysql数据库,这些分布式环境下,数据表现为单份的,都应当能满足需求。

然而,基于文件、DB会遭遇各式各样的问题,性能,经常也会是瓶颈。
因此,我们这里使用的,是淘系用的最多的缓存中间件产品--Tair。

查看com.taobao.tair.TairManager接口,发现有以下几个接口或许适合使用:

  • TairManager.incr 加法计数器。首次使用时,返回值能等于默认值,而不被+n的机会,只有一个。貌似可以用。
  • TairManager.lock 看起来名字像是这个意思,姑且拿来一试。
  • TairManager.put 传入version版本进行校验,cas原则会保证只有一个能成功。貌似可以用。

(注:mdb有可能丢、且invalid时不保证跨机房一致性,所以这个锁肯定需要用ldb来实现的。)

在线上多机房情况下,做了一下测试,测试程序核心代码大约是:

void test(){           // 2个机房的jvm实例,每个实例n个线程同时执行本方法;
    while(true){
        if(tryLock()){     // tryLock\unLock 的实现对应有3套;
            try{
                logger.info("Got lock! Hostname:{} ThreadName:{}",getHostname(),Thread.currentThread().getName());
                Thread.sleep(1000);
            }finally {
                unLock();
                logger.info("Release lock! Hostname:{} ThreadName:{}",getHostname(),Thread.currentThread().getName());
            }
        }
    }
}

测试的结论如下:

  • TairManager.incr 初始几次锁的获取和释放没有问题,但是后来返回值很大,就谁也拿不到锁;怀疑和接口超时、或invalid机制有关;

    • 补充: 后面发现incr做锁的一种可靠方案是,使用值限定参数:int lowBound, int upperBound

      比如:lowBound=0, upperBound=1,则value一直在0与1之间切换,用过多次,还是很靠谱的。
  • TairManager.lock 完全不行,lock接口看来根本不是做这个用的;真正的使用场景是锁住一个key不容许更新,不是锁机制。

0

  • TairManager.put 非常稳定、靠谱;有个现象值得关注:A机房invalid之后,B机房会先拿到锁;因为invalid先从远程机房开始;

最后,给出ldb put实现的分布式锁的核心代码(__后面都基于ldb put来实现__):

public boolean trylock(String key) {
    ResultCode code = ldbTairManager.put(NAMESPACE, key, "This is a Lock.", 2, 0);
    if (ResultCode.SUCCESS.equals(code))
        return true;
    else
        return false;
}
public boolean unlock(String key) {
    ldbTairManager.invalid(NAMESPACE, key);
}

问题二:lock之后,程序发布或者进程crash,trylock就永远false了;

每次发布,总发现有些异常数据,拿不到锁,不能继续向前走;
仔细分析,原来tair的lock,一直没能释放;

要解决这个问题,可以先不管原因,无脑的给tair put加上超时时间就行,这样业务至少可以自行恢复。
但是,这个超时时间需要仔细考虑把握,需要在业务承受范围之内。

注: 像程序发布、进程crash,这种情况,是无可避免的让锁没机会释放。还有其他可能性,大多是bug了。。

public boolean trylock(String key, int timeout) {
    ResultCode code = ldbTairManager.put(NAMESPACE, key, "This is a Lock.", 2, timeout);
    if (ResultCode.SUCCESS.equals(code))
        return true;
    else
        return false;
}

问题二:tair存在让人苦恼的超时问题,即使千分之1,本业务有时也不能容忍

tair的大神给的回复很简单:超时请重试

仔细想一下,put超时分两种情况:

  1. 上一次put其实已经成功了;那么重试肯定会失败;
  2. 上一次put其实失败了;那么若这一次顺利,就会成功;

那如何解决上面的第1点呢?

其实,锁应该是能够经得起复查的(类似偏向锁):A拿到的锁,没有unlock之前,无论A重试检查多少次,都是A的!

怎么实现?

既然用的是ldb缓存,它是key-value结构的,前面version控制等,都只用到了key。
这里,我们可以从tair value里做文章:让value包含机器ip+线程name,trylock内先get value做检查

于是,实现变为:

public boolean trylock(String key, int timeout) {
    Result<DataEntry> result = locker.ldbTairManager.get(NAMESPACE, lockKey);
    if (result == null) return null;
    if (ResultCode.DATANOTEXSITS.equals(result.getRc())) { // means lock is free
        ResultCode code = ldbTairManager.put(NAMESPACE, key, getLockValue(), 2, timeout);
        if (ResultCode.SUCCESS.equals(code))
            return true;
    }else if(result.getValue() != null && getLockValue().equals(result.getValue().getValue())){
        return true;
    }
    return false;
}

private String getLockValue(){  
    return Utils.getHostname() + ":" + Thread.currentThread().getName();
}

注意: 其实这里线程复用时,ThreadName有相同风险,可以改为uuid逻辑,需复用锁时传入uuid。

public boolean trylock(String key, int timeout, String uuid){ ... }

问题三:新方案其实多了一次get操作,若是get也超时怎么办?

超时无法避免,还是要靠重试!(前提是逻辑可以重试)

public boolean trylock(String key, int timeout) {
    Result<DataEntry> result = locker.ldbTairManager.get(NAMESPACE, key);
    if (result == null || ResultCode.CONNERROR.equals(result.getRc()) || ResultCode.TIMEOUT.equals(result.getRc()) || ResultCode.UNKNOW.equals(result.getRc())) // get timeout retry case
        result = locker.ldbTairManager.get(NAMESPACE, key);
    if (result == null || ResultCode.CONNERROR.equals(result.getRc()) || ResultCode.TIMEOUT.equals(result.getRc()) || ResultCode.UNKNOW.equals(result.getRc())){ // 还是超时,则留下日志痕迹
        logger.error("ldb tair get timeout. key:{}.",key);
        return false;
    }
    if (ResultCode.DATANOTEXSITS.equals(result.getRc())) { // means lock is free
        ResultCode code = ldbTairManager.put(NAMESPACE, key, getLockValue(), 2, timeout);
        if (ResultCode.SUCCESS.equals(code))
            return true;
        else if(code==null || ResultCode.CONNERROR.equals(result.getRc()) || ResultCode.TIMEOUT.equals(result.getRc()) || ResultCode.UNKNOW.equals(result.getRc())){  // put timeout retry case
            return trylock(key, timeout); // 递归尝试
        }
    }else if(result.getValue() != null && getLockValue().equals(result.getValue().getValue())){
        return true;
    }
    return false;
}

// 【注意】其实这里线程复用时,ThreadName有相同风险,可以改为uuid逻辑,复用锁传入uuid。
private String getLockValue(){
    return Utils.getHostname() + ":" + Thread.currentThread().getName();
}

进一步的,我们还可以对get/put的retry做次数控制;
真实线上的情况,一般一次retry就能解决问题,次数多了,反而可能导致雪崩,需要慎重;

多一次Get的性能影响

有代码洁癖、性能洁癖的人可能会想:普通tair锁,一次put就能搞定,这里却要先get再put,浪费啊。。。

这里梳理一下:

  • 若是锁已经被持有,那么get之后,麻烦发现被持有,直接返回失败;这时,并不会再次put,开销是一样的;(甚至get的开销,比put要小,至少不会占用put的限流阈值)
  • 若是没人持有锁,确实这时get有些浪费的,但是为了锁可以复查这个特性(可重试)、为了能解决超时这个问题,我认为还是值得的。在实际场景中,开发者自己可以评估是否需要。比如:拿前面的uuid的样例API讲,若不需要这个特性时,就不传入uuid,那么实现代码里,可以自动降级为只有一个put的锁实现;

问题四:批量锁

批量锁,主要注意拿锁的顺序和释放锁相反,伪代码如下:

if(trylock("A") && trylock("B") && trylock("C")){
    try{
        // do something
    }finally{  // 注意这里的顺序要反过来
        unlock("C");
        unlock("B");
        unlock("A");
    }
}

最后总结下,给一个完整代码 :smile:

import com.taobao.tair.DataEntry;
import com.taobao.tair.Result;
import com.taobao.tair.ResultCode;
import com.taobao.tair.TairManager;
import org.apache.commons.lang.NotImplementedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.helpers.FormattingTuple;
import org.slf4j.helpers.MessageFormatter;

import javax.annotation.Resource;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class CommonLocker {

    private static final Logger logger = LoggerFactory.getLogger(CommonLocker.class);

    @Resource
    private TairManager ldbTairManager;

    private static final short NAMESPACE = 1310;

    private static CommonLocker locker;

    public void init() {
        if (locker != null) return;
        synchronized (CommonLocker.class) {
            if (locker == null)
                locker = this;
        }
    }

    public static Lock newLock(String format, Object... argArray) {
        FormattingTuple ft = MessageFormatter.arrayFormat(format, argArray);
        return newLock(ft.getMessage());
    }

    public static Lock newLock(String strKey) {
        String key = "_tl_" + strKey;
        return new TairLock(key, CommonConfig.lock_default_timeout);
    }

    public static Lock newLock(String strKey, int timeout) {
        String key = "_tl_" + strKey;
        return new TairLock(key, timeout);
    }

    private static class TairLock implements Lock {
        private String lockKey;
        private boolean gotLock = false;
        private int retryGet = 0;
        private int retryPut = 0;
        private int timeout;

        public TairLock(String key, int timeout) {
            this.lockKey = tokey(key);
            this.timeout = timeout;
        }

        public boolean tryLock() {
            return tryLock(timeout);
        }

        /**
         * need finally do unlock
         *
         * @return
         */
        public boolean tryLock(int timeout) {
            Result<DataEntry> result = locker.ldbTairManager.get(NAMESPACE, lockKey);
            while (retryGet++ < CommonConfig.lock_get_max_retry &&
                    (result == null || ResultCode.CONNERROR.equals(result.getRc()) || ResultCode.TIMEOUT.equals(result.getRc()) || ResultCode.UNKNOW.equals(result.getRc()))) // 重试一次
                result = locker.ldbTairManager.get(NAMESPACE, lockKey);
            if (ResultCode.DATANOTEXSITS.equals(result.getRc())) { // lock is free
                // 已验证version 2表示为空,若不是为空,则返回version error
                ResultCode code = locker.ldbTairManager.put(NAMESPACE, lockKey, locker.getValue(), 2, timeout);
                if (ResultCode.SUCCESS.equals(code)) {
                    gotLock = true;
                    return true;
                } else if (retryPut++ < CommonConfig.lock_put_max_retry &&
                        (code == null || ResultCode.CONNERROR.equals(result.getRc()) || ResultCode.TIMEOUT.equals(result.getRc()) || ResultCode.UNKNOW.equals(result.getRc()))) {
                    return tryLock(timeout);
                }
            } else if (result.getValue() != null && locker.getValue().equals(result.getValue().getValue())) {
// 【注意】其实这里线程复用时,ThreadName有相同风险,可以改为uuid逻辑,复用锁传入uuid。
                // 若是自己的锁,自己继续用
                gotLock = true;
                return true;
            }
            // 到这里表示没有拿到锁
            return false;
        }

        public void unlock() {
            if (gotLock) {
                ResultCode invalidCode = locker.ldbTairManager.invalid(NAMESPACE, lockKey);
                gotLock = false;
            }
        }

        public void lock() {
            throw new NotImplementedException();
        }

        public void lockInterruptibly() throws InterruptedException {
            throw new NotImplementedException();
        }

        public boolean tryLock(long l, TimeUnit timeUnit) throws InterruptedException {
            throw new NotImplementedException();
        }

        public Condition newCondition() {
            throw new NotImplementedException();
        }
    }

// 【注意】其实这里线程复用时,ThreadName有相同风险,可以改为uuid逻辑,复用锁传入uuid。
    private String getValue() {
        return getHostname() + ":" + Thread.currentThread().getName();
    }

    /**
     * 获得机器名
     *
     * @return
     */
    public static String getHostname() {
        try {
            return InetAddress.getLocalHost().getHostName();
        } catch (UnknownHostException e) {
            return "[unknown]";
        }
    }

    public void setLdbTairManager(TairManager ldbTairManager) {
        this.ldbTairManager = ldbTairManager;
    }
}

使用样例

Lock lockA = CommonLocker.newLock("hs_room_{}_uid_{}", roomDo.getUuid(), roomDo.getMaster().getUid());
Lock lockB = CommonLocker.newLock("hs_room_{}_uid_{}", roomDo.getUuid(), roomDo.getPartnerList().get(0).getUid());
try {
    if (lockA.tryLock() && lockB.tryLock()) {// 分布式锁定本任务
        // do something....
    }
} finally {
    lockB.unlock();
    lockA.unlock();
}

祝大家用的愉快,Happy Coding!!

目录
相关文章
|
4月前
|
人工智能 安全 Java
分布式 Multi Agent 安全高可用探索与实践
在人工智能加速发展的今天,AI Agent 正在成为推动“人工智能+”战略落地的核心引擎。无论是技术趋势还是政策导向,都预示着一场深刻的变革正在发生。如果你也在探索 Agent 的应用场景,欢迎关注 AgentScope 项目,或尝试使用阿里云 MSE + Higress + Nacos 构建属于你的 AI 原生应用。一起,走进智能体的新世界。
1084 67
|
4月前
|
关系型数据库 Apache 微服务
《聊聊分布式》分布式系统基石:深入理解CAP理论及其工程实践
CAP理论指出分布式系统中一致性、可用性、分区容错性三者不可兼得,必须根据业务需求进行权衡。实际应用中,不同场景选择不同策略:金融系统重一致(CP),社交应用重可用(AP),内网系统可选CA。现代架构更趋向动态调整与混合策略,灵活应对复杂需求。
|
6月前
|
数据采集 消息中间件 监控
单机与分布式:社交媒体热点采集的实践经验
在舆情监控与数据分析中,单机脚本适合小规模采集如微博热榜,而小红书等大规模、高时效性需求则需分布式架构。通过Redis队列、代理IP与多节点协作,可提升采集效率与稳定性,适应数据规模与变化速度。架构选择应根据实际需求,兼顾扩展性与维护成本。
183 2
|
9月前
|
人工智能 安全 应用服务中间件
阿里巴巴 MCP 分布式落地实践:快速转换 HSF 到 MCP server
本文分享了阿里巴巴内部将大规模HSF服务快速转换为MCP Server的实践经验,通过Higress网关实现MCP协议卸载,无需修改代码即可接入MCP生态。文章分析了MCP生态面临的挑战,如协议快速迭代和SDK不稳定性,并详细介绍了操作步骤及组件功能。强调MCP虽非终极解决方案,但作为AI业务工程化的起点具有重要意义。最后总结指出,MCP只是AI原生应用发展的第一步,未来还有更多可能性值得探索。
1393 48
|
5月前
|
消息中间件 缓存 监控
中间件架构设计与实践:构建高性能分布式系统的核心基石
摘要 本文系统探讨了中间件技术及其在分布式系统中的核心价值。作者首先定义了中间件作为连接系统组件的&quot;神经网络&quot;,强调其在数据传输、系统稳定性和扩展性中的关键作用。随后详细分类了中间件体系,包括通信中间件(如RabbitMQ/Kafka)、数据中间件(如Redis/MyCAT)等类型。文章重点剖析了消息中间件的实现机制,通过Spring Boot代码示例展示了消息生产者的完整实现,涵盖消息ID生成、持久化、批量发送及重试机制等关键技术点。最后,作者指出中间件架构设计对系统性能的决定性影响,
|
9月前
|
监控 Linux 应用服务中间件
Linux多节点多硬盘部署MinIO:分布式MinIO集群部署指南搭建高可用架构实践
通过以上步骤,已成功基于已有的 MinIO 服务,扩展为一个 MinIO 集群。该集群具有高可用性和容错性,适合生产环境使用。如果有任何问题,请检查日志或参考MinIO 官方文档。作者联系方式vx:2743642415。
3175 57
|
9月前
|
安全 JavaScript 前端开发
HarmonyOS NEXT~HarmonyOS 语言仓颉:下一代分布式开发语言的技术解析与应用实践
HarmonyOS语言仓颉是华为专为HarmonyOS生态系统设计的新型编程语言,旨在解决分布式环境下的开发挑战。它以“编码创造”为理念,具备分布式原生、高性能与高效率、安全可靠三大核心特性。仓颉语言通过内置分布式能力简化跨设备开发,提供统一的编程模型和开发体验。文章从语言基础、关键特性、开发实践及未来展望四个方面剖析其技术优势,助力开发者掌握这一新兴工具,构建全场景分布式应用。
884 35
|
10月前
|
存储 负载均衡 测试技术
ACK Gateway with Inference Extension:优化多机分布式大模型推理服务实践
本文介绍了如何利用阿里云容器服务ACK推出的ACK Gateway with Inference Extension组件,在Kubernetes环境中为多机分布式部署的LLM推理服务提供智能路由和负载均衡能力。文章以部署和优化QwQ-32B模型为例,详细展示了从环境准备到性能测试的完整实践过程。
|
11月前
|
并行计算 PyTorch 算法框架/工具
融合AMD与NVIDIA GPU集群的MLOps:异构计算环境中的分布式训练架构实践
本文探讨了如何通过技术手段混合使用AMD与NVIDIA GPU集群以支持PyTorch分布式训练。面对CUDA与ROCm框架互操作性不足的问题,文章提出利用UCC和UCX等统一通信框架实现高效数据传输,并在异构Kubernetes集群中部署任务。通过解决轻度与强度异构环境下的挑战,如计算能力不平衡、内存容量差异及通信性能优化,文章展示了如何无需重构代码即可充分利用异构硬件资源。尽管存在RDMA验证不足、通信性能次优等局限性,但该方案为最大化GPU资源利用率、降低供应商锁定提供了可行路径。源代码已公开,供读者参考实践。
1015 3
融合AMD与NVIDIA GPU集群的MLOps:异构计算环境中的分布式训练架构实践
|
11月前
|
人工智能 运维 监控
领先AI企业经验谈:探究AI分布式推理网络架构实践
当前,AI行业正处于快速发展的关键时期。继DeepSeek大放异彩之后,又一款备受瞩目的AI智能体产品Manus横空出世。Manus具备独立思考、规划和执行复杂任务的能力,其多智能体架构能够自主调用工具。在GAIA基准测试中,Manus的性能超越了OpenAI同层次的大模型,展现出卓越的技术实力。

热门文章

最新文章