MQ单一消息完整流程

简介: public class QueueManger { private static string QueuePath = @".\private$\{0}"; /// /// 创建MSMQ队列 ...
public class QueueManger
    {
        private static string QueuePath = @".\private$\{0}";
            
        /// <summary>
        /// 创建MSMQ队列
        /// </summary>
        /// <param name="queueName">队列路径</param>
        /// <param name="transactional">是否事务队列</param>
        public static void Createqueue(string queueName, bool transactional = false)
        {
            try
            {
                QueuePath = string.Format(QueuePath, queueName);
                //判断队列是否存在
                if (!MessageQueue.Exists(QueuePath))
                {
                    MessageQueue.Create(QueuePath);
                    LoggerFile.Write(QueuePath + "已成功创建!"); 
                }
                else
                {
                    LoggerFile.Write(QueuePath + "已经存在!"); 
                }
            }
            catch (MessageQueueException e)
            {
                LoggerFile.Write(e.Message); 
            }
        }
        /// <summary>
        /// 删除队列
        /// </summary>
        /// <param name="queueName"></param>
        public static void Deletequeue(string queueName)
        {
            try
            {
                QueuePath = string.Format(QueuePath, queueName);
                //判断队列是否存在
                if (MessageQueue.Exists(QueuePath))
                {
                    MessageQueue.Delete(QueuePath);
                    LoggerFile.Write(QueuePath + "已删除!");
                }
                else
                {
                    LoggerFile.Write(QueuePath + "不存在!");
                }
            }
            catch (MessageQueueException e)
            {
                LoggerFile.Write(e.Message);
            }
        }
        /// <summary>
        /// 发送消息
        /// </summary>
        /// <typeparam name="T">用户数据类型</typeparam>
        /// <param name="target">用户数据</param>
        /// <param name="queueName">队列名称</param>
        /// <param name="tran"></param>
        /// <returns></returns>
        public static bool SendMessage<T>(T target, string queueName, MessageQueueTransaction tran = null)
        {
            try
            {
                QueuePath = string.Format(QueuePath, queueName);
                //连接到本地的队列
                MessageQueue myQueue = new MessageQueue(QueuePath);
                System.Messaging.Message myMessage = new System.Messaging.Message();
                myMessage.Body = target;
                myMessage.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
                //发送消息到队列中
                if (tran == null)
                {
                    myQueue.Send(myMessage);
                }
                else
                {
                    myQueue.Send(myMessage, tran);
                }
                LoggerFile.Write("消息已成功发送到" + queueName + "队列!");
                return true;
            }
            catch (ArgumentException e)
            {
                LoggerFile.Write(e.Message);
                return false;
            }
        }
        /// <summary>
        /// 接收消息
        /// </summary>
        /// <typeparam name="T">用户的数据类型</typeparam>
        /// <param name="queueName">消息路径</param>
        /// <returns>用户填充在消息当中的数据</returns>
        public static T ReceiveMessage<T>(string queueName, MessageQueueTransaction tran = null)
        {
            QueuePath = string.Format(QueuePath, queueName);
            //连接到本地队列
            MessageQueue myQueue = new MessageQueue(QueuePath);
            myQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
            try
            {
                //从队列中接收消息
                System.Messaging.Message myMessage = tran == null ? myQueue.Receive() : myQueue.Receive(tran);
                return (T)myMessage.Body; //获取消息的内容
            }
            catch (MessageQueueException e)
            {
                LoggerFile.Write(e.Message);
            }
            catch (InvalidCastException e)
            {
                LoggerFile.Write(e.Message);
            }
            return default(T);
        }
        /// <summary>
        /// 采用Peek方法接收消息
        /// </summary>
        /// <typeparam name="T">用户数据类型</typeparam>
        /// <param name="queueName">队列路径</param>
        /// <returns>用户数据</returns>
        public static T ReceiveMessageByPeek<T>(string queueName)
        {
            QueuePath = string.Format(QueuePath, queueName);
            //连接到本地队列
            MessageQueue myQueue = new MessageQueue(QueuePath);
            myQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
            try
            {
                //从队列中接收消息
                System.Messaging.Message myMessage = myQueue.Peek();
                return (T)myMessage.Body; //获取消息的内容
            }
            catch (MessageQueueException e)
            {
                LoggerFile.Write(e.Message);
            }
            catch (InvalidCastException e)
            {
                LoggerFile.Write(e.Message);
            }
            return default(T);
        }
        /// <summary>
        /// 获取队列中的所有消息
        /// </summary>
        /// <typeparam name="T">用户数据类型</typeparam>
        /// <param name="queueName">队列路径</param>
        /// <returns>用户数据集合</returns>
        public static List<T> GetAllMessage<T>(string queueName)
        {
            QueuePath = string.Format(QueuePath, queueName);
            MessageQueue myQueue = new MessageQueue(QueuePath);
            myQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(T) });
            try
            {
                Message[] msgArr = myQueue.GetAllMessages();
                List<T> list = new List<T>();
                msgArr.ToList().ForEach((o) =>
                {
                    list.Add((T)o.Body);
                });
                return list;
            }
            catch (Exception e)
            {
                LoggerFile.Write(e.Message);
            }
            return null;
        }
    }

  

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 RocketMQ
这个RocketMQ节点似乎是在正常工作,但是它不能接收或者处理消息
这个RocketMQ节点似乎是在正常工作,但是它不能接收或者处理消息
200 0
|
6月前
|
消息中间件 存储 安全
01为什么需要MQ及其好处
01为什么需要MQ及其好处
30 0
|
4月前
|
消息中间件 存储 RocketMQ
大白话-设计RocketMQ延迟消息
RocketMQ的延迟消息使用上非常便捷,但是不支持任意时间的延迟,这一点对于有强迫症的朋友来说就比较难受,但是搞明白为什么这么设计后,就自然释怀了。
|
消息中间件 存储 运维
RocketMQ 消息集成:多类型业务消息-普通消息
本篇将从业务集成场景的诉求开始,介绍 RocketMQ 作为业务消息集成方案的核心能力和优势,通过功能场景、应用案例以及最佳实践等角度介绍 RocketMQ 普通消息类型的使用。
224 0
RocketMQ  消息集成:多类型业务消息-普通消息
|
消息中间件 存储 算法
RocketMQ 消息集成:多类型业务消息——定时消息
本篇将继续业务消息集成的场景,从使用场景、应用案例、功能原理以及最佳实践等角度介绍 RocketMQ 的定时消息功能。
427 0
RocketMQ  消息集成:多类型业务消息——定时消息
|
消息中间件 存储 Apache
解析 RocketMQ 业务消息--“顺序消息”
本篇将继续业务消息集成的场景,从功能原理、应用案例、最佳实践以及实战等角度介绍 RocketMQ 的顺序消息功能。
238 0
解析 RocketMQ  业务消息--“顺序消息”
|
存储 消息中间件 RocketMQ
RocketMQ学习Broker流程、生产者和存储流程联系
放入消息之后,进行操作体现在asyncSendMessage中。将消息以异步方式存储到存储器中,处理器可以处理下一个请求,而不是在结果完成后等待结果,以异步方式通知客户端。此时可以看到asyncPutMessage的操作中会进入到CommitLog中,此时进行提交日志操作,此时会执行写入到ByteBuffer中,然后刷盘到硬盘中。同时执行统计操作,进行HA同步。
119 0
RocketMQ学习Broker流程、生产者和存储流程联系
|
消息中间件 缓存 数据库
4 张图,9 个维度告诉你怎么做能确保 RocketMQ 不丢失消息
4 张图,9 个维度告诉你怎么做能确保 RocketMQ 不丢失消息
382 0
4 张图,9 个维度告诉你怎么做能确保 RocketMQ 不丢失消息
|
消息中间件 运维 监控
多类型业务消息专题-普通消息 | 学习笔记(一)
快速学习多类型业务消息专题-普通消息
139 0
 多类型业务消息专题-普通消息 | 学习笔记(一)
|
消息中间件 存储 运维
多类型业务消息专题-普通消息 | 学习笔记(二)
快速学习多类型业务消息专题-普通消息
95 0
多类型业务消息专题-普通消息 | 学习笔记(二)