PHP使用Redis的List(列表)命令实现消息队列

简介: 本篇笔记旨在使用Redis的List(列表)命令实现消息队列,生产者使用lPush命令发布消息,消费者使用rpoplpush命令获取消息,同时将消息放入监听队列,如果处理超时,监听者将把消息弹回消息队列

1.用到的List(列表)命令

命令 作用
lPush 将一个或多个值插入到列表头部
rpoplpush 弹出列表最后一个值,同时插入到另一个列表头部,并返回该值
lRem 删除列表内的给定值
lIndex 按索引获取列表内的值

2.队列的组成

名称 职责
生产者 发布消息
消费者 获取并处理消息
监听者 监听超时的消息,弹回原消息队列,确保消费者挂掉后或处理失败后消息能被其他消费者处理

3.php实现代码

生产者Producter.php

<?php
/**
 * Created by PhpStorm.
 * User: jmsite.cn
 * Date: 2019/1/26
 * Time: 0:13
 */
try {
    //声明消息队列-list的键名
    $queueKey = 'testQueueKey';
    $redis = new Redis();
    $redis->connect('192.168.75.132', 6379);
    //向列表中push10条消息
    for ($i = 0;$i < 10;$i++){
        //为消息生成唯一标识
        $uniqid = uniqid(mt_rand(10000, 99999).getmypid().memory_get_usage(), true);
        $ret = $redis->lPush($queueKey, json_encode(array('uniqid' => $uniqid, 'key' => 'key-'.$i, 'value' => 'data')));
        var_dump($ret);
    }

} catch (Exception $e){
    echo $e->getMessage();
}

消费者Consumer.php

<?php
/**
 * Created by PhpStorm.
 * User: jmsite.cn
 * Date: 2019/1/26
 * Time: 0:14
 */
try {
    //声明消息队列-list的键名
    $queueKey = 'testQueueKey';
    //声明监听者队列-list的键名
    $watchQueueKey = 'watchQueueKey';
    $redis = new Redis();
    $redis->connect('192.168.75.132', 6379);
    //队列先进先出,弹出最先加入的消息,同时放入监听队列
    while (true){
        $ret = $redis->rpoplpush($queueKey, $watchQueueKey);
        if ($ret === false){
            sleep(1);
        } else {
            $retArray = json_decode($ret, true);
            //将唯一id写入缓存设置有效期
            $redis->setex($retArray['uniqid'], 60, 0);
            //模拟失败
            $rand = mt_rand(0,9);
            if ($rand < 3){
                echo "failure:".$ret."\n";
            } else {
                //todo
                //处理成功移除消息
                $redis->lRem($watchQueueKey, $ret, 0);
                echo "success:".$ret."\n";
            }
        }
    }

} catch (Exception $e){
    echo $e->getMessage();
}

监听者Watcher.php

<?php
/**
 * Created by PhpStorm.
 * User: jmsite.cn
 * Date: 2019/1/26
 * Time: 0:15
 */
try {
    //声明消息队列-list的键名
    $queueKey = 'testQueueKey';
    //声明监听者队列-list的键名
    $watchQueueKey = 'watchQueueKey';
    $redis = new Redis();
    $redis->connect('192.168.75.132', 6379);

    while (true){
        //取出列表尾部的一个值
        $ret = $redis->lIndex($watchQueueKey, -1);
        //如果不存在则休眠1秒
        if ($ret === false){
            sleep(1);
        } else {
            $retArray = json_decode($ret, true);
            $idCache = $redis->get($retArray['uniqid']);
            if ($idCache === false){
                //如果已过期,表示任务超时,弹回原队列
                $redis->rpoplpush($watchQueueKey, $queueKey);
                echo "rpoplpush:".$ret."\n";
            } else {
                //处理中,继续等待
                sleep(1);
            }
        }
    }

} catch (Exception $e){
    echo $e->getMessage();
}

4.执行队列

开启监听者php Watcher.php
开启消费者php Consumer.php
执行生产者php Producter.php
生产者输出

int(1)
int(2)
int(3)
int(4)
int(5)
int(6)
int(7)
int(8)
int(9)
int(10)

监听者输出

rpoplpush:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"}
rpoplpush:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"}
rpoplpush:{"uniqid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"}
rpoplpush:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}
rpoplpush:{"uniqid":"83293267323642245c4bde640ed753.04622366","key":"key-9","value":"data"}
rpoplpush:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}

消费者输出

