一起谈.NET技术,NET 下RabbitMQ实践 [实战篇]

简介:   之前的文章中,介绍了如何将RabbitMQ以WCF方式进行发布。今天就介绍一下我们产品中如何使用RabbitMQ的!  在Discuz!NT企业版中,提供了对HTTP错误日志的记录功能,这一点对企业版非常重要,另外存储错误日志使用了MongoDB,理由很简单,MongoDB的添加操作飞快,即使数量过亿之后插入速度依旧不减。

  之前的文章中,介绍了如何将RabbitMQ以WCF方式进行发布。今天就介绍一下我们产品中如何使用RabbitMQ的!
  在Discuz!NT企业版中,提供了对HTTP错误日志的记录功能,这一点对企业版非常重要,另外存储错误日志使用了MongoDB,理由很简单,MongoDB的添加操作飞快,即使数量过亿之后插入速度依旧不减。    
  在开始正文之前,先说明一下本文的代码分析顺序,即:程序入口==》RabbitMQ客户端===>RabbitMQ服务端。好了,闲话少说,开始正文!    
  首先是程序入口,也就是WCF+RabbitMQ客户端实现:因为Discuz!NT使用了HttpModule方式来接管HTTP链接请求,而在.NET的HttpModule模板中,可以通过如下方法来接管程序运行时发生的ERROR,如下:         

  context.Error += new EventHandler(Application_OnError);   

   而“记录错误日志"的功能入口就在这里:
 
  
public void Application_OnError(Object sender, EventArgs e)
{
string requestUrl = DNTRequest.GetUrl();
HttpApplication application
= (HttpApplication)sender;
HttpContext context
= application.Context; #if EntLib
if (RabbitMQConfigs.GetConfig() != null && RabbitMQConfigs.GetConfig().HttpModuleErrLog.Enable) // 当开启errlog错误日志记录功能时
{
RabbitMQClientHelper.GetHttpModuleErrLogClient().AsyncAddLog(
new HttpModuleErrLogData(LogLevel.High, context.Server.GetLastError().ToString())); // 异步方式
// RabbitMQHelper.GetHttpModuleErrLogClient().AddLog(new HttpModuleErrLogData(LogLevel.High, "wrong message infomation!")); // 同步方式
return ;
}
#endif
...
}

  当然从代码可以看出,记录日志的工作基本是通过配置文件控制的,即“HttpModuleErrLog.Enable”。而RabbitMQClientHelper是一个封装类,主要用于反射生成IHttpModuleErrlogClient接口实例,该实例就是“基于WCF发布的RabbitMQ”的客户端访问对象。

 
  
/// <summary>
/// RabbitMQ
/// </summary>
public class RabbitMQClientHelper
{
static IHttpModuleErrlogClient ihttpModuleErrLogClient;


private static object lockHelper = new object ();


public static IHttpModuleErrlogClient GetHttpModuleErrLogClient()
{
if (ihttpModuleErrLogClient == null )
{
lock (lockHelper)
{
if (ihttpModuleErrLogClient == null )
{
try
{
if (RabbitMQConfigs.GetConfig().HttpModuleErrLog.Enable)
{
ihttpModuleErrLogClient
= (IHttpModuleErrlogClient)Activator.CreateInstance(Type.GetType(
" Discuz.EntLib.RabbitMQ.Client.HttpModuleErrLogClient, Discuz.EntLib.RabbitMQ.Client " , false , true ));
}
}
catch
{
throw new Exception( " 请检查 Discuz.EntLib.RabbitMQ.dll 文件是否被放置到了bin目录下! " );
}
}
}
}
return ihttpModuleErrLogClient;
}
}

  可以看出它反射的是Discuz.EntLib.RabbitMQ.dll文件的HttpModuleErrLogClient对象(注:使用反射的原因主要是解决企业版代码与普遍版代码在项目引用上的相互依赖),下面就是其接口和具体要求实现:

 
    
/// <summary>
/// IHttpModuleErrlogClient 客户端接口类,用于反射实例化绑定
/// </summary>
public interface IHttpModuleErrlogClient
{
void AddLog(HttpModuleErrLogData httpModuleErrLogData);


void AsyncAddLog(HttpModuleErrLogData httpModuleErrLogData);
}

