Adhesive框架系列文章--Mongodb数据服务模块实现(下)

本文涉及的产品
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
简介: 在本文中我们会着重介绍一下Mongodb数据服务查询部分的实现,也就是通用后台用到的查询服务。 首先,为了性能考虑,我们的每一个Mongodb集群都可以支持主从分离,也就是查询可以使用Slave的Mongodb: 其中的同步延迟配置,用于配置在查询的时候缓存多久之前的数据,因为同步的延迟问题,缓存太新的数据很可能导致数据不完整。

在本文中我们会着重介绍一下Mongodb数据服务查询部分的实现,也就是通用后台用到的查询服务。

首先,为了性能考虑,我们的每一个Mongodb集群都可以支持主从分离,也就是查询可以使用Slave的Mongodb:

image

其中的同步延迟配置,用于配置在查询的时候缓存多久之前的数据,因为同步的延迟问题,缓存太新的数据很可能导致数据不完整。

并且,我们也可以灵活配置每一个类型保存到的集群,实现数据的手动分区:

image

阅读过之前文章的读者知道,在通用后台的高级数据筛选中,我们会列出一些搜索选项,比如:

image

并且,在显示数据的时候,我们也会需要把元数据和实际存储的数据组合起来返回给客户端。为了效率的考虑,我们把所有的这些索引数据、数据库状态数据、各个类型的元数据等等全部缓存在了Mongodb数据服务服务端的内存中。在获取数据缓存数据的同时,我们也会进行索引构建的操作,所有的这些都是在MongodbServerMaintainceCenter中进行的。先来看一下初始化方法:

        public static void Init()
        {
            if (servers.Count == 0)
            {
                BsonClassMap.RegisterClassMap<MongodbDatabaseDescription>(cm =>
                {
                    cm.AutoMap();
                    cm.SetIgnoreExtraElements(true);
                    cm.SetIdMember(cm.GetMemberMap("TypeFullName"));
                });

                BsonClassMap.RegisterClassMap<MongodbColumnDescription>(cm =>
                {
                    cm.AutoMap();
                    cm.SetIgnoreExtraElements(true);
                });

                foreach (var url in MongodbServerConfiguration.GetConfig().MongodbServerUrls)
                {
                    var server = url.Value;
                    var thread = new Thread(() =>
                    {
                        while (true)
                        {
                            Maintainance(server);
                            Thread.Sleep(MongodbServerConfiguration.GetConfig().MaintainceIntervalMilliSeconds);
                        }
                    })
                    {
                        IsBackground = true,
                        Name = string.Format("MongodbServerMaintainanceCenter_{0}", url.Key)
                    };
                    thread.Start();
                }
            }
        }

我们首先使用代码方式配置了一些类型的主键映射以及忽略额外的元素。使用代码方式配置的好处是可以不造成很强的侵入性,也就是实体类型所在的程序集可以不依赖Mongodb的程序集(如果使用Attribute进行配置就需要引用相关程序集了)。

然后在这个Maintainance方法中,我们根据配置定时进行数据维护:

private static Dictionary<MongodbServerUrl, ServerInfo> servers = new Dictionary<MongodbServerUrl, ServerInfo>();

最后,内存中会根据MongodbServerUrl也就是集群来保存每一个集群的信息:

    [DataContract(Namespace = "Adhesive.Mongodb")]
    public class ServerInfo
    {
        [DataMember]
        public MongodbServerUrl Url { get; set; }

        [DataMember]
        public List<DatabaseInfo> Databases { get; set; }

        [DataMember]
        public List<MongodbDatabaseDescription> Descriptions { get; set; }
    }

这是最顶层的服务器信息,这里保存了:

1)集群地址信息

2)所有数据库的信息

3)元数据描述

需要注意到这里的类型都是WCF的数据契约之所以这么做,是因为这些数据是数据服务后台的管理页面需要用到的,也就是通过WCF服务对外开放的。

