基于Redis6.0 部署迷你版本消息队列实战(上)

简介: 基于Redis6.0 部署迷你版本消息队列实战(上)

技术研究背景



由于目前的研发团队处于公司初创阶段,尚未有能成熟的运维体系,对于市面上常见的成熟MQ搭建维护能力不足,但是又希望能有一款轻量级的消息系统供研发团队的成员使用,因此开展了对该方面相关的技术调研工作。


通过相关的技术调研后,决定挑选基于Redis实现消息系统。


具体技术选型原因:


团队内部已经有搭建相关的Redis服务,并且具备一定的运维能力,可以节省技术成本

业界有较多关于Redis搭建消息系统方面的技术文章


目前的系统的整体吞吐量并不高,接入消息系统的主要目的只是为了实现系统之间的解耦


为了方便让读者们从0到1地学习这块内容,我将会从环节搭建开始介绍起。


基本环境的搭建



基于redis6.0.6版本搭建一套简单的消息队列系统。

环境部署:


docker run -p 6379:6379 --name redis_6_0_6 -d redis:6.0.6 
复制代码


  • 参数解释:
    -d 后台启动
    -p 端口映射
    -name 容器名称


如果本地没有相关镜像,可以尝试通过搭建下方命令进行镜像的拉取:


docker pull redis:6.0.6 
复制代码


当redis的基础环境配置好了之后,接下来便是基于redis内置的一些基本功能开发一款消息队列组件了。


下边我将分三种不同的技术方案来介绍如何实现一款轻量级的消息队列。


基于常规的队列结构来实现消息队列



这块的实现比较简单,主要是基于Redis内部的List结构来落地的,发送方将消息从队列的左边写入,然后消费方从队列的右边读取。


package org.idea.mq.redis.framework.mq.list;
import com.alibaba.fastjson.JSON;
import org.idea.mq.redis.framework.bean.MsgWrapper;
import org.idea.mq.redis.framework.mq.IMQTemplate;
import org.idea.mq.redis.framework.redis.IRedisService;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
 * @Author linhao
 * @Date created in 3:09 下午 2022/2/7
 */
@Component
public class RedisListMQTemplate implements IMQTemplate {
    @Resource
    private IRedisService iRedisService;
    @Override
    public boolean send(MsgWrapper msgWrapper) {
        try {
            String json = JSON.toJSONString(msgWrapper.getMsgInfo());
            iRedisService.lpush(msgWrapper.getTopic(),json);
            return true;
        }catch (Exception e){
            e.printStackTrace();
        }
        return false;
    }
}
复制代码


问题思考



这里存在几个问题点需要思考下:


多个服务之间如何订阅同一个消息


这里我建议可以按照系统的项目名称前缀+业务标识来组织。


例如:用户系统中需要发布一条 会员已升级 的消息给到下游系统,此时可以将这条消息写入到名为:user-service:member-upgrade-list 的List集合中。如果订单系统希望访问用户系统的消息,则需要在redis的key里指定user-service:member-upgrade-list关键字。


网络异常,图片无法展示
|


消息的监听机制如何实现?


对于List的消息可以采用轮询的方式获取,例如下边这段案例代码:


/**
 * 轮询的方式获取数据
 *
 * @param msgWrapper
 */
private void pollingGet(MsgWrapper msgWrapper) {
    while (true) {
        String value = iRedisService.rpop(msgWrapper.getTopic());
        if (!StringUtils.isEmpty(value)) {
            System.out.println(value);
        }
        //减少访问压力,定期睡眠一段时间
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
复制代码


但是轮询的方式比较消耗性能,所以可以尝试使用Redis的阻塞式弹出指令,例如下边这种方式来监听消息的触发行为:


/**
 * 阻塞的方式获取数据
 */
private void blockGet(MsgWrapper msgWrapper) {
    while (true) {
        List<String> values = iRedisService.brpop(msgWrapper.getTopic());
        if (!CollectionUtils.isEmpty(values)) {
            values.forEach(value -> {
                System.out.println(value);
            });
        }
    }
}
复制代码


消息的可靠性传输如何确保?


在设计消息队列的时候,我们非常看重的就是消息的可靠性保证。当一条消息发送到消费端之后,如果出现了异常,希望消息能够实现重新发送的效果。


对于这种场景的设计我们可以尝试使用 BRPOPLPUSH 这条指令,这条指令可以帮助我们在Redis内部将数据弹出时写入到另一个备份队列中,这样即使弹出的消息消费失败了,备份队列中还有一份备用消息可以使用,而且弹出和写入备份队列操作在Redis内部做了封装,外界调用可以视作为一个原子操作。


是否可以支持广播的模式?


从List集合的实现原理来看,Redis弹出的元素只能返回给一个客户端链接,因此无法支持广播这种效果的实现。


基于发布订阅功能实现消息队列



Redis的内部提供了一个叫做发布订阅的功能,通过subscibe命令和publish指令可以帮助我们实现关于消息发布和通知的功能。


使用subscibe/publish命令实现的效果和List结构最大的不同在于它的传输方式:

list更多的是实现点对点方式的传输(P2P方式)


subscibe/publish则是可以实现广播的方式和订阅者进行通信


publish部分的案例代码:


@Override
public boolean publish(String channel, String content) {
    try (Jedis jedis = iRedisFactory.getConnection()) {
        jedis.publish(channel, content);
        return true;
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}
复制代码


subscibe部分的代码:


@Override
public boolean subscribe(JedisPubSub jedisPubSub, String... channel) {
    try (Jedis jedis = iRedisFactory.getConnection()) {
        jedis.subscribe(jedisPubSub, channel);
        return true;
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}
复制代码


监听的部分可以通过额外开启一个线程来实现这部分效果:


@Component
public class RedisSubscribeMQListener implements IMQListener {
    @Resource
    private IRedisService iRedisService;
    class TestChannel extends JedisPubSub {
        @Override
        public void onMessage(String channel, String message) {
            super.onMessage(channel, message);
            System.out.println("channel " + channel + " 接收到消息:" + message);
        }
        @Override
        public void onSubscribe(String channel, int subscribedChannels) {
            System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d",
                    channel, subscribedChannels));
        }
        @Override
        public void onUnsubscribe(String channel, int subscribedChannels) {
            System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d",
                    channel, subscribedChannels));
        }
    }
    //所有频道的消息都监听
    @Override
    public void onMessageReach(MsgWrapper msgWrapper) {
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                iRedisService.subscribe(new TestChannel(), msgWrapper.getTopic());
            }
        });
        thread.start();
    }
}
复制代码


