Redis学习笔记~分布式的Pub/Sub模式

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介:

redis的客户端有很多,这次用它的pub/sub发布与订阅我选择了StackExchange.Redis,发布与订阅大家应该很清楚了,首先一个订阅者,订阅一个服务,服务执行一些处理程序(可能是写个日志,插入个数据,发个email)然后当另一个项目的某个业务发布这个服务后,被订阅的程序将会被执行,这个听起来很有意思,redis有好的实现了这个pub/sub功能。

看一下结构图

 

Sub订阅(消息消费者)

对于订阅方,这里类似于一个服务,它会长期运行着,被启动后动态订阅一些服务进来,以便以后再被其它服务调用

        //sub a function in A-project
            PubSubManager.Instance.Subscribe("UserLog", (msg) =>
            {
                //订阅者处理自己的业务逻辑,这相关于队列服务要干的事
                Console.WriteLine(msg);
            });

Pub发布(消息生产者)

而对于其它项目,如A网站,它可能在被在POST动作后发布这个UserLog的服务,这时上面的订阅方将会消费它(消费者模式),或者服务代码被执行!

     [HttpPost]
        public ActionResult Index(string user)
        {
            PubSubManager.Instance.Publish("UserLog", user + "这个用户提交表单了");
            return View();
        }

对于一直运行的服务,将会收到来自不同项目的消息,而它只负责消费它!

Lind.DDD.PublishSubscribe

封装了一些标准的pub/sub方法,它可以有多种实现方法,本例使用redis这个中间件

   /// <summary>
    /// 发布订阅的接口规则
    /// </summary>
    public interface IPubSub
    {
        /// <summary>
        /// 发布,有顺序,对象源是字符串
        /// </summary>
        /// <param name="channel"></param>
        /// <param name="value"></param>
        void Publish(string channel, string value);
        /// <summary>
        /// 订阅,对象源是字符串
        /// </summary>
        /// <param name="channel"></param>
        /// <param name="action"></param>
        void Subscribe(string channel, Action<string> action);
        /// <summary>
        /// 异步发布,无顺序,对象源是字符串
        /// </summary>
        /// <param name="channel"></param>
        /// <param name="value"></param>
        void PublishAsync(string channel, string value);
        /// <summary>
        /// 异步订阅,无顺序,对象源是字符串
        /// </summary>
        /// <param name="channel"></param>
        /// <param name="action"></param>
        void SubscribeAsync(string channel, Action<string> action);

        /// <summary>
        /// 发布,有顺序,对象源是Byte[]
        /// </summary>
        /// <param name="channel"></param>
        /// <param name="value"></param>
        void PublishByte(string channel, byte[] value);
        /// <summary>
        /// 订阅,对象源是Byte[]
        /// </summary>
        /// <param name="channel"></param>
        /// <param name="action"></param>
        void SubscribeByte(string channel, Action<byte[]> action);
        /// <summary>
        /// 异步发布,有顺序,对象源是Byte[]
        /// </summary>
        /// <param name="channel"></param>
        /// <param name="value"></param>
        void PublishByteAsync(string channel, byte[] value);
        /// <summary>
        /// 异步订阅,对象源是Byte[]
        /// </summary>
        /// <param name="channel"></param>
        /// <param name="action"></param>
        void SubscribeByteAsync(string channel, Action<byte[]> action);

        /// <summary>
        /// 发布,有顺序,对象源是泛型对象
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="channel"></param>
        /// <param name="value"></param>
        void Publish<T>(string channel, T value);
        /// <summary>
        /// 订阅,对象源是泛型对象
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="channel"></param>
        /// <param name="action"></param>
        void Subscribe<T>(string channel, Action<T> action);
        /// <summary>
        /// 异步发布,有顺序,对象源是泛型对象
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="channel"></param>
        /// <param name="value"></param>
        void PublishAsync<T>(string channel, T value);
        /// <summary>
        /// 异步订阅,对象源是泛型对象
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="channel"></param>
        /// <param name="action"></param>
        void SubscribeAsync<T>(string channel, Action<T> action);
        /// <summary>
        /// 取消指定订阅
        /// </summary>
        /// <param name="channel"></param>
        void UnSubscribe(string channel);
        /// <summary>
        /// 取消所有订阅
        /// </summary>
        /// <param name="channel"></param>
        void UnSubscribeAll();
    }

 本文转自博客园张占岭(仓储大叔)的博客,原文链接:Redis学习笔记~分布式的Pub/Sub模式,如需转载请自行联系原博主。

