实战干货:基于Redis6.0 部署迷你版本消息队列(上)

简介: 实战干货:基于Redis6.0 部署迷你版本消息队列(上)

技术研究背景


由于目前的研发团队处于公司初创阶段,尚未有能成熟的运维体系,对于市面上常见的成熟MQ搭建维护能力不足,但是又希望能有一款轻量级的消息系统供研发团队的成员使用,因此开展了对该方面相关的技术调研工作。


通过相关的技术调研后,决定挑选基于Redis实现消息系统。


具体技术选型原因:


  • 团队内部已经有搭建相关的Redis服务,并且具备一定的运维能力,可以节省技术成本


  • 业界有较多关于Redis搭建消息系统方面的技术文章


  • 目前的系统的整体吞吐量并不高,接入消息系统的主要目的只是为了实现系统之间的解耦


为了方便让读者们从0到1地学习这块内容,我将会从环节搭建开始介绍起。


基本环境的搭建


基于redis6.0.6版本搭建一套简单的消息队列系统。 环境部署:



docker run -p 6379:6379 --name redis_6_0_6 -d redis:6.0.6


  • 参数解释: -d 后台启动 -p 端口映射 -name 容器名称


如果本地没有相关镜像,可以尝试通过搭建下方命令进行镜像的拉取:


docker pull redis:6.0.6


当redis的基础环境配置好了之后,接下来便是基于redis内置的一些基本功能开发一款消息队列组件了。


下边我将分三种不同的技术方案来介绍如何实现一款轻量级的消息队列。


基于常规的队列结构来实现消息队列


这块的实现比较简单,主要是基于Redis内部的List结构来落地的,发送方将消息从队列的左边写入,然后消费方从队列的右边读取。


package org.idea.mq.redis.framework.mq.list;
import com.alibaba.fastjson.JSON;
import org.idea.mq.redis.framework.bean.MsgWrapper;
import org.idea.mq.redis.framework.mq.IMQTemplate;
import org.idea.mq.redis.framework.redis.IRedisService;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
 * @Author linhao
 * @Date created in 3:09 下午 2022/2/7
 */
@Component
public class RedisListMQTemplate implements IMQTemplate {
    @Resource
    private IRedisService iRedisService;
    @Override
    public boolean send(MsgWrapper msgWrapper) {
        try {
            String json = JSON.toJSONString(msgWrapper.getMsgInfo());
            iRedisService.lpush(msgWrapper.getTopic(),json);
            return true;
        }catch (Exception e){
            e.printStackTrace();
        }
        return false;
    }
}


问题思考


这里存在几个问题点需要思考下:


多个服务之间如何订阅同一个消息


这里我建议可以按照系统的项目名称前缀+业务标识来组织。


例如:用户系统中需要发布一条 会员已升级 的消息给到下游系统,此时可以将这条消息写入到名为:user-service:member-upgrade-list 的List集合中。


如果订单系统希望访问用户系统的消息,则需要在redis的key里指定user-service:member-upgrade-list关键字。


image.png


消息的监听机制如何实现?


对于List的消息可以采用轮询的方式获取,例如下边这段案例代码:


/**
 * 轮询的方式获取数据
 *
 * @param msgWrapper
 */
