EntityFramework 中支持 BulkInsert 扩展

本文涉及的产品
云数据库 RDS SQL Server,基础系列 2核4GB
大数据开发治理平台DataWorks,资源组抵扣包 750CU*H
简介:

前言

很显然,你应该不至于使用 EntityFramework 直接插入 10W 数据到数据库中,那可能得用上个几分钟。EntityFramework 最被人诟病的地方就是它的性能,处理大量数据时的效率。此种条件下,通常会转回使用 ADO.NET 来完成任务。

但是,如果已经在项目中使用了 EntityFramework,如果碰到需要直接向数据库中插入 10W 的数据的需求,引入 ADO.NET 和 SqlBulkCopy 的组合将打破 EntityFramework 作为 ORM 所带来的优势,我们不得不再次去编写那些 SQL 语句,关注表结构的细节,相应的代码可维护性也在下降。

那么,假设我们将 SqlBulkCopy 的功能封装为 EntityFramework 中的一个扩展方法,通过接口像外暴露 BulkInsert 方法。这样,我们既没有改变使用 EntityFramework 的习惯,同时也隐藏了 SqlBulkCopy 的代码细节,更重要的是,合理的封装演进出复用的可能性,可以在多个 Entity 表中使用。

环境准备

以下测试基于 EntityFramework 6.0.2 版本。

首先定义一个 Customer 类:

复制代码
1   public class Customer
2   {
3     public long Id { get; set; }
4     public string Name { get; set; }
5     public string Address { get; set; }
6     public string Phone { get; set; }
7   }
复制代码

通过 CustomerMap 类将 Entity 映射到数据库表结构:

复制代码
 1   public class CustomerMap : EntityTypeConfiguration<Customer>
 2   {
 3     public CustomerMap()
 4     {
 5       // Primary Key
 6       this.HasKey(t => t.Id);
 7 
 8       // Properties
 9       this.Property(t => t.Name)
10           .IsRequired()
11           .HasMaxLength(256);
12 
13       this.Property(t => t.Phone)
14           .IsRequired()
15           .HasMaxLength(256);
16 
17       // Table & Column Mappings
18       this.ToTable("Customer", "STORE");
19       this.Property(t => t.Id).HasColumnName("Id");
20       this.Property(t => t.Name).HasColumnName("Name");
21       this.Property(t => t.Address).HasColumnName("Address");
22       this.Property(t => t.Phone).HasColumnName("Phone");
23     }
24   }
复制代码

我们定义数据库的名字为 “Retail”,则使用 RetailEntities 类来实现 DbContext :

复制代码
 1   public class RetailEntities : DbContext
 2   {
 3     static RetailEntities()
 4     {
 5       Database.SetInitializer<RetailEntities>(
 6         new DropCreateDatabaseAlways<RetailEntities>());
 7     }
 8 
 9     public RetailEntities()
10       : base("Name=RetailEntities")
11     {
12     }
13 
14     public DbSet<Customer> Customers { get; set; }
15 
16     protected override void OnModelCreating(DbModelBuilder modelBuilder)
17     {
18       modelBuilder.Configurations.Add(new CustomerMap());
19     }
20   }
复制代码

将 DatabaseInitializer 设置为 DropCreateDatabaseAlways,这样我们可以保证每次都针对新表进行测试。

如果需要更复杂的模型,我们将基于如下的模型进行测试:

测试主机

数据库:Microsoft SQL Server 2012 (64-bit)

EntityFramework 插入 10W 数据需要多久

我们先来看下EntityFramework 插入 10W 数据需要多久。

构造 10W 个 Customer 实例:

复制代码
 1       int customerCount = 100000;
 2 
 3       List<Customer> customers = new List<Customer>();
 4       for (int i = 0; i < customerCount; i++)
 5       {
 6         Customer customer = new Customer()
 7         {
 8           Name = "Dennis Gao" + i,
 9           Address = "Beijing" + i,
10           Phone = "18888888888" + i,
11         };
12         customers.Add(customer);
13 
14         Console.Write(".");
15       }
复制代码

使用如下语法来将上面构造的 10W 数据保存到数据库中:

复制代码
1         using (RetailEntities context = new RetailEntities())
2         {
3           foreach (var entity in customers)
4           {
5             context.Customers.Add(entity);
6           }
7           context.SaveChanges();
8         }
复制代码

