PHP使用Redis的List(列表)命令实现消息队列

简介: 本篇笔记旨在使用Redis的List(列表)命令实现消息队列,生产者使用lPush命令发布消息,消费者使用rpoplpush命令获取消息,同时将消息放入监听队列,如果处理超时,监听者将把消息弹回消息队列

1.用到的List(列表)命令

命令 作用
lPush 将一个或多个值插入到列表头部
rpoplpush 弹出列表最后一个值,同时插入到另一个列表头部,并返回该值
lRem 删除列表内的给定值
lIndex 按索引获取列表内的值

2.队列的组成

名称 职责
生产者 发布消息
消费者 获取并处理消息
监听者 监听超时的消息,弹回原消息队列,确保消费者挂掉后或处理失败后消息能被其他消费者处理

3.php实现代码

生产者Producter.php

<?php
/**
 * Created by PhpStorm.
 * User: jmsite.cn
 * Date: 2019/1/26
 * Time: 0:13
 */
try {
    //声明消息队列-list的键名
    $queueKey = 'testQueueKey';
    $redis = new Redis();
    $redis->connect('192.168.75.132', 6379);
    //向列表中push10条消息
    for ($i = 0;$i < 10;$i++){
        //为消息生成唯一标识
        $uniqid = uniqid(mt_rand(10000, 99999).getmypid().memory_get_usage(), true);
        $ret = $redis->lPush($queueKey, json_encode(array('uniqid' => $uniqid, 'key' => 'key-'.$i, 'value' => 'data')));
        var_dump($ret);
    }

} catch (Exception $e){
    echo $e->getMessage();
}

消费者Consumer.php

<?php
/**
 * Created by PhpStorm.
 * User: jmsite.cn
 * Date: 2019/1/26
 * Time: 0:14
 */
try {
    //声明消息队列-list的键名
    $queueKey = 'testQueueKey';
    //声明监听者队列-list的键名
    $watchQueueKey = 'watchQueueKey';
    $redis = new Redis();
    $redis->connect('192.168.75.132', 6379);
    //队列先进先出,弹出最先加入的消息,同时放入监听队列
    while (true){
        $ret = $redis->rpoplpush($queueKey, $watchQueueKey);
        if ($ret === false){
            sleep(1);
        } else {
            $retArray = json_decode($ret, true);
            //将唯一id写入缓存设置有效期
            $redis->setex($retArray['uniqid'], 60, 0);
            //模拟失败
            $rand = mt_rand(0,9);
            if ($rand < 3){
                echo "failure:".$ret."\n";
            } else {
                //todo
                //处理成功移除消息
                $redis->lRem($watchQueueKey, $ret, 0);
                echo "success:".$ret."\n";
            }
        }
    }

} catch (Exception $e){
    echo $e->getMessage();
}

监听者Watcher.php

<?php
/**
 * Created by PhpStorm.
 * User: jmsite.cn
 * Date: 2019/1/26
 * Time: 0:15
 */
try {
    //声明消息队列-list的键名
    $queueKey = 'testQueueKey';
    //声明监听者队列-list的键名
    $watchQueueKey = 'watchQueueKey';
    $redis = new Redis();
    $redis->connect('192.168.75.132', 6379);

    while (true){
        //取出列表尾部的一个值
        $ret = $redis->lIndex($watchQueueKey, -1);
        //如果不存在则休眠1秒
        if ($ret === false){
            sleep(1);
        } else {
            $retArray = json_decode($ret, true);
            $idCache = $redis->get($retArray['uniqid']);
            if ($idCache === false){
                //如果已过期,表示任务超时,弹回原队列
                $redis->rpoplpush($watchQueueKey, $queueKey);
                echo "rpoplpush:".$ret."\n";
            } else {
                //处理中,继续等待
                sleep(1);
            }
        }
    }

} catch (Exception $e){
    echo $e->getMessage();
}

4.执行队列

开启监听者php Watcher.php
开启消费者php Consumer.php
执行生产者php Producter.php
生产者输出

int(1)
int(2)
int(3)
int(4)
int(5)
int(6)
int(7)
int(8)
int(9)
int(10)

