ElasticSearch悲观锁

简介: 介绍ElasticSearch悲观锁相关知识

ES在文档更新时默认使用的是乐观锁方案,而ES利用文档的一些create限制条件,也可以实现悲观锁的效果。

一、悲观锁与乐观锁

乐观锁

ES默认实现乐观锁,所有的数据更新迷人使用乐观锁机制。文档更新时,必须要带上current version,更新时与文档的version进行比较,如果相同进行更新,不相同表示已被其他线程进行了修改,此时更新失败,并且重新获取新的version再尝试更新。

悲观锁

我们举一个这样的例子:Elasticsearch存储文件系统的目录、文件名信息,有多个线程需要对/home/workspace/ReadMe.txt进行追加修改,而且是并发执行的,有先后顺序之分,跟之前的库存更新案例有点不一样,此时单纯使用乐观锁,可能会出现乱序的问题。


这种场景就需要使用悲观锁控制,保证线程的执行顺序,有一个线程在修改,其他的线程只能挂起等待。悲观锁通过/index/lock/实现,只有一个线程能做修改操作,其他线程block掉。


悲观锁有三种,分别对应三种粒度,由粗到细可为分:

  • 全局锁:最粗的锁,直接锁整个索引
  • document锁:指定id加锁,只锁一条数据,类似于数据库的行锁
  • 共享锁和排他锁:也叫读写锁,针对一条数据分读和写两种操作,一般共享锁允许多个线程对同一条数据进行加锁,排他锁只允许一个线程对数据加锁,并且排他锁和共享锁互斥。

锁的基本操作步骤

我们使用锁的基本步骤都是一样的,无论是关系型数据库、Redis/Memcache/Zookeeper分布式锁,还是今天介绍的Elasticsearch实现的锁机制,都有如下三步:

  • 上锁
  • 执行事务方法
  • 解锁

二、基于全局锁实现悲观锁的并发控制

1. 全局锁

全局锁,就是在某个要加锁的索引里面创建一个文档,利用文档创建的乐观锁,来间接实现悲观锁,全局锁会直接锁掉整个index索引。

2. 上锁步骤

第一步:上锁

PUT /fs/lock/global/_create
{}

fs:要上锁的索引index。 lock:对于这个index上全局锁的type。 global:全局锁对应的这个文档的id。 _create:强制必须创建,如果已存在,那么创建失败,直接报错。

第二步:上锁成功,执行相关操作。

第三步:删除锁 删除锁,只要删除对应的这个文档即可

DELETE /fs/lock/global

在没有删除锁的时候,其他线程来加锁,会直接报错无法加锁,如果加锁失败,就再次重复尝试上锁,

PUT /fs/lock/global/_create
{}
{
  "error": {
    "root_cause": [
      {
        "type": "version_conflict_engine_exception",
        "reason": "[lock][global]: version conflict, document already exists (current version [1])",
        "index_uuid": "IYbj0OLGQHmMUpLfbhD4Hw",
        "shard": "2",
        "index": "fs"
      }
    ],
    "type": "version_conflict_engine_exception",
    "reason": "[lock][global]: version conflict, document already exists (current version [1])",
    "index_uuid": "IYbj0OLGQHmMUpLfbhD4Hw",
    "shard": "2",
    "index": "fs"
  },
  "status": 409
}

3.全局锁的优缺点

优点:操作简单,容易使用,成本低。

缺点:直接锁掉整个index,导致并发能力很低。

适用条件:加锁解锁的操作不是很频繁,每次上锁之后,执行的操作的耗时不会太长,可以使用这种加锁方式,操作简单方便。

三、基于document锁实现悲观锁并发控制

1.document锁

document锁是细粒度的一个锁,每次就锁你要进行操作的哪些doc,其他线程就能在对这些doc进行增删改操作了,但是其他线程对其他的doc还是可以上锁并进行增删改的。

2.上锁方式

document锁,使用脚本进行上锁操作。

