AspNetCore结合Redis实践消息队列

简介: 这是年中首发在博客园上的文章,个人觉得是AspNetCore结合Redis做的一次比较优秀的消息队列重构,其中对于点对点/发布-订阅的思路应该也是面试必考题。

引言


.Net TPL Dataflow是一个进程内数据流管道,应对高并发、低延迟的要求非常有效, 但在实际Docker部署的过程中, 有一个问题一直无法回避:


单体程序部署的瞬间(服务不可用)会有少量流量无法处理;


更糟糕的情况下,迭代部署的这个版本有问题,上线后无法工作, 导致更多流量没有处理。

 

背负神圣使命(巨大压力)的程序猿心生一计,为何不将单体程序改成分布式:


增加服务ReceiverApp,ReceiverApp只接受数据,WebApp只处理数据。


41af355283d1d767d9c90a0b032e9d7c.png


知识储备


消息队列和订阅发布作为老生常谈的两个知识点被反复提及,按照JMS的规范, 官方称为点对点(point to point, queue)和发布/订阅(publish/subscribe,channel)


240f2477fc4c9d8eec4e0357b07cb7ea.png


点对点


   生产者发送消息到Message Queue中,然后消费者从队列中取出消息并消费。


队列会保留消息,直到他们被消费或超时;

① MQ支持多消费者,但每个消息只能被一个消费者处理

② 发送者和消费者在时间上没有依赖性,当发送者发送消息之后,不管消费者有没有在运行(甚至不管有没有消费者),都不会影响到消息被发送到队列

③ 一般消费者在消费之后需要向队列应答成功

如果希望发送的每个消息都被成功处理,你应该使用p2p模型


发布/订阅


消息生产者将消息发布到Channel,在此之前已有多个消费者订阅该通道。


和点对点方式不同,发布到特定通道的消息会被通道订阅者实时接收。


通道没有队列机制,发布的消息只能被当前收听的订阅者接收到

① 每个消息可以有多个订阅者

② 发布者和消费者有时间上依赖性:某通道的订阅者,必须先创建该通道订阅,才能收到消息

发布消息至通道,不关注订阅者是谁;订阅者可收听自己感兴趣的多个通道(类似于topic),也不关注发布者是谁。

③ 故如果没有订阅者,发布的消息将得不到处理;


头脑风暴


Redis内置的List数据结构能形成轻量级消息队列的效果;Redis原生支持发布/订阅 模型

如上分析, Pub/Sub模型在订阅者宕机的时候,发布的消息得不到处理,故此模型不能用于强业务的数据接收和处理。


本次采用的消息队列模型:


  • 解耦业务:新建ReceiverApp作为生产者,专注于接收并发送到队列;原有的WebApp作为消费者专注数据处理。


  • 起到削峰填谷的作用,若缩放出多个WebApp消费者容器,还能形成负载均衡的效果。


需要关注Redis操作List结构的两个命令( 左进右出,右进左出同理):


  LPUSH  &  RPOP/BRPOP


Brpop中的B 表示"Block",是一个rpop命令的阻塞版本:若指定List没有新元素,在给定时间内,该命令会阻塞当前redis客户端连接,直到超时返回nil


AspNetCore编程实践


本次使用AspNetCore 完成RedisMQ的实践,引入Redis国产第三方开源库CSRedisCore


生产者ReceiverApp


生产者使用LPush命令向Redis List数据结构写入消息。


------------------截取自Startup.cs-------------------------
public void ConfigureServices(IServiceCollection services)
{    
// Redis客户端要定义成单例, 不然在大流量并发收数的时候, 会造成redis client来不及释放。另一方面也确认api控制器不是单例模式,    
var csredis = new CSRedisClient(Configuration.GetConnectionString("redis")+",name=receiver");    
RedisHelper.Initialization(csredis);    
services.AddSingleton(csredis);    
services.AddMvc();
}
------------------截取自数据接收Controller-------------------[Route("batch")]
[HttpPost]
public async Task BatchPutEqidAndProfileIds([FromBody]List<EqidPair> eqidPairs){
    if (!ModelState.IsValid)  
    throw new ArgumentException("Http Body Payload Error.");  
    var redisKey = $"{DateTime.Now.ToString("yyyyMMdd")}";    
    eqidPairs = await EqidExtractor.EqidExtractAsync(eqidPairs);   
    if (eqidPairs != null && eqidPairs.Any())    
    RedisHelper.LPush(redisKey, eqidPairs.ToArray());    
    await Task.CompletedTask; 
    }


消费者WebApp


根据以上RedisMQ思路,事件消费方式是拉取pull,故需要轮询Redis  List数据结构,这里使用AspNetCore内置的BackgroundService后台服务类后台轮询消费:

