ES在文档更新时默认使用的是乐观锁方案,而ES利用文档的一些create限制条件,也可以实现悲观锁的效果。
一、悲观锁与乐观锁
乐观锁
ES默认实现乐观锁,所有的数据更新迷人使用乐观锁机制。文档更新时,必须要带上current version,更新时与文档的version进行比较,如果相同进行更新,不相同表示已被其他线程进行了修改,此时更新失败,并且重新获取新的version再尝试更新。
悲观锁
我们举一个这样的例子:Elasticsearch存储文件系统的目录、文件名信息,有多个线程需要对/home/workspace/ReadMe.txt进行追加修改,而且是并发执行的,有先后顺序之分,跟之前的库存更新案例有点不一样,此时单纯使用乐观锁,可能会出现乱序的问题。
这种场景就需要使用悲观锁控制,保证线程的执行顺序,有一个线程在修改,其他的线程只能挂起等待。悲观锁通过/index/lock/实现,只有一个线程能做修改操作,其他线程block掉。
悲观锁有三种,分别对应三种粒度,由粗到细可为分:
- 全局锁:最粗的锁,直接锁整个索引
- document锁:指定id加锁,只锁一条数据,类似于数据库的行锁
- 共享锁和排他锁:也叫读写锁,针对一条数据分读和写两种操作,一般共享锁允许多个线程对同一条数据进行加锁,排他锁只允许一个线程对数据加锁,并且排他锁和共享锁互斥。
锁的基本操作步骤
我们使用锁的基本步骤都是一样的,无论是关系型数据库、Redis/Memcache/Zookeeper分布式锁,还是今天介绍的Elasticsearch实现的锁机制,都有如下三步:
- 上锁
- 执行事务方法
- 解锁
二、基于全局锁实现悲观锁的并发控制
1. 全局锁
全局锁,就是在某个要加锁的索引里面创建一个文档,利用文档创建的乐观锁,来间接实现悲观锁,全局锁会直接锁掉整个index索引。
2. 上锁步骤
第一步:上锁
PUT /fs/lock/global/_create {}
fs:要上锁的索引index。 lock:对于这个index上全局锁的type。 global:全局锁对应的这个文档的id。 _create:强制必须创建,如果已存在,那么创建失败,直接报错。
第二步:上锁成功,执行相关操作。
第三步:删除锁 删除锁,只要删除对应的这个文档即可
DELETE /fs/lock/global
在没有删除锁的时候,其他线程来加锁,会直接报错无法加锁,如果加锁失败,就再次重复尝试上锁,
PUT /fs/lock/global/_create {} { "error": { "root_cause": [ { "type": "version_conflict_engine_exception", "reason": "[lock][global]: version conflict, document already exists (current version [1])", "index_uuid": "IYbj0OLGQHmMUpLfbhD4Hw", "shard": "2", "index": "fs" } ], "type": "version_conflict_engine_exception", "reason": "[lock][global]: version conflict, document already exists (current version [1])", "index_uuid": "IYbj0OLGQHmMUpLfbhD4Hw", "shard": "2", "index": "fs" }, "status": 409 }
3.全局锁的优缺点
优点:操作简单,容易使用,成本低。
缺点:直接锁掉整个index,导致并发能力很低。
适用条件:加锁解锁的操作不是很频繁,每次上锁之后,执行的操作的耗时不会太长,可以使用这种加锁方式,操作简单方便。
三、基于document锁实现悲观锁并发控制
1.document锁
document锁是细粒度的一个锁,每次就锁你要进行操作的哪些doc,其他线程就能在对这些doc进行增删改操作了,但是其他线程对其他的doc还是可以上锁并进行增删改的。
2.上锁方式
document锁,使用脚本进行上锁操作。
POST /fs/lock/1/_update { "upsert": { "process_id": 123 }, "script": "if ( ctx._source.process_id != process_id ) { assert false }; ctx.op = 'noop';" "params": { "process_id": 123 } }
/fs/lock 固定值,表示fs索引下面lock type。专门用于进行上锁操作。
/fs/lock/1 1表示id,就是要进行上锁操作的那个doc文档对应的id,代表了某个doc数据对应的lock。
_update + upsert 执行upsert操作。
param 里面有一个process_id,process_id代表要进行增删改的进程的唯一id,比如说可以在java系统启动的时候,给每个线程都手动生成一个唯一的thread_id。在系统启动的时候给整个进程也分配一个唯一process_id,process_id + thread_id 就代表了某一个进程下的某个线程的唯一标识,也可以自己生成一个唯一id。
process_id很重要,会在lock中,设置对对应的doc加锁的进程的id,这样其他进程过来的时候,才知道,这条数据已经被别人给锁了
assert false 不是当前进程加锁的话,抛出异常。
ctx.op='noop' 不进行任何操作。
如果该document之前没有被锁,/fs/lock/1之前不存在,也就是doc id=1没有被别人上过锁; upsert的语法,那么执行index操作,创建一个/fs/lock/id这条数据,而且用params中的数据作为这个lock的数据。process_id被设置为123,script不执行。这个时候象征着process_id=123的进程已经锁了一个doc了。
如果document被锁了,就是说/fs/lock/1已经存在了,代表doc id=1已经被某个进程给锁了。那么执行update操作,script,此时会比对process_id,如果相同,就是说,某个进程,之前锁了这个doc,然后这次又过来,就可以直接对这个doc执行操作,说明是该进程之前锁的doc,则不报错,不执行任何操作,返回success; 如果process_id比对不上,说明doc被其他doc给锁了,此时报错。
操作完成。删除本条上锁记录,即可完成解锁操作。
DELETE /fs/lock/1
3. 加锁实验
1.创建脚本 在ES目录下面的config文件夹下面的scripts文件夹下面创建一个名为judge-lock.groovy的文件,写入以下脚本内容 if ( ctx._source.process_id != process_id ) { assert false }; ctx.op = 'noop';
2.进行实验 线程一加锁
POST /fs/lock/1/_update { "upsert": { "process_id": 123 }, "script": { "lang": "groovy", "file": "judge-lock", "params": { "process_id": 123 } } } ## 加锁成功 { "_index": "fs", "_type": "lock", "_id": "1", "_version": 1, "result": "created", "_shards": { "total": 2, "successful": 1, "failed": 0 } }
线程二加锁
POST /fs/lock/1/_update { "upsert": { "process_id": 124 }, "script": { "lang": "groovy", "file": "judge-lock", "params": { "process_id": 124 } } } ## 加锁失败 { "error": { "root_cause": [ { "type": "remote_transport_exception", "reason": "[qOWzntC][127.0.0.1:9300][indices:data/write/update[s]]" } ], "type": "illegal_argument_exception", "reason": "failed to execute script", "caused_by": { "type": "script_exception", "reason": "error evaluating judge-lock", "caused_by": { "type": "power_assertion_error", "reason": "assert false\n" }, "script_stack": [], "script": "", "lang": "groovy" } }, "status": 400 }
四、基于共享锁和排它锁实现悲观并发控制
1.共享锁与排它锁
共享锁:加了共享锁的数据是共享的,多个线程可以获取同一个数据的共享锁,然后对数据执行读操作。 排它锁:加了排它锁的数据是独占的,同一时间只能由一个线程获取到排它锁,然后执行增删改操作。
读写锁分离 如果只是要读取数据的话,那么多个线程都可以同时读取数据,每个线程都可以上一个共享锁,但是这时候如果有一个线程要对数据进行修改操作,那么需要尝试上排它锁,排它锁与共享锁互斥,也就是说,如果一个上了共享锁,就不能再上排它锁了,如果已经上了排它锁,那么也不能再上共享锁。
如果有人在修改数据,就是加了排他锁 那么其他线程过来要修改数据,也会尝试加排他锁,此时会失败,锁冲突,必须等待,同时只能有一个线程修改数据 如果有人过来同时要读取数据,那么会尝试加共享锁,此时会失败,因为共享锁和排他锁是冲突的 如果有在修改数据,就不允许别人来修改数据,也不允许别人来读取数据
2.上锁方式
共享锁上锁,采用脚本进行。
1.创建脚本
A.加锁脚本 在ES目录下面的config文件夹下面的scripts文件夹下面创建一个名为judge-lock-2.groovy的文件,写入以下脚本内容 if (ctx._source.lock_type == 'exclusive') { assert false }; ctx._source.lock_count++。 判断加锁类型是否为排它锁(exclusive),如果是直接失败,如果不是排它锁,锁数量加1,所有共享锁可以无限加锁。
B.解锁脚本 在ES目录下面的config文件夹下面的scripts文件夹下面创建一个名为unlock-shared.groovy的文件,写入以下脚本内容 if ( --ctx._source.lock_count == 0) { ctx.op = 'delete' } 将当前锁数量减1,判断锁是否解完,如果锁数量为0,表示所有共享锁都已经解锁,直接将其删除。
2.上锁测试
A. 加多个共享锁
## 线程一上共享锁 POST /fs/lock/1/_update { "upsert": { "lock_type": "shared", "lock_count": 1 }, "script": { "lang": "groovy", "file": "judge-lock-2" } } ## 上锁成功 { "_index": "fs", "_type": "lock", "_id": "1", "_version": 3, "result": "created", "_shards": { "total": 2, "successful": 1, "failed": 0 } } ## 线程二进行上共享锁 POST /fs/lock/1/_update { "upsert": { "lock_type": "shared", "lock_count": 1 }, "script": { "lang": "groovy", "file": "judge-lock-2" } } ## 上锁成功 { "_index": "fs", "_type": "lock", "_id": "1", "_version": 4, "result": "updated", "_shards": { "total": 2, "successful": 1, "failed": 0 } } ## 查看锁状态 GET /fs/lock/1 { "_index": "fs", "_type": "lock", "_id": "1", "_version": 4, "found": true, "_source": { "lock_type": "shared", "lock_count": 2 } }
从上面我们看出,两个线程对同一个doc进行上共享锁,两个都上锁成功,并且显示锁数量(lock_count)值为2,表示加了两把锁,并且锁类型为共享(lock_type)锁。所有如果已经上了共享锁,再上共享锁,不会出现问题,只会锁数量加1.
B. 加了共享锁再加排它锁
## 线程一上共享锁 POST /fs/lock/1/_update { "upsert": { "lock_type": "shared", "lock_count": 1 }, "script": { "lang": "groovy", "file": "judge-lock-2" } } ## 上锁成功 { "_index": "fs", "_type": "lock", "_id": "1", "_version": 3, "result": "created", "_shards": { "total": 2, "successful": 1, "failed": 0 } } ## 线程二上排它锁 PUT /fs/lock/1/_create { "lock_type": "exclusive" } ## 加排它锁失败 { "error": { "root_cause": [ { "type": "version_conflict_engine_exception", "reason": "[lock][1]: version conflict, document already exists (current version [4])", "index_uuid": "Teq7Y-m1S1qJ6F3yTbMMCg", "shard": "3", "index": "fs" } ], "type": "version_conflict_engine_exception", "reason": "[lock][1]: version conflict, document already exists (current version [4])", "index_uuid": "Teq7Y-m1S1qJ6F3yTbMMCg", "shard": "3", "index": "fs" }, "status": 409 } ## 解锁共享锁 POST /fs/lock/1/_update { "script": { "lang": "groovy", "file": "unlock-shared" } } ## 解锁成功 { "_index": "fs", "_type": "lock", "_id": "1", "_version": 5, "result": "updated", "_shards": { "total": 2, "successful": 1, "failed": 0 } } ## 解锁完成只后再排它锁 PUT /fs/lock/1/_create { "lock_type": "exclusive" } ## 排它锁加锁成功 { "_index": "fs", "_type": "lock", "_id": "1", "_version": 7, "result": "created", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "created": true }
C.上了排它锁,再上共享锁
## 线程一家排它锁 PUT /fs/lock/1/_create { "lock_type": "exclusive" } ## 排它锁加锁成功 { "_index": "fs", "_type": "lock", "_id": "1", "_version": 7, "result": "created", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "created": true } ## 线程二在加共享锁 POST /fs/lock/1/_update { "upsert": { "lock_type": "shared", "lock_count": 1 }, "script": { "lang": "groovy", "file": "judge-lock-2" } } ## 排它锁加锁失败 { "error": { "root_cause": [ { "type": "remote_transport_exception", "reason": "[qOWzntC][127.0.0.1:9300][indices:data/write/update[s]]" } ], "type": "illegal_argument_exception", "reason": "failed to execute script", "caused_by": { "type": "script_exception", "reason": "error evaluating judge-lock-2", "caused_by": { "type": "power_assertion_error", "reason": "assert false\n" }, "script_stack": [], "script": "", "lang": "groovy" } }, "status": 400 }
D:加了排它锁,在加排它锁
## 线程一家排它锁 PUT /fs/lock/1/_create { "lock_type": "exclusive" } ## 排它锁加锁成功 { "_index": "fs", "_type": "lock", "_id": "1", "_version": 7, "result": "created", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "created": true } ## 线程二在加排它锁 ## 线程一家排它锁 PUT /fs/lock/1/_create { "lock_type": "exclusive" } ## 排它锁加锁失败 { "error": { "root_cause": [ { "type": "version_conflict_engine_exception", "reason": "[lock][1]: version conflict, document already exists (current version [7])", "index_uuid": "Teq7Y-m1S1qJ6F3yTbMMCg", "shard": "3", "index": "fs" } ], "type": "version_conflict_engine_exception", "reason": "[lock][1]: version conflict, document already exists (current version [7])", "index_uuid": "Teq7Y-m1S1qJ6F3yTbMMCg", "shard": "3", "index": "fs" }, "status": 409 }
3.解锁方式
共享锁可以同时加多个,所以解锁需要使用脚本,一个一个的解锁。
## 解锁共享锁 POST /fs/lock/1/_update { "script": { "lang": "groovy", "file": "unlock-shared" } }
排它锁只会存在一个锁,所以直接删除即可。
DELETE /fs/lock/1