通过 context.SaveChanges() 来保证一次事务提交。

为了计算使用时间,在上面代码的前后加上 Stopwatch 来计算:

复制代码
 1         Stopwatch watch = Stopwatch.StartNew();
 2 
 3         using (RetailEntities context = new RetailEntities())
 4         {
 5           foreach (var entity in customers)
 6           {
 7             context.Customers.Add(entity);
 8           }
 9           context.SaveChanges();
10         }
11 
12         watch.Stop();
13         Console.WriteLine(string.Format(
14           "{0} customers are created, cost {1} milliseconds.", 
15           customerCount, watch.ElapsedMilliseconds));
复制代码

然后运行,

好吧,我应该没有耐心等待它运行完。

现在减少数据量进行测试,将数据数量降低到 500 条,

空表插入500 条数据耗时 5652 毫秒。我多测了几遍,这个数据稳定在 5 秒以上。

将数据量改变到 1000 条,

将数据量改变到 1500 条,

将数据量改变到 10000 条,

那么我们估计下 10W 数据大概需要 10W / 500 * 2 = 至少 400 秒 = 至少 6 分钟。

好吧,慢是毋庸置疑的。

SqlBulkCopy 接口描述

Microsoft SQL Server 提供一个称为 bcp 的流行的命令提示符实用工具,用于将数据从一个表移动到另一个表(表既可以在同一个服务器上,也可以在不同服务器上)。 SqlBulkCopy 类允许编写提供类似功能的托管代码解决方案。 还有其他将数据加载到 SQL Server 表的方法(例如 INSERT 语句),但相比之下 SqlBulkCopy 提供明显的性能优势。

使用 SqlBulkCopy 类只能向 SQL Server 表写入数据。 但是,数据源不限于 SQL Server;可以使用任何数据源,只要数据可加载到 DataTable 实例或可使用 IDataReader 实例读取数据。

WriteToServer(DataRow[]) 将所提供的 DataRow 数组中的所有行复制到 SqlBulkCopy 对象的 DestinationTableName 属性指定的目标表中。
WriteToServer(DataTable) 将所提供的 DataTable 中的所有行复制到 SqlBulkCopy 对象的 DestinationTableName 属性指定的目标表中。
WriteToServer(IDataReader) 将所提供的 IDataReader 中的所有行复制到 SqlBulkCopy 对象的 DestinationTableName 属性指定的目标表中。
WriteToServer(DataTable, DataRowState) 只将与所提供 DataTable 中所提供行状态匹配的行复制到 SqlBulkCopy 对象的 DestinationTableName 属性指定的目标表中。

在 .NET 4.5 中还提供了支持 async 语法的接口。

WriteToServerAsync(DataRow[]) WriteToServer 的异步版本,将 DataRow 数组中提供的所有行都复制到由 SqlBulkCopy 对象的 DestinationTableName 属性指定的目标表中。
WriteToServerAsync(DataTable) WriteToServer 的异步版本,将 DataTable 中提供的所有行都复制到 SqlBulkCopy 对象的 DestinationTableName 属性指定的目标表中。
WriteToServerAsync(IDataReader) WriteToServer 的异步版本,将 IDataReader 中提供的所有行都复制到 SqlBulkCopy 对象的 DestinationTableName 属性指定的目标表中。
WriteToServerAsync(DataRow[], CancellationToken) WriteToServer 的异步版本,将 DataRow 数组中提供的所有行都复制到由 SqlBulkCopy 对象的 DestinationTableName 属性指定的目标表中。取消标记可用于在命令超时超过前请求放弃操作。通过返回的任务对象将报告异常。
WriteToServerAsync(DataTable, DataRowState) WriteToServer 的异步版本,之间与 DataTable 中所提供行状态匹配的行复制到 SqlBulkCopy 对象的 DestinationTableName 属性中指定的目标表中。
WriteToServerAsync(DataTable, CancellationToken) WriteToServer 的异步版本,将 DataTable 中提供的所有行都复制到 SqlBulkCopy 对象的 DestinationTableName 属性指定的目标表中。取消标记可用于在命令超时超过前请求放弃操作。通过返回的任务对象将报告异常。
WriteToServerAsync(IDataReader, CancellationToken) WriteToServer 的异步版本,将 IDataReader 中提供的所有行都复制到 SqlBulkCopy 对象的 DestinationTableName 属性指定的目标表中。取消标记可用于在命令超时超过前请求放弃操作。通过返回的任务对象将报告异常。
WriteToServerAsync(DataTable, DataRowState, CancellationToken) WriteToServer 的异步版本,之间与 DataTable 中所提供行状态匹配的行复制到 SqlBulkCopy 对象的 DestinationTableName 属性中指定的目标表中。取消标记可用于在命令超时超过前请求放弃操作。通过返回的任务对象将报告异常。