第二,之所以不把元数据和数据库信息保存在一起是因为我们按照年月分库的。DatabaseInfo中保存的是实际数据库中每一个数据库的信息,而MongodbDatabaseDescription中保存的是每一个类型的元数据,后者是和分库分表没关系的。至于MongodbDatabaseDescription的数据结构,我们在上一篇文章中已经介绍了,在这里继续往下看DatabaseInfo:

    [DataContract(Namespace = "Adhesive.Mongodb")]
    public class DatabaseInfo
    {
        [DataMember]
        public string DatabaseName { get; set; }

        [DataMember]
        public DateTime DatabaseDate { get; set; }

        [DataMember]
        public string DatabasePrefix { get; set; }

        [DataMember]
        public DatabaseStatus DatabaseStatus { get; set; }

        [DataMember]
        public List<CollectionInfo> Collections { get; set; }
    }

除了数据库名、数据库所属年月(分库)、数据库前缀(不带上年月的部分)之外,主要来看一下DatabaseStatus和CollectionInfo。

前者是数据库当前的状态:

    [DataContract(Namespace = "Adhesive.Mongodb")]
    public class DatabaseStatus
    {
        [DataMember]
        public double AverageObjectSize { get; set; }

        [DataMember]
        public int CollectionCount { get; set; }

        [DataMember]
        public long DataSize { get; set; }

        [DataMember]
        public int ExtentCount { get; set; }

        [DataMember]
        public long FileSize { get; set; }

        [DataMember]
        public int IndexCount { get; set; }

        [DataMember]
        public long IndexSize { get; set; }

        [DataMember]
        public long ObjectCount { get; set; }

        [DataMember]
        public long StorageSize { get; set; }
    }

而后者是一个数据库下的集合信息:

    [DataContract(Namespace = "Adhesive.Mongodb")]
    public class CollectionInfo
    {
        [DataMember]
        public string CollectionName { get; set; }

        [DataMember]
        public CollectionStatus CollectionStatus { get; set; }

        [DataMember]
        public List<TextboxFilterColumnInfo> TextboxFilterColumns { get; set; }

        [DataMember]
        public List<ListFilterColumnInfo> ListFilterColumns { get; set; }

        [DataMember]
        public List<CascadeFilterColumnInfo> CascadeFilterColumns { get; set; }
    }

这里包括集合或者说表名、集合状态,以及三种类型过滤列的信息。集合状态CollectionStatus很明显就是表的状态:

    [DataContract(Namespace = "Adhesive.Mongodb")]
    public class CollectionStatus
    {
        [DataMember]
        public DateTime LastEnsureIndexTime { get; set; }

        [DataMember]
        public double AverageObjectSize { get; set; }

        [DataMember]
        public long DataSize { get; set; }

        [DataMember]
        public int ExtentCount { get; set; }

        [DataMember]
        public int Flags { get; set; }

        [DataMember]
        public int IndexCount { get; set; }

        [DataMember]
        public List<IndexStatus> IndexStatusList { get; set; }

        [DataMember]
        public long LastExtentSize { get; set; }

        [DataMember]
        public string Namespace { get; set; }

        [DataMember]
        public long ObjectCount { get; set; }

        [DataMember]
        public double PaddingFactor { get; set; }

        [DataMember]
        public long StorageSize { get; set; }

        [DataMember]
        public long TotalIndexSize { get; set; }
    }

每一个集合下都有若干索引,因此这里还有索引状态:

    [DataContract(Namespace = "Adhesive.Mongodb")]
    public class IndexStatus
    {
        [DataMember]
        public string Name { get; set; }

        [DataMember]
        public string Namespace { get; set; }

        [DataMember]
        public long Size { get; set; }

    }

回到三种过滤列信息的定义:

    [DataContract(Namespace = "Adhesive.Mongodb")]
    public class TextboxFilterColumnInfo
    {
        [DataMember]
        public string ColumnName { get; set; }
    }

    [DataContract(Namespace = "Adhesive.Mongodb")]
    public class ListFilterColumnInfo
    {
        [DataMember]
        public string ColumnName { get; set; }

        [DataMember]
        public List<ItemPair> DistinctValues { get; set; }
    }

    [DataContract(Namespace = "Adhesive.Mongodb")]
    public class CascadeFilterColumnInfo
    {
        [DataMember]
        public string ColumnName { get; set; }

        [DataMember]
        public List<string> DistinctValues { get; set; }
    }

