Redis的高级特性与应用场景(二)

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: Redis的高级特性与应用场景(二)

Redis的高级特性与应用场景(二)

利用Pipeline管道处理多个命令

Redis是一种基于客户端-服务端模型以及请求/响应协议的TCP服务。

这意味着通常情况下一个请求会遵循以下步骤:

  • 客户端向服务端发送一个查询请求,并监听Socket返回,通常是以阻塞模式,等待服务端响应。
  • 服务端处理命令,并将结果返回给客户端。

一个命令的发送到处理,是需要往返时间的,如果是本地回环网络的话还会比较快,可如果是外网的话经常层层网络代理就不一定能接受了。管道的话可以一次将多个命令发送到服务器,而不用等待答复,最后在一个步骤中读取该答复。

例子

我们可以在批量插入和批量获取的时候使用,例如下面: 我批量获取了直播相关的数据

Redis::connection('cache')->pipeline(function ($pipe) use ($fn) {
    foreach ($data() as $v) {
        $likeKeyArr[] = sprintf(LiveRooms::REDIS_LIVE_LIKE_PREFIX, $v['id'], $v['start_live_time']);
        $commentKeyArr[] = sprintf(LiveRooms::REDIS_LIVE_COMMENT_PREFIX, $v['id'], $v['start_live_time']);
        $shareKeyArr[] = sprintf(LiveRooms::REDIS_LIVE_SHARE_PREFIX, $v['id'], $v['start_live_time']);
    }
    $pipe->mget($likeKeyArr);
    $pipe->mget($commentKeyArr);
    $pipe->mget($shareKeyArr);
})

Redis键空间通知

这个特性可以让我们订阅redis的操作,例如在redis中设置好key的生存时间后,希望key过期被删除后能给发一个通知

del key

例如上面删除了一个键, redis 会发送两种不同类型的数据,特定的事件会往特定的频道发送通知,我们只要订阅这个特定的频道等待通知即可.

PUBLISH __keyspace@0__:key del # 键空间通知
PUBLISH __keyevent@0__:del key # 键事件通知

我们可以只启用其中一种通知,以便只传递我们感兴趣的事件子集。

注意: 事件使用Redis普通发布/订阅层传递,由于Redis的发布/订阅是fire and forget,因此如果你的应用要求可靠的事件通知,目前还不能使用这个功能,也就是说,如果你的发布/订阅客户端断开连接,并在稍后重连,那么所有在客户端断开期间发送的事件将会丢失。

例子

我们可以监听0库里面键过期的事件

<?php
class RedisInstance
{
    private $redis;
    public function __construct($host = '127.0.0.1', $port = 6379)
    {
        $this->redis = new Redis();
        $this->redis->connect($host, $port);
    }
    public function expire($key = null, $time = 0)
    {
        return $this->redis->expire($key, $time);
    }
    public function psubscribe($patterns = array(), $callback)
    {
        $this->redis->psubscribe($patterns, $callback);
    }
    public function setOption()
    {
        $this->redis->setOption(\Redis::OPT_READ_TIMEOUT,-1);
    }
}
echo "程序开始执行..\n";
$redis = new RedisInstance();
$redis->setOption();
$redis->psubscribe(array('__keyevent@0__:expired'), 'callback');
//回调
function callback($redis, $pattern, $chan, $msg)
{
    echo "$pattern\n";
    echo "$chan\n";
    echo "$msg\n";
      /*业务逻辑*/
}

不支持rollback的事务

redis事务与关系型数据库事务不太一样,它的事务不支持回滚,这也使得redis的事务处理效率特别高,但是这个不支持rollback是不是会造成我们的数据混乱呢,这样的事务是不是没有意义呢?

redis的事务是不保证原子性的: redis事务只保证在命令格式只有在都正确的情况下才会都执行,要不就都不执行命令。但是事务的整体是不保证原子性的,且没有回滚,当事务中任意一个命令执行失败,其余的命令依然会执行。

redis事务是将所有的命令发送到队列里面,最终exec才进行执行,redis的命令只会因为语法而失败,或是命令用在了错误类型键上面. 这也就是说,失败命令是由编程造成的,而这些错误应该在开发过程中被发现,而不应该出现在生产环境中. 鉴于更多的问题都是程序员自身的问题,redis直接采用无回滚方式来处理事务