这里,我们选用 DataTable 来构建数据源,将 10W 数据导入 DataTable 中。可以看出,我们需要构建出给定 Entity 类型所对应的数据表的 DataTable,将所有的 entities 数据插入到 DataTable 中。

构建 TableMapping 映射

此时,我并不想手工书写表中的各字段名称,同时,我可能甚至都不想关心 Entity 类到底被映射到了数据库中的哪一张表上。

此处,我们定义一个 TableMapping 类,用于存储一张数据库表的映射信息。

在获取和生成 TableMapping 之前,我们需要先定义和获取 DbMapping 类。

复制代码
 1 internal class DbMapping
 2 {
 3     public DbMapping(DbContext context)
 4     {
 5       _context = context;
 6 
 7       var objectContext = ((IObjectContextAdapter)context).ObjectContext;
 8       _metadataWorkspace = objectContext.MetadataWorkspace;
 9 
10       _codeFirstEntityContainer = _metadataWorkspace.GetEntityContainer("CodeFirstDatabase", DataSpace.SSpace);
11 
12       MapDb();
13     }
14 }
复制代码

通过读取 CodeFirstEntityContainer 中的元数据,我们可以获取到指定数据库中的所有表的信息。

复制代码
 1     private void MapDb()
 2     {
 3       ExtractTableColumnEdmMembers();
 4 
 5       List<EntityType> tables =
 6         _metadataWorkspace
 7           .GetItems(DataSpace.OCSpace)
 8           .Select(x => x.GetPrivateFieldValue("EdmItem") as EntityType)
 9           .Where(x => x != null)
10           .ToList();
11 
12       foreach (var table in tables)
13       {
14         MapTable(table);
15       }
16     }
复制代码

进而,根据表映射类型的定义,可以获取到表中字段的映射信息。

复制代码
 1     private void MapTable(EntityType tableEdmType)
 2     {
 3       string identity = tableEdmType.FullName;
 4       EdmType baseEdmType = tableEdmType;
 5       EntitySet storageEntitySet = null;
 6 
 7       while (!_codeFirstEntityContainer.TryGetEntitySetByName(baseEdmType.Name, false, out storageEntitySet))
 8       {
 9         if (baseEdmType.BaseType == null) break;
10         baseEdmType = baseEdmType.BaseType;
11       }
12       if (storageEntitySet == null) return;
13 
14       var tableName = (string)storageEntitySet.MetadataProperties["Table"].Value;
15       var schemaName = (string)storageEntitySet.MetadataProperties["Schema"].Value;
16 
17       var tableMapping = new TableMapping(identity, schemaName, tableName);
18       _tableMappings.Add(identity, tableMapping);
19       _primaryKeysMapping.Add(identity, storageEntitySet.ElementType.KeyMembers.Select(x => x.Name).ToList());
20 
21       foreach (var prop in storageEntitySet.ElementType.Properties)
22       {
23         MapColumn(identity, _tableMappings[identity], prop);
24       }
25     }
复制代码

然后,可以将表信息和字段信息存放到 TableMapping 和 ColumnMapping 当中。

复制代码
internal class TableMapping
{
    public string TableTypeFullName { get; private set; }
    public string SchemaName { get; private set; }
    public string TableName { get; private set; }

    public ColumnMapping[] Columns
    {
      get { return _columnMappings.Values.ToArray(); }
    }
}
复制代码

构建 DataTable 数据

终于,有了 TableMapping 映射之后,我们可以开始创建 DataTable 了。