目录
相关文章
|
4月前
|
存储 负载均衡 NoSQL
【赵渝强老师】Redis Cluster分布式集群
Redis Cluster是Redis的分布式存储解决方案,通过哈希槽(slot)实现数据分片,支持水平扩展,具备高可用性和负载均衡能力,适用于大规模数据场景。
342 2
|
4月前
|
存储 缓存 NoSQL
【📕分布式锁通关指南 12】源码剖析redisson如何利用Redis数据结构实现Semaphore和CountDownLatch
本文解析 Redisson 如何通过 Redis 实现分布式信号量(RSemaphore)与倒数闩(RCountDownLatch),利用 Lua 脚本与原子操作保障分布式环境下的同步控制,帮助开发者更好地理解其原理与应用。
275 6
|
5月前
|
存储 缓存 NoSQL
Redis核心数据结构与分布式锁实现详解
Redis 是高性能键值数据库,支持多种数据结构,如字符串、列表、集合、哈希、有序集合等,广泛用于缓存、消息队列和实时数据处理。本文详解其核心数据结构及分布式锁实现,帮助开发者提升系统性能与并发控制能力。
|
2月前
|
NoSQL 算法 Redis
【Docker】(3)学习Docker中 镜像与容器数据卷、映射关系!手把手带你安装 MySql主从同步 和 Redis三主三从集群!并且进行主从切换与扩容操作,还有分析 哈希分区 等知识点!
Union文件系统(UnionFS)是一种**分层、轻量级并且高性能的文件系统**,它支持对文件系统的修改作为一次提交来一层层的叠加,同时可以将不同目录挂载到同一个虚拟文件系统下(unite several directories into a single virtual filesystem) Union 文件系统是 Docker 镜像的基础。 镜像可以通过分层来进行继承,基于基础镜像(没有父镜像),可以制作各种具体的应用镜像。
353 5
|
3月前
|
NoSQL Java 调度
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
分布式锁是分布式系统中用于同步多节点访问共享资源的机制,防止并发操作带来的冲突。本文介绍了基于Spring Boot和Redis实现分布式锁的技术方案,涵盖锁的获取与释放、Redis配置、服务调度及多实例运行等内容,通过Docker Compose搭建环境,验证了锁的有效性与互斥特性。
217 0
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
|
3月前
|
缓存 NoSQL 关系型数据库
Redis缓存和分布式锁
Redis 是一种高性能的键值存储系统,广泛用于缓存、消息队列和内存数据库。其典型应用包括缓解关系型数据库压力,通过缓存热点数据提高查询效率,支持高并发访问。此外,Redis 还可用于实现分布式锁,解决分布式系统中的资源竞争问题。文章还探讨了缓存的更新策略、缓存穿透与雪崩的解决方案,以及 Redlock 算法等关键技术。
|
3月前
|
存储 运维 NoSQL
Redis集群模式
Redis集群是一种分布式存储方案,旨在解决数据存储容量不足的问题。它通过将数据分片存储在多个节点上,实现数据的横向扩展。常见的分片算法包括哈希求余、一致性哈希和哈希槽分区。其中,Redis采用哈希槽分区算法,将数据均匀分配到16384个槽位中,每个分片负责一部分槽位。当节点故障时,集群通过故障检测和主从切换机制,确保服务的高可用性。集群还支持自动的数据迁移和负载均衡,保障系统稳定运行。
|
5月前
|
NoSQL Redis
Lua脚本协助Redis分布式锁实现命令的原子性
利用Lua脚本确保Redis操作的原子性是分布式锁安全性的关键所在,可以大幅减少由于网络分区、客户端故障等导致的锁无法正确释放的情况,从而在分布式系统中保证数据操作的安全性和一致性。在将这些概念应用于生产环境前,建议深入理解Redis事务与Lua脚本的工作原理以及分布式锁的可能问题和解决方案。
209 8
|
6月前
|
缓存 NoSQL 算法
高并发秒杀系统实战(Redis+Lua分布式锁防超卖与库存扣减优化)
秒杀系统面临瞬时高并发、资源竞争和数据一致性挑战。传统方案如数据库锁或应用层锁存在性能瓶颈或分布式问题,而基于Redis的分布式锁与Lua脚本原子操作成为高效解决方案。通过Redis的`SETNX`实现分布式锁,结合Lua脚本完成库存扣减,确保操作原子性并大幅提升性能(QPS从120提升至8,200)。此外,分段库存策略、多级限流及服务降级机制进一步优化系统稳定性。最佳实践包括分层防控、黄金扣减法则与容灾设计,强调根据业务特性灵活组合技术手段以应对高并发场景。
1607 7
|
7月前
|
NoSQL 算法 安全
redis分布式锁在高并发场景下的方案设计与性能提升
本文探讨了Redis分布式锁在主从架构下失效的问题及其解决方案。首先通过CAP理论分析,Redis遵循AP原则,导致锁可能失效。针对此问题,提出两种解决方案:Zookeeper分布式锁(追求CP一致性)和Redlock算法(基于多个Redis实例提升可靠性)。文章还讨论了可能遇到的“坑”,如加从节点引发超卖问题、建议Redis节点数为奇数以及持久化策略对锁的影响。最后,从性能优化角度出发,介绍了减少锁粒度和分段锁的策略,并结合实际场景(如下单重复提交、支付与取消订单冲突)展示了分布式锁的应用方法。
523 3