private void pollingGet(MsgWrapper msgWrapper) {
    while (true) {
        String value = iRedisService.rpop(msgWrapper.getTopic());
        if (!StringUtils.isEmpty(value)) {
            System.out.println(value);
        }
        //减少访问压力,定期睡眠一段时间
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}


但是轮询的方式比较消耗性能,所以可以尝试使用Redis的阻塞式弹出指令,例如下边这种方式来监听消息的触发行为:


/**
 * 阻塞的方式获取数据
 */
private void blockGet(MsgWrapper msgWrapper) {
    while (true) {
        List<String> values = iRedisService.brpop(msgWrapper.getTopic());
        if (!CollectionUtils.isEmpty(values)) {
            values.forEach(value -> {
                System.out.println(value);
            });
        }
    }
}


消息的可靠性传输如何确保?


在设计消息队列的时候,我们非常看重的就是消息的可靠性保证。当一条消息发送到消费端之后,如果出现了异常,希望消息能够实现重新发送的效果。


对于这种场景的设计我们可以尝试使用 BRPOPLPUSH 这条指令,这条指令可以帮助我们在Redis内部将数据弹出时写入到另一个备份队列中,这样即使弹出的消息消费失败了,备份队列中还有一份备用消息可以使用,而且弹出和写入备份队列操作在Redis内部做了封装,外界调用可以视作为一个原子操作。


是否可以支持广播的模式?


从List集合的实现原理来看,Redis弹出的元素只能返回给一个客户端链接,因此无法支持广播这种效果的实现。


基于发布订阅功能实现消息队列


Redis的内部提供了一个叫做发布订阅的功能,通过subscibe命令和publish指令可以帮助我们实现关于消息发布和通知的功能。


使用subscibe/publish命令实现的效果和List结构最大的不同在于它的传输方式:


  • list更多的是实现点对点方式的传输(P2P方式)


  • subscibe/publish则是可以实现广播的方式和订阅者进行通信


publish部分的案例代码:


@Override
public boolean publish(String channel, String content) {
    try (Jedis jedis = iRedisFactory.getConnection()) {
        jedis.publish(channel, content);
        return true;
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}


subscibe部分的代码:


@Override
public boolean subscribe(JedisPubSub jedisPubSub, String... channel) {
    try (Jedis jedis = iRedisFactory.getConnection()) {
        jedis.subscribe(jedisPubSub, channel);
        return true;
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}


监听的部分可以通过额外开启一个线程来实现这部分效果:


@Component
public class RedisSubscribeMQListener implements IMQListener {
    @Resource
    private IRedisService iRedisService;
    class TestChannel extends JedisPubSub {
        @Override
        public void onMessage(String channel, String message) {
            super.onMessage(channel, message);
            System.out.println("channel " + channel + " 接收到消息:" + message);
        }
        @Override
        public void onSubscribe(String channel, int subscribedChannels) {
            System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d",
                    channel, subscribedChannels));
        }
        @Override
        public void onUnsubscribe(String channel, int subscribedChannels) {
            System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d",
                    channel, subscribedChannels));
        }
    }
    //所有频道的消息都监听
    @Override
    public void onMessageReach(MsgWrapper msgWrapper) {
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                iRedisService.subscribe(new TestChannel(), msgWrapper.getTopic());
            }
        });
        thread.start();
    }
}


要注意,回调通知的时候需要注入一个JedisPubSub的对象,这个对象的内部定义了接收消息之后的处理操作。