乐观锁例子

我们津津乐道的转账问题,就可以利用事务来处理.

场景:

A余额100元

B余额100元

C余额100元

A准备给B转账50,再同时C要还50元给A,那么A这个余额怎么确定在转账完之后操作还是在转账前操作呢?这属于资源竞争,常见方式就是加锁了,排它锁的话比较消耗资源,我们可以利用watch来实现乐观锁.

watchkeyvalue发生改变的时候,exec事务会取消, 当 exec 被调用后, 所有的keys都将UNWATCH,不管这个事务会不会终止。

set A 100
set B 100
set C 100
watch A
multi
decrby A 50
incrby B 50
# 在exec之前可以启用第二个客户端,对A账号减少50元,查看watch乐观锁机制是否生效
exec   # 这里就会返回 <nil> 因为事务没有执行
get A  # 第二个客户端转账50元 所以最终为 150

分布式锁

锁的机制也是一个热门话题,不同的进程必须以独占资源的方式实现资源共享

首先,为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:

  • 互斥性。在任意时刻,只有一个客户端能持有锁。
  • 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
  • 具有容错性。只要大部分的Redis节点正常运行,客户端就可以加锁和解锁。
  • 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了

下面找到一个例如大家可以看看: https://github.com/ronnylt/redlock-php

<?php
class RedLock
{
    private $retryDelay;
    private $retryCount;
    private $clockDriftFactor = 0.01;
    private $quorum;
    private $servers = array();
    private $instances = array();
    function __construct(array $servers, $retryDelay = 200, $retryCount = 3)
    {
        $this->servers = $servers;
        $this->retryDelay = $retryDelay;
        $this->retryCount = $retryCount;
        $this->quorum  = min(count($servers), (count($servers) / 2 + 1));
    }
    public function lock($resource, $ttl)
    {
        $this->initInstances();
        $token = uniqid();
        $retry = $this->retryCount;
        do {
            $n = 0;
            $startTime = microtime(true) * 1000;
            foreach ($this->instances as $instance) {
                if ($this->lockInstance($instance, $resource, $token, $ttl)) {
                    $n++;
                }
            }
            # Add 2 milliseconds to the drift to account for Redis expires
            # precision, which is 1 millisecond, plus 1 millisecond min drift
            # for small TTLs.
            $drift = ($ttl * $this->clockDriftFactor) + 2;
            $validityTime = $ttl - (microtime(true) * 1000 - $startTime) - $drift;
            if ($n >= $this->quorum && $validityTime > 0) {
                return [
                    'validity' => $validityTime,
                    'resource' => $resource,
                    'token'    => $token,
                ];
            } else {
                foreach ($this->instances as $instance) {
                    $this->unlockInstance($instance, $resource, $token);
                }
            }
            // Wait a random delay before to retry
            $delay = mt_rand(floor($this->retryDelay / 2), $this->retryDelay);
            usleep($delay * 1000);
            $retry--;
        } while ($retry > 0);
        return false;
    }
    public function unlock(array $lock)
    {
        $this->initInstances();
        $resource = $lock['resource'];
        $token    = $lock['token'];
        foreach ($this->instances as $instance) {
            $this->unlockInstance($instance, $resource, $token);
        }
    }
    private function initInstances()
    {
        if (empty($this->instances)) {
            foreach ($this->servers as $server) {
                list($host, $port, $timeout) = $server;
                $redis = new \Redis();
                $redis->connect($host, $port, $timeout);
                $this->instances[] = $redis;
            }
        }
    }
    private function lockInstance($instance, $resource, $token, $ttl)
    {
        return $instance->set($resource, $token, ['NX', 'PX' => $ttl]);
    }
    private function unlockInstance($instance, $resource, $token)
    {
        $script = '
            if redis.call("GET", KEYS[1]) == ARGV[1] then
                return redis.call("DEL", KEYS[1])
            else
                return 0
            end
        ';
        return $instance->eval($script, [$resource, $token], 1);
    }
}
$servers = [
    ['127.0.0.1', 6379, 0.01],
    ['127.0.0.1', 6389, 0.01],
    ['127.0.0.1', 6399, 0.01],
];
$redLock = new RedLock($servers);
while (true) {
    $lock = $redLock->lock('test', 10000);
    if ($lock) {
        print_r($lock);
    } else {
        print "Lock not acquired\n";
    }
}