很明显,我们这里缓存了每一个需要用于过滤列的所有Distinct的值,这样后台在打开高级数据过滤的时候就不需要再从数据库去获取这些值了。对于文本列是不需要枚举所有Distinct值的,对于列表列值的类型可以是枚举,而不仅仅是字符串,所以我们还额外定义了ItemPair:

    [DataContract(Namespace = "Adhesive.Mongodb")]
    [Serializable]
    public class ItemPair
    {
        [DataMember]
        public string Name { get; set; }

        [DataMember]
        public object Value { get; set; }
    }

看到这里就很清楚了,Maintainance的工作是获取下列数据并缓存在内存中:

1)和实际表实际库没关系的元数据

2)每一个服务器中实际数据库实际数据表的信息,包括表中各个需要过滤的列的Distinct值

3)数据表数据库的实际状态,包括表中索引的状态

由于需要数据具有一定的实时性,我们的Maintainance工作是基于Master的。现在来看一下Maintainance的流程:

1)先获取所有的元数据

2)根据元数据获取所有数据库

3)并行方式获取每一个数据库的状态,以及数据库下的所有表,

4)为表中的所有索引列进行索引,对过滤列进行Distinct数据获取

5)对过期数据进行删除,必要的时候删除表以及库

6)把DatabaseInfo存放到缓存中去,缓存时间为Maintainance周期

7)更新本地的ServerInfo

 

MongodbServerMaintainceCenter存储了大部分必要的元数据和状态信息,因此在后台页面的主页面中基本都是直接访问内存来获取到这些数据的。只有具体的表视图、状态视图、统计视图、分组统计视图等才会去进行数据库的访问,提高了不少性能。在源代码中也根据不同的需求把相关的服务实现放在了独立的文件中:

image

其中大部分的实现都是差不多的,在这里我们仅仅把列表视图拿过来作为一个例子,也就是下面的服务:

public List<TableData> GetTableData(string databasePrefix, List<string> tableNames, DateTime beginTime, DateTime endTime, int pageIndex, int pageSize, Dictionary<string, object> filters)

首先,按照时间和数据库前缀获取所有的数据库,获取类型全名,获取列元数据,获取枚举元数据,获取时间列:

  var databaseInfos = MongodbServerMaintainceCenter.GetDatabaseInfos(databasePrefix, beginTime, endTime);
                if (databaseInfos.Count == 0) return null;

                var typeFullName = MongodbServerMaintainceCenter.GetTypeFullName(databasePrefix);
                var columnDescriptions = MongodbServerMaintainceCenter.GetMongodbColumnDescriptions(databasePrefix);
                var enumColumnDescriptions = MongodbServerMaintainceCenter.GetMongodbEnumColumnDescriptions(databasePrefix);
                var statTimeColumn = columnDescriptions.SingleOrDefault(c => c.IsTimeColumn);
                if (statTimeColumn == null) return null;

获取查询条件,获取排序列,获取显示的列,获取主键列,然后把主键列和时间列强制作为需要在列表视图中显示的列:

   var filterquery = Query.Null;
                if (filters != null)
                {
                    foreach (var item in filters)
                    {
                        if (item.Value != null)
                        {
                            if (item.Value is string && item.Value.ToString().Split(',').Length > 1)
                            {
                                var values = item.Value.ToString().Split(',');
                                filterquery = Query.And(filterquery, Query.In(item.Key, values.Select(val => BsonValue.Create(val)).ToArray()));
                            }
                            else
                            {
                                filterquery = Query.And(filterquery, Query.EQ(item.Key, BsonValue.Create(item.Value)));
                            }
                        }
                    }
                }
                var query = Query.And(Query.LT(statTimeColumn.ColumnName, endTime).GTE(beginTime), filterquery);

                var sortColumn = columnDescriptions.Where(desc => desc.MongodbSortOption != MongodbSortOption.None).SingleOrDefault();
                var sort = SortBy.Null;
                if (sortColumn != null)
                {
                    if (sortColumn.MongodbSortOption == MongodbSortOption.Descending)
                        sort = SortBy.Descending(sortColumn.ColumnName);
                    else
                        sort = SortBy.Ascending(sortColumn.ColumnName);
                }

                var showColumns = columnDescriptions.Where(desc => desc.ShowInTableView).ToList();
                var pkColumn = columnDescriptions.SingleOrDefault(desc => desc.IsPrimaryKey);
                if (pkColumn == null) return null;
                var fields = showColumns.Select(c => c.ColumnName).ToList();

                if (fields.Contains(statTimeColumn.ColumnName))
                {
                    fields.Remove(statTimeColumn.ColumnName);
                }
                fields.Insert(0, statTimeColumn.ColumnName);

                if (fields.Contains(pkColumn.ColumnName))
                {
                    fields.Remove(pkColumn.ColumnName);
                }
                fields.Insert(0, pkColumn.ColumnName);

