[.NET领域驱动设计实战系列]专题八:DDD案例:网上书店分布式消息队列和分布式缓存的实现

简介: 原文:[.NET领域驱动设计实战系列]专题八:DDD案例:网上书店分布式消息队列和分布式缓存的实现一、引言    在上一专题中,商家发货和用户确认收货功能引入了消息队列来实现的,引入消息队列的好处可以保证消息的顺序处理,并且具有良好的可扩展性。
原文: [.NET领域驱动设计实战系列]专题八:DDD案例:网上书店分布式消息队列和分布式缓存的实现

一、引言

   在上一专题中,商家发货和用户确认收货功能引入了消息队列来实现的,引入消息队列的好处可以保证消息的顺序处理,并且具有良好的可扩展性。但是上一专题消息队列是基于内存中队列对象来实现,这样实现有一个弊端,就是一旦服务重启或出现故障时,此时消息队列中的消息会丢失,并且也记录不了日志。所以就会出现,商家发货成功后,用户并没有收到邮件通知,并且也没有日志让我们发现是否发送了邮件通知。为了解决这个问题,就需要引入一种可恢复的消息队列。目前有很多开源的消息队列都支持可恢复的,例如TibcoEms.net等。然而,微软的MSMQ也是支持这种特性的。并且MSMQ还支持分布式部署,关于MSMQ更多内容可以参考:http://www.cnblogs.com/zhili/p/MSMQ.html

   在本专题中将介绍为网上书店案例引入分布式消息队列和分布式缓存的实现。

二、分布式消息队列的实现

   MSMQ的实现原理是:消息的发送者把自己想要发送的信息放入一个容器,然后把它保存到一个系统公用空间的消息队列中,本地或异地的消息接收程序再从该队列中取出发给它的消息进行处理。所以,即使服务器突然重启,消息也会存在于系统公用空间的消息队列中,待服务器重新启动后,可以继续接受消息进行处理,从而解决上一专题存在的问题。另外,上一专题的消息队列只能被用在当前服务器中,而MSMQ支持分布式部署,不同机器都可以对MSMQ进行接收消息来处理,此时MSMQ起到一个中间件的作用。

  在为网上书店引入分布式消息队列之前,让我们先理一下实现思路:

  • 上一专题中把发货事件和收货事件发布到EventBus中,而此时需要用MsmqEventBus来替代EventBus。而MsmqEventBus的实现就很简单了,完全可以参考EventBus来实现,只是此时消息并不是进入Queue对象中,而是通过MessageQueue对象发送到系统的消息队列中。
  • 而Commit方法即从系统的消息队列中出队来获得消息。再获得消息的处理器时,与上一专题的实现有点不同,因为把事件对象发送到消息队列时,需要先把事件对象先序列化为Message对象再放入消息队列中,而出队的也是消息对象,而不是上一专题中的发货事件对象。所以此时需要把出队的消息对象反序列化为对应的事件对象。

   有了上面的实现思路,接下来让我们一起看看MsmqEventBus的具体实现代码吧。