要注意,回调通知的时候需要注入一个JedisPubSub的对象,这个对象的内部定义了接收消息之后的处理操作。


问题思考



如何保证消息的可靠性传输?


通过subscibe/publish处理的消息没有持久化的特性,一旦出现网络中断,Redis宕机这类异常的时候就会导致消息丢失,而且也没有较好的机制取支持消息重复消费的问题。因此可靠性方面较差。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
2月前
|
NoSQL 安全 测试技术
Redis游戏积分排行榜项目中通义灵码的应用实战
Redis游戏积分排行榜项目中通义灵码的应用实战
63 4
|
3月前
|
消息中间件 缓存 NoSQL
Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。
【10月更文挑战第4天】Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。随着数据增长,有时需要将 Redis 数据导出以进行分析、备份或迁移。本文详细介绍几种导出方法:1)使用 Redis 命令与重定向;2)利用 Redis 的 RDB 和 AOF 持久化功能;3)借助第三方工具如 `redis-dump`。每种方法均附有示例代码,帮助你轻松完成数据导出任务。无论数据量大小,总有一款适合你。
85 6
|
2月前
|
消息中间件 运维 UED
消息队列运维实战:攻克消息丢失、重复与积压难题
消息队列(MQ)作为分布式系统中的核心组件,承担着解耦、异步处理和流量削峰等功能。然而,在实际应用中,消息丢失、重复和积压等问题时有发生,严重影响系统的稳定性和数据的一致性。本文将深入探讨这些问题的成因及其解决方案,帮助您在运维过程中有效应对这些挑战。
41 1
|
3月前
|
NoSQL 关系型数据库 MySQL
MySQL与Redis协同作战:优化百万数据查询的实战经验
【10月更文挑战第13天】 在处理大规模数据集时,传统的关系型数据库如MySQL可能会遇到性能瓶颈。为了提升数据处理的效率,我们可以结合使用MySQL和Redis,利用两者的优势来优化数据查询。本文将分享一次实战经验,探讨如何通过MySQL与Redis的协同工作来优化百万级数据统计。
107 5
|
3月前
|
缓存 NoSQL Java
Spring Boot与Redis:整合与实战
【10月更文挑战第15天】本文介绍了如何在Spring Boot项目中整合Redis,通过一个电商商品推荐系统的案例,详细展示了从添加依赖、配置连接信息到创建配置类的具体步骤。实战部分演示了如何利用Redis缓存提高系统响应速度,减少数据库访问压力,从而提升用户体验。
167 2
|
4月前
|
消息中间件 存储 NoSQL
剖析 Redis List 消息队列的三种消费线程模型
Redis 列表(List)是一种简单的字符串列表,它的底层实现是一个双向链表。 生产环境,很多公司都将 Redis 列表应用于轻量级消息队列 。这篇文章,我们聊聊如何使用 List 命令实现消息队列的功能以及剖析消费者线程模型 。
109 20
剖析 Redis List 消息队列的三种消费线程模型
|
3月前
|
消息中间件 分布式计算 NoSQL
大数据-41 Redis 类型集合(2) bitmap位操作 geohash空间计算 stream持久化消息队列 Z阶曲线 Base32编码
大数据-41 Redis 类型集合(2) bitmap位操作 geohash空间计算 stream持久化消息队列 Z阶曲线 Base32编码
32 2
|
4月前
|
缓存 NoSQL 应用服务中间件
Redis实战篇
Redis实战篇
|
3月前
|
消息中间件
实践部署《云消息队列RabbitMQ实践》测评
《云消息队列RabbitMQ实践》解决方案原理清晰,尤其在异步通信和解耦方面解释详尽。对初学者而言,部分术语如消息持久化、确认机制及集群性能优化可更细致。部署过程文档详实,涵盖主要环节,但插件配置等细节存在环境问题,需查阅社区资料解决。该方案展示了RabbitMQ的高吞吐量、灵活路由和可靠消息传递能力,但在高可用性和消息丢失处理上可提供更深入配置建议。适用于高并发和解耦场景,如订单处理、日志收集,有助于提升系统可扩展性。总体部署体验良好,实用性较强。
58 0
|
3月前
|
消息中间件 存储 NoSQL
python 使用redis实现支持优先级的消息队列详细说明和代码
python 使用redis实现支持优先级的消息队列详细说明和代码
50 0