Adhesive框架系列文章--Mongodb数据服务模块实现(上)

简介: Mongodb数据服务可以直接接受任何类型数据,并且它设计为可以承受大量数据的写入。为了能保存任何类型的数据,并且在后台可以查看任何类型的数据,因此我们必须在收到数据的时候对数据的元数据进行提取,随同主体数据一并保存在数据库中。

Mongodb数据服务可以直接接受任何类型数据,并且它设计为可以承受大量数据的写入。为了能保存任何类型的数据,并且在后台可以查看任何类型的数据,因此我们必须在收到数据的时候对数据的元数据进行提取,随同主体数据一并保存在数据库中。对数据本身也需要重新组织结构,相当于进行一次序列化,然后保存到数据库中。虽然Mongodb是支持Json格式的,但是由于我们在保存数据的时候还有很多逻辑,因此我们必须手动进行这个工作。其实对于提交数据来说,应该是一个非常快的动作,应该以异步方式进行,在一个尽量短的时间内让方法的调用可以返回,之后可以在后台慢慢进行数据的转换和数据发送到远端。因此,开发了一个内存队列服务模块来进行异步队列处理工作,并且提交数据到远端也使用了框架内部的Wcf分布式服务模块。当然,在服务端道理也一样,我们可以通过一个内存队列来批量提交数据,并且让服务的调用尽快返回。Mongodb数据服务提交数据的过程如下:

image

项目的结构如下:

image

1、Mongodb项目是客户端部分的接口

2、Mongodb.Imp项目是客户端部分的实现

3、Mongodb.Server是服务端部分的接口,或者说是服务契约

4、Mongodb.Server.Imp是服务端部分的实现

可以看到Mongodb数据本身依赖应用程序信息中心模块、配置服务模块、内存队列服务模块、Wcf分布式服务模块,对于大部分客户端应用程序来说都应该只依赖Mongodb数据服务的客户端而不是服务端。我们把Mongodb数据服务分成两部分,插入数据的服务和查询服务,后者的使用者一般而言只有Mongodb数据服务的后台。本文主要介绍前者:

   public interface IMongodbInsertService : IDisposable
    {
        void Insert(object item);
    }

从接口本身来看非常简单,只有一个方法。我们来看看它的实现步骤:

1、调用配置服务,查看这个数据类型对应的配置,说到这里,让我们来看一下Mongodb数据服务客户端的配置:

    [ConfigEntity(FriendlyName = "Mongodb客户端配置")]
    public class MongodbServiceConfigurationEntity
    {
        [ConfigItem(FriendlyName = "插入服务配置项列表")]
        public Dictionary<string, MongodbInsertServiceConfigurationItem> MongodbInsertServiceConfigurationItems { get; set; }
    }

每一个类型的配置项如下:

    [ConfigEntity(FriendlyName = "Mongodb客户端针对每个数据类型的配置")]
    public class MongodbInsertServiceConfigurationItem
    {
        [ConfigItem(FriendlyName = "类型完整名")]
        public string TypeFullName { get; set; }

        [ConfigItem(FriendlyName = "是否提交到服务端")]
        public bool SubmitToServer { get; set; }

        [ConfigItem(FriendlyName = "队列最大项数")]
        public int MaxItemCount { get; set; }

        [ConfigItem(FriendlyName = "消费的线程总数")]
        public int ConsumeThreadCount { get; set; }

        [ConfigItem(FriendlyName = "消费数据的时间间隔毫秒")]
        public int ConsumeIntervalMilliseconds { get; set; }

        [ConfigItem(FriendlyName = "遇到错误时消费数据的时间间隔毫秒")]
        public int ConsumeIntervalWhenErrorMilliseconds { get; set; }

        [ConfigItem(FriendlyName = "消费数据的批量项数")]
        public int ConsumeItemCountInOneBatch { get; set; }

        [ConfigItem(FriendlyName = "达到最大项数后的策略")]
        public MemoryQueueServiceReachMaxItemCountAction ReachMaxItemCountAction { get; set; }

        [ConfigItem(FriendlyName = "消费数据时不足批次数的策略")]
        public MemoryQueueServiceNotReachBatchCountConsumeAction NotReachBatchCountConsumeAction { get; set; }

        [ConfigItem(FriendlyName = "消费数据遇到错误的策略")]
        public MemoryQueueServiceConsumeErrorAction ConsumeErrorAction { get; set; }

        public MongodbInsertServiceConfigurationItem()
        {
            TypeFullName = "";
            SubmitToServer = true;
            ReachMaxItemCountAction = MemoryQueueServiceReachMaxItemCountAction.AbandonOldItems
                .Add(MemoryQueueServiceReachMaxItemCountAction.LogExceptionEveryOneSecond);
            ConsumeErrorAction = MemoryQueueServiceConsumeErrorAction.AbandonAndLogException;
            ConsumeThreadCount = 1;
            ConsumeIntervalMilliseconds = 10;
            ConsumeIntervalWhenErrorMilliseconds = 1000;
            ConsumeItemCountInOneBatch = 100;
            NotReachBatchCountConsumeAction = MemoryQueueServiceNotReachBatchCountConsumeAction.ConsumeAllItems;
            MaxItemCount = 10000;
        }
    }

