ElasticSearch悲观锁

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 介绍ElasticSearch悲观锁相关知识

ES在文档更新时默认使用的是乐观锁方案,而ES利用文档的一些create限制条件,也可以实现悲观锁的效果。

一、悲观锁与乐观锁

乐观锁

ES默认实现乐观锁,所有的数据更新迷人使用乐观锁机制。文档更新时,必须要带上current version,更新时与文档的version进行比较,如果相同进行更新,不相同表示已被其他线程进行了修改,此时更新失败,并且重新获取新的version再尝试更新。

悲观锁

我们举一个这样的例子:Elasticsearch存储文件系统的目录、文件名信息,有多个线程需要对/home/workspace/ReadMe.txt进行追加修改,而且是并发执行的,有先后顺序之分,跟之前的库存更新案例有点不一样,此时单纯使用乐观锁,可能会出现乱序的问题。


这种场景就需要使用悲观锁控制,保证线程的执行顺序,有一个线程在修改,其他的线程只能挂起等待。悲观锁通过/index/lock/实现,只有一个线程能做修改操作,其他线程block掉。


悲观锁有三种,分别对应三种粒度,由粗到细可为分:

  • 全局锁:最粗的锁,直接锁整个索引
  • document锁:指定id加锁,只锁一条数据,类似于数据库的行锁
  • 共享锁和排他锁:也叫读写锁,针对一条数据分读和写两种操作,一般共享锁允许多个线程对同一条数据进行加锁,排他锁只允许一个线程对数据加锁,并且排他锁和共享锁互斥。

锁的基本操作步骤

我们使用锁的基本步骤都是一样的,无论是关系型数据库、Redis/Memcache/Zookeeper分布式锁,还是今天介绍的Elasticsearch实现的锁机制,都有如下三步:

  • 上锁
  • 执行事务方法
  • 解锁

二、基于全局锁实现悲观锁的并发控制

1. 全局锁

全局锁,就是在某个要加锁的索引里面创建一个文档,利用文档创建的乐观锁,来间接实现悲观锁,全局锁会直接锁掉整个index索引。

2. 上锁步骤

第一步:上锁

PUT /fs/lock/global/_create
{}

fs:要上锁的索引index。 lock:对于这个index上全局锁的type。 global:全局锁对应的这个文档的id。 _create:强制必须创建,如果已存在,那么创建失败,直接报错。

第二步:上锁成功,执行相关操作。

第三步:删除锁 删除锁,只要删除对应的这个文档即可

DELETE /fs/lock/global

在没有删除锁的时候,其他线程来加锁,会直接报错无法加锁,如果加锁失败,就再次重复尝试上锁,

PUT /fs/lock/global/_create
{}
{
  "error": {
    "root_cause": [
      {
        "type": "version_conflict_engine_exception",
        "reason": "[lock][global]: version conflict, document already exists (current version [1])",
        "index_uuid": "IYbj0OLGQHmMUpLfbhD4Hw",
        "shard": "2",
        "index": "fs"
      }
    ],
    "type": "version_conflict_engine_exception",
    "reason": "[lock][global]: version conflict, document already exists (current version [1])",
    "index_uuid": "IYbj0OLGQHmMUpLfbhD4Hw",
    "shard": "2",
    "index": "fs"
  },
  "status": 409
}

3.全局锁的优缺点

优点:操作简单,容易使用,成本低。

缺点:直接锁掉整个index,导致并发能力很低。

适用条件:加锁解锁的操作不是很频繁,每次上锁之后,执行的操作的耗时不会太长,可以使用这种加锁方式,操作简单方便。

三、基于document锁实现悲观锁并发控制

1.document锁

document锁是细粒度的一个锁,每次就锁你要进行操作的哪些doc,其他线程就能在对这些doc进行增删改操作了,但是其他线程对其他的doc还是可以上锁并进行增删改的。

2.上锁方式

document锁,使用脚本进行上锁操作。

POST /fs/lock/1/_update
{
  "upsert": { "process_id": 123 },
  "script": "if ( ctx._source.process_id != process_id ) { assert false }; ctx.op = 'noop';"
  "params": {
    "process_id": 123
  }
}