public class HttpModuleErrLogClient : IHttpModuleErrlogClient
{
public void AddLog(HttpModuleErrLogData httpModuleErrLogData)
{
try
{
// ((RabbitMQBinding)binding).OneWayOnly = true;
ChannelFactory < IHttpModuleErrLogService > m_factory = new ChannelFactory < IHttpModuleErrLogService > (GetBinding(), " soap.amqp:///HttpModuleErrLogService " );
m_factory.Open();
IHttpModuleErrLogService m_client
= m_factory.CreateChannel();
m_client.AddLog(httpModuleErrLogData);
((IClientChannel)m_client).Close();
m_factory.Close();
}
catch (System.Exception e)
{
string msg = e.Message;
}
}


private delegate void delegateAddLog(HttpModuleErrLogData httpModuleErrLogData);


public void AsyncAddLog(HttpModuleErrLogData httpModuleErrLogData)
{
delegateAddLog AddLog_aysncallback
= new delegateAddLog(AddLog);
AddLog_aysncallback.BeginInvoke(httpModuleErrLogData,
null , null );
}


public Binding GetBinding()
{
return new RabbitMQBinding(RabbitMQConfigs.GetConfig().HttpModuleErrLog.RabbitMQAddress);
}
}

  可以看出,AddLog方法与上一篇中的客户端内容基本上没什么太大差别,只不过它提供了同步和异步访问两种方式,这样做的目的主要是用户可根据生产环境来灵活配置。    

  下面就来看一下RabbitMQ的服务端实现,首先看一下其运行效果,如下图:

  接着看一下启动rabbitmq服务的代码:

 
    
public void StartService(System.ServiceModel.Channels.Binding binding)
{
m_host
= new ServiceHost( typeof (HttpModuleErrLogService), new Uri( " soap.amqp:/// " ));
// ((RabbitMQBinding)binding).OneWayOnly = true;
m_host.AddServiceEndpoint( typeof (IHttpModuleErrLogService), binding, " HttpModuleErrLogService " );
m_host.Open();
m_serviceStarted
= true ;
}

  上面代码会添加IHttpModuleErrLogService接口实现类HttpModuleErrLogService 的Endpoint,并启动它,下面就是该接口声明:

 
     
/// <summary>
/// IHttpModuleErrLogService 接口类
/// </summary>
[ServiceContract]
public interface IHttpModuleErrLogService
{
/// <summary>
/// 添加 httpModuleErrLogData日志信息
/// </summary>
/// <param name="httpModuleErrLogData"></param>
[OperationContract]
void AddLog(HttpModuleErrLogData httpModuleErrLogData);
}

  代码很简单,就是定义了一个添加日志的方法:void AddLog(HttpModuleErrLogData httpModuleErrLogData)       

  下面就是接口的具体实现,首先是类声明及初始化代码: 

 
     
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)] // Single - 为所有客户端调用分配一个服务实例。
public class HttpModuleErrLogService : IHttpModuleErrLogService
{
/// <summary>
/// 获取 HttpModuleErrLogInfo配置文件对象实例
/// </summary>
private static HttpModuleErrLogInfo httpModuleErrorLogInfo = RabbitMQConfigs.GetConfig().HttpModuleErrLog;
/// <summary>
/// 定时器对象
/// </summary>
private static System.Timers.Timer _timer;
/// <summary>
/// 定时器的时间
/// </summary>
private static int _elapsed = 0 ;


public static void Initial(System.Windows.Forms.RichTextBox msgBox, int elapsed)
{
_msgBox
= msgBox;
_elapsed
= elapsed;


// 初始定时器
if (_elapsed > 0 )
{
_timer
= new System.Timers.Timer() { Interval = elapsed * 1000 , Enabled = true , AutoReset = true };
_timer.Elapsed
+= new System.Timers.ElapsedEventHandler(Timer_Elapsed);
_timer.Start();
}
}


/// <summary>
/// 时间到时执行出队操作
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private static void Timer_Elapsed( object sender, System.Timers.ElapsedEventArgs e)
{
Dequeue();
}

  可以看出,这里使用了静态定时器对象,来进行定时访问队列信息功能(“非同步出队”操作),这样设计的原因主要是为用户提供适合的配置方式,即如果不使用定时器(为0时),则系统会在日志入队后,就立即启动出队(“同步出队”)操作获取日志信息并插入到MongoDB数据库中。