这里可以看到,除了是否提交到服务端这个配置,大多数的配置其实是内存队列服务的配置,在之后的文章中我们单独会介绍内存队列服务。之所以需要为Mongodb数据服务的客户端设置这样的配置,一方面是允许修改队列服务的配置,另一方面是为了限制没有经过配置随便什么数据都往服务端发送,只有在后台显式配置的数据类型,才会发生到服务端。

2、如果没获取到配置的话返回,如果获取到配置的话,则为这个类型初始化内存队列服务,设置一系列队列服务的参数,并且把队列的处理委托挂载我们提交数据到服务端的处理方法。换句话说是每一个类型都会有自己的内存队列服务,我们在MongodbInsertService的实现定义了一个静态字典用于保存内存队列服务的实现:

private static Dictionary<string, IMemoryQueueService> submitDataMemoryQueueServices = new Dictionary<string, IMemoryQueueService>();
                if (!submitDataMemoryQueueServices.ContainsKey(typeFullName))
                {
                    lock (submitDataMemoryQueueServices)
                    {
                        if (!submitDataMemoryQueueServices.ContainsKey(typeFullName))
                        {
                            var memoryQueueService = LocalServiceLocator.GetService<IMemoryQueueService>();
                            memoryQueueService.Init(new MemoryQueueServiceConfiguration(string.Format("{0}_{1}", ServiceName, typeFullName), InternalSubmitData)
                            {
                                ConsumeErrorAction = config.ConsumeErrorAction,
                                ConsumeIntervalMilliseconds = config.ConsumeIntervalMilliseconds,
                                ConsumeIntervalWhenErrorMilliseconds = config.ConsumeIntervalWhenErrorMilliseconds,
                                ConsumeItemCountInOneBatch = config.ConsumeItemCountInOneBatch,
                                ConsumeThreadCount = config.ConsumeThreadCount,
                                MaxItemCount = config.MaxItemCount,
                                NotReachBatchCountConsumeAction = config.NotReachBatchCountConsumeAction,
                                ReachMaxItemCountAction = config.ReachMaxItemCountAction,
                            });
                            submitDataMemoryQueueServices.Add(typeFullName, memoryQueueService);
                        }
                    }
                }

3、然后会判断是否已经提取过这个类型元数据了,如果没提取过则尝试提取元数据并加入缓存:

                if (!mongodbDatabaseDescriptionCache.ContainsKey(typeFullName))
                {
                    lock (mongodbDatabaseDescriptionCache)
                    {
                        if (!mongodbDatabaseDescriptionCache.ContainsKey(typeFullName))
                        {
                            MongodbDatabaseDescription mongodbDatabaseDescription = GetMongodbDatabaseDescription(item);
                            CheckMongodbDatabaseDescription(mongodbDatabaseDescription);
                            mongodbDatabaseDescriptionCache.Add(typeFullName, mongodbDatabaseDescription);
                        }
                    }
                }

4、把数据加入队列,等待队列服务在合适的时候调用处理方法(也就是发送到服务端):

 if (config.SubmitToServer)
                {
                    submitDataMemoryQueueServices[typeFullName].Enqueue(item);
                }

 