public class MsmqEventBus : DisposableObject, IEventBus
    {
             public void Publish<TMessage>(TMessage message) where TMessage : class, IEvent
        {
            // 将消息放入Message中Body属性进行序列化发送到消息队列中
            var msmqMessage = new Message(message) { Formatter = new XmlMessageFormatter(new[] { message.GetType() }), Label = message.GetType().ToString()};
            _messageQueue.Send(msmqMessage);
            _committed = false;
        }

        public void Publish<TMessage>(IEnumerable<TMessage> messages) where TMessage : class, IEvent
        {
            messages.ToList().ForEach(m =>
            {
                _messageQueue.Send(m);
                _committed = false;
            });
        }

        public void Commit()
        {
            if (this._useInternalTransaction)
            {
                using (var transaction = new MessageQueueTransaction())
                {
                    try
                    {
                        transaction.Begin();
                        var message = _messageQueue.Receive();
                        if (message != null)
                        {
                            message.Formatter = new XmlMessageFormatter(new[] { typeof(string) });
                            var evntType = ConvertStringToType(message.Body.ToString());
                            var method = _publishMethod.MakeGenericMethod(evntType);
                            var evnt = Activator.CreateInstance(evntType);
                            method.Invoke(_aggregator, new object[] { evnt });

                            transaction.Commit();
                        }
                    }
                    catch
                    {
                        transaction.Abort();
                        throw;
                    }
                }
            }
            else
            {
                // 从msmq消息队列中出队,此时获得的对象是消息对象
                var message = _messageQueue.Receive();
                if (message != null)
                {
                    // 指定反序列化的对象,由于我们之前把对应的事件类型保存在MessageQueue中的Label属性
                    // 所以此时可以通过Label属性来获得目标序列化类型
                    message.Formatter = new XmlMessageFormatter(new[] { ConvertStringToType(message.Label) });
                    
                    // 这样message.Body获得就是对应的事件对象,后面的处理逻辑就和EventBus一样了
                    var evntType =message.Body.GetType();
                    var method = _publishMethod.MakeGenericMethod(evntType);
                    method.Invoke(_aggregator, new object[] { message.Body });
                }
            }

            _committed = true;
        }
    }

  结合上面代码的注释和前面实现思路的介绍,相信理解MsmqEventBus应该没什么问题了。接下来,我们需要在配置文件中指定EventBus为MsmqEventBus类,另外需要在你本地专有队列中创建"OnlineStoreQueue"队列来接受消息。具体的配置文件修改为:

 <!--Event Bus-->
      <!--<register type="OnlineStore.Events.Bus.IEventBus, OnlineStore.Events" mapTo="OnlineStore.Events.Bus.EventBus, OnlineStore.Events">
        <lifetime type="singleton" />
      </register>-->
      
      <!--注入MsmqEventBus-->
      <register type="OnlineStore.Events.Bus.IEventBus, OnlineStore.Events"
                mapTo="OnlineStore.Events.Bus.MsmqEventBus, OnlineStore.Events">
        <lifetime type="singleton" />
        <constructor>
          <param name="path" value=".\Private$\OnlineStoreQueue" />
        </constructor>
      </register>
    </container>

  到此,分布式消息队列的实现就完成了,具体分布式消息队列的实现效果和上一专题使用EventBus的效果是一样的,这里就不再贴图了,大家可以自行下载源码查看。

三、缓存的实现

  在实际开发过程中,缓存的实现是必不可少的,对于已经查询过的数据可以直接从缓存中进行读取返回给调用者,利用缓存不但可以加快响应速度,还能减轻数据库服务器的压力。在大型电子商务网站中,缓存的实现更是必不可少的功能。然而缓存的实现也有两种,一种是分布式缓存,另一种本地缓存。在大型网站中,更多实现的是分布式缓存,对于一些少用户的企业系统,可能才会使用到本地缓存。所以在本专题中,将在网上书店案例中对这两种缓存分别进行实现。

3.1 本地缓存的实现

  首先,我们来介绍本地缓存的实现。由于这里需要实现两种缓存,根据面向接口编程原则,我们自然首先需要定义一个缓存接口,然后这两种具体缓存都需要实现该接口。针对缓存接口,无非是缓存数据的添加,移除,更新等操作,所以缓存接口的定义如下所示:

 // 缓存接口的定义
    public interface ICacheProvider
    {
        /// <summary>
        /// 向缓存中添加一个对象
        /// </summary>
        /// <param name="key">缓存的键值</param>
        /// <param name="valueKey">缓存值的键值</param>
        /// <param name="value">缓存的对象</param>
        void Add(string key, string valueKey, object value);
        void Update(string key, string valueKey, object value);
        object Get(string key, string valueKey);
        void Remove(string key);
        bool Exists(string key);
        bool Exists(string key, string valueKey);
    }

  在介绍本地缓存的实现之前,让我们先来思考下本地缓存的实现思路——就是在本地缓存类中定义一个字典对象,添加缓存就是往该字典插入键值对而已,其中key就是缓存数据对应的键值,value就是真正的缓存数据,如果缓存在字典中存在的话,就直接根据键值查找出缓存数据进行返回。

  然而网上书店的本地缓存是基于Enterprise Library Caching库来实现的,其实现思路和我之前介绍的思路也是一样的,只不过此时字典对象不需要我们在类中定义,此时直接用Enterprise Library Caching库中定义的就好。有了上面的分析,本地缓存的实现理解起来也就不那么难了,具体本地缓存的实现代码如下所示:

// 表示基于Microsoft Patterns & Practices - Enterprise Library Caching Application Block的缓存机制的实现
    // 该类简单理解为对Enterprise Library Caching中的CacheManager封装
    // 该缓存实现不支持分布式缓存,更多信息参考: 
    // http://stackoverflow.com/questions/7799664/enterpriselibrary-caching-in-load-balance 
    public class EntLibCacheProvider : ICacheProvider
    {
        // 获得CacheManager实例,该实例的注册通过cachingConfiguration进行注册进去的,具体看配置文件
        private readonly ICacheManager _cacheManager = CacheFactory.GetCacheManager();

        #region ICahceProvider

        public void Add(string key, string valueKey, object value)
        {
            Dictionary<string, object> dict = null;
            if (_cacheManager.Contains(key))
            {
                dict = (Dictionary<string, object>) _cacheManager[key];
                dict[valueKey] = value;
            }
            else
            {
                dict = new Dictionary<string, object> { { valueKey, value }};
            }

            _cacheManager.Add(key, dict);
        }

        public void Update(string key, string valueKey, object value)
        {
            Add(key, valueKey, value);
        }

        public object Get(string key, string valueKey)
        {
            if (!_cacheManager.Contains(key)) return null;
            var dict = (Dictionary<string, object>)_cacheManager[key];
            if (dict != null && dict.ContainsKey(valueKey))
                return dict[valueKey];
            else
                return null;
        }

        // 从缓存中移除对象
        public void Remove(string key)
        {
            _cacheManager.Remove(key);
        }

        // 判断指定的键值的缓存是否存在
        public bool Exists(string key)
        {
            return _cacheManager.Contains(key);
        }

        // 判断指定的键值和缓存键值的缓存是否存在
        public bool Exists(string key, string valueKey)
        {
            return _cacheManager.Contains(key) &&
               ((Dictionary<string, object>)_cacheManager[key]).ContainsKey(valueKey);
        }
        #endregion 
    }

  到此,网上书店案例中本地缓存的实现就完成了。由于本地缓存不支持分布式部署,所有的缓存都存在于单独缓存服务器中,然而,针对一些大型网站来说,这样的实现并不适合,因为在大型网站中,需要通过多个缓存服务进行集群,需要使得缓存均匀分布在集群中的缓存服务器中。此时就需要引入分布式缓存的实现。下面让我们具体看看分布式缓存如何在该案例中实现。

3.2 分布式缓存的实现

  分布式缓存可以通过具体的算法把缓存均匀地分布在集群中缓存服务器中,从而用户请求的不同数据可以路由到对应的缓存服务器中进行添加、更新或获得。分布式缓存的实现有很多种方式,可以利用Memcached和Redis开源库来实现。然而,微软的Windows Azure也提供了分布式缓存的实现,本案例中分布式缓存就是基于Windows Azure的。在对分布式缓存实现之前,需要先下载对应的dll,然后再在项目中进行引用。需要下载的dll已经包含在项目根目录下的libs文件夹下,具体需要下载的程序集截图如下所示:

 

  然后在基础设施层引入这些程序集,之前就可以去实现基于Windows Azure的分布式缓存了。具体的实现代码如下所示:

// 分布式缓存,该类是对微软分布式缓存服务的封装
    // 在该案例中没用用到该缓存,但是提供在这里让大家明白微软的分布式缓存实现,并不是只有memcached和Redis
    // Redis参考:http://www.cnblogs.com/ceecy/p/3279407.htmlhttp://blog.csdn.net/suifeng3051/article/details/23739295
    // 关于微软分布式缓存更多介绍参考:http://www.cnblogs.com/shanyou/archive/2010/06/29/AppFabricCaching.html 
    // 和http://www.cnblogs.com/mlj322/archive/2010/04/05/1704624.html
    public class AppfabricCacheProvider : ICacheProvider
    {
        private readonly DataCacheFactory _factory = new DataCacheFactory();
        private readonly DataCache _cache;

        public AppfabricCacheProvider()
        {
            this._cache = _factory.GetDefaultCache();
        }

        #region ICacheProvider Members
        public void Add(string key, string valueKey, object value)
        {
            // DataCache中不包含Contain方法,所有用Get方法来判断对应的key值是否在缓存中存在
            var val = (Dictionary<string, object>)_cache.Get(key);
            if (val == null)
            {
                val = new Dictionary<string, object> {{ valueKey, value}};
                _cache.Add(key, val);
            }
            else
            {
                if (!val.ContainsKey(valueKey))
                    val.Add(valueKey, value);
                else
                    val[valueKey] = value;

                _cache.Put(key, val);
            }
        }

        public void Update(string key, string valueKey, object value)
        {
            Add(key, valueKey, value);
        }

        public object Get(string key, string valueKey)
        {
            return Exists(key, valueKey) ? ((Dictionary<string, object>)_cache.Get(key))[valueKey] : null;
        }

        public void Remove(string key)
        {
            _cache.Remove(key);
        }

        public bool Exists(string key)
        {
            return _cache.Get(key) != null;
        }

        public bool Exists(string key, string valueKey)
        {
            var val = _cache.Get(key);
            if (val == null)
                return false;
            return ((Dictionary<string, object>)val).ContainsKey(valueKey);
        }

        #endregion 
    }

  通过上面的步骤,分布式缓存的实现就完成了。其实,分布式缓存和本地缓存不同之处就在于:分布式缓存支持对应的算法可以把缓存存放在不同的服务器上,而本地缓存只能存在于本地,而不能跨机器分布。所以对于大型网站,分布式缓存才是最好的选择,由于分布式缓存的实现和部署,无疑会增加开发和维护成本,所以对于一些小型系统(指定是单数据库服务器系统),可以考虑使用本地缓存。

  在本案例中,由于本人没有Windows Azure环境,所以对于分布式缓存的实现也不能进行测试,所以本案例中使用的还是本地缓存。要使缓存生效,还需要对配置文件进行修改。具体配置文件修改为:

 <unity xmlns="http://schemas.microsoft.com/practices/2010/unity">
    <sectionExtension type="Microsoft.Practices.Unity.InterceptionExtension.Configuration.InterceptionConfigurationExtension, Microsoft.Practices.Unity.Interception.Configuration" />
    <container>
      <extension type="Interception" />
      
       <!--Cache Provider-->
      <register type="OnlineStore.Infrastructure.Caching.ICacheProvider, OnlineStore.Infrastructure" mapTo="OnlineStore.Infrastructure.Caching.EntLibCacheProvider, OnlineStore.Infrastructure" />
<!--........-->
 </container>
</unity>

  其实,通过上面的配置之后,缓存还是不能生效的,因为我们一般把缓存放在获得数据方法之前进行调用,在用户对获得数据方法调用之前,首先从缓存中进行查找,如果存在,则直接返回缓存中的数据给调用者就可以了,如果不存在再调用获得数据方法从数据库中读取,读取成功后添加到缓存中再返回给调用者。既然要在方法调用前来查找缓存,从中你是否想到了什么呢?不错,就是面向切面编程,即AOP。所以要让缓存生效,在该案例中还需要支持AOP。至于AOP的支持,我将会在下一专题进行介绍。

四、总结

   到这里,本专题的内容就结束了,正如前面所说的,在下一专题,我将在网上书店案例中引入对AOP的支持。

  本专题所有源码下载地址:https://github.com/lizhi5753186/OnlineStore_Second/

 

