Redis的高级特性与应用场景(二)
利用Pipeline管道处理多个命令
Redis
是一种基于客户端-服务端模型以及请求/响应协议的TCP
服务。
这意味着通常情况下一个请求会遵循以下步骤:
- 客户端向服务端发送一个查询请求,并监听
Socket
返回,通常是以阻塞模式,等待服务端响应。 - 服务端处理命令,并将结果返回给客户端。
一个命令的发送到处理,是需要往返时间的,如果是本地回环网络的话还会比较快,可如果是外网的话经常层层网络代理就不一定能接受了。管道的话可以一次将多个命令发送到服务器,而不用等待答复,最后在一个步骤中读取该答复。
例子
我们可以在批量插入和批量获取的时候使用,例如下面: 我批量获取了直播相关的数据
Redis::connection('cache')->pipeline(function ($pipe) use ($fn) { foreach ($data() as $v) { $likeKeyArr[] = sprintf(LiveRooms::REDIS_LIVE_LIKE_PREFIX, $v['id'], $v['start_live_time']); $commentKeyArr[] = sprintf(LiveRooms::REDIS_LIVE_COMMENT_PREFIX, $v['id'], $v['start_live_time']); $shareKeyArr[] = sprintf(LiveRooms::REDIS_LIVE_SHARE_PREFIX, $v['id'], $v['start_live_time']); } $pipe->mget($likeKeyArr); $pipe->mget($commentKeyArr); $pipe->mget($shareKeyArr); })
Redis键空间通知
这个特性可以让我们订阅redis
的操作,例如在redis
中设置好key
的生存时间后,希望key
过期被删除后能给发一个通知
del key
例如上面删除了一个键, redis
会发送两种不同类型的数据,特定的事件会往特定的频道发送通知,我们只要订阅这个特定的频道等待通知即可.
PUBLISH __keyspace@0__:key del # 键空间通知 PUBLISH __keyevent@0__:del key # 键事件通知
我们可以只启用其中一种通知,以便只传递我们感兴趣的事件子集。
注意: 事件使用Redis
的普通发布/订阅层
传递,由于Redis
的发布/订阅是fire and forget
,因此如果你的应用要求可靠的事件通知,目前还不能使用这个功能,也就是说,如果你的发布/订阅客户端断开连接,并在稍后重连,那么所有在客户端断开期间发送的事件将会丢失。
例子
我们可以监听0库里面键过期的事件
<?php class RedisInstance { private $redis; public function __construct($host = '127.0.0.1', $port = 6379) { $this->redis = new Redis(); $this->redis->connect($host, $port); } public function expire($key = null, $time = 0) { return $this->redis->expire($key, $time); } public function psubscribe($patterns = array(), $callback) { $this->redis->psubscribe($patterns, $callback); } public function setOption() { $this->redis->setOption(\Redis::OPT_READ_TIMEOUT,-1); } } echo "程序开始执行..\n"; $redis = new RedisInstance(); $redis->setOption(); $redis->psubscribe(array('__keyevent@0__:expired'), 'callback'); //回调 function callback($redis, $pattern, $chan, $msg) { echo "$pattern\n"; echo "$chan\n"; echo "$msg\n"; /*业务逻辑*/ }
不支持rollback的事务
redis
事务与关系型数据库事务不太一样,它的事务不支持回滚,这也使得redis
的事务处理效率特别高,但是这个不支持rollback
是不是会造成我们的数据混乱呢,这样的事务是不是没有意义呢?
redis
的事务是不保证原子性的: redis
事务只保证在命令格式只有在都正确的情况下才会都执行,要不就都不执行命令。但是事务的整体是不保证原子性的,且没有回滚,当事务中任意一个命令执行失败,其余的命令依然会执行。
redis
事务是将所有的命令发送到队列里面,最终exec
才进行执行,redis
的命令只会因为语法而失败,或是命令用在了错误类型键上面. 这也就是说,失败命令是由编程造成的,而这些错误应该在开发过程中被发现,而不应该出现在生产环境中. 鉴于更多的问题都是程序员自身的问题,redis
直接采用无回滚方式来处理事务
乐观锁例子
我们津津乐道的转账问题,就可以利用事务来处理.
场景:
A余额100元
B余额100元
C余额100元
A准备给B转账50,再同时C要还50元给A,那么A这个余额怎么确定在转账完之后操作还是在转账前操作呢?这属于资源竞争,常见方式就是加锁了,排它锁的话比较消耗资源,我们可以利用watch
来实现乐观锁.
当watch
的key
value
发生改变的时候,exec
事务会取消, 当 exec
被调用后, 所有的keys
都将UNWATCH
,不管这个事务会不会终止。
set A 100 set B 100 set C 100 watch A multi decrby A 50 incrby B 50 # 在exec之前可以启用第二个客户端,对A账号减少50元,查看watch乐观锁机制是否生效 exec # 这里就会返回 <nil> 因为事务没有执行 get A # 第二个客户端转账50元 所以最终为 150
分布式锁
锁的机制也是一个热门话题,不同的进程必须以独占资源的方式实现资源共享
首先,为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:
- 互斥性。在任意时刻,只有一个客户端能持有锁。
- 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
- 具有容错性。只要大部分的
Redis
节点正常运行,客户端就可以加锁和解锁。 - 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了
下面找到一个例如大家可以看看: https://github.com/ronnylt/redlock-php
<?php class RedLock { private $retryDelay; private $retryCount; private $clockDriftFactor = 0.01; private $quorum; private $servers = array(); private $instances = array(); function __construct(array $servers, $retryDelay = 200, $retryCount = 3) { $this->servers = $servers; $this->retryDelay = $retryDelay; $this->retryCount = $retryCount; $this->quorum = min(count($servers), (count($servers) / 2 + 1)); } public function lock($resource, $ttl) { $this->initInstances(); $token = uniqid(); $retry = $this->retryCount; do { $n = 0; $startTime = microtime(true) * 1000; foreach ($this->instances as $instance) { if ($this->lockInstance($instance, $resource, $token, $ttl)) { $n++; } } # Add 2 milliseconds to the drift to account for Redis expires # precision, which is 1 millisecond, plus 1 millisecond min drift # for small TTLs. $drift = ($ttl * $this->clockDriftFactor) + 2; $validityTime = $ttl - (microtime(true) * 1000 - $startTime) - $drift; if ($n >= $this->quorum && $validityTime > 0) { return [ 'validity' => $validityTime, 'resource' => $resource, 'token' => $token, ]; } else { foreach ($this->instances as $instance) { $this->unlockInstance($instance, $resource, $token); } } // Wait a random delay before to retry $delay = mt_rand(floor($this->retryDelay / 2), $this->retryDelay); usleep($delay * 1000); $retry--; } while ($retry > 0); return false; } public function unlock(array $lock) { $this->initInstances(); $resource = $lock['resource']; $token = $lock['token']; foreach ($this->instances as $instance) { $this->unlockInstance($instance, $resource, $token); } } private function initInstances() { if (empty($this->instances)) { foreach ($this->servers as $server) { list($host, $port, $timeout) = $server; $redis = new \Redis(); $redis->connect($host, $port, $timeout); $this->instances[] = $redis; } } } private function lockInstance($instance, $resource, $token, $ttl) { return $instance->set($resource, $token, ['NX', 'PX' => $ttl]); } private function unlockInstance($instance, $resource, $token) { $script = ' if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("DEL", KEYS[1]) else return 0 end '; return $instance->eval($script, [$resource, $token], 1); } } $servers = [ ['127.0.0.1', 6379, 0.01], ['127.0.0.1', 6389, 0.01], ['127.0.0.1', 6399, 0.01], ]; $redLock = new RedLock($servers); while (true) { $lock = $redLock->lock('test', 10000); if ($lock) { print_r($lock); } else { print "Lock not acquired\n"; } }
这个是redlock
,但是这个也不能保证线程安全,进程由于各种原因pause
,类似于上文说的多线程间的时间片切换,比如由于GC
停顿等导致锁过期,但是进程并未感知到,同时另一个进程已经获取了该分布式锁,就会导致奇怪的结果发生.
这里有说明为什么redlock
不安全原因: https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html
有兴趣的可以阅读一下
但是这种开源可以让我们更好的学习,对于数据要求强一致性的使用 redlock
还是需要慎重, 不推荐使用