RabbitMQ~消费者实时与消息服务器保持通话

简介:

这个文章主要介绍简单的消费者的实现,rabbitMQ实现的消费者可以对消息服务器进行实时监听,当有消息(生产者把消息推到服务器上之后),消费者可以自动去消费它,这通常是开启一个进程去维护这个对话,它与消息服务器保持一个TCP的长连接,整个这个过程于rabbitMQ为我们提供,程序开发人员只需要实现自己的回调方法即可.

简单的rabbitMQ消费者

    /// <summary>
    /// 消息消费者
    /// </summary>
    public class RabbitMqSubscriber : Lind.DDD.Commons.DisposableBase
    {
        private readonly string exchangeName;
        private readonly string queueName;
        private readonly IConnection connection;
        private readonly IModel channel;
        private bool disposed;

        /// <summary>
        /// 从消息服务器拉到消息后触发
        /// </summary>
        public event EventHandler<MessageReceivedEventArgs> MessageReceived;

        /// <summary>
        /// Initializes a new instance of <c>RabbitMqMessageSubscriber</c> class.
        /// </summary>
        /// <param name="uri"></param>
        /// <param name="exchangeName"></param>
        /// <param name="queueName"></param>
        public RabbitMqSubscriber(string uri, string queueName, string userName = "", string password = "")
        {
            this.exchangeName = exchangeName;
            this.queueName = queueName;
            var factory = new ConnectionFactory() { Uri = uri };
            if (!string.IsNullOrWhiteSpace(userName))
                factory.UserName = userName;
            if (!string.IsNullOrWhiteSpace(password))
                factory.Password = password;
            this.connection = factory.CreateConnection();
            this.channel = connection.CreateModel();
        }

        public void Subscribe()
        {
            channel.QueueDeclare(
                queue: this.queueName, 
                durable: false,//持久化
                exclusive: false, //独占,只能被一个consumer使用
                autoDelete: false,//自己删除,在最后一个consumer完成后删除它
                arguments: null);
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (sender, e) =>
            {
                var body = e.Body;
                var json = Encoding.UTF8.GetString(body);
                var message = JsonConvert.DeserializeObject(json, new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All });
                this.OnMessageReceived(new MessageReceivedEventArgs(message));
                channel.BasicAck(e.DeliveryTag, multiple: false);
            };
            channel.BasicConsume(queue: queueName,
                                 noAck: false,
                                 consumer: consumer);
        }

        private void OnMessageReceived(MessageReceivedEventArgs e)
        {
            this.MessageReceived?.Invoke(this, e);
        }

        protected override void Finalize(bool disposing)
        {
            if (disposing)
            {
                if (!disposed)
                {
                    this.channel.Dispose();
                    this.connection.Dispose();
                    disposed = true;
                }
            }
        }
    }

简单调用

   class Program
    {
        static void Main(string[] args)
        {
            var subscriber = new Lind.DDD.RabbitMq.RabbitMqSubscriber("amqp://localhost:5672", "zzl");
            subscriber.MessageReceived += Subscriber_MessageReceived;
            subscriber.Subscribe();
            Console.ReadKey();
        }

        private static void Subscriber_MessageReceived(object sender, RabbitMq.MessageReceivedEventArgs e)
        {
            Console.WriteLine("消费者2->消费了一个消息{0}", e.Message);
            Lind.DDD.Logger.LoggerFactory.Instance.Logger_Debug("消费者2->消费了一个消息{0}" + e.Message);
            Thread.Sleep(2000);
        }

    }

实时拉消息

RabbitMQ消息模型

通过上面图我们可以更容易和清晰的去理解rabbitmq的工作流程.

本文转自博客园张占岭(仓储大叔)的博客,原文链接:RabbitMQ~消费者实时与消息服务器保持通话,如需转载请自行联系原博主。

相关实践学习
消息队列+Serverless+Tablestore:实现高弹性的电商订单系统
基于消息队列以及函数计算,快速部署一个高弹性的商品订单系统,能够应对抢购场景下的高并发情况。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
4月前
|
传感器 网络协议 物联网
手把手教你在 Windows 环境中搭建 MQTT 服务器
手把手教你在 Windows 环境中搭建 MQTT 服务器
325 0
|
5月前
|
消息中间件 Java 调度
消息队列 MQ使用问题之消费者自动掉线是什么导致的
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
4月前
|
消息中间件 存储 负载均衡
我服了,RocketMQ消费者负载均衡内核是这样设计的
文章为理解RocketMQ的负载均衡机制提供了深入的技术洞察,并对如何在实际应用中扩展和定制负载均衡策略提供了有价值的见解。
我服了,RocketMQ消费者负载均衡内核是这样设计的
|
4月前
|
消息中间件 存储 负载均衡
RocketMQ消费者消费消息核心原理(含长轮询机制)
这篇文章深入探讨了Apache RocketMQ消息队列中消费者消费消息的核心原理,特别是长轮询机制。文章从消费者和Broker的交互流程出发,详细分析了Push和Pull两种消费模式的内部实现,以及它们是如何通过长轮询机制来优化消息消费的效率。文章还对RocketMQ的消费者启动流程、消息拉取请求的发起、Broker端处理消息拉取请求的流程进行了深入的源码分析,并总结了RocketMQ在设计上的优点,如单一职责化和线程池的使用等。
RocketMQ消费者消费消息核心原理(含长轮询机制)
|
4月前
|
消息中间件 缓存 Java
RocketMQ - 消费者消费方式
RocketMQ - 消费者消费方式
112 0
|
4月前
|
消息中间件 RocketMQ
RocketMQ - 消费者进度保存机制
RocketMQ - 消费者进度保存机制
78 0
|
4月前
|
消息中间件 RocketMQ
RocketMQ - 消费者Rebalance机制
RocketMQ - 消费者Rebalance机制
67 0
|
4月前
|
消息中间件 存储 缓存
RocketMQ - 消费者启动机制
RocketMQ - 消费者启动机制
55 0
|
4月前
|
消息中间件 存储 缓存
RocketMQ - 消费者概述
RocketMQ - 消费者概述
73 0
|
5月前
|
消息中间件 负载均衡 算法
【RocketMQ系列十二】RocketMQ集群核心概念之主从复制&生产者负载均衡策略&消费者负载均衡策略
【RocketMQ系列十二】RocketMQ集群核心概念之主从复制&生产者负载均衡策略&消费者负载均衡策略
142 2