本文主要通过批量与非批量对比操作的方式介绍MongoDB的bulkWrite()方法的使用。顺带与关系型数据库MySQL进行对比,比较这两种不同类型数据库的效率。如果只是想学习bulkWrite()的使用的看第一部分就行。
测试环境:win7旗舰版、16G内存、i3处理器、MongoDB3.0.2、mysql5.0
一、MongoDB批量操作
MongoDB对数据的操作分为Read Operations和Write Operations,Read Operations包含查询操作,Write Operations包含删除、插入、替换、更新几种操作。MongoDB提供客户端用bulk方式执行Write Operations,也就是批量写操作。在java driver中,对应MongoCollection的bulkWrite()方法,先来看下这个方法签名:
BulkWriteResult com.mongodb.client.MongoCollection.bulkWrite(List<? extends WriteModel<? extends Document>> requests)
这个方法要求传入一个List集合,集合中的元素类型为WriteModel,它表示一个可用于批量写操作的基类模型,它有以下几个子类DeleteManyModel、DeleteOneModel、 InsertOneModel、ReplaceOneModel、 UpdateManyModel、UpdateOneModel,从名字可以看出来它对应了删除、插入、替换、更新几种操作。该方法返回一个BulkWriteResult对象,代表一个成功的批量写操作结果,封装了操作结果的状态信息,如插入、更新、删除记录数等。
1、插入操作
(1)、批量插入
代码如下,该方法接收一个包含要进行插入的Document对象的集合参数,遍历集合,使用Document构造InsertOneModel对象,每个InsertOneModel实例代表一个插入单个Document的操作,然后将该实例添加List集合中,调用bulkWrite()方法,传入存储所有插入操作的List集合完成批量插入。
3、更新操作
(1)、批量更新
再来看看批量更新,分UpdateOneModel和UpdateManyModel两种,区别是前者更新匹配条件的一条记录,后者更新匹配条件的所有记录。对于ReplaceOneModel,表示替换操作,这里也归为更新,现在以UpdateOneModel为例进行讲解。UpdateOneModel构造方法接收3个参数,第一个是查询条件,第二个参数是要更新的内容,第三个参数是可选的UpdateOptions,不填也会自动帮你new一个,代表批量更新操作未匹配到查询条件时的动作,它的upser属性值默认false,什么都不干,true时表示将一个新的Document插入数据库,这个新的Document是查询Document和更新Document的结合,但如果是替换操作,这个新的Document就是这个替换Document。
这里会有个疑惑:这和匹配到查询条件后执行替换操作结果不一样吗?区别在于_id字段,未匹配查询条件时插入的新的Document的_id是新的,而成功执行替换操作,_id是原先旧的。
public void bulkWriteUpdate(List<Document> documents){ List<WriteModel<Document>> requests = new ArrayList<WriteModel<Document>>(); for (Document document : documents) { //更新条件 Document queryDocument = new Document("_id",document.get("_id")); //更新内容,改下书的价格 Document updateDocument = new Document("$set",new Document("price","30.6")); //构造更新单个文档的操作模型 UpdateOneModel<Document> uom = new UpdateOneModel<Document>(queryDocument,updateDocument,new UpdateOptions().upsert(false)); //UpdateOptions代表批量更新操作未匹配到查询条件时的动作,默认false,什么都不干,true时表示将一个新的Document插入数据库,他是查询部分和更新部分的结合 requests.add(uom); } BulkWriteResult bulkWriteResult = collection.bulkWrite(requests); System.out.println(bulkWriteResult.toString()); }
测试:10万条数据
public void insertBatch(ArrayList<Product> list) throws Exception{ Connection conn = DBUtil.getConnection(); try { PreparedStatement pst = conn.prepareStatement("insert into t_product value(?,?,?,?)"); int count = 1; for (Product product : list) { pst.setInt(1, product.getProductId()); pst.setString(2, product.getCategory()); pst.setString(3, product.getName()); pst.setDouble(4, product.getPrice()); pst.addBatch(); if(count % 1000 == 0){ pst.executeBatch(); pst.clearBatch();//每1000条sql批处理一次,然后置空PreparedStatement中的参数,这样也能提高效率,防止参数积累过多事务超时,但实际测试效果不明显 } count++; } conn.commit(); } catch (SQLException e) { e.printStackTrace(); } DBUtil.closeConnection(conn); }
public void updateBatch(ArrayList<Product> list) throws Exception{ Connection conn = DBUtil.getConnection(); try { PreparedStatement pst = conn.prepareStatement("update t_product set price=31.5 where id=?"); int count = 1; for (Product product : list) { pst.setInt(1, product.getProductId()); pst.addBatch(); if(count % 1000 == 0){ pst.executeBatch(); pst.clearBatch();//每1000条sql批处理一次,然后置空PreparedStatement中的参数,这样也能提高效率,防止参数积累过多事务超时,但实际测试效果不明显 } count++; } conn.commit(); } catch (SQLException e) { e.printStackTrace(); } DBUtil.closeConnection(conn); }
测试:10万条数据