复制代码
 1     private static DataTable BuildDataTable<T>(TableMapping tableMapping)
 2     {
 3       var entityType = typeof(T);
 4       string tableName = string.Join(@".", tableMapping.SchemaName, tableMapping.TableName);
 5 
 6       var dataTable = new DataTable(tableName);
 7       var primaryKeys = new List<DataColumn>();
 8 
 9       foreach (var columnMapping in tableMapping.Columns)
10       {
11         var propertyInfo = entityType.GetProperty(columnMapping.PropertyName, '.');
12         columnMapping.Type = propertyInfo.PropertyType;
13 
14         var dataColumn = new DataColumn(columnMapping.ColumnName);
15 
16         Type dataType;
17         if (propertyInfo.PropertyType.IsNullable(out dataType))
18         {
19           dataColumn.DataType = dataType;
20           dataColumn.AllowDBNull = true;
21         }
22         else
23         {
24           dataColumn.DataType = propertyInfo.PropertyType;
25           dataColumn.AllowDBNull = columnMapping.Nullable;
26         }
27 
28         if (columnMapping.IsIdentity)
29         {
30           dataColumn.Unique = true;
31           if (propertyInfo.PropertyType == typeof(int)
32             || propertyInfo.PropertyType == typeof(long))
33           {
34             dataColumn.AutoIncrement = true;
35           }
36           else continue;
37         }
38         else
39         {
40           dataColumn.DefaultValue = columnMapping.DefaultValue;
41         }
42 
43         if (propertyInfo.PropertyType == typeof(string))
44         {
45           dataColumn.MaxLength = columnMapping.MaxLength;
46         }
47 
48         if (columnMapping.IsPk)
49         {
50           primaryKeys.Add(dataColumn);
51         }
52 
53         dataTable.Columns.Add(dataColumn);
54       }
55 
56       dataTable.PrimaryKey = primaryKeys.ToArray();
57 
58       return dataTable;
59     }
复制代码

通过 Schema 名称和表名称来构建指定 Entity 类型的 DataTable 对象。

然后将,entities 数据列表中的数据导入到 DataTable 对象之中。

复制代码
 1     private static DataTable CreateDataTable<T>(TableMapping tableMapping, IEnumerable<T> entities)
 2     {
 3       var dataTable = BuildDataTable<T>(tableMapping);
 4 
 5       foreach (var entity in entities)
 6       {
 7         DataRow row = dataTable.NewRow();
 8 
 9         foreach (var columnMapping in tableMapping.Columns)
10         {
11           var @value = entity.GetPropertyValue(columnMapping.PropertyName);
12 
13           if (columnMapping.IsIdentity) continue;
14 
15           if (@value == null)
16           {
17             row[columnMapping.ColumnName] = DBNull.Value;
18           }
19           else
20           {
21             row[columnMapping.ColumnName] = @value;
22           }
23         }
24 
25         dataTable.Rows.Add(row);
26       }
27 
28       return dataTable;
29     }
复制代码

SqlBulkCopy 导入数据

终于,数据源准备好了。然后使用如下代码结构,调用 WriteToServer 方法,将数据写入数据库。

复制代码
1       using (DataTable dataTable = CreateDataTable(tableMapping, entities))
2       {
3         using (SqlBulkCopy sqlBulkCopy = new SqlBulkCopy(transaction.Connection, options, transaction))
4         {
5           sqlBulkCopy.BatchSize = batchSize;
6           sqlBulkCopy.DestinationTableName = dataTable.TableName;
7           sqlBulkCopy.WriteToServer(dataTable);
8         }
9       }
复制代码

看下保存 500 数据的效果,用时 1.9 秒。

看下保存 1W 数据的效果,用时 2.1 秒。

看下保存 10W 数据的效果,用时 7.5 秒。

再试下 100W 数据的效果,用时 27 秒。

封装 BulkInsert 扩展方法

我们可以为 DbContext 添加一个 BulkInsert 扩展方法。

复制代码
 1   internal static class DbContextBulkOperationExtensions
 2   {
 3     public const int DefaultBatchSize = 1000;
 4 
 5     public static void BulkInsert<T>(this DbContext context, IEnumerable<T> entities, int batchSize = DefaultBatchSize)
 6     {
 7       var provider = new BulkOperationProvider(context);
 8       provider.Insert(entities, batchSize);
 9     }
10   }
复制代码