  下面介绍一下入队操作实现:

 
     
/// <summary>
/// 添加 httpModuleErrLogData日志信息
/// </summary>
/// <param name="httpModuleErrLogData"></param>
public void AddLog(HttpModuleErrLogData httpModuleErrLogData)
{
Enqueue(httpModuleErrLogData);


if (_elapsed <= 0 ) // 如果使用定时器(为0 时),则立即执行出队操作
Dequeue();
}


/// <summary>
/// 交换机名称
/// </summary>
private const string EXCHANGE = " ex1 " ;
/// <summary>
/// 交换方法,更多内容参见: http://melin.javaeye.com/blog/691265
/// </summary>
private const string EXCHANGE_TYPE = " direct " ;
/// <summary>
/// 路由key,更多内容参见: http://sunjun041640.blog.163.com/blog/static/256268322010328102029919/
/// </summary>
private const string ROUTING_KEY = " m1 " ;


/// <summary>
/// 日志入队
/// </summary>
/// <param name="httpModuleErrLogData"></param>
public static void Enqueue(HttpModuleErrLogData httpModuleErrLogData)
{
Uri uri
= new Uri(httpModuleErrorLogInfo.RabbitMQAddress);
ConnectionFactory cf
= new ConnectionFactory()
{
UserName
= httpModuleErrorLogInfo.UserName,
Password
= httpModuleErrorLogInfo.PassWord,
VirtualHost
= " dnt_mq " ,
RequestedHeartbeat
= 0 ,
Endpoint
= new AmqpTcpEndpoint(uri)
};
using (IConnection conn = cf.CreateConnection())
{
using (IModel ch = conn.CreateModel())
{
if (EXCHANGE_TYPE != null )
{
ch.ExchangeDeclare(EXCHANGE, EXCHANGE_TYPE);
// ,true,true,false,false, true,null);
ch.QueueDeclare(httpModuleErrorLogInfo.QueueName, true ); // true, true, true, false, false, null);
ch.QueueBind(httpModuleErrorLogInfo.QueueName, EXCHANGE, ROUTING_KEY, false , null );
}
IMapMessageBuilder b
= new MapMessageBuilder(ch);
IDictionary target
= b.Headers;
target[
" header " ] = " HttpErrLog " ;
IDictionary targetBody
= b.Body;
targetBody[
" body " ] = SerializationHelper.Serialize(httpModuleErrLogData);
((IBasicProperties)b.GetContentHeader()).DeliveryMode
= 2 ; // persistMode
ch.BasicPublish(EXCHANGE, ROUTING_KEY,
(IBasicProperties)b.GetContentHeader(),
b.GetContentBody());
}
}
}

  代码很简单,主要构造rabbitmq链接(ConnectionFactory)并初始化相应参数如用户名,密码,ROUTING_KEY等。