然后,使用并行方式同时获取多个表的信息:

Parallel.ForEach(tableNames, tableName =>
                {
                    var tables = new List<Table>();
                    var remainRows = pageSize;
                    var totalCount = 0;
                    foreach (var databaseInfo in databaseInfos)
                    {
                        var table = new List<Dictionary<string, string>>();
                        var databaseName = databaseInfo.DatabaseName;
                        var database = server.GetDatabase(databaseName);
                        var collection = database.GetCollection(tableName);
                        var count = collection.Count(query);
                        totalCount += count;
                        if (remainRows > 0)
                        {
                            var q = collection.Find(query).SetFields(fields.ToArray())
                                .SetLimit(remainRows).SetSkip(pageIndex * pageSize);
                            if (sort != SortBy.Null)
                                q = q.SetSortOrder(sort);
                            var result = q.ToList();
                            foreach (var item in result)
                            {
                                var row = new Dictionary<string, string>();
                                foreach (var element in item)
                                {
                                    var details = new List<Detail>();
                                    InternalGetDetailData(string.Empty, details, element, columnDescriptions, enumColumnDescriptions, pkColumn.ColumnName);
                                    foreach (var detail in details)
                                    {
                                        InternalGetTableData(detail, row);
                                    }
                                }
                                table.Add(row);
                            }
                            var t = new Table
                            {
                                Data = table,
                                DatabaseName = databaseName,
                                DatabasePrefix = databasePrefix,
                                TotalCount = 0,
                            };
                            tables.Add(t);
                            remainRows -= table.Count;
                        }
                    }

                    tables.ForEach(t => t.TotalCount = totalCount);
                    data.Add(new TableData
                    {
                        TableName = tableName,
                        Tables = tables,
                        PkColumnName = pkColumn.ColumnName,
                        PkColumnDisplayName = pkColumn.DisplayName,
                    });
                });

在获取到了数据也就是BsonElement之后,通过InternalGetDetailData来把实际数据和元数据进行组合:

       private void InternalGetDetailData(string prefix, List<Detail> data, BsonElement element, List<MongodbColumnDescription> descriptions, List<MongodbEnumColumnDescription> enumDescriptions, string pkColumnName)
        {
            try
            {
                if (element.Value.IsObjectId || element.Value.IsBsonNull) return;

                var columnName = string.IsNullOrEmpty(prefix) ? element.Name : string.Format("{0}.{1}", prefix, element.Name);
                var entityColumnName = columnName;
                if (columnName.Contains("__") && columnName.Contains(".") && columnName.Split('.').Length > 2)
                {
                    var parts = columnName.Split('.');
                    entityColumnName = "";
                    foreach (var part in parts)
                    {
                        if (part.IndexOf("__") >= 0)
                            entityColumnName += part.Substring(0, part.IndexOf("__"));
                        else
                            entityColumnName += part;
                        entityColumnName += ".";
                    }
                    entityColumnName = entityColumnName.TrimEnd('.');
                }

                var description = descriptions.SingleOrDefault(d => d.ColumnName == entityColumnName) ?? new MongodbColumnDescription
                {
                    Description = "",
                    DisplayName = entityColumnName,
                    ColumnName = entityColumnName,
                };

                if (element.Value.IsBsonDocument)
                {
                    var subData = new List<Detail>();
                    foreach (var subDocument in element.Value.AsBsonDocument.Elements)
                    {
                        InternalGetDetailData(columnName, subData, subDocument, descriptions, enumDescriptions, pkColumnName);
                    }
                    data.Add(new Detail
                    {
                        Description = description.Description,
                        DisplayName = description.DisplayName.Replace('.', '_'),
                        ColumnName = columnName,
                        Value = "",
                        SubDetails = subData,
                    });
                }
                else
                {
                    string value = "";

                    if (element.Value.IsBsonDateTime)
                    {
                        TimeZone ze = TimeZone.CurrentTimeZone;
                        value = (element.Value.AsDateTime + ze.GetUtcOffset(DateTime.Now)).ToString();
                    }
                    else
                    {
                        var enumColumnDescription = enumDescriptions.SingleOrDefault(e => e.Name == columnName);
                        if (enumColumnDescription == null)
                        {
                            value = element.Value.RawValue.ToString();
                        }
                        else
                        {
                            var enumValue = 0;
                            if (int.TryParse(element.Value.RawValue.ToString(), out enumValue) && enumColumnDescription.EnumItems.ContainsKey(enumValue.ToString()))
                            {
                                value = enumColumnDescription.EnumItems[enumValue.ToString()];
                            }
                            else
                            {
                                value = element.Value.RawValue.ToString();
                            }
                        }
                    }

                    var detail = new Detail
                    {
                        Description = description.Description,
                        DisplayName = description.DisplayName.Replace('.', '_'),
                        ColumnName = columnName,
                        Value = value,
                    };
                    data.Add(detail);
                }
            }
            catch (Exception ex)
            {
                LocalLoggingService.Error(ex.ToString());
                throw;
            }
        }