相关文章
kde
|
10天前
|
存储 NoSQL Redis
手把手教你用 Docker 部署 Redis
Redis是高性能内存数据库,支持多种数据结构,适用于缓存、消息队列等场景。本文介绍如何通过Docker快速拉取轩辕镜像并部署Redis,涵盖快速启动、持久化存储及docker-compose配置,助力开发者高效搭建稳定服务。
kde
302 6
|
1月前
|
存储 NoSQL 前端开发
Redis专题-实战篇一-基于Session和Redis实现登录业务
本项目基于SpringBoot实现黑马点评系统,涵盖Session与Redis两种登录方案。通过验证码登录、用户信息存储、拦截器校验等流程,解决集群环境下Session不共享问题,采用Redis替代Session实现数据共享与自动续期,提升系统可扩展性与安全性。
159 3
Redis专题-实战篇一-基于Session和Redis实现登录业务
|
1月前
|
存储 缓存 NoSQL
Redis专题-实战篇二-商户查询缓存
本文介绍了缓存的基本概念、应用场景及实现方式,涵盖Redis缓存设计、缓存更新策略、缓存穿透问题及其解决方案。重点讲解了缓存空对象与布隆过滤器的使用,并通过代码示例演示了商铺查询的缓存优化实践。
125 1
Redis专题-实战篇二-商户查询缓存
|
7月前
|
数据采集 存储 数据可视化
分布式爬虫框架Scrapy-Redis实战指南
本文介绍如何使用Scrapy-Redis构建分布式爬虫系统,采集携程平台上热门城市的酒店价格与评价信息。通过代理IP、Cookie和User-Agent设置规避反爬策略,实现高效数据抓取。结合价格动态趋势分析,助力酒店业优化市场策略、提升服务质量。技术架构涵盖Scrapy-Redis核心调度、代理中间件及数据解析存储,提供完整的技术路线图与代码示例。
679 0
分布式爬虫框架Scrapy-Redis实战指南
|
4月前
|
缓存 监控 NoSQL
Redis 实操要点:Java 最新技术栈的实战解析
本文介绍了基于Spring Boot 3、Redis 7和Lettuce客户端的Redis高级应用实践。内容包括:1)现代Java项目集成Redis的配置方法;2)使用Redisson实现分布式可重入锁与公平锁;3)缓存模式解决方案,包括布隆过滤器防穿透和随机过期时间防雪崩;4)Redis数据结构的高级应用,如HyperLogLog统计UV和GeoHash处理地理位置。文章提供了详细的代码示例,涵盖Redis在分布式系统中的核心应用场景,特别适合需要处理高并发、分布式锁等问题的开发场景。
315 40
|
4月前
|
缓存 NoSQL 算法
高并发秒杀系统实战(Redis+Lua分布式锁防超卖与库存扣减优化)
秒杀系统面临瞬时高并发、资源竞争和数据一致性挑战。传统方案如数据库锁或应用层锁存在性能瓶颈或分布式问题,而基于Redis的分布式锁与Lua脚本原子操作成为高效解决方案。通过Redis的`SETNX`实现分布式锁,结合Lua脚本完成库存扣减,确保操作原子性并大幅提升性能(QPS从120提升至8,200)。此外,分段库存策略、多级限流及服务降级机制进一步优化系统稳定性。最佳实践包括分层防控、黄金扣减法则与容灾设计,强调根据业务特性灵活组合技术手段以应对高并发场景。
1159 7
|
4月前
|
机器学习/深度学习 存储 NoSQL
基于 Flink + Redis 的实时特征工程实战:电商场景动态分桶计数实现
本文介绍了基于 Flink 与 Redis 构建的电商场景下实时特征工程解决方案,重点实现动态分桶计数等复杂特征计算。通过流处理引擎 Flink 实时加工用户行为数据,结合 Redis 高性能存储,满足推荐系统毫秒级特征更新需求。技术架构涵盖状态管理、窗口计算、Redis 数据模型设计及特征服务集成,有效提升模型预测效果与系统吞吐能力。
396 2
|
7月前
|
缓存 NoSQL Java
基于SpringBoot的Redis开发实战教程
Redis在Spring Boot中的应用非常广泛,其高性能和灵活性使其成为构建高效分布式系统的理想选择。通过深入理解本文的内容,您可以更好地利用Redis的特性,为应用程序提供高效的缓存和消息处理能力。
591 79
|
11月前
|
NoSQL 安全 测试技术
Redis游戏积分排行榜项目中通义灵码的应用实战
Redis游戏积分排行榜项目中通义灵码的应用实战
248 4
|
8月前
|
存储 NoSQL Redis
Docker 部署 Redis
在使用 Docker 部署 Redis 时,为实现数据持久化,需正确挂载容器内的数据目录到宿主机。推荐命令如下: ``` docker run -d --name redis -v /mnt/data/redis:/data -p 6379:6379 redis ``` 该命令将宿主机的 `/mnt/data/redis` 目录挂载到容器的 `/data` 目录,确保 Redis 数据持久化。此路径更通用,适合大多数场景。避免使用不匹配的挂载路径,如 `/var/lib/redis` 或 `/mnt/data/redis` 到非默认目录,以防止数据无法正确持久化。
下一篇
oss教程