队列工厂之RabbitMQ

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介:

本次和大家分享的是RabbitMQ队列的用法,前一篇文章队列工厂之(MSMQ)中在描述的时候已经搭建了简单工厂,因此本章内容是在其之上扩充的子项不再过多讲解工厂的代码了;RabbitMQ应该是现在互联网公司消息队列用的最多的一种之一吧,看看招聘基本都会有这个单词的出现,她相比前一篇分享的MSMQ来说配置更多样化,安装步骤数两者都差不多吧,最大差别MSMQ是windows捆绑的服务几乎只能在windows上使用,而Rabbit现目前运行支持的系统比较多;在写队列工厂第二篇文章的时候,其实代码已经都完成了目前队列工厂包括有如下队列(msmq,redis,rabbitmq),你可以去下载源码和测试用例:QueueReposity-队列工厂;希望大家能够喜欢,也希望各位多多"扫码支持"和"推荐"谢谢!

 

» RabbitMQ安装和控制台

» 封装RabbitMQ队列的读和写

» 队列工厂之RabbitMQ测试用例

 

下面一步一个脚印的来分享:

» RabbitMQ安装和控制台

要说RabbitMQ的安装,首先我们要下载对应服务器操作系统的RabbitMQ安装文件,因为她有对应不同操作系统的安装版本,这点需要注意;我本地电脑系统是win7(属于windows)所以去官网下载安装包:https://www.rabbitmq.com/,进入官网后选择“Installation”,能看到很多安装版本的下载源,这里可选择从Downloads on rabbitmq.com下载,点击“windows”即可下载:

目前最新版本地址:

https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitmq-server-3.6.6.exe;

通常我都是进入这个界面的Installation Guides节点后-》With installer (recommended),这个时候进入的是windows系统所需的帮助文档吧,同样可以选择版本下载;进入该界面的主要目的是需要下载一个Erlang Windows的安装文件(更深层原因:RabbitMq运行依赖于Erlang语言),点击Erlang Windows Binary File进入下载界面,然后选择您操作系统对应的版本,如果您也是windows64位的可以直接用这个地址下载:

http://erlang.org/download/otp_win64_19.2.exe

此刻两个必须的东西已经下载完成,先安装erlang语言的exe,再安装rabbitmq-server-3.6.6.exe;有刚开始接触RabbitMQ的朋友会问为什么需要Erlang的支持,因为她就是Erlang开发出来的,Erlang语言是一种通用的面向并发的编程语言,专门用来编写分布式的一种语言;当你安装前面说的那个erlang安装包后,您电脑开始菜单中就有Erlang开发编辑器,有时间您可以用来练练手,就目前而言这种语言单词一般出现在一流大公司的招聘中,中小型一般没有,可能也因为很少中小型公司会涉及到并发的原因吧;到这里安装就完成了,下面需要通过命令行执行一些指令,由于RabbitMQ配置很多这里我捡一定会用到的几个来示范,其他具体可以参考官方文档:

首先找到安装rabbitmq的目录并进入rabbitmq_server-3.6.6找到sbin文件夹-》按住Shift+鼠标右键sbin文件夹-》在此处打开命令窗体-》参考这个地址https://www.rabbitmq.com/management.html的命令:rabbitmq-plugins enable rabbitmq_management -》录入到刚才打开的cmd窗体中:

这个是开启rabbitmq管理器的指令,这个时候你可以在你浏览器中录入http://localhost:15672/ 通过游客账号进入rabbitmq的监控后台:

url:http://localhost:15672/

Username:guest

Password:guest

此刻如果你看到如下图的界面,那恭喜你成功了,搭建RabbitMQ服务成功了:

因为Rabbit不光有队列,还有其他的路由,交换机等功能,所以能看到很多的统计或描述,这里我们只用到Queues的选项,点击进入Queues界面能看到没有任何的数据,但是有一个Add queue的按钮,这个控制台允许你手动添加一个队列数据,当然这不是我们今天的话题:

上面的guest已经够咋们测试使用了,至于剩余的什么管理员账号或密码等操作的设置可以去看这个:

rabbitmqctl操作的文档:https://www.rabbitmq.com/man/rabbitmqctl.1.man.html#

plugins操作文档:https://www.rabbitmq.com/plugins.html

 

» 封装RabbitMQ队列的读和写

在C#中运用RabbitMQ官网列举了几种方式,这里我选择直接使用其提供的RabbitMQ.Client的nuget包,就目前这个nuget而言4.0.0及以上版本必须要NETFramework 4.5.1及以上版本或netcore版本才允许使用,笔者这里用的是Framework4.5框架所以引用了此版本的nuget包:

Install-Package RabbitMQ.Client -Version 3.6.6

引用过后就是往前面讲的队列工厂填写代码,首先继承统一配置文件读取类 PublicClass.ConfClass<QRabbitMQ> ,然后实现 IQueue 接口,这里封装了RabbitMq常用的几个操作方法,具体代码:

复制代码
复制代码
 1  /// <summary>
 2     /// RabbitMq
 3     /// </summary>
 4     public class QRabbitMQ : PublicClass.ConfClass<QRabbitMQ>, IQueue
 5     {
 6         private IConnection con = null;
 7 
 8         public void Create()
 9         {
10             if (string.IsNullOrWhiteSpace(this.ApiUrl) || string.IsNullOrWhiteSpace(this.ApiKey)) { throw new Exception("创建RabbitMq队列需要指定队列:HostName和Port"); }
11 
12             try
13             {
14                 var factory = new ConnectionFactory() { HostName = this.ApiUrl, Port = Convert.ToInt32(this.ApiKey) };
15                 con = con ?? factory.CreateConnection();
16             }
17             catch (Exception ex)
18             {
19                 throw new Exception(ex.Message);
20             }
21         }
22 
23         public long Total(string name = "Redis_01")
24         {
25             if (con == null) { throw new Exception("请先创建队列连接"); }
26             using (var channel = con.CreateModel())
27             {
28                 return channel.MessageCount(name);
29             }
30         }
31 
32         public Message Read(string name = "RabbitMQ_01")
33         {
34             if (con == null) { throw new Exception("请先创建队列连接"); }
35             if (string.IsNullOrWhiteSpace(name)) { throw new Exception("name不能为空"); }
36 
37             var message = new Message();
38             message.Label = name;
39             message.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
40             using (var channel = con.CreateModel())
41             {
42                 var baseResult = channel.BasicGet(name, true); //true:获取后删除队列   false:不删除  
43                 if (baseResult == null) { return message; }
44                 var body = baseResult.Body;
45                 message.Body = Encoding.UTF8.GetString(body); 
46             }
47             return message;
48         }
49 
50         public bool Write(string content, string name = "RabbitMQ_Queue01")
51         {
52             if (con == null) { throw new Exception("请先创建队列连接"); }
53             if (string.IsNullOrWhiteSpace(content) || string.IsNullOrWhiteSpace(name)) { throw new Exception("content和name不能为空"); }
54 
55             using (var channel = con.CreateModel())
56             {
57                 channel.QueueDeclare(name, false, false, false, null);
58                 var body = Encoding.UTF8.GetBytes(content);
59 
60                 channel.BasicPublish(string.Empty, name, null, body);
61                 return true;
62             }
63         }
64 
65         public void Dispose()
66         {
67             if (con != null)
68             {
69                 con.Close();
70                 con.Dispose();
71                 con = null;
72             }
73         }
74     }
复制代码
复制代码

代码主要使用流程是:创建(Create)-》读(Read)|写(Write)-》释放(Dispose);有了具体的RabbitMq实现类,那么在工厂中直接通过泛型映射来获取该实现类的对象:

复制代码
复制代码
 1 /// <summary>
 2     /// ==================
 3     /// author:神牛步行3
 4     /// des:该列工厂开源,包括队列有MSMQ,RedisMQ,RabbitMQ
 5     /// blogs:http://www.cnblogs.com/wangrudong003/
 6     /// ==================
 7     /// 队列工厂
 8     /// </summary>
 9     public class QueueReposity<T> where T : class,IQueue, new()
10     {
11         public static IQueue Current
12         {
13             get
14             {
15                 return PublicClass.ConfClass<T>.Current;
16             }
17         }
18     }
复制代码
复制代码

 

» 队列工厂之RabbitMQ测试用例

通过上面配置环境和封装自己的方法,这里写了一个简单的测试用例,分为Server(加入消息队列)和Client(获取消息队列),首先来看Server端的代码:

复制代码
复制代码
 1  /// <summary>
 2     /// 队列服务端测试用例
 3     /// </summary>
 4     class Program
 5     {
 6         static void Main(string[] args)
 7         {
 8             //Redis_Server();
 9 
10             RabbitMQ_Server();
11 
12             //MSMQ_Server();
13         }
14         private static void RabbitMQ_Server()
15         {
16             //实例化QMsmq对象
17             var mq = QueueReposity<QRabbitMQ>.Current;
18 
19             try
20             {
21                 Console.WriteLine("Server端创建:RabbitMQ实例");
22                 mq.Create();
23 
24                 var num = 0;
25                 do
26                 {
27                     Console.WriteLine("输入循环数量(数字,0表示结束):");
28                     var readStr = Console.ReadLine();
29                     num = string.IsNullOrWhiteSpace(readStr) ? 0 : Convert.ToInt32(readStr);
30 
31                     Console.WriteLine("插入数据:");
32                     for (int i = 0; i < num; i++)
33                     {
34                         var str = "我的编号是:" + i;
35                         mq.Write(str);
36                         Console.WriteLine(str);
37                     }
38                 } while (num > 0);
39             }
40             catch (Exception ex)
41             {
42             }
43             finally
44             {
45                 Console.WriteLine("释放。");
46                 mq.Dispose();
47             }
48             Console.ReadLine();
49         }
50     }
复制代码
复制代码