其实到这里为止,方法已经返回了,之后就是队列服务在后台的异步调用了。现在我们来深入一下细节,首先看一下GetMongodbDatabaseDescription是如何提取元数据的,这个方法返回的是MongodbDatabaseDescription,它的定义如下:

    [DataContract(Namespace = "Adhesive.Mongodb")]
    public class MongodbDatabaseDescription
    {
        [DataMember]
        public bool SentToServer { get; set; }

        [DataMember]
        public string TypeFullName { get; set; }

        [DataMember]
        public string DatabasePrefix { get; set; }

        [DataMember]
        public string CategoryName { get; set; }

        [DataMember]
        public string Name { get; set; }

        [DataMember]
        public string DisplayName { get; set; }

        [DataMember]
        public int ExpireDays { get; set; }

        [DataMember]
        public List<MongodbColumnDescription> MongodbColumnDescriptionList { get; set; }

        [DataMember]
        public List<MongodbEnumColumnDescription> MongodbEnumColumnDescriptionList { get; set; }
    }

在这里可以看到,我们主要解析的是MongodbPersistenceEntityAttribute,对于下一级的MongodbColumnDescriptionList ,我们主要是解析每一个列的元数据,而MongodbEnumColumnDescriptionList则提取所有枚举的信息。MongodbColumnDescription的定义如下:

    [DataContract(Namespace = "Adhesive.Mongodb")]
    public class MongodbColumnDescription
    {
        [DataMember]
        public string Name { get; set; }

        [DataMember]
        public string TypeName { get; set; }

        [DataMember]
        public bool IsArrayColumn { get; set; }

        [DataMember]
        public bool IsEntityColumn { get; set; }

        [DataMember]
        public string ColumnName { get; set; }

        [DataMember]
        public string DisplayName { get; set; }

        [DataMember]
        public string Description { get; set; }

        [DataMember]
        public bool ShowInTableView { get; set; }

        [DataMember]
        public bool IsTableColumn { get; set; }

        [DataMember]
        public bool IsTimeColumn { get; set; }

        [DataMember]
        public bool IsContextIdentityColumn { get; set; }

        [DataMember]
        public bool IsPrimaryKey { get; set; }

        [DataMember]
        public MongodbIndexOption MongodbIndexOption { get; set; }

        [DataMember]
        public MongodbFilterOption MongodbFilterOption { get; set; }

        [DataMember]
        public MongodbCascadeFilterOption MongodbCascadeFilterOption { get; set; }

        [DataMember]
        public MongodbSortOption MongodbSortOption { get; set; }
    }

这里很多数据都来自MongodbPersistenceItemAttribute和MongodbPresentationItemAttribute。再来看看MongodbEnumColumnDescription:

    [DataContract(Namespace = "Adhesive.Mongodb")]
    public class MongodbEnumColumnDescription
    {
        [DataMember]
        public string Name { get; set; }

        [DataMember]
        public Dictionary<string, string> EnumItems { get; set; }

    }

它就简单了,只是保存枚举的列名,和枚举每一项的数据。其实这些元数据提取本身没什么复杂的,可以想到是反射提取,并且其中还涉及到递归,需要深入每一个自定义类型,GetMongodbColumnDescription方法其中有一段这样的代码实现了递归:

                if (!type.Assembly.GlobalAssemblyCache && type != pi.DeclaringType)
                {
                    columnDescription.IsEntityColumn = true;
                    var properties = GetPropertyListFromCache(type);
                    if (properties != null)
                    {
                        foreach (var property in properties)
                        {
                            GetMongodbColumnDescription(typeFullName, fullName, columnDescriptionList, enumColumnDescriptionList, property);
                        }
                    }
                }

在提取元数据的时候,另一个重要的工作是缓存一些关键的PropertyInfo的配置,以便后期处理数据的时候使用:

    internal class ProperyInfoConfig
    {
        public bool IsCascadeFilterLevelOne { get; set; }

        public bool IsCascadeFilterLevelTwo { get; set; }

        public bool IsCascadeFilterLevelThree { get; set; }

        public bool IsDateColumn { get; set; }

        public bool IsTableName { get; set; }

        public bool IsIgnore { get; set; }

        public string ColumnName { get; set; }
    }