success:{"uniqid":"47280267323557445c4bde640dbfb4.78962728","key":"key-0","value":"data"}
failure:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"}
success:{"uniqid":"39394267323642245c4bde640de992.34641654","key":"key-2","value":"data"}
success:{"uniqid":"41335267323642245c4bde640df980.38466514","key":"key-3","value":"data"}
failure:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"}
failure:{"uniqid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"}
failure:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}
success:{"uniqid":"43817267323642245c4bde640ec189.44008738","key":"key-7","value":"data"}
success:{"uniqid":"69276267323642245c4bde640ecb91.04877522","key":"key-8","value":"data"}
failure:{"uniqid":"83293267323642245c4bde640ed753.04622366","key":"key-9","value":"data"}
success:{"uniqid":"28580267323642245c4bde640dd8f3.30292468","key":"key-1","value":"data"}
success:{"uniqid":"10258267323642245c4bde640e1cd9.95656605","key":"key-4","value":"data"}
success:{"uniqid":"43356267323642245c4bde640e88e9.50566706","key":"key-5","value":"data"}
failure:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}
success:{"uniqid":"83293267323642245c4bde640ed753.04622366","key":"key-9","value":"data"}
success:{"uniqid":"59823267323642245c4bde640e98b5.51512314","key":"key-6","value":"data"}

我们看到消费者第一次执行时失败的消息,超时后又被弹回了消息队列,消费者有了再次执行的机会,监听者的职责就是确保消费者执行失败或挂掉后消息还能再弹回原队列得到再次执行
原文地址:https://www.jmsite.cn/blog-615.html

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
2月前
|
消息中间件 存储 负载均衡
Redis使用ZSET实现消息队列使用总结二
Redis使用ZSET实现消息队列使用总结二
58 0
|
17天前
|
消息中间件 存储 NoSQL
剖析 Redis List 消息队列的三种消费线程模型
Redis 列表(List)是一种简单的字符串列表,它的底层实现是一个双向链表。 生产环境,很多公司都将 Redis 列表应用于轻量级消息队列 。这篇文章,我们聊聊如何使用 List 命令实现消息队列的功能以及剖析消费者线程模型 。
59 20
剖析 Redis List 消息队列的三种消费线程模型
|
3月前
|
消息中间件 负载均衡 NoSQL
Redis系列学习文章分享---第七篇(Redis快速入门之消息队列--List实现消息队列 Pubsub实现消息队列 stream的单消费模式 stream的消费者组模式 基于stream消息队列)
Redis系列学习文章分享---第七篇(Redis快速入门之消息队列--List实现消息队列 Pubsub实现消息队列 stream的单消费模式 stream的消费者组模式 基于stream消息队列)
51 0
|
28天前
|
PHP 开发者 UED
PHP中的异常处理:从基础到高级探索Python中的列表推导式:简洁而强大的工具
【8月更文挑战第30天】在PHP编程的世界中,异常处理是确保代码健壮性和可靠性的关键机制。本文将引导您了解PHP异常处理的基本概念,并通过实际示例展示如何有效地捕获和处理异常。我们将一起探索try-catch结构、自定义异常类以及高级异常处理技术,让您的PHP代码更加稳固和易于维护。
|
1月前
|
消息中间件 NoSQL Redis
Redis Stream消息队列之基本语法与使用方式
这篇文章详细介绍了Redis Stream消息队列的基本语法和使用方式,包括消息的添加、读取、删除、修剪以及消费者组的使用和管理,强调了其在消息持久化和主备复制方面的优势。
45 0
|
2月前
|
消息中间件 存储 NoSQL
Redis使用ZSET实现消息队列使用总结一
Redis使用ZSET实现消息队列使用总结一
66 0
|
3月前
|
安全 Java
java线程之List集合并发安全问题及解决方案
java线程之List集合并发安全问题及解决方案
467 1
|
2月前
|
Java API Apache
怎么在在 Java 中对List进行分区
本文介绍了如何将列表拆分为给定大小的子列表。尽管标准Java集合API未直接支持此功能,但Guava和Apache Commons Collections提供了相关API。
|
2月前
|
运维 关系型数据库 Java
PolarDB产品使用问题之使用List或Range分区表时,Java代码是否需要进行改动
PolarDB产品使用合集涵盖了从创建与管理、数据管理、性能优化与诊断、安全与合规到生态与集成、运维与支持等全方位的功能和服务,旨在帮助企业轻松构建高可用、高性能且易于管理的数据库环境,满足不同业务场景的需求。用户可以通过阿里云控制台、API、SDK等方式便捷地使用这些功能,实现数据库的高效运维与持续优化。
|
2月前
|
存储 安全 Java
详解Java中集合的List接口实现的ArrayList方法 | Set接口实现的HashSet方法
详解Java中集合的List接口实现的ArrayList方法 | Set接口实现的HashSet方法