Detail的定义如下:

    public class Detail
    {
        public string ColumnName { get; set; }

        public string DisplayName { get; set; }

        public string Description { get; set; }

        public string Value { get; set; }

        public List<Detail> SubDetails { get; set; }
    }

这很明显是一棵树。在这里我们要把元数据和实际数据进行对接,包括对枚举数据进行解析。

最后,还需要通过InternalGetTableData把树进一步进行转换成平面的数据:

 if (detail.SubDetails != null)
                {
                    foreach (var sub in detail.SubDetails)
                    {
                        var subdic = new Dictionary<string, string>();
                        InternalGetTableData(sub, subdic);
                        foreach (var item in subdic)
                        {
                            row.Add(item.Key, item.Value);
                        }
                    }
                }
                else
                {
                    row.Add(detail.DisplayName.Replace('.','_'), detail.Value == null ? "" : detail.Value.ToString());
                }

整个过程结束,总结一下获取列表视图数据的过程:

1)MongodbServerMaintainceCenter会尽量多构建和缓存一些元数据、实时状态、索引数据

2)必要的数据从Mongodb中获取后,再把缓存中的元数据和实际数据进行“对接”(主要是递归树为每一个层次附上DisplayName)

3)对于列表视图数据应该是平面的,最后“压”成平面返回,成为我们需要的结果:

image