  然后将传入的日志对象序列化成字符串对象,赋值给targetBody["body"],这样做主要是因为我没找到更好的方法来赋值(之前尝试直接绑定httpModuleErrLogData到targetBody["body"],但在出队操作中找不到合适方法将httpModuleErrLogData对象解析出来)。下面就是出队操作:  

 
     
/// <summary>
/// 日志出队
/// </summary>
public static void Dequeue()
{
string serverAddress = httpModuleErrorLogInfo.RabbitMQAddress.Replace( " amqp:// " , "" ).TrimEnd( ' / ' ); // "10.0.4.85:5672";
ConnectionFactory cf = new ConnectionFactory()
{
UserName
= httpModuleErrorLogInfo.UserName,
Password
= httpModuleErrorLogInfo.PassWord,
VirtualHost
= " dnt_mq " ,
RequestedHeartbeat
= 0 ,
Address
= serverAddress
};

using (IConnection conn = cf.CreateConnection())
{
using (IModel ch = conn.CreateModel())
{
while ( true )
{
BasicGetResult res
= ch.BasicGet(httpModuleErrorLogInfo.QueueName, false );
if (res != null )
{
try
{
string objstr = System.Text.UTF8Encoding.UTF8.GetString(res.Body).Replace( " \0\0\0
相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件 Java 数据库
RocketMQ实战—9.营销系统代码初版
本文主要介绍了实现营销系统四大促销场景的代码初版:全量用户推送促销活动、全量用户发放优惠券、特定用户推送领取优惠券消息、热门商品定时推送。
RocketMQ实战—9.营销系统代码初版
|
消息中间件 搜索推荐 调度
RocketMQ实战—8.营销系统业务和方案介绍
本文详细介绍了电商营销系统的业务流程、技术架构及挑战解决方案。涵盖核心交易与支付后履约流程,优惠券和促销活动的发券、领券、用券、销券机制,以及会员与推送的数据库设计。技术架构基于Nacos服务注册中心、Dubbo RPC框架、RocketMQ消息中间件和XXLJob分布式调度工具,实现系统间高效通信与任务管理。针对千万级用户量下的推送和发券场景,提出异步化、分片处理与惰性发券等优化方案,解决高并发压力。同时,通过RocketMQ实现系统解耦,提升扩展性,并利用XXLJob完成爆款商品推荐的分布式调度推送。整体设计确保系统在大规模用户场景下的性能与稳定性。
RocketMQ实战—8.营销系统业务和方案介绍
|
消息中间件 存储 NoSQL
RocketMQ实战—6.生产优化及运维方案
本文围绕RocketMQ集群的使用与优化,详细探讨了六个关键问题。首先,介绍了如何通过ACL配置实现RocketMQ集群的权限控制,防止不同团队间误用Topic。其次,讲解了消息轨迹功能的开启与追踪流程,帮助定位和排查问题。接着,分析了百万消息积压的处理方法,包括直接丢弃、扩容消费者或通过新Topic间接扩容等策略。此外,提出了针对RocketMQ集群崩溃的金融级高可用方案,确保消息不丢失。同时,讨论了为RocketMQ增加限流功能的重要性及实现方式,以提升系统稳定性。最后,分享了从Kafka迁移到RocketMQ的双写双读方案,确保数据一致性与平稳过渡。
|
8月前
|
监控 Cloud Native 测试技术
.NET技术深度解析:现代企业级开发指南
每日激励:“不要一直责怪过去的自己,他曾经站在雾里也很迷茫”。我是蒋星熠Jaxonic,一名在代码宇宙中探索的极客旅人。从.NET Framework到.NET 8,我深耕跨平台、高性能、云原生开发,践行领域驱动设计与微服务架构,用代码书写技术诗篇。分享架构演进、性能优化与AI融合前沿,助力开发者在二进制星河中逐光前行。关注我,共探技术无限可能!
.NET技术深度解析:现代企业级开发指南
|
9月前
|
消息中间件 Ubuntu Java
SpringBoot整合MQTT实战:基于EMQX实现双向设备通信
本教程指导在Ubuntu上部署EMQX 5.9.0并集成Spring Boot实现MQTT双向通信,涵盖服务器搭建、客户端配置及生产实践,助您快速构建企业级物联网消息系统。
2929 1
|
9月前
|
数据采集 传感器 监控
Modbus 与 MQTT 协议兼容:MyEMS 的泛在能源数据采集技术实现
MyEMS深度融合Modbus与MQTT协议,破解能源数据采集中协议碎片化、网络异构、数据孤岛等难题。通过Modbus接入95%以上工业设备,实现现场数据精准“拉取”;依托MQTT构建高效物联网传输通道,支持多源数据主动“推送”与云端集成。边缘侧采集规整,中心侧汇聚分析,形成统一、可靠、低延迟的数据流。该架构兼具高兼容性、强扩展性与低运维成本,广泛应用于工业园区、商业楼宇及集团型企业,支撑实时监控、AI分析与跨系统融合,打造泛在互联的能源数据底座,助力企业实现全面智慧能源管理。
584 6
|
消息中间件 NoSQL 大数据
RocketMQ实战—5.消息重复+乱序+延迟的处理
本文围绕RocketMQ的使用与优化展开,分析了优惠券重复发放的原因及解决方案。首先,通过案例说明了优惠券系统因消息重复、数据库宕机或消费失败等原因导致重复发券的问题,并提出引入幂等性机制(如业务判断法、Redis状态判断法)来保证数据唯一性。其次,探讨了死信队列在处理消费失败时的作用,以及如何通过重试和死信队列解决消息处理异常。接着,分析了订单库同步中消息乱序的原因,提出了基于顺序消息机制的代码实现方案,确保消息按序处理。此外,介绍了利用Tag和属性过滤数据提升效率的方法,以及延迟消息机制优化定时退款扫描的功能。最后,总结了RocketMQ生产实践中的经验.
RocketMQ实战—5.消息重复+乱序+延迟的处理
|
消息中间件 Java 测试技术
RocketMQ实战—7.生产集群部署和生产参数
本文详细介绍了RocketMQ生产集群的部署与调优过程,包括集群规划、环境搭建、参数配置和优化策略。
RocketMQ实战—7.生产集群部署和生产参数
|
消息中间件 NoSQL Java
RocketMQ实战—10.营销系统代码优化
本文主要介绍了如何对营销系统的四大促销场景的代码进行优化,包括:全量用户推送促销活动、全量用户发放优惠券、特定用户推送领取优惠券消息、热门商品定时推送。