POST /fs/lock/1/_update
{
  "upsert": { "process_id": 123 },
  "script": "if ( ctx._source.process_id != process_id ) { assert false }; ctx.op = 'noop';"
  "params": {
    "process_id": 123
  }
}

/fs/lock 固定值,表示fs索引下面lock type。专门用于进行上锁操作。

/fs/lock/1 1表示id,就是要进行上锁操作的那个doc文档对应的id,代表了某个doc数据对应的lock。


_update + upsert 执行upsert操作。

param 里面有一个process_id,process_id代表要进行增删改的进程的唯一id,比如说可以在java系统启动的时候,给每个线程都手动生成一个唯一的thread_id。在系统启动的时候给整个进程也分配一个唯一process_id,process_id + thread_id 就代表了某一个进程下的某个线程的唯一标识,也可以自己生成一个唯一id。


process_id很重要,会在lock中,设置对对应的doc加锁的进程的id,这样其他进程过来的时候,才知道,这条数据已经被别人给锁了

assert false 不是当前进程加锁的话,抛出异常。

ctx.op='noop' 不进行任何操作。


如果该document之前没有被锁,/fs/lock/1之前不存在,也就是doc id=1没有被别人上过锁; upsert的语法,那么执行index操作,创建一个/fs/lock/id这条数据,而且用params中的数据作为这个lock的数据。process_id被设置为123,script不执行。这个时候象征着process_id=123的进程已经锁了一个doc了。


如果document被锁了,就是说/fs/lock/1已经存在了,代表doc id=1已经被某个进程给锁了。那么执行update操作,script,此时会比对process_id,如果相同,就是说,某个进程,之前锁了这个doc,然后这次又过来,就可以直接对这个doc执行操作,说明是该进程之前锁的doc,则不报错,不执行任何操作,返回success; 如果process_id比对不上,说明doc被其他doc给锁了,此时报错。


操作完成。删除本条上锁记录,即可完成解锁操作。

DELETE /fs/lock/1

3. 加锁实验

1.创建脚本 在ES目录下面的config文件夹下面的scripts文件夹下面创建一个名为judge-lock.groovy的文件,写入以下脚本内容 if ( ctx._source.process_id != process_id ) { assert false }; ctx.op = 'noop';

2.进行实验 线程一加锁

POST /fs/lock/1/_update
{
  "upsert": { "process_id": 123 },
  "script": {
    "lang": "groovy",
    "file": "judge-lock", 
    "params": {
      "process_id": 123
    }
  }
}
## 加锁成功
{
  "_index": "fs",
  "_type": "lock",
  "_id": "1",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  }
}

线程二加锁

POST /fs/lock/1/_update
{
  "upsert": { "process_id": 124 },
  "script": {
    "lang": "groovy",
    "file": "judge-lock", 
    "params": {
      "process_id": 124
    }
  }
}
## 加锁失败
{
  "error": {
    "root_cause": [
      {
        "type": "remote_transport_exception",
        "reason": "[qOWzntC][127.0.0.1:9300][indices:data/write/update[s]]"
      }
    ],
    "type": "illegal_argument_exception",
    "reason": "failed to execute script",
    "caused_by": {
      "type": "script_exception",
      "reason": "error evaluating judge-lock",
      "caused_by": {
        "type": "power_assertion_error",
        "reason": "assert false\n"
      },
      "script_stack": [],
      "script": "",
      "lang": "groovy"
    }
  },
  "status": 400
}

四、基于共享锁和排它锁实现悲观并发控制

1.共享锁与排它锁

共享锁:加了共享锁的数据是共享的,多个线程可以获取同一个数据的共享锁,然后对数据执行读操作。 排它锁:加了排它锁的数据是独占的,同一时间只能由一个线程获取到排它锁,然后执行增删改操作。


读写锁分离 如果只是要读取数据的话,那么多个线程都可以同时读取数据,每个线程都可以上一个共享锁,但是这时候如果有一个线程要对数据进行修改操作,那么需要尝试上排它锁,排它锁与共享锁互斥,也就是说,如果一个上了共享锁,就不能再上排它锁了,如果已经上了排它锁,那么也不能再上共享锁。