IBulkableRepository 接口

在下面两篇文章中,我介绍了精炼的 IRepository 接口。

复制代码
1 public interface IRepository<T>
2     where T : class
3   {
4     IQueryable<T> Query();
5     void Insert(T entity);
6     void Update(T entity);
7     void Delete(T entity);
8   }
复制代码

当我们需要扩展 BulkInsert 功能时,可以通过继承来完成功能扩展。

1 public interface IBulkableRepository<T> : IRepository<T>
2     where T : class
3   {
4     void BulkInsert(IEnumberable<T> entities);
5   }

这样,我们就可以使用很自然的方式直接使用 BulkInsert 功能了。

代码在哪里

代码在这里:

参考资料







本文转自匠心十年博客园博客,原文链接:http://www.cnblogs.com/gaochundong/p/entity_framework_bulk_insert_extension.html,如需转载请自行联系原作者
相关实践学习
基于Hologres轻量实时的高性能OLAP分析
本教程基于GitHub Archive公开数据集,通过DataWorks将GitHub中的项⽬、行为等20多种事件类型数据实时采集至Hologres进行分析,同时使用DataV内置模板,快速搭建实时可视化数据大屏,从开发者、项⽬、编程语⾔等多个维度了解GitHub实时数据变化情况。
目录
相关文章
|
关系型数据库 MySQL Docker
6. 修改docker端口 (映射宿主机和docker容器中的端口)
6. 修改docker端口 (映射宿主机和docker容器中的端口)
3281 0
6. 修改docker端口 (映射宿主机和docker容器中的端口)
|
10月前
|
传感器 监控 前端开发
zabbix中IPMI (Intelligent Platform Management Interface)
zabbix中IPMI (Intelligent Platform Management Interface)
541 68
|
11月前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
329 2
|
开发框架 自然语言处理 .NET
C#一分钟浅谈:LINQ 查询表达式的使用技巧
【9月更文挑战第6天】LINQ(Language Integrated Query)是C#开发中的强大工具,使查询数据集合变得简单且接近自然语言。本文从基础入手,通过具体示例讲解LINQ查询表达式的使用技巧,包括过滤、排序和分组等操作。同时,文章还探讨了常见问题及解决方法,如性能优化、过早枚举和类型转换等,帮助开发者写出更高效、易维护的代码。
252 16
|
10月前
|
XML 移动开发 开发者
京东商品详情数据接口(H5、APP 端)
京东商品详情数据接口是为H5和APP开发者提供的工具,支持获取商品名称、价格、库存、销量、评价、图片等详细信息,优化应用功能。接口返回JSON或XML格式数据,方便解析处理。适用于电商导购、社交媒体分享、活动推广、价格监控等场景,提升用户体验和购物决策效率。示例代码展示了使用Python发送GET请求的方法。
|
Linux 程序员 Perl
老程序员分享:Linux查看系统开机时间
老程序员分享:Linux查看系统开机时间
626 0
|
图形学
【unity小技巧】unity读excel配置表操作,excel转txt文本,并读取txt文本内容,实例说明
【unity小技巧】unity读excel配置表操作,excel转txt文本,并读取txt文本内容,实例说明
682 0
|
存储 NoSQL 数据管理
如何借助Redis巧妙的管理用户签到?——Bitmap篇
Redis位操作用于高效存储分析,如用户签到。通过位操作,每个用户签到只需1位,节省空间。使用`setbit`设置签到状态,`getbit`查询,`bitcount`统计签到天数。适用于用户特征标记、系统功能开关和在线状态追踪。高效率、低空间占用,适合大数据场景。
233 0
精准记录:使用 Playwright 实现屏幕截图
Playwright是用于自动化浏览器的库,提供简洁的API进行屏幕截图。要截屏,调用`page.screenshot()`方法,指定保存路径。若需全页截图,设置`full_page=True`。还能捕获图片数据流,不写入文件,以及截取特定元素,如`.header`。Playwright的截图功能比Selenium更强大,便于开发和测试人员分析应用界面。
|
安全 网络安全 开发工具
git配置密钥及提交代码到仓库
Git是一种分布式版本控制系统,用于管理和跟踪软件开发项目的源代码。它是由Linus Torvalds于2005年创建的,被广泛用于开源和商业项目。
309 0