.net平台的rabbitmq使用封装(二)

简介: .net平台的rabbitmq使用封装(二)

Pull(拉)的封装


直接上代码:


/// <summary>
        /// 获取消息
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="routingKey"></param>
        /// <param name="handler">消费处理</param>
        private void Poll<T>(string exchange, string queue, string routingKey, Action<T> handler) where T : class
        {
            var channel = GetModel(exchange, queue, routingKey);
            var result = channel.BasicGet(queue, false);
            if (result.IsNull())
                return;
            var msg = result.Body.DeserializeUtf8().FromJson<T>();
            try
            {
                handler(msg);
            }
            catch (Exception ex)
            {
                ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq");
            }
            finally
            {
                channel.BasicAck(result.DeliveryTag, false);
            }
        }


image.png

快的时候有1.8K/s,稳定是1.5K/S

 

Rpc(远程调用)的封装


首先说明下,RabbitMq只是提供了这个RPC的功能,但是并不是真正的RPC,为什么这么说:


1、传统Rpc隐藏了调用细节,像调用本地方法一样传参、抛出异常

2、RabbitMq的Rpc是基于消息的,消费者消费后,通过新队列返回响应结果。


/// <summary>
        /// RPC客户端
        /// </summary>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="routingKey"></param>
        /// <param name="body"></param>
        /// <param name="isProperties"></param>
        /// <returns></returns>
        public string RpcClient(string exchange, string queue, string routingKey, string body, bool isProperties = false)
        {
            var channel = GetModel(exchange, queue, routingKey, isProperties);
            var consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume(queue, true, consumer);
            try
            {
                var correlationId = Guid.NewGuid().ToString();
                var basicProperties = channel.CreateBasicProperties();
                basicProperties.ReplyTo = queue;
                basicProperties.CorrelationId = correlationId;
                channel.BasicPublish(exchange, routingKey, basicProperties, body.SerializeUtf8());
                var sw = Stopwatch.StartNew();
                while (true)
                {
                    var ea = consumer.Queue.Dequeue();
                    if (ea.BasicProperties.CorrelationId == correlationId)
                    {
                        return ea.Body.DeserializeUtf8();
                    }
                    if (sw.ElapsedMilliseconds > 30000)
                        throw new Exception("等待响应超时");
                }
            }
            catch (Exception ex)
            {
                throw ex.GetInnestException();
            }
        }    
        /// <summary>
        /// RPC服务端
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="isProperties"></param>
        /// <param name="handler"></param>
        /// <param name="isDeadLetter"></param>
        public void RpcService<T>(string exchange, string queue, bool isProperties, Func<T, T> handler, bool isDeadLetter)
        {
            //队列声明
            var channel = GetModel(queue, isProperties);
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var msgStr = body.DeserializeUtf8();
                var msg = msgStr.FromJson<T>();
                var props = ea.BasicProperties;
                var replyProps = channel.CreateBasicProperties();
                replyProps.CorrelationId = props.CorrelationId;
                try
                {
                    msg = handler(msg);
                }
                catch (Exception ex)
                {
                    ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq");
                }
                finally
                {
                    channel.BasicPublish(exchange, props.ReplyTo, replyProps, msg.ToJson().SerializeUtf8());
                    channel.BasicAck(ea.DeliveryTag, false);
                }
            };
            channel.BasicConsume(queue, false, consumer);
        }


可以用,但不建议去用。可以考虑其他的RPC框架。grpc、thrift等。


结尾


本篇文章,没有过多的写RabbitMq的知识点,因为园子的学习笔记实在太多了。


下面把我的代码奉上 https://github.com/SkyChenSky/Sikiro.Mq.Rabbit。如果有发现写得不对的地方麻烦在评论指出,我会及时修改以免误导别人。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
7天前
|
存储 NoSQL MongoDB
.NET MongoDB数据仓储和工作单元模式封装
.NET MongoDB数据仓储和工作单元模式封装
41 15
|
5天前
|
Linux API C#
基于 .NET 开发的多功能流媒体管理控制平台
基于 .NET 开发的多功能流媒体管理控制平台
|
30天前
|
消息中间件 运维 安全
C5GAME 游戏饰品交易平台借助 RocketMQ Serverless 保障千万级玩家流畅体验
游戏行业蓬勃发展,作为国内领先的 STEAM 游戏饰品交易的服务平台,看 C5GAME 如何利用 RocketMQ Serverless 技术,为千万级玩家提供流畅的游戏体验,同时降低成本并提升运维效率。
107 10
C5GAME 游戏饰品交易平台借助 RocketMQ Serverless 保障千万级玩家流畅体验
|
24天前
|
消息中间件 存储 JSON
Net使用EasyNetQ简化与RabbitMQ的交互
EasyNetQ是专为.NET环境设计的RabbitMQ客户端API,简化了与RabbitMQ的交互过程。通过NuGet安装EasyNetQ,可轻松实现消息的发布与订阅,支持多种消息模式及高级特性。文中提供了详细的安装步骤、代码示例及基础知识介绍,帮助开发者快速上手。关注公众号“Net分享”获取更多技术文章。
34 1
Net使用EasyNetQ简化与RabbitMQ的交互
|
10天前
|
JSON 数据格式
.net HTTP请求类封装
`HttpRequestHelper` 是一个用于简化 HTTP 请求的辅助类,支持发送 GET 和 POST 请求。它使用 `HttpClient` 发起请求,并通过 `Newtonsoft.Json` 处理 JSON 数据。示例展示了如何使用该类发送请求并处理响应。注意事项包括:简单的错误处理、需安装 `Newtonsoft.Json` 依赖,以及建议重用 `HttpClient` 实例以优化性能。
54 2
|
2月前
|
机器学习/深度学习 人工智能 Cloud Native
在数字化时代,.NET 技术凭借其跨平台兼容性、丰富的类库和工具集以及卓越的性能与效率,成为软件开发的重要平台
在数字化时代,.NET 技术凭借其跨平台兼容性、丰富的类库和工具集以及卓越的性能与效率,成为软件开发的重要平台。本文深入解析 .NET 的核心优势,探讨其在企业级应用、Web 开发及移动应用等领域的应用案例,并展望未来在人工智能、云原生等方面的发展趋势。
45 3
|
6月前
|
消息中间件 Java 测试技术
消息队列 MQ使用问题之数据流出规则是否支持平台的云RabbitMQ
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
存储 设计模式 编解码
.NET 8.0 通用管理平台,支持模块化、WinForms 和 WPF
【11月更文挑战第5天】本文分析了.NET 8.0 通用管理平台在模块化、WinForms 和 WPF 方面的优势。模块化设计提升了系统的可维护性和可扩展性,提高了代码复用性;WinForms 提供了丰富的控件库和简单易用的开发模式,技术成熟稳定;WPF 支持强大的数据绑定和 MVVM 模式,具备丰富的图形和动画功能,以及灵活的布局系统。
EMQ
|
6月前
|
物联网 Linux C语言
在 Windows 平台搭建 MQTT 服务
NanoMQ 有着强大的跨平台和可兼容能力,不仅可以用于以 Linux 为基础的各类平台,也为 Windows 平台提供了 MQTT 服务的新选择。
EMQ
125 9
在 Windows 平台搭建 MQTT 服务
|
5月前
|
缓存 程序员
封装一个给 .NET Framework 用的内存缓存帮助类
封装一个给 .NET Framework 用的内存缓存帮助类