因为我们在提交数据之前,需要针对级联下拉的数据进行处理,把第二级的值设置为第一级的值加上第二级的值,第三级的值设置为一加二加三,这样在筛选的时候就会很方便;此外还需要替换列名,计算表名等等,只有缓存了PropertyInfo才能无需重新读取元数据:

 private static Dictionary<string, Dictionary<PropertyInfo, ProperyInfoConfig>> propertyConfigCache = new Dictionary<string, Dictionary<PropertyInfo, ProperyInfoConfig>>();

之前说了元数据提取部分时的逻辑,然后来看一下格式化数据时的逻辑,之前为内存队列服务的提交数据的委托挂载的方法主要实现如下:

  var mongodbDataList = items.Select(_ => ConvertItemToMongodbData(_)).Where(_ => _ != null).ToList();
                var desc = mongodbDatabaseDescriptionCache[typeFullName];
                WcfServiceLocator.GetSafeService<IMongodbServer>().SubmitData(mongodbDataList, desc.SentToServer ? null : desc);

先是获取要提交的数据,然后再获取元数据,如果有的话和主体数据一并提交到服务端。通过Wcf分布式数据服务获取到IMongodbServer,并调用它的SubmitData方法,定义如下:

[OperationContract]
        void SubmitData(IList<MongodbData> dataList, MongodbDatabaseDescription databaseDescription);

MongodbData的定义如下:

    [DataContract(Namespace = "Adhesive.Mongodb")]
    public class MongodbData
    {
        [DataMember]
        public string TypeFullName { get; set; }

        [DataMember]
        public string DatabaseName { get; set; }

        [DataMember]
        public string TableName { get; set; }

        [DataMember]
        public string Data { get; set; }
    }

在这里可以发现Data是字符串类型,那是因为我们把要提交的数据主体转换成了Json,否则我们是无法通过Wcf提交Dictionary<string, object>构成的一颗无限级树的。在这里,我们略去介绍ConvertItemToMongodbData的实现,它其实并不复杂,也是通过递归和反射无限级获取类的所有属性的值,并转换为Dictionary<string, object>,只不过在这里面需要处理列表类型、字典类型以及枚举。

 

至此为止,客户端的部分介绍完了,现在我们来看一下服务端部分。首先,服务端也有根据每一个类型的配置:

    [ConfigEntity(FriendlyName = "Mongodb服务端针对每个数据类型的配置")]
    public class MongodbServerConfigurationItem
    {
        [ConfigItem(FriendlyName = "类型完整名")]
        public string TypeFullName { get; set; }

        [ConfigItem(FriendlyName = "服务器名")]
        public string MongodbServerUrlName { get; set; }

        [ConfigItem(FriendlyName = "是否提交到数据库")]
        public bool SubmitToDatabase { get; set; }

        [ConfigItem(FriendlyName = "队列最大项数")]
        public int MaxItemCount { get; set; }

        [ConfigItem(FriendlyName = "消费的线程总数")]
        public int ConsumeThreadCount { get; set; }

        [ConfigItem(FriendlyName = "消费数据的时间间隔毫秒")]
        public int ConsumeIntervalMilliseconds { get; set; }

        [ConfigItem(FriendlyName = "遇到错误时消费数据的时间间隔毫秒")]
        public int ConsumeIntervalWhenErrorMilliseconds { get; set; }

        [ConfigItem(FriendlyName = "消费数据的批量项数")]
        public int ConsumeItemCountInOneBatch { get; set; }

        [ConfigItem(FriendlyName = "达到最大项数后的策略")]
        public MemoryQueueServiceReachMaxItemCountAction ReachMaxItemCountAction { get; set; }

        [ConfigItem(FriendlyName = "消费数据时不足批次数的策略")]
        public MemoryQueueServiceNotReachBatchCountConsumeAction NotReachBatchCountConsumeAction { get; set; }

        [ConfigItem(FriendlyName = "消费数据遇到错误的策略")]
        public MemoryQueueServiceConsumeErrorAction ConsumeErrorAction { get; set; }

        public MongodbServerConfigurationItem()
        {
            TypeFullName = "";
            SubmitToDatabase = true;
            ReachMaxItemCountAction = MemoryQueueServiceReachMaxItemCountAction.AbandonOldItems
                .Add(MemoryQueueServiceReachMaxItemCountAction.LogExceptionEveryOneSecond);
            ConsumeErrorAction = MemoryQueueServiceConsumeErrorAction.AbandonAndLogException;
            ConsumeThreadCount = Environment.ProcessorCount;
            ConsumeIntervalMilliseconds = 10;
            ConsumeIntervalWhenErrorMilliseconds = 1000;
            ConsumeItemCountInOneBatch = 100;
            NotReachBatchCountConsumeAction = MemoryQueueServiceNotReachBatchCountConsumeAction.ConsumeAllItems;
            MaxItemCount = 100000;
        }
    }