作者: lovecindywang
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
相关实践学习
MongoDB数据库入门
MongoDB数据库入门实验。
快速掌握 MongoDB 数据库
本课程主要讲解MongoDB数据库的基本知识,包括MongoDB数据库的安装、配置、服务的启动、数据的CRUD操作函数使用、MongoDB索引的使用(唯一索引、地理索引、过期索引、全文索引等)、MapReduce操作实现、用户管理、Java对MongoDB的操作支持(基于2.x驱动与3.x驱动的完全讲解)。 通过学习此课程,读者将具备MongoDB数据库的开发能力,并且能够使用MongoDB进行项目开发。 &nbsp; 相关的阿里云产品:云数据库 MongoDB版 云数据库MongoDB版支持ReplicaSet和Sharding两种部署架构,具备安全审计,时间点备份等多项企业能力。在互联网、物联网、游戏、金融等领域被广泛采用。 云数据库MongoDB版(ApsaraDB for MongoDB)完全兼容MongoDB协议,基于飞天分布式系统和高可靠存储引擎,提供多节点高可用架构、弹性扩容、容灾、备份回滚、性能优化等解决方案。 产品详情: https://www.aliyun.com/product/mongodb
相关文章
|
1月前
|
存储 NoSQL MongoDB
数据的存储--MongoDB文档存储(二)
数据的存储--MongoDB文档存储(二)
|
29天前
|
存储 NoSQL MongoDB
基于阿里云数据库MongoDB版,微财数科“又快又稳”服务超7000万客户
选择MongoDB主要基于其灵活的数据模型、高性能、高可用性、可扩展性、安全性和强大的分析能力。
|
1月前
|
NoSQL MongoDB 数据库
使用NimoShake将数据从AWS DynamoDB迁移至阿里云MongoDB
使用NimoShake将数据从AWS DynamoDB迁移至阿里云MongoDB
|
1月前
|
存储 NoSQL 关系型数据库
数据的存储--MongoDB文档存储(一)
数据的存储--MongoDB文档存储(一)
|
3月前
|
持续交付 jenkins Devops
WPF与DevOps的完美邂逅:从Jenkins配置到自动化部署,全流程解析持续集成与持续交付的最佳实践
【8月更文挑战第31天】WPF与DevOps的结合开启了软件生命周期管理的新篇章。通过Jenkins等CI/CD工具,实现从代码提交到自动构建、测试及部署的全流程自动化。本文详细介绍了如何配置Jenkins来管理WPF项目的构建任务,确保每次代码提交都能触发自动化流程,提升开发效率和代码质量。这一方法不仅简化了开发流程,还加强了团队协作,是WPF开发者拥抱DevOps文化的理想指南。
82 1
|
3月前
|
存储 NoSQL JavaScript
MongoDB存储过程实战:聚合框架、脚本、最佳实践,一文全掌握!
【8月更文挑战第24天】MongoDB是一款备受欢迎的文档型NoSQL数据库,以灵活的数据模型和强大功能著称。尽管其存储过程支持不如传统关系型数据库,本文深入探讨了MongoDB在此方面的最佳实践。包括利用聚合框架处理复杂业务逻辑、封装业务逻辑提高复用性、运用JavaScript脚本实现类似存储过程的功能以及考虑集成其他工具提升数据处理能力。通过示例代码展示如何创建订单处理集合并定义验证规则,虽未直接实现存储过程,但有效地演示了如何借助JavaScript脚本处理业务逻辑,为开发者提供更多实用指导。
68 2
|
3月前
|
NoSQL 安全 MongoDB
【MongoDB深度揭秘】你的更新操作真的安全了吗?MongoDB fsync机制大起底,数据持久化不再是谜!
【8月更文挑战第24天】MongoDB是一款备受欢迎的NoSQL数据库,以其灵活的文档模型和强大的查询能力著称。处理关键业务数据时,数据持久化至关重要。本文深入探讨MongoDB的写入机制,特别是更新操作时的fsync行为。MongoDB先将数据更新至内存以提升性能,而非直接写入磁盘。fsync的作用是确保数据从内存同步到磁盘,但MongoDB并非每次更新后都立即执行fsync。通过设置不同的写入关注级别(如w:0、w:1和w:majority),可以平衡数据持久性和性能。
48 1
|
3月前
|
持续交付 jenkins C#
“WPF与DevOps深度融合:从Jenkins配置到自动化部署全流程解析,助你实现持续集成与持续交付的无缝衔接”
【8月更文挑战第31天】本文详细介绍如何在Windows Presentation Foundation(WPF)项目中应用DevOps实践,实现自动化部署与持续集成。通过具体代码示例和步骤指导,介绍选择Jenkins作为CI/CD工具,结合Git进行源码管理,配置构建任务、触发器、环境、构建步骤、测试及部署等环节,显著提升开发效率和代码质量。
73 0
|
3月前
|
持续交付 C# 敏捷开发
“敏捷之道:揭秘WPF项目中的快速迭代与持续交付——从需求管理到自动化测试,打造高效开发流程的全方位指南”
【8月更文挑战第31天】敏捷开发是一种注重快速迭代和持续交付的软件开发方法,通过短周期开发提高产品质量并快速响应变化。本文通过问题解答形式,探讨在Windows Presentation Foundation(WPF)项目中应用敏捷开发的最佳实践,涵盖需求管理、版本控制、自动化测试及持续集成等方面,并通过具体示例代码展示其实施过程,帮助团队提升代码质量和开发效率。
72 0
|
1月前
|
存储 关系型数据库 MySQL
一个项目用5款数据库?MySQL、PostgreSQL、ClickHouse、MongoDB区别,适用场景
一个项目用5款数据库?MySQL、PostgreSQL、ClickHouse、MongoDB——特点、性能、扩展性、安全性、适用场景比较