监听者输出

rpoplpush:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"}
rpoplpush:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"}
rpoplpush:{"uniqid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"}
rpoplpush:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}
rpoplpush:{"uniqid":"83293267323642245c4bde640ed753.04622366","key":"key-9","value":"data"}
rpoplpush:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}

消费者输出

success:{"uniqid":"47280267323557445c4bde640dbfb4.78962728","key":"key-0","value":"data"}
failure:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"}
success:{"uniqid":"39394267323642245c4bde640de992.34641654","key":"key-2","value":"data"}
success:{"uniqid":"41335267323642245c4bde640df980.38466514","key":"key-3","value":"data"}
failure:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"}
failure:{"uniqid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"}
failure:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}
success:{"uniqid":"43817267323642245c4bde640ec189.44008738","key":"key-7","value":"data"}
success:{"uniqid":"69276267323642245c4bde640ecb91.04877522","key":"key-8","value":"data"}
failure:{"uniqid":"83293267323642245c4bde640ed753.04622366","key":"key-9","value":"data"}
success:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"}
success:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"}
success:{"uniqid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"}
failure:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}
success:{"uniqid":"83293267323642245c4bde640ed753.04622366","key":"key-9","value":"data"}
success:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}

我们看到消费者第一次执行时失败的消息,超时后又被弹回了消息队列,消费者有了再次执行的机会,监听者的职责就是确保消费者执行失败或挂掉后消息还能再弹回原队列得到再次执行
原文地址:https://www.jmsite.cn/blog-615.html

相关文章
|
缓存 NoSQL PHP
Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出
本文深入探讨了Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出。文章还介绍了Redis在页面缓存、数据缓存和会话缓存等应用场景中的使用,并强调了缓存数据一致性、过期时间设置、容量控制和安全问题的重要性。
269 5
|
存储 NoSQL 关系型数据库
PHP 使用 Redis
10月更文挑战第22天
213 6
|
存储 NoSQL PHP
PHP与Redis结合使用,提升数据存储性能
随着互联网应用的发展,PHP与Redis的结合成为提升数据存储性能的重要手段。PHP作为流行的服务器端语言,常用于网站开发;Redis作为高性能内存数据库,以其快速读写能力,有效优化数据访问速度,减轻数据库压力。两者结合通过缓存机制显著提升应用响应速度,支持高并发场景下的稳定性和可扩展性。
|
NoSQL 关系型数据库 MySQL
Redis 列表(List)
10月更文挑战第16天
190 2
|
存储 分布式计算 NoSQL
大数据-40 Redis 类型集合 string list set sorted hash 指令列表 执行结果 附截图
大数据-40 Redis 类型集合 string list set sorted hash 指令列表 执行结果 附截图
166 3
|
缓存 NoSQL Redis
Redis命令:列表模糊删除详解
Redis命令:列表模糊删除详解
471 3
|
存储 NoSQL Java
Redis命令:列表模糊删除详解
通过本文的介绍,我们详细探讨了如何在Redis中实现列表的模糊删除。虽然Redis没有直接提供模糊删除命令,但可以通过组合使用 `LRANGE`和 `LREM`命令,并在客户端代码中进行模糊匹配,来实现这一功能。希望本文能帮助你在实际应用中更有效地操作Redis列表。
454 0
|
存储 NoSQL Redis
Redis常见面试题:ZSet底层数据结构,SDS、压缩列表ZipList、跳表SkipList
String类型底层数据结构,List类型全面解析,ZSet底层数据结构;简单动态字符串SDS、压缩列表ZipList、哈希表、跳表SkipList、整数数组IntSet
|
缓存 NoSQL 数据处理
原生php实现redis缓存配置和使用方法
通过上述步骤,你可以在PHP项目中配置并使用Redis作为高性能的缓存解决方案。合理利用Redis的各种数据结构和特性,可以有效提升应用的响应速度和数据处理效率。记得在实际应用中根据具体需求选择合适的缓存策略,如设置合理的过期时间,以避免内存过度消耗。
382 0
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。