如果有人在修改数据,就是加了排他锁 那么其他线程过来要修改数据,也会尝试加排他锁,此时会失败,锁冲突,必须等待,同时只能有一个线程修改数据 如果有人过来同时要读取数据,那么会尝试加共享锁,此时会失败,因为共享锁和排他锁是冲突的 如果有在修改数据,就不允许别人来修改数据,也不允许别人来读取数据

2.上锁方式

共享锁上锁,采用脚本进行。


1.创建脚本


A.加锁脚本 在ES目录下面的config文件夹下面的scripts文件夹下面创建一个名为judge-lock-2.groovy的文件,写入以下脚本内容  if (ctx._source.lock_type == 'exclusive') { assert false }; ctx._source.lock_count++。 判断加锁类型是否为排它锁(exclusive),如果是直接失败,如果不是排它锁,锁数量加1,所有共享锁可以无限加锁。


B.解锁脚本 在ES目录下面的config文件夹下面的scripts文件夹下面创建一个名为unlock-shared.groovy的文件,写入以下脚本内容 if ( --ctx._source.lock_count == 0) { ctx.op = 'delete' } 将当前锁数量减1,判断锁是否解完,如果锁数量为0,表示所有共享锁都已经解锁,直接将其删除。

2.上锁测试

A. 加多个共享锁

## 线程一上共享锁
POST /fs/lock/1/_update 
{
  "upsert": { 
    "lock_type":  "shared",
    "lock_count": 1
  },
  "script": {
    "lang": "groovy",
    "file": "judge-lock-2"
  }
}
## 上锁成功
{
  "_index": "fs",
  "_type": "lock",
  "_id": "1",
  "_version": 3,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  }
}
## 线程二进行上共享锁
POST /fs/lock/1/_update 
{
  "upsert": { 
    "lock_type":  "shared",
    "lock_count": 1
  },
  "script": {
    "lang": "groovy",
    "file": "judge-lock-2"
  }
}
## 上锁成功
{
  "_index": "fs",
  "_type": "lock",
  "_id": "1",
  "_version": 4,
  "result": "updated",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  }
}
## 查看锁状态
GET /fs/lock/1
{
  "_index": "fs",
  "_type": "lock",
  "_id": "1",
  "_version": 4,
  "found": true,
  "_source": {
    "lock_type": "shared",
    "lock_count": 2
  }
}

从上面我们看出,两个线程对同一个doc进行上共享锁,两个都上锁成功,并且显示锁数量(lock_count)值为2,表示加了两把锁,并且锁类型为共享(lock_type)锁。所有如果已经上了共享锁,再上共享锁,不会出现问题,只会锁数量加1.

B. 加了共享锁再加排它锁

## 线程一上共享锁
POST /fs/lock/1/_update 
{
  "upsert": { 
    "lock_type":  "shared",
    "lock_count": 1
  },
  "script": {
    "lang": "groovy",
    "file": "judge-lock-2"
  }
}
## 上锁成功
{
  "_index": "fs",
  "_type": "lock",
  "_id": "1",
  "_version": 3,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  }
}
## 线程二上排它锁
PUT /fs/lock/1/_create
{ "lock_type": "exclusive" }
## 加排它锁失败
{
  "error": {
    "root_cause": [
      {
        "type": "version_conflict_engine_exception",
        "reason": "[lock][1]: version conflict, document already exists (current version [4])",
        "index_uuid": "Teq7Y-m1S1qJ6F3yTbMMCg",
        "shard": "3",
        "index": "fs"
      }
    ],
    "type": "version_conflict_engine_exception",
    "reason": "[lock][1]: version conflict, document already exists (current version [4])",
    "index_uuid": "Teq7Y-m1S1qJ6F3yTbMMCg",
    "shard": "3",
    "index": "fs"
  },
  "status": 409
}
## 解锁共享锁
POST /fs/lock/1/_update
{
  "script": {
    "lang": "groovy",
    "file": "unlock-shared"
  }
}
## 解锁成功
{
  "_index": "fs",
  "_type": "lock",
  "_id": "1",
  "_version": 5,
  "result": "updated",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  }
}
## 解锁完成只后再排它锁
PUT /fs/lock/1/_create
{ "lock_type": "exclusive" }
## 排它锁加锁成功
{
  "_index": "fs",
  "_type": "lock",
  "_id": "1",
  "_version": 7,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": true
}