通过:创建(Create)-》读(Read)|写(Write)-》释放(Dispose) 的流程来使用我们的队列工厂,感觉挺简单的,此时我们运行下这个Server端,然后录入参数:

这个时候就往RabbitMq队列中加入了11条数据,我们通过她的后台去找刚才添加的队列:

能够看到我们刚刚插入的队列总数和名称,如果你想看里面具体内容,可以点击名字“mq_01”进入某一个队列的界面,往下面拉滚动条找到“Get messages”选项,默认查看Messages是1我们修改为10,再点击get messages就能够看到如下图我们刚才插入的具体内容了:

截图有点长哦,不知道dudu会不会怪我哈哈,到这里能看到队列插入是成功的,然后我们来通过client端消费队列,具体代码:

复制代码
复制代码
 1  /// <summary>
 2     /// 队列客户端测试用例
 3     /// </summary>
 4     class Program
 5     {
 6         static void Main(string[] args)
 7         {
 8             //RedisMQ_Client();
 9 
10             RabbitMQ_Client();
11 
12             //MSMQ_Client();
13         }
14 
15         private static void RabbitMQ_Client()
16         {
17             //实例化QMsmq对象
18             var mq = QueueReposity<QRabbitMQ>.Current;
19             try
20             {
21                 Console.WriteLine("Client端创建:RabbitMQ实例");
22                 mq.Create();
23 
24                 while (true)
25                 {
26                     try
27                     {
28                         var total = mq.Total();
29                         if (total > 0) { Console.WriteLine("队列条数:" + total); }
30 
31                         var result = mq.Read();
32                         if (result.Body == null) { continue; }
33                         Console.WriteLine(string.Format("接受队列{0}:{1}", result.Label, result.Body));
34                     }
35                     catch (Exception ex)
36                     { Console.WriteLine("异常信息:" + ex.Message); }
37                 }
38             }
39             catch (Exception ex)
40             {
41                 throw ex;
42             }
43             finally
44             {
45                 Console.WriteLine("释放。");
46                 mq.Dispose();
47             }
48         }
49     }
复制代码
复制代码

再来咋们运行exe看下效果:

此刻刚刚加入队列中的数据就读取出来了,这个时候我们再看Rabbitmq控制台,get messages已经获取不出来具体的内容信息了,因为这个客户端消费了数据,队列中的数据自动清除了,至于是否想清除数据这个设置在代码:

1 var baseResult = channel.BasicGet(name, true); //true:获取后删除队列   false:不删除  

以上对封装RabbitMQ的代码分享和环境搭建讲解,希望能给您带来好的帮助,谢谢阅读;

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
消息中间件 存储 监控
RabbitMQ 队列之战:Classic 和 Quorum 的性能洞察
RabbitMQ 是一个功能强大的消息代理,用于分布式应用程序间的通信。它通过队列临时存储消息,支持异步通信和解耦。经典队列适合高吞吐量和低延迟场景,而仲裁队列则提供高可用性和容错能力,适用于关键任务系统。选择哪种队列取决于性能、持久性和容错性的需求。
188 6
|
3月前
|
消息中间件 JSON Java
|
3月前
|
消息中间件
rabbitmq,&队列
rabbitmq,&队列
|
3月前
|
消息中间件 JSON Java
玩转RabbitMQ声明队列交换机、消息转换器
玩转RabbitMQ声明队列交换机、消息转换器
99 0
|
4月前
|
消息中间件 存储 NoSQL
MQ的顺序性保证:顺序队列、消息编号、分布式锁,一文全掌握!
【8月更文挑战第24天】消息队列(MQ)是分布式系统的关键组件,用于实现系统解耦、提升可扩展性和可用性。保证消息顺序性是其重要挑战之一。本文介绍三种常用策略:顺序队列、消息编号与分布式锁,通过示例展示如何确保消息按需排序。这些方法各有优势,可根据实际场景灵活选用。提供的Java示例有助于加深理解与实践应用。
116 2
|
5月前
|
消息中间件 RocketMQ
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
MetaQ/RocketMQ 原理问题之当消费集群规模较大时,处理分配不到队列的Consumer的问题如何解决
|
5月前
|
消息中间件 Java Kafka
说说RabbitMQ延迟队列实现原理?
说说RabbitMQ延迟队列实现原理?
74 0
说说RabbitMQ延迟队列实现原理?
|
5月前
|
消息中间件 NoSQL 关系型数据库
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
【RocketMQ系列十三】RocketMQ的集群核心概念之消费重试&死信队列&幂等消息的出现以及处理
148 1
|
6月前
|
消息中间件
RabbitMQ配置单活模式队列
RabbitMQ配置单活模式队列
157 0
|
6月前
|
消息中间件 数据库
03.RabbitMQ延迟队列
03.RabbitMQ延迟队列
50 0