一起谈.NET技术,NET下RabbitMQ实践 [示例篇]

简介: 在上一篇文章中,介绍了在window环境下安装erlang,rabbitmq-server,以免配置用户,权限,虚拟机等内容。今天将会介绍如果使用rabbitmq进行简单的消息入队,出队操作,因为本文演示的环境要用到上文中配置的环境,所以要运行本文sample,请先按上一篇中完成相应环境配置。

      在上一篇文章中,介绍了在window环境下安装erlang,rabbitmq-server,以免配置用户,权限,虚拟机等内容。今天将会介绍如果使用rabbitmq进行简单的消息入队,出队操作,因为本文演示的环境要用到上文中配置的环境,所以要运行本文sample,请先按上一篇中完成相应环境配置。  
      首先,我们下载官方的.net客户端软件,链接:http://www.rabbitmq.com/dotnet.html。下载并安装之后,将安装目录下的这两个DLL文件复制到我们示例项目中,并添加引用:

RabbitMQ.Client.dll //基于的发布订阅消息的功能类   
RabbitMQ.ServiceModel.dll //包括基于WCF方式发布订阅服务模型类

       如下图:          接着,我们创建两个类,一个是ProducerMQ.cs(用于产生消息),一个是CustmerMq.cs(用于消费消息),代码如下:  
       首先是ProducerMQ:

public   class  ProducerMQ
{
    
public   static    void  InitProducerMQ()
    {
        Uri uri 
=   new  Uri( " amqp://10.0.4.85:5672/ " );
        
string  exchange  =   " ex1 " ;
        
string  exchangeType  =   " direct " ;
        
string  routingKey  =   " m1 " ;
        
bool  persistMode  =   true ;
        ConnectionFactory cf 
=   new  ConnectionFactory();
      
        cf.UserName 
=   " daizhj " ;
        cf.Password 
=   " 617595 " ;
        cf.VirtualHost 
=   " dnt_mq " ;
        cf.RequestedHeartbeat 
=   0 ;
        cf.Endpoint 
=   new  AmqpTcpEndpoint(uri);
        
using  (IConnection conn  =  cf.CreateConnection())
        {
            
using  (IModel ch  =  conn.CreateModel())
            {
                
if  (exchangeType  !=   null )
                {
                    ch.ExchangeDeclare(exchange, exchangeType);
// ,true,true,false,false, true,null);
                    ch.QueueDeclare( " q1 " true ); // true, true, true, false, false, null);
                    ch.QueueBind( " q1 " " ex1 " " m1 " false null ); 
                }
                IMapMessageBuilder b 
=   new  MapMessageBuilder(ch);
                IDictionary target 
=  b.Headers;
                target[
" header " =   " hello world " ;
                IDictionary targetBody 
=  b.Body;
                targetBody[
" body " =   " daizhj " ;
                
if  (persistMode)
                {
                    ((IBasicProperties)b.GetContentHeader()).DeliveryMode 
=   2 ;
                }
             
                ch.BasicPublish(exchange, routingKey,
                                           (IBasicProperties)b.GetContentHeader(),
                                           b.GetContentBody());

            }
        }
    }
}

  下面对上面代码进行说明:
  1.  定义要链接的rabbitmq-server地址(基于amqp协议):

Uri uri = new Uri("amqp://10.0.4.85:5672/");

  2.  定义交换方式       

string  exchange  =   " ex1 " ;
string  exchangeType  =   " direct " ;
string  routingKey  =   " m1 " ;

  说明:rabbitmq交换方式分为三种,分别是:
  Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。
  Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
  Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。
  更多内容参见:RabbitMQ 三种Exchange  
  3. 是否对消息队列持久化保存       

bool  persistMode  =   true ;

  4. 使用ConnectionFactory创建连接,虽然创建时指定了多个server address,但每个connection只与一个物理的server进行连接。

       ConnectionFactory cf  =   new  ConnectionFactory();    
        
// 使用前文的配置环境信息  
        cf.UserName  =   " daizhj "
        cf.Password 
=   " 617595 " ;
        cf.VirtualHost 
=   " dnt_mq " ;
        cf.RequestedHeartbeat 
=   0 ;
        cf.Endpoint 
=   new  AmqpTcpEndpoint(uri);

     5. 实例化IConnection对象,并设置交换方式:

  using  (IConnection conn  =  cf.CreateConnection())
            {
                
using  (IModel ch  =  conn.CreateModel())
                {
                    
if  (exchangeType  !=   null )
                    {
                        ch.ExchangeDeclare(exchange, exchangeType);
// ,true,true,false,false, true,null);
                        ch.QueueDeclare( " q1 " true ); // true, true, true, false, false, null);
                        ch.QueueBind( " q1 " " ex1 " " m1 " false null ); 
                    }
        ....

  6. 构造消息实体对象并发布到消息队列上:     

  IMapMessageBuilder b  =   new  MapMessageBuilder(ch);
  IDictionary target 
=  b.Headers;
  target[
" header " =   " hello world " ;
  IDictionary targetBody 
=  b.Body;
  targetBody[
" body " =   " daizhj " ;
  
if  (persistMode)
  {
    ((IBasicProperties)b.GetContentHeader()).DeliveryMode 
=   2 ;
  }
  
// 简单发布方式
  ch.BasicPublish(exchange, routingKey,
          (IBasicProperties)b.GetContentHeader(),
          b.GetContentBody());

  这样就完成了单条消息的发布。 下面是CustmerMq.cs(用于消费消息)实例代码:    

public   class  CustmerMq
    {
        
public   static   int  InitCustmerMq()
        {
            
string  exchange  =   " ex1 " ;
            
string  exchangeType  =   " direct " ;
            
string  routingKey  =   " m1 " ;

            
string  serverAddress  =   " 10.0.4.85:5672 " ;
            ConnectionFactory cf 
=   new  ConnectionFactory();
            cf.Address 
=  serverAddress;
            cf.UserName 
=   " daizhj " ;
            cf.Password 
=   " 617595 " ;
            cf.VirtualHost 
=   " dnt_mq " ;
            cf.RequestedHeartbeat 
=   0 ;

  可以看出上面的代码与 ProducerMQ的开头代码类似,下面使用ConnectionFactory来构造链接并接收队列消息:           

  using  (IConnection conn  =  cf.CreateConnection())
            {
                
using  (IModel ch  =  conn.CreateModel())
                {
                    
// 普通使用方式BasicGet
                    
// noAck = true,不需要回复,接收到消息后,queue上的消息就会清除
                    
// noAck = false,需要回复,接收到消息后,queue上的消息不会被清除,直到调用channel.basicAck(deliveryTag, false); queue上的消息才会被清除 而且,在当前连接断开以前,其它客户端将不能收到此queue上的消息
                    BasicGetResult res  =  ch.BasicGet( " q1 " false /* noAck */ );
                    
if  (res  !=   null )
                    {
                        
bool  t  =  res.Redelivered;
                        t 
=   true ;
                        Console.WriteLine(System.Text.UTF8Encoding.UTF8.GetString(res.Body));
                        ch.BasicAck(res.DeliveryTag, 
false );
                    }
                    
else
                    {
                        Console.WriteLine(
" No message! " );
                    }  

  上面代码比较简单,主要是使用BasicGetResult来进行简单的消息接收,并使用BasicAck方式来告之是否从队列中移除该条消息。这一点很重要,因为在某些应用场景下,比如从队列中获取消息并用它来操作数据库或日志文件时,如果出现操作失败时,则该条消息应该保留在队列中,只到操作成功时才从队列中移除。 
  当然上面操作只是用于单条数据操作,如果要遍历队列中所有消息,则需要使用如下方式:

while  ( true )
  {
      BasicGetResult res 
=  ch.BasicGet( " q1 " false /* noAck */ );
      
if  (res  !=   null )
      {
          
try
          {
               
bool  t  =  res.Redelivered;
                        t 
=   true ;
                        Console.WriteLine(System.Text.UTF8Encoding.UTF8.GetString(res.Body));
                        ch.BasicAck(res.DeliveryTag, 
false );
          }
          
catch  { }
      }
      
else
          
break ;
  }

  另外,在rabbitmq中,获取消息可以使用两种方式,一种是上面提到的主动获取,另一种是基于订阅模式,即让当前获取消息的线程阻塞,用于绑定到指定的队列上,当有新的消息入队之后,该阻塞线程会被运行,从队列中获取新入队的消息,形如:    

  // 第二种取法QueueingBasicConsumer基于订阅模式
 QueueingBasicConsumer consumer  =   new  QueueingBasicConsumer(ch);
 ch.BasicConsume(
" q1 " false null , consumer);
 
while  ( true )
 {
     
try
     {
         BasicDeliverEventArgs e 
=  (BasicDeliverEventArgs)consumer.Queue.Dequeue();
         IBasicProperties props 
=  e.BasicProperties;
         
byte [] body  =  e.Body;
         Console.WriteLine(System.Text.Encoding.UTF8.GetString(body));
         
// ch.BasicAck(e.DeliveryTag, true);
         ProcessRemainMessage();                          
     }
     
catch  (EndOfStreamException ex) 
     {
         
// The consumer was removed, either through channel or connection closure, or through the action of IModel.BasicCancel(). 
         Console.WriteLine(ex.ToString());
         
break ;
     }
 }

  这样,就完成了一个简单的发布,消费消息的示例。在接下来的文章中,将会介绍如果基于WCF来发布RABBITMQ服务,敬请关注:) 

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2天前
|
人工智能 开发框架 量子技术
【专栏】.NET 技术:驱动创新的力量
【4月更文挑战第29天】.NET技术,作为微软的开发框架,以其跨平台、开源和语言多样性驱动软件创新。它在云计算、AI/ML、混合现实等领域发挥关键作用,通过Azure、ML.NET等工具促进新兴技术发展。未来,.NET将涉足量子计算、微服务和无服务器计算,持续拓宽软件开发边界,成为创新的重要推动力。掌握.NET技术,对于开发者而言,意味着握有开启创新的钥匙。
|
2天前
|
开发框架 .NET C#
【专栏】理解.NET 技术,提升开发水平
【4月更文挑战第29天】本文介绍了.NET技术的核心概念和应用,包括其跨平台能力、性能优化、现代编程语言支持及Web开发等特性。文章强调了深入学习.NET技术、关注社区动态、实践经验及学习现代编程理念对提升开发水平的重要性。通过这些,开发者能更好地利用.NET构建高效、可维护的多平台应用。
|
2天前
|
机器学习/深度学习 vr&ar 开发者
【专栏】.NET 技术:引领开发新方向
【4月更文挑战第29天】本文探讨了.NET技术如何引领软件开发新方向,主要体现在三方面:1) 作为跨平台开发的先锋,.NET Core支持多操作系统和移动设备,借助.NET MAUI创建统一UI,适应物联网需求;2) 提升性能和开发者生产力,采用先进技术和优化策略,同时更新C#语言特性,提高代码效率和可维护性;3) 支持现代化应用架构,包括微服务、容器化,集成Kubernetes和ASP.NET Core,保障安全性。此外,.NET还不断探索AI、ML和AR/VR技术,为软件开发带来更多创新可能。
|
2天前
|
开发框架 Cloud Native 开发者
【专栏】剖析.NET 技术的核心竞争力
【4月更文挑战第29天】本文探讨了.NET框架在软件开发中的核心竞争力:1) .NET Core实现跨平台与云原生技术的融合,支持多操作系统和容器化;2) 提升性能和开发者生产力,采用JIT、AOT优化,提供C#新特性和Roslyn编译器平台;3) 支持现代化应用架构,包括微服务和容器化,内置安全机制;4) 丰富的生态系统和社区支持,拥有庞大的开发者社区和微软的持续投入。这些优势使.NET在竞争激烈的市场中保持领先地位。
|
2天前
|
开发框架 .NET 开发者
【专栏】领略.NET 技术的创新力量
【4月更文挑战第29天】.NET技术自ASP.NET起历经创新,现以.NET Core为核心,展现跨平台能力,提升性能与生产力,支持现代化应用架构。.NET Core使开发者能用同一代码库在不同操作系统上构建应用,扩展至移动和物联网领域。性能提升,C#新特性简化编程,Roslyn编译器优化代码。拥抱微服务、容器化,内置安全机制,支持OAuth等标准。未来.NET 6将引入更快性能、Hot Reload等功能,预示着.NET将持续引领软件开发潮流,为开发者创造更多机会。
|
2天前
|
物联网 vr&ar 开发者
【专栏】.NET 技术:为开发注入活力
【4月更文挑战第29天】本文探讨了.NET技术的创新,主要体现在三个方面:1) .NET Core实现跨平台开发革命,支持多种操作系统和硬件,如.NET MAUI用于多平台UI;2) 性能提升与生产力飞跃,C#新特性简化编程,JIT和AOT优化提升性能,Roslyn提供代码分析工具;3) 引领现代化应用架构,支持微服务、容器化,内置安全机制。未来,.NET 7将带来更多新特性和前沿技术整合,如量子计算、AI,持续推动软件开发创新。开发者掌握.NET技术将赢得竞争优势。
|
2天前
|
人工智能 前端开发 Cloud Native
【专栏】洞察.NET 技术的开发趋势
【4月更文挑战第29天】本文探讨了.NET技术的三大发展趋势:1) 跨平台与云原生技术融合,通过.NET Core支持轻量级、高性能应用,适应云计算和微服务;2) 人工智能与机器学习的集成,如ML.NET框架,使开发者能用C#构建AI模型;3) 引入现代化前端开发技术,如Blazor,实现前后端一致性。随着.NET 8等新版本的发布,期待更多创新技术如量子计算、AR/VR的融合,.NET将持续推动软件开发的创新与进步。
|
2天前
|
人工智能 前端开发 Devops
【专栏】洞察.NET 技术在现代开发中的作用
【4月更文挑战第29天】本文探讨了.NET技术在现代软件开发中的核心价值、应用及挑战。.NET提供语言统一性与多样性,强大的Visual Studio工具,丰富的类库,跨平台能力及活跃的开发者社区。实际应用包括企业级应用、Web、移动、云服务和游戏开发。未来面临性能优化、容器化、AI集成等挑战,需持续创新。开发者应深入理解.NET,把握技术趋势,参与社区,共创美好未来。
|
2天前
|
开发工具 C# 开发者
【专栏】理解.NET 技术,开创美好未来
【4月更文挑战第29天】本文探讨了.NET技术在软件开发中的关键作用,强调其核心优势,如语言多样性、丰富类库、强大的开发工具和跨平台能力。.NET在现代应用开发中涉及企业级应用、云服务集成、微服务、移动应用和游戏开发。未来,.NET将持续创新,提升性能,拓展应用场景,并促进更紧密的社区合作,通过跨平台框架扩大应用范围。开发者应深入学习.NET,抓住技术趋势,共创美好未来。
|
2天前
|
机器学习/深度学习 人工智能 开发者
【专栏】.NET 技术:为开发带来新机遇
【4月更文挑战第29天】本文探讨了.NET技术如何为软件开发带来新机遇,分为三个部分:首先,.NET的跨平台革命,包括.NET Core的兴起、Xamarin与.NET MAUI的移动应用开发、开源社区的推动及性能优化;其次,介绍了云服务与微服务架构的集成,如Azure云服务、微服务支持、DevOps与CI/CD,以及Docker容器化;最后,讨论了AI与机器学习集成,如ML.NET、认知服务、TensorFlow和ONNX,使开发者能构建智能应用。面对这些机遇,开发者应不断学习和适应新技术,以创造更多价值。

热门文章

最新文章