C.上了排它锁,再上共享锁

## 线程一家排它锁
PUT /fs/lock/1/_create
{ "lock_type": "exclusive" }
## 排它锁加锁成功
{
  "_index": "fs",
  "_type": "lock",
  "_id": "1",
  "_version": 7,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": true
}
## 线程二在加共享锁
POST /fs/lock/1/_update
{
  "upsert": { 
    "lock_type":  "shared",
    "lock_count": 1
  },
  "script": {
    "lang": "groovy",
    "file": "judge-lock-2"
  }
}
## 排它锁加锁失败
{
  "error": {
    "root_cause": [
      {
        "type": "remote_transport_exception",
        "reason": "[qOWzntC][127.0.0.1:9300][indices:data/write/update[s]]"
      }
    ],
    "type": "illegal_argument_exception",
    "reason": "failed to execute script",
    "caused_by": {
      "type": "script_exception",
      "reason": "error evaluating judge-lock-2",
      "caused_by": {
        "type": "power_assertion_error",
        "reason": "assert false\n"
      },
      "script_stack": [],
      "script": "",
      "lang": "groovy"
    }
  },
  "status": 400
}

D:加了排它锁,在加排它锁

## 线程一家排它锁
PUT /fs/lock/1/_create
{ "lock_type": "exclusive" }
## 排它锁加锁成功
{
  "_index": "fs",
  "_type": "lock",
  "_id": "1",
  "_version": 7,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": true
}
## 线程二在加排它锁
## 线程一家排它锁
PUT /fs/lock/1/_create
{ "lock_type": "exclusive" }
## 排它锁加锁失败
{
  "error": {
    "root_cause": [
      {
        "type": "version_conflict_engine_exception",
        "reason": "[lock][1]: version conflict, document already exists (current version [7])",
        "index_uuid": "Teq7Y-m1S1qJ6F3yTbMMCg",
        "shard": "3",
        "index": "fs"
      }
    ],
    "type": "version_conflict_engine_exception",
    "reason": "[lock][1]: version conflict, document already exists (current version [7])",
    "index_uuid": "Teq7Y-m1S1qJ6F3yTbMMCg",
    "shard": "3",
    "index": "fs"
  },
  "status": 409
}

3.解锁方式

共享锁可以同时加多个,所以解锁需要使用脚本,一个一个的解锁。

## 解锁共享锁
POST /fs/lock/1/_update
{
  "script": {
    "lang": "groovy",
    "file": "unlock-shared"
  }
}

排它锁只会存在一个锁,所以直接删除即可。