这个配置和客户端的配置差不多,只不过这里把是否提交到服务端改为了是否提交到数据库。在获取了配置之后,同样把数据提交到内存队列,然后由内存队列提交到数据库。核心代码如下:

            try
            {
                var sw = Stopwatch.StartNew();

                var server = CreateMasterMongoServer(typeFullName);
                if (server != null)
                {
                    var database = server.GetDatabase(item.DatabaseName);
                    var collection = database.GetCollection(item.TableName);
                    var documentList = new List<BsonDocument>();
                    JavaScriptSerializer s = new JavaScriptSerializer();
                    mongodbDataList.ForEach(i =>
                    {
                        var dic = s.DeserializeObject(i.Data) as IDictionary;
                        var document = new BsonDocument().Add(dic);
                        documentList.Add(document);
                    });

                    collection.InsertBatch(documentList);
                    LocalLoggingService.Debug("Mongodb服务端成功服务提交 {0} 条数据到数据库,类型是 '{1}',耗时 {2} 毫秒", documentList.Count, typeFullName, sw.ElapsedMilliseconds);
                }

            }
            catch (Exception ex)
            {
                AppInfoCenterService.ExceptionService.Handle(ex, categoryName: ServiceName, subcategoryName: typeFullName, description: "写入数据出现错误", extraInfo: new ExtraInfo
                {
                    DisplayItems = new Dictionary<string, string>()
                    {
                        {"DatabaseName" , item.DatabaseName}, 
                        {"TableName", item.TableName}
                    }
                });
            }
        }

首先是Json反序列化获取到数据,然后转换为BsonDocument,最后批量提交到数据库中。

本文介绍了Mongodb数据服务的插入数据部分在客户端和服务端之间的逻辑,下一篇将介绍Mongodb数据服务查询数据的部分。