/fs/lock 固定值,表示fs索引下面lock type。专门用于进行上锁操作。

/fs/lock/1 1表示id,就是要进行上锁操作的那个doc文档对应的id,代表了某个doc数据对应的lock。


_update + upsert 执行upsert操作。

param 里面有一个process_id,process_id代表要进行增删改的进程的唯一id,比如说可以在java系统启动的时候,给每个线程都手动生成一个唯一的thread_id。在系统启动的时候给整个进程也分配一个唯一process_id,process_id + thread_id 就代表了某一个进程下的某个线程的唯一标识,也可以自己生成一个唯一id。


process_id很重要,会在lock中,设置对对应的doc加锁的进程的id,这样其他进程过来的时候,才知道,这条数据已经被别人给锁了

assert false 不是当前进程加锁的话,抛出异常。

ctx.op='noop' 不进行任何操作。


如果该document之前没有被锁,/fs/lock/1之前不存在,也就是doc id=1没有被别人上过锁; upsert的语法,那么执行index操作,创建一个/fs/lock/id这条数据,而且用params中的数据作为这个lock的数据。process_id被设置为123,script不执行。这个时候象征着process_id=123的进程已经锁了一个doc了。


如果document被锁了,就是说/fs/lock/1已经存在了,代表doc id=1已经被某个进程给锁了。那么执行update操作,script,此时会比对process_id,如果相同,就是说,某个进程,之前锁了这个doc,然后这次又过来,就可以直接对这个doc执行操作,说明是该进程之前锁的doc,则不报错,不执行任何操作,返回success; 如果process_id比对不上,说明doc被其他doc给锁了,此时报错。


操作完成。删除本条上锁记录,即可完成解锁操作。

DELETE /fs/lock/1

3. 加锁实验

1.创建脚本 在ES目录下面的config文件夹下面的scripts文件夹下面创建一个名为judge-lock.groovy的文件,写入以下脚本内容 if ( ctx._source.process_id != process_id ) { assert false }; ctx.op = 'noop';

2.进行实验 线程一加锁

POST /fs/lock/1/_update
{
  "upsert": { "process_id": 123 },
  "script": {
    "lang": "groovy",
    "file": "judge-lock", 
    "params": {
      "process_id": 123
    }
  }
}
## 加锁成功
{
  "_index": "fs",
  "_type": "lock",
  "_id": "1",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  }
}

线程二加锁

POST /fs/lock/1/_update
{
  "upsert": { "process_id": 124 },
  "script": {
    "lang": "groovy",
    "file": "judge-lock", 
    "params": {
      "process_id": 124
    }
  }
}
## 加锁失败
{
  "error": {
    "root_cause": [
      {
        "type": "remote_transport_exception",
        "reason": "[qOWzntC][127.0.0.1:9300][indices:data/write/update[s]]"
      }
    ],
    "type": "illegal_argument_exception",
    "reason": "failed to execute script",
    "caused_by": {
      "type": "script_exception",
      "reason": "error evaluating judge-lock",
      "caused_by": {
        "type": "power_assertion_error",
        "reason": "assert false\n"
      },
      "script_stack": [],
      "script": "",
      "lang": "groovy"
    }
  },
  "status": 400
}

四、基于共享锁和排它锁实现悲观并发控制

1.共享锁与排它锁

共享锁:加了共享锁的数据是共享的,多个线程可以获取同一个数据的共享锁,然后对数据执行读操作。 排它锁:加了排它锁的数据是独占的,同一时间只能由一个线程获取到排它锁,然后执行增删改操作。


读写锁分离 如果只是要读取数据的话,那么多个线程都可以同时读取数据,每个线程都可以上一个共享锁,但是这时候如果有一个线程要对数据进行修改操作,那么需要尝试上排它锁,排它锁与共享锁互斥,也就是说,如果一个上了共享锁,就不能再上排它锁了,如果已经上了排它锁,那么也不能再上共享锁。


如果有人在修改数据,就是加了排他锁 那么其他线程过来要修改数据,也会尝试加排他锁,此时会失败,锁冲突,必须等待,同时只能有一个线程修改数据 如果有人过来同时要读取数据,那么会尝试加共享锁,此时会失败,因为共享锁和排他锁是冲突的 如果有在修改数据,就不允许别人来修改数据,也不允许别人来读取数据