DELETE /fs/lock/1
相关实践学习
以电商场景为例搭建AI语义搜索应用
本实验旨在通过阿里云Elasticsearch结合阿里云搜索开发工作台AI模型服务,构建一个高效、精准的语义搜索系统,模拟电商场景,深入理解AI搜索技术原理并掌握其实现过程。
ElasticSearch 最新快速入门教程
本课程由千锋教育提供。全文搜索的需求非常大。而开源的解决办法Elasricsearch(Elastic)就是一个非常好的工具。目前是全文搜索引擎的首选。本系列教程由浅入深讲解了在CentOS7系统下如何搭建ElasticSearch,如何使用Kibana实现各种方式的搜索并详细分析了搜索的原理,最后讲解了在Java应用中如何集成ElasticSearch并实现搜索。  
相关文章
|
存储 固态存储 架构师
【最佳实践】Elasticsearch Snapshot 备份的使用方法
常见的数据库都会提供备份的机制,以解决在数据库无法使用的情况下,可以开启新的实例,然后通过备份来恢复数据减少损失。
7354 0
【最佳实践】Elasticsearch Snapshot 备份的使用方法
|
存储 自然语言处理 数据可视化
可视化FAISS矢量空间并调整RAG参数提高结果精度
随着开源大型语言模型的性能不断提高,编写和分析代码、推荐、文本摘要和问答(QA)对的性能都有了很大的提高。但是当涉及到QA时,LLM通常会在未训练数据的相关的问题上有所欠缺,很多内部文件都保存在公司内部,以确保合规性、商业秘密或隐私。当查询这些文件时,会使得LLM产生幻觉,产生不相关、捏造或不一致的内容。
682 0
|
NoSQL Redis 数据安全/隐私保护
Redis 6.0 新特性详解
艺术致敬! 一、众多新模块(modules)API   Redis 6中模块API开发进展非常大,因为Redis Labs为了开发复杂的功能,从一开始就用上Redis模块。Redis可以变成一个框架,利用Modules来构建不同系统,而不需要从头开始写然后还要BSD许可。
10590 27
|
人工智能 自动驾驶 机器人
吴泳铭:AI最大的想象力不在手机屏幕,而是改变物理世界
过去22个月,AI发展速度超过任何历史时期,但我们依然还处于AGI变革的早期。生成式AI最大的想象力,绝不是在手机屏幕上做一两个新的超级app,而是接管数字世界,改变物理世界。
24888 73
吴泳铭:AI最大的想象力不在手机屏幕,而是改变物理世界
|
人工智能 自然语言处理 机器人
对话阿里云 CIO 蒋林泉:AI 时代,企业如何做好智能化系统建设?
10 月 18 日, InfoQ《C 位面对面》栏目邀请到阿里云 CIO 及 aliyun.com 负责人蒋林泉(花名:雁杨),就 AI 时代企业 CIO 的角色转变、企业智能化转型路径、AI 落地实践与人才培养等主题展开了讨论。
24575 69
对话阿里云 CIO 蒋林泉:AI 时代,企业如何做好智能化系统建设?
|
12月前
|
人工智能 JSON 安全
Spring Boot实现无感刷新Token机制
本文深入解析在Spring Boot项目中实现JWT无感刷新Token的机制,涵盖双Token策略、Refresh Token安全性及具体示例代码,帮助开发者提升用户体验与系统安全性。
1231 4
|
移动开发 前端开发 API
React 拖拽上传组件 Drag & Drop Upload
拖拽上传(Drag & Drop Upload)是现代文件上传方式,允许用户通过简单拖拽操作将文件上传至网页。本文介绍如何在React应用中实现拖拽上传组件,涵盖HTML5拖放API、React状态管理、组件构建及常见问题解决。包括视觉反馈、文件类型和大小限制等优化措施,确保组件的用户体验和稳定性。
667 27
|
移动开发 网络协议 前端开发
每日一博 - Server-Sent Events推送技术
每日一博 - Server-Sent Events推送技术
877 0
|
搜索推荐 算法 Linux
这款文本转语音(TTS)
【8月更文挑战第6天】Fish Speech是一款先进的开源文本转语音(TTS)工具,它能迅速将文字转换为流畅自然的语音,尤其适合镜头前感到紧张的人制作视频内容。Fish Speech支持中文、英文及日文等多种语言,可通过简单的原始语音样本快速克隆个性化声音。其架构设计高效,仅需4GB显存即可运行,采用Flash-Attn算法实现高性能语音合成。Fish Speech具备易用性,提供Web界面操作,并可在Linux与Windows系统上部署。用户可通过官网([https://fish.audio/zh-CN/](https://fish.audio/zh-CN/))直接体验其强大功能。
2064 0
|
SQL 缓存 监控
如何在数据库查询中使用参数化查询?
【4月更文挑战第30天】如何在数据库查询中使用参数化查询?
695 1

热门文章

最新文章