作者: lovecindywang
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
相关文章
|
NoSQL MongoDB 微服务
微服务——MongoDB实战演练——文章评论的基本增删改查
本节介绍了文章评论的基本增删改查功能实现。首先,在`cn.itcast.article.dao`包下创建数据访问接口`CommentRepository`,继承`MongoRepository`以支持MongoDB操作。接着,在`cn.itcast.article.service`包下创建业务逻辑类`CommentService`,通过注入`CommentRepository`实现保存、更新、删除及查询评论的功能。最后,新建Junit测试类`CommentServiceTest`,对保存和查询功能进行测试,并展示测试结果截图,验证功能的正确性。
285 2
|
NoSQL Java MongoDB
微服务——MongoDB实战演练——文章评论实体类的编写
本节主要介绍文章评论实体类的编写,创建了包`cn.itcast.article.po`用于存放实体类。具体实现中,`Comment`类通过`@Document`注解映射到MongoDB的`comment`集合,包含主键、内容、发布时间、用户ID、昵称等属性,并通过`@Indexed`和`@CompoundIndex`注解添加单字段及复合索引,以提升查询效率。同时提供了Mongo命令示例,便于理解和操作。
224 2
|
NoSQL 测试技术 MongoDB
微服务——MongoDB实战演练——根据上级ID查询文章评论的分页列表
本节介绍如何根据上级ID查询文章评论的分页列表,主要包括以下内容:(1)在CommentRepository中新增`findByParentid`方法,用于按父ID查询子评论分页列表;(2)在CommentService中新增`findCommentListPageByParentid`方法,封装分页逻辑;(3)提供JUnit测试用例,验证功能正确性;(4)使用Compass插入测试数据并执行测试,展示查询结果。通过这些步骤,实现对评论的高效分页查询。
219 0
|
NoSQL MongoDB 微服务
微服务——MongoDB实战演练——文章微服务模块搭建
本节介绍文章微服务模块的搭建过程,主要包括以下步骤:(1)创建项目工程 *article*,并在 *pom.xml* 中引入依赖;(2)配置 *application.yml* 文件;(3)创建启动类 *cn.itcast.article.ArticleApplication*;(4)启动项目,确保控制台无错误提示。通过以上步骤,完成文章微服务模块的基础构建与验证。
185 0
|
NoSQL MongoDB 数据库
使用 docker 快速搭建开发环境的 mongodb 服务
本指南介绍如何使用 Docker 和 Docker Compose 部署 MongoDB 和 Mongo Express。首先,通过 Docker 命令分别启动 MongoDB(镜像 `mongo:7.0.14`)和 Mongo Express(镜像 `mongo-express:1.0.2-20-alpine3.19`),并配置环境变量确保两者能正确连接。接着,提供了一个 `docker-compose.yaml` 文件示例,包含 MongoDB 数据卷、健康检查及服务依赖配置,简化多容器管理。
2169 2
|
存储 NoSQL MongoDB
基于阿里云数据库MongoDB版,微财数科“又快又稳”服务超7000万客户
选择MongoDB主要基于其灵活的数据模型、高性能、高可用性、可扩展性、安全性和强大的分析能力。
|
持续交付 jenkins Devops
WPF与DevOps的完美邂逅:从Jenkins配置到自动化部署,全流程解析持续集成与持续交付的最佳实践
【8月更文挑战第31天】WPF与DevOps的结合开启了软件生命周期管理的新篇章。通过Jenkins等CI/CD工具,实现从代码提交到自动构建、测试及部署的全流程自动化。本文详细介绍了如何配置Jenkins来管理WPF项目的构建任务,确保每次代码提交都能触发自动化流程,提升开发效率和代码质量。这一方法不仅简化了开发流程,还加强了团队协作,是WPF开发者拥抱DevOps文化的理想指南。
374 1
|
NoSQL BI 数据处理
【超实用攻略】MongoDB 聚合框架:从入门到精通,带你解锁数据处理新姿势!
【8月更文挑战第24天】MongoDB是一款以其灵活性和高性能闻名的NoSQL数据库。其强大的聚合框架采用管道式处理,允许用户定义多个数据处理阶段如过滤、分组等。本文通过示例数据库`orders`和`products`,演示如何利用聚合框架计算各产品的总销售额。示例代码展示了使用`$lookup`连接两集合、`$unwind`打平数组及`$group`按产品ID分组并计算总销售额的过程。这突显了聚合框架处理复杂查询的强大能力,是进行数据分析和报表生成的理想选择。
312 3
|
存储 NoSQL JavaScript
MongoDB存储过程实战:聚合框架、脚本、最佳实践,一文全掌握!
【8月更文挑战第24天】MongoDB是一款备受欢迎的文档型NoSQL数据库,以灵活的数据模型和强大功能著称。尽管其存储过程支持不如传统关系型数据库,本文深入探讨了MongoDB在此方面的最佳实践。包括利用聚合框架处理复杂业务逻辑、封装业务逻辑提高复用性、运用JavaScript脚本实现类似存储过程的功能以及考虑集成其他工具提升数据处理能力。通过示例代码展示如何创建订单处理集合并定义验证规则,虽未直接实现存储过程,但有效地演示了如何借助JavaScript脚本处理业务逻辑,为开发者提供更多实用指导。
334 2
|
存储 NoSQL 数据处理
【MongoDB大神级操作】揭秘聚合框架,让你的数据处理能力瞬间飙升,秒变数据界的超级英雄!
【8月更文挑战第24天】MongoDB是一款备受欢迎的非关系型数据库,以其灵活的文档模型和出色的可扩展性著称。其聚合框架尤其亮眼,能高效地对数据库中的数据执行复杂的转换与聚合操作,无需将数据导出到应用端处理,极大提升了数据处理的效率与灵活性。例如,在一个大型电商数据库中,聚合框架能轻松分析出最热卖的商品或特定时段内某类别商品的销售总额。通过一系列管道操作,如$unwind、$group等,可以对数据进行逐步处理并得到最终结果,同时还支持过滤、排序、分页等多种操作,极大地丰富了数据处理的能力,成为进行数据分析、报表生成及复杂业务逻辑实现的强大工具。
264 2

推荐镜像

更多