2.上锁方式

共享锁上锁,采用脚本进行。


1.创建脚本


A.加锁脚本 在ES目录下面的config文件夹下面的scripts文件夹下面创建一个名为judge-lock-2.groovy的文件,写入以下脚本内容  if (ctx._source.lock_type == 'exclusive') { assert false }; ctx._source.lock_count++。 判断加锁类型是否为排它锁(exclusive),如果是直接失败,如果不是排它锁,锁数量加1,所有共享锁可以无限加锁。


B.解锁脚本 在ES目录下面的config文件夹下面的scripts文件夹下面创建一个名为unlock-shared.groovy的文件,写入以下脚本内容 if ( --ctx._source.lock_count == 0) { ctx.op = 'delete' } 将当前锁数量减1,判断锁是否解完,如果锁数量为0,表示所有共享锁都已经解锁,直接将其删除。

2.上锁测试

A. 加多个共享锁

## 线程一上共享锁
POST /fs/lock/1/_update 
{
  "upsert": { 
    "lock_type":  "shared",
    "lock_count": 1
  },
  "script": {
    "lang": "groovy",
    "file": "judge-lock-2"
  }
}
## 上锁成功
{
  "_index": "fs",
  "_type": "lock",
  "_id": "1",
  "_version": 3,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  }
}
## 线程二进行上共享锁
POST /fs/lock/1/_update 
{
  "upsert": { 
    "lock_type":  "shared",
    "lock_count": 1
  },
  "script": {
    "lang": "groovy",
    "file": "judge-lock-2"
  }
}
## 上锁成功
{
  "_index": "fs",
  "_type": "lock",
  "_id": "1",
  "_version": 4,
  "result": "updated",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  }
}
## 查看锁状态
GET /fs/lock/1
{
  "_index": "fs",
  "_type": "lock",
  "_id": "1",
  "_version": 4,
  "found": true,
  "_source": {
    "lock_type": "shared",
    "lock_count": 2
  }
}

从上面我们看出,两个线程对同一个doc进行上共享锁,两个都上锁成功,并且显示锁数量(lock_count)值为2,表示加了两把锁,并且锁类型为共享(lock_type)锁。所有如果已经上了共享锁,再上共享锁,不会出现问题,只会锁数量加1.

B. 加了共享锁再加排它锁

## 线程一上共享锁
POST /fs/lock/1/_update 
{
  "upsert": { 
    "lock_type":  "shared",
    "lock_count": 1
  },
  "script": {
    "lang": "groovy",
    "file": "judge-lock-2"
  }
}
## 上锁成功
{
  "_index": "fs",
  "_type": "lock",
  "_id": "1",
  "_version": 3,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  }
}
## 线程二上排它锁
PUT /fs/lock/1/_create
{ "lock_type": "exclusive" }
## 加排它锁失败
{
  "error": {
    "root_cause": [
      {
        "type": "version_conflict_engine_exception",
        "reason": "[lock][1]: version conflict, document already exists (current version [4])",
        "index_uuid": "Teq7Y-m1S1qJ6F3yTbMMCg",
        "shard": "3",
        "index": "fs"
      }
    ],
    "type": "version_conflict_engine_exception",
    "reason": "[lock][1]: version conflict, document already exists (current version [4])",
    "index_uuid": "Teq7Y-m1S1qJ6F3yTbMMCg",
    "shard": "3",
    "index": "fs"
  },
  "status": 409
}
## 解锁共享锁
POST /fs/lock/1/_update
{
  "script": {
    "lang": "groovy",
    "file": "unlock-shared"
  }
}
## 解锁成功
{
  "_index": "fs",
  "_type": "lock",
  "_id": "1",
  "_version": 5,
  "result": "updated",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  }
}
## 解锁完成只后再排它锁
PUT /fs/lock/1/_create
{ "lock_type": "exclusive" }
## 排它锁加锁成功
{
  "_index": "fs",
  "_type": "lock",
  "_id": "1",
  "_version": 7,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": true
}

C.上了排它锁,再上共享锁