这个是redlock,但是这个也不能保证线程安全,进程由于各种原因pause,类似于上文说的多线程间的时间片切换,比如由于GC停顿等导致锁过期,但是进程并未感知到,同时另一个进程已经获取了该分布式锁,就会导致奇怪的结果发生.

这里有说明为什么redlock不安全原因: https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html 有兴趣的可以阅读一下

但是这种开源可以让我们更好的学习,对于数据要求强一致性的使用 redlock 还是需要慎重, 不推荐使用

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
13天前
|
消息中间件 缓存 NoSQL
Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。
【10月更文挑战第4天】Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。随着数据增长,有时需要将 Redis 数据导出以进行分析、备份或迁移。本文详细介绍几种导出方法:1)使用 Redis 命令与重定向;2)利用 Redis 的 RDB 和 AOF 持久化功能;3)借助第三方工具如 `redis-dump`。每种方法均附有示例代码,帮助你轻松完成数据导出任务。无论数据量大小,总有一款适合你。
50 6
|
2月前
|
缓存 NoSQL Java
Redis深度解析:解锁高性能缓存的终极武器,让你的应用飞起来
【8月更文挑战第29天】本文从基本概念入手,通过实战示例、原理解析和高级使用技巧,全面讲解Redis这一高性能键值对数据库。Redis基于内存存储,支持多种数据结构,如字符串、列表和哈希表等,常用于数据库、缓存及消息队列。文中详细介绍了如何在Spring Boot项目中集成Redis,并展示了其工作原理、缓存实现方法及高级特性,如事务、发布/订阅、Lua脚本和集群等,帮助读者从入门到精通Redis,大幅提升应用性能与可扩展性。
65 0
|
2天前
|
监控 NoSQL Java
场景题:百万数据插入Redis有哪些实现方案?
场景题:百万数据插入Redis有哪些实现方案?
13 1
场景题:百万数据插入Redis有哪些实现方案?
|
14天前
|
存储 缓存 NoSQL
大数据-38 Redis 高并发下的分布式缓存 Redis简介 缓存场景 读写模式 旁路模式 穿透模式 缓存模式 基本概念等
大数据-38 Redis 高并发下的分布式缓存 Redis简介 缓存场景 读写模式 旁路模式 穿透模式 缓存模式 基本概念等
37 4
|
16天前
|
存储 消息中间件 NoSQL
【redis】redis的特性和主要应用场景
【redis】redis的特性和主要应用场景
57 1
|
27天前
|
存储 消息中间件 缓存
深入探析Redis常见数据类型及应用场景
深入探析Redis常见数据类型及应用场景
34 2
|
2月前
|
Kubernetes NoSQL Redis
【Azure Redis】部署在AKS中的应用连接Redis时候出现Unable to connect to Redis server
【Azure Redis】部署在AKS中的应用连接Redis时候出现Unable to connect to Redis server
【Azure Redis】部署在AKS中的应用连接Redis时候出现Unable to connect to Redis server
|
3月前
|
消息中间件 缓存 NoSQL
Redis快速度特性及为什么支持多线程及应用场景
Redis快速度特性及为什么支持多线程及应用场景
99 11
|
2月前
|
NoSQL Java Redis
Spring Boot集成Redis全攻略:高效数据存取,打造性能飞跃的Java微服务应用!
【8月更文挑战第3天】Spring Boot是备受欢迎的微服务框架,以其快速开发与轻量特性著称。结合高性能键值数据库Redis,可显著增强应用性能。集成步骤包括:添加`spring-boot-starter-data-redis`依赖,配置Redis服务器参数,注入`RedisTemplate`或`StringRedisTemplate`进行数据操作。这种集成方案适用于缓存、高并发等场景,有效提升数据处理效率。
425 2
|
2月前
|
缓存 NoSQL Linux
【Azure Redis 缓存】应用中出现连接Redis服务错误(production.ERROR: Connection refused)的排查步骤
【Azure Redis 缓存】应用中出现连接Redis服务错误(production.ERROR: Connection refused)的排查步骤