目录
相关文章
|
1天前
|
物联网 调度 vr&ar
鸿蒙HarmonyOS应用开发 |鸿蒙技术分享HarmonyOS Next 深度解析:分布式能力与跨设备协作实战
鸿蒙技术分享:HarmonyOS Next 深度解析 随着万物互联时代的到来,华为发布的 HarmonyOS Next 在技术架构和生态体验上实现了重大升级。本文从技术架构、生态优势和开发实践三方面深入探讨其特点,并通过跨设备笔记应用实战案例,展示其强大的分布式能力和多设备协作功能。核心亮点包括新一代微内核架构、统一开发语言 ArkTS 和多模态交互支持。开发者可借助 DevEco Studio 4.0 快速上手,体验高效、灵活的开发过程。 239个字符
130 13
鸿蒙HarmonyOS应用开发 |鸿蒙技术分享HarmonyOS Next 深度解析:分布式能力与跨设备协作实战
|
8天前
|
NoSQL Java Redis
秒杀抢购场景下实战JVM级别锁与分布式锁
在电商系统中,秒杀抢购活动是一种常见的营销手段。它通过设定极低的价格和有限的商品数量,吸引大量用户在特定时间点抢购,从而迅速增加销量、提升品牌曝光度和用户活跃度。然而,这种活动也对系统的性能和稳定性提出了极高的要求。特别是在秒杀开始的瞬间,系统需要处理海量的并发请求,同时确保数据的准确性和一致性。 为了解决这些问题,系统开发者们引入了锁机制。锁机制是一种用于控制对共享资源的并发访问的技术,它能够确保在同一时间只有一个进程或线程能够操作某个资源,从而避免数据不一致或冲突。在秒杀抢购场景下,锁机制显得尤为重要,它能够保证商品库存的扣减操作是原子性的,避免出现超卖或数据不一致的情况。
41 10
|
1天前
|
数据管理 API 调度
鸿蒙HarmonyOS应用开发 | 探索 HarmonyOS Next-从开发到实战掌握 HarmonyOS Next 的分布式能力
HarmonyOS Next 是华为新一代操作系统,专注于分布式技术的深度应用与生态融合。本文通过技术特点、应用场景及实战案例,全面解析其核心技术架构与开发流程。重点介绍分布式软总线2.0、数据管理、任务调度等升级特性,并提供基于 ArkTS 的原生开发支持。通过开发跨设备协同音乐播放应用,展示分布式能力的实际应用,涵盖项目配置、主界面设计、分布式服务实现及部署调试步骤。此外,深入分析分布式数据同步原理、任务调度优化及常见问题解决方案,帮助开发者掌握 HarmonyOS Next 的核心技术和实战技巧。
102 74
鸿蒙HarmonyOS应用开发 | 探索 HarmonyOS Next-从开发到实战掌握 HarmonyOS Next 的分布式能力
|
1月前
|
消息中间件 开发框架 .NET
.NET 8 强大功能 IHostedService 与 BackgroundService 实战
【11月更文挑战第7天】本文介绍了 ASP.NET Core 中的 `IHostedService` 和 `BackgroundService` 接口及其用途。`IHostedService` 定义了 `StartAsync` 和 `StopAsync` 方法,用于在应用启动和停止时执行异步操作,适用于资源初始化和清理等任务。`BackgroundService` 是 `IHostedService` 的抽象实现,简化了后台任务的编写,通过 `ExecuteAsync` 方法实现长时间运行的任务逻辑。文章还提供了创建和注册这两个服务的实战步骤,帮助开发者在实际项目中应用这些功能。
|
2月前
|
NoSQL Java Redis
开发实战:使用Redisson实现分布式延时消息,订单30分钟关闭的另外一种实现!
本文详细介绍了 Redisson 延迟队列(DelayedQueue)的实现原理,包括基本使用、内部数据结构、基本流程、发送和获取延时消息以及初始化延时队列等内容。文章通过代码示例和流程图,逐步解析了延迟消息的发送、接收及处理机制,帮助读者深入了解 Redisson 延迟队列的工作原理。
|
2月前
|
开发框架 NoSQL MongoDB
C#/.NET/.NET Core开发实战教程集合
C#/.NET/.NET Core开发实战教程集合
|
2月前
|
消息中间件 存储 监控
【10月更文挑战第2天】消息队列系统中的确认机制在分布式系统中如何实现
【10月更文挑战第2天】消息队列系统中的确认机制在分布式系统中如何实现
|
2月前
|
消息中间件 存储 监控
消息队列系统中的确认机制在分布式系统中如何实现?
消息队列系统中的确认机制在分布式系统中如何实现?
|
5月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
下一篇
DataWorks