## 线程一家排它锁
PUT /fs/lock/1/_create
{ "lock_type": "exclusive" }
## 排它锁加锁成功
{
  "_index": "fs",
  "_type": "lock",
  "_id": "1",
  "_version": 7,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": true
}
## 线程二在加共享锁
POST /fs/lock/1/_update
{
  "upsert": { 
    "lock_type":  "shared",
    "lock_count": 1
  },
  "script": {
    "lang": "groovy",
    "file": "judge-lock-2"
  }
}
## 排它锁加锁失败
{
  "error": {
    "root_cause": [
      {
        "type": "remote_transport_exception",
        "reason": "[qOWzntC][127.0.0.1:9300][indices:data/write/update[s]]"
      }
    ],
    "type": "illegal_argument_exception",
    "reason": "failed to execute script",
    "caused_by": {
      "type": "script_exception",
      "reason": "error evaluating judge-lock-2",
      "caused_by": {
        "type": "power_assertion_error",
        "reason": "assert false\n"
      },
      "script_stack": [],
      "script": "",
      "lang": "groovy"
    }
  },
  "status": 400
}

D:加了排它锁,在加排它锁

## 线程一家排它锁
PUT /fs/lock/1/_create
{ "lock_type": "exclusive" }
## 排它锁加锁成功
{
  "_index": "fs",
  "_type": "lock",
  "_id": "1",
  "_version": 7,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": true
}
## 线程二在加排它锁
## 线程一家排它锁
PUT /fs/lock/1/_create
{ "lock_type": "exclusive" }
## 排它锁加锁失败
{
  "error": {
    "root_cause": [
      {
        "type": "version_conflict_engine_exception",
        "reason": "[lock][1]: version conflict, document already exists (current version [7])",
        "index_uuid": "Teq7Y-m1S1qJ6F3yTbMMCg",
        "shard": "3",
        "index": "fs"
      }
    ],
    "type": "version_conflict_engine_exception",
    "reason": "[lock][1]: version conflict, document already exists (current version [7])",
    "index_uuid": "Teq7Y-m1S1qJ6F3yTbMMCg",
    "shard": "3",
    "index": "fs"
  },
  "status": 409
}

3.解锁方式

共享锁可以同时加多个,所以解锁需要使用脚本,一个一个的解锁。

## 解锁共享锁
POST /fs/lock/1/_update
{
  "script": {
    "lang": "groovy",
    "file": "unlock-shared"
  }
}

排它锁只会存在一个锁,所以直接删除即可。

DELETE /fs/lock/1
相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
6月前
|
API 网络架构 索引
Elasticsearch索引中数据的增删改查与并发控制
Elasticsearch索引中数据的增删改查与并发控制
|
5月前
|
存储 缓存 自然语言处理
【Elasticsearch】Elasticsearch倒排索引详解
【Elasticsearch】Elasticsearch倒排索引详解
180 12
|
6月前
|
数据采集 API 定位技术
elasticsearch pipelineI详解:原理与使用
elasticsearch pipelineI详解:原理与使用
|
API 索引
【Elasticsearch】学好Elasticsearch系列-索引的CRUD
【Elasticsearch】学好Elasticsearch系列-索引的CRUD
99 0
|
API 索引
Elasticsearch 中的骚操作你确定不看看?
Elasticsearch 中的骚操作你确定不看看?
|
存储 消息中间件 缓存
【ElasticSearch从入门到放弃系列 九】Elasticsearch原理机制探索
【ElasticSearch从入门到放弃系列 九】Elasticsearch原理机制探索
244 0
|
存储 监控 数据可视化
Elasticsearch是什么?底层原理是什么?
Elasticsearch是什么?底层原理是什么?
150 0
|
存储 SQL 自然语言处理
1.【Elasticsearch】Elasticsearch从入门到放弃-Elasticsearch概念篇
【Elasticsearch】Elasticsearch从入门到放弃-Elasticsearch概念篇
|
SQL 关系型数据库 MySQL
elasticsearch常用应用操作
elasticsearch常用应用操作,会批量从mysql批量导入数据。 会解决一些mysql不太好写的sql语句,如搜索多模糊查询。
128 1
|
存储 负载均衡 NoSQL