关注后台Job中的循环接收方法。


    public class BackgroundJob : BackgroundService
    {    
        private readonly IEqidPairHandler _eqidPairHandler;    
        private readonly CSRedisClient[] _cSRedisClients;    
        private readonly I   Configuration _conf;    
        private readonly ILogger _logger;    
        public BackgroundJob(IEqidPairHandler eqidPairHandler, CSRedisClient[] csRedisClients,IConfiguration conf,ILoggerFactory loggerFactory)    
        {        
        _eqidPairHandler = eqidPairHandler;        
        _cSRedisClients = csRedisClients;        
        _conf = conf;        
        _logger = loggerFactory.CreateLogger(nameof(BackgroundJob));    
      }
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)    
        {        
        _logger.LogInformation("Service starting");        
        if (_cSRedisClients[0] == null)        
        {            
          _cSRedisClients[0] = new CSRedisClient(_conf.GetConnectionString("redis") + ",defaultDatabase=" + 0);        
          }        
          RedisHelper.Initialization(_cSRedisClients[0]);
            while (!stoppingToken.IsCancellationRequested)        
            {           
            var key = $"eqidpair:{DateTime.Now.ToString("yyyyMMdd")}";           var eqidpair = RedisHelper.BRPop(5, key);           
            if (eqidpair != null)              
                await_ eqidPairHandler.AcceptEqidParamAsync(JsonConvert.DeserializeObject<EqidPair>(eqidpair));           // 强烈建议无论如何休眠一段时间,防止突发大流量导致WebApp进程CPU满载,自行根据场景设置合理休眠时间           await Task.Delay(10, stoppingToken);        
                }       
                _logger.LogInformation("Service stopping");    
         }
     }


    迭代验证


    使用docker-compose单机部署Nginx,ReceiverApp,WebApp容器。


    docker-compose up指令默认只会重建[Service配置或Image变更]的容器


    If there are existing containers for a service, and the service’s configuration or image was changed after the container’s creation, docker-compose up picks up the changes by stopping and recreating the containers (preserving mounted volumes). To prevent Compose from picking up changes, use the --no-recreate flag.


    做一次迭代验证,更新docke-compose.yml文件WebApp服务的镜像版本


    docker-compose up;


    下图显示仅 数据处理容器 WebApp被Recreate:


    c93e76a74477bdc21c01e8d2560a085c.png


    Nice,分布式改造完成,效果很明显,现在可以放心安全的迭代核心WebApp数据处理程序。

    相关实践学习
    基于Redis实现在线游戏积分排行榜
    本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
    云数据库 Redis 版使用教程
    云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
    相关文章
    |
    3月前
    |
    存储 缓存 NoSQL
    深入理解Django与Redis的集成实践
    深入理解Django与Redis的集成实践
    106 0
    |
    15天前
    |
    消息中间件 存储 监控
    活动实践 | 快速体验云消息队列RocketMQ版
    本方案介绍如何使用阿里云消息队列RocketMQ版Serverless实例进行消息管理。主要步骤包括获取接入点、创建Topic和订阅组、收发消息、查看消息轨迹及仪表盘监控。通过这些操作,用户可以轻松实现消息的全生命周期管理,确保消息收发的高效与可靠。此外,还提供了消费验证、下载消息等功能,方便用户进行详细的消息处理与调试。
    |
    4天前
    |
    缓存 NoSQL JavaScript
    Vue.js应用结合Redis数据库:实践与优化
    将Vue.js应用与Redis结合,可以实现高效的数据管理和快速响应的用户体验。通过合理的实践步骤和优化策略,可以充分发挥两者的优势,提高应用的性能和可靠性。希望本文能为您在实际开发中提供有价值的参考。
    31 11
    |
    1月前
    |
    消息中间件 Java 开发工具
    【实践】快速学会使用云消息队列RabbitMQ版
    本次分享的主题是快速学会使用云消息队列RabbitMQ版的实践。内容包括:如何创建和配置RabbitMQ实例,如Vhost、Exchange、Queue等;如何通过阿里云控制台管理静态用户名密码和AccessKey;以及如何使用RabbitMQ开源客户端进行消息生产和消费测试。最后介绍了实验资源的回收步骤,确保资源合理利用。通过详细的操作指南,帮助用户快速上手并掌握RabbitMQ的使用方法。
    109 10
    |
    2月前
    |
    NoSQL Java 数据处理
    基于Redis海量数据场景分布式ID架构实践
    【11月更文挑战第30天】在现代分布式系统中,生成全局唯一的ID是一个常见且重要的需求。在微服务架构中,各个服务可能需要生成唯一标识符,如用户ID、订单ID等。传统的自增ID已经无法满足在集群环境下保持唯一性的要求,而分布式ID解决方案能够确保即使在多个实例间也能生成全局唯一的标识符。本文将深入探讨如何利用Redis实现分布式ID生成,并通过Java语言展示多个示例,同时分析每个实践方案的优缺点。
    90 8
    |
    2月前
    |
    缓存 NoSQL Redis
    Redis 缓存使用的实践
    《Redis缓存最佳实践指南》涵盖缓存更新策略、缓存击穿防护、大key处理和性能优化。包括Cache Aside Pattern、Write Through、分布式锁、大key拆分和批量操作等技术,帮助你在项目中高效使用Redis缓存。
    444 22
    |
    3月前
    |
    消息中间件 安全 Java
    云消息队列RabbitMQ实践解决方案评测
    一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
    123 10
    |
    3月前
    |
    NoSQL 关系型数据库 MySQL
    MySQL与Redis协同作战:百万级数据统计优化实践
    【10月更文挑战第21天】 在处理大规模数据集时,传统的单体数据库解决方案往往力不从心。MySQL和Redis的组合提供了一种高效的解决方案,通过将数据库操作与高速缓存相结合,可以显著提升数据处理的性能。本文将分享一次实际的优化案例,探讨如何利用MySQL和Redis共同实现百万级数据统计的优化。
    166 9
    |
    3月前
    |
    消息中间件
    解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
    云消息队列RabbitMQ实践获奖名单公布!
    |
    3月前
    |
    消息中间件 存储 弹性计算
    云消息队列RabbitMQ实践
    云消息队列RabbitMQ实践