[译] MongoDB Java异步驱动快速指南

本文涉及的产品
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
简介:

导读

mongodb-java-driver是mongodb的Java驱动项目。

本文是对MongoDB-java-driver官方文档 MongoDB Async Driver Quick Tour 的翻译(原创翻译)。

mongodb-java-driver 从3.0版本开始同时支持同步异步方式(分别是不同的驱动应用)。异步的好处,众所周知,就是支持快速、非阻塞式的IO操作,可以提高处理速度。

请注意:本文仅介绍异步驱动的使用指南。同步驱动官方文档:mongo-java-driver ,需要了解的朋友,请移驾。

安装

简单提下安装说明。

注:MongoDB 异步驱动需要依赖Netty 或 Java 7。

如果你的项目是maven项目,只需在pom.xml中添加如下依赖:

<dependencies>
    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>mongodb-driver-async</artifactId>
        <version>3.3.0</version>
    </dependency>
</dependencies>

你也可以点击链接直接下载jar包: 下载点这里 。

分割线,下面是 MongoDB Async Driver Quick Tour 的译文。


MongoDB 异步驱动快速指南

以下的代码片段来自于 async driver source 的范例代码 QuickTour.java 。

注意

如何安装MongoDB异步驱动请参考 安装指导 。

执行异步回调

MongoDB异步驱动利用Netty或Java7的AsynchronousSocketChannel 来提供一个支持异步的API,以支持快速的、非阻塞式的IO操作。

该API形式和MongoDB同步驱动的新API保持一致,但是任何会导致网络IO的方法都会有一个SingleResponseCallback并且会立即返回,其中T是响应对于该文档的类型的任何方法。

SingleResponseCallback  回调接口需要实现一个简单方法onResult(T result, Throwable t) ,这个方法在操作完成时被调用。其中,如果操作成功, result参数包含着操作结果;如果操作失败,t中包含着抛出的异常信息。

重要

SingleResponseCallback的实现中检查错误并适当处理错误是十分重要的。下面的错误检查仅为简便起见而省略。

创建一个连接

下面的例子展示多种方法去链接本地机器上的mydb数据库。详情参考 MongoClients.create API手册。

// 直接连接默认服务host和端口,即 localhost:27017
MongoClient mongoClient = MongoClients.create();

// 使用一个字符串
MongoClient mongoClient = MongoClients.create("mongodb://localhost");

// 使用一个ConnectionString
MongoClient mongoClient = MongoClients.create(new ConnectionString("mongodb://localhost"));


// 使用MongoClientSettings
ClusterSettings clusterSettings = ClusterSettings.builder().hosts(asList(new ServerAddress("localhost"))).build();
MongoClientSettings settings = MongoClientSettings.builder().clusterSettings(clusterSettings).build();
MongoClient mongoClient = MongoClients.create(settings);

MongoDatabase database = mongoClient.getDatabase("mydb");

此时,database对象是一个MongoDB 服务器中指定数据库的连接。

注意

getDatabase("mydb") 方法并没有回调,因为它没有涉及网络IO操作。一个 MongoDatabase 实例提供了与数据库进行交互的方法,若数据库不存在,它会在插入数据时创建一个新的数据库。例如,创建一个 collection 或插入 document(这些确实需要回调,因为需要涉及网络IO)。

MongoClient

MongoClient 实例实际上代表了一个数据库的连接池;即使要并发执行异步操作,你也仅仅需要一个 MongoClient 实例。

重要

一般情况下,在一个指定的数据库集群中仅需要创建一个MongoClient实例,并通过你的应用使用它。

当创建多个实例时:

  • 所有的资源使用限制(例如最大连接数)适用于每个MongoClient实例
  • 销毁实例时,请确保调用 MongoClient.close() 清理资源。

获得一个 collection

要获得一个 collection ,你需要在 getCollection(String collectionName) 方法中指定 collection 的名字:

下面的例子获得名为 test 的collection :

MongoCollection<Document> collection = database.getCollection("test");

添加一个 document

一旦你有了collection对象,你就可以向collection中插入document。例如,考虑如下的json形式document;document中含包含了一个名为 info 的子document。

{
   "name" : "MongoDB",
   "type" : "database",
   "count" : 1,
   "info" : {
               x : 203,
               y : 102
             }
}

要创建document,需要使用 Document 类。你可以使用这个类来创建嵌入式的document。

Document doc = new Document("name", "MongoDB")
               .append("type", "database")
               .append("count", 1)
               .append("info", new Document("x", 203).append("y", 102));

要向 collection 中插入 document ,需要使用 insertOne() 方法。

collection.insertOne(doc, new SingleResultCallback<Void>() {
    @Override
    public void onResult(final Void result, final Throwable t) {
        System.out.println("Inserted!");
    }
});

SingleResponseCallback 是一个 函数式接口 并且它可以以lambda方式实现(前提是你的APP工作在JDK8):

collection.insertOne(doc, (Void result, final Throwable t) -> System.out.println("Inserted!"));

一旦document成功插入,onResult 回调方法会被调用并打印“Inserted!”。记住,在一个普通应用中,你应该总是检查 t 变量中是否有错误信息。

添加多个 document

要添加多个 documents,你可以使用 insertMany() 方法。

接下来的例子会添多个document,document形式如下:

{ "i" : value }

循环创建多个 documents 。

List<Document> documents = new ArrayList<Document>();
for (int i = 0; i < 100; i++) {
    documents.add(new Document("i", i));
}

要插入多个 document 到 collection,传递 documents 列表到 insertMany() 方法.

collection.insertMany(documents, new SingleResultCallback<Void>() {
    @Override
    public void onResult(final Void result, final Throwable t) {
        System.out.println("Documents inserted!");
    }
});

统计一个 collection的document数量

既然前面的多个例子中我们已经插入了 101 个 document,我们可以检查一下插入数量,使用 count() 方法。下面的代码应该打印 101

collection.count(
  new SingleResultCallback<Long>() {
      @Override
      public void onResult(final Long count, final Throwable t) {
          System.out.println(count);
      }
  });

查询 collection

使用 find() 方法来查询 collection。

在一个 collection 中找到第一个 document

要获得 collection 中的第一个 document ,需要调用 first() 方法。collection.find().first() 返回第一个 document 或 null 值,而不是一个游标。这种查询适用于匹配一个单一的 document,,或你仅对第一个 document 有兴趣。

注意

有时你需要多次使用相同或相似的回调方法。在这种情况下,合理的做法是DRY(不要重复自己):把回调保存为一个具体的类或分配给一个变量。

SingleResultCallback<Document> printDocument = new SingleResultCallback<Document>() {
    @Override
    public void onResult(final Document document, final Throwable t) {
        System.out.println(document.toJson());
    }
};

下面的例子传递 printDocument 回调给 first 方法:

collection.find().first(printDocument);

范例会打印下面的 document:

{ "_id" : { "$oid" : "551582c558c7b4fbacf16735" },
  "name" : "MongoDB", "type" : "database", "count" : 1,
  "info" : { "x" : 203, "y" : 102 } }

注意

 _id 元素会被MongoDB动态的添加到你的 document 上,并且值也会与展示的不同。“_” 和 “$”开头的域是MongoDB 预留给内部使用的。

遍历查找一个collection中所有的 document

要检索 collection 中所有的 document,需要使用 find() 方法。find() 方法返回一个 FindIterable 实例,它提供了一个接口来链接和控制查找操作。使用 forEach() 方法可以提供一个 Block 作用于每个 document 并且迭代结束时执行回调一次。下面的代码遍历 collection 中所有的 document 并逐一打印,最后打印 “Operation Finished!”。

Block<Document> printDocumentBlock = new Block<Document>() {
    @Override
    public void apply(final Document document) {
        System.out.println(document.toJson());
    }
};
SingleResultCallback<Void> callbackWhenFinished = new SingleResultCallback<Void>() {
    @Override
    public void onResult(final Void result, final Throwable t) {
        System.out.println("Operation Finished!");
    }
};

collection.find().forEach(printDocumentBlock, callbackWhenFinished);

通过查询条件获得一个 document

我们可以创建一个过滤器传递给 find() 方法,以获得我们 collection 中的一组子集。例如,如果我们想查找 key为“i” ,value为71 的 document,我们要按下面的方法做(重用 printDocument 回调)。

import static com.mongodb.client.model.Filters.*;

collection.find(eq("i", 71)).first(printDocument);

最终会只印一个 document:

{ "_id" : { "$oid" : "5515836e58c7b4fbc756320b" }, "i" : 71 }

重要

请使用 FiltersSortsProjections 和 Updates API手册来找到简单、清晰的方法构建查询。

通过查询获得一组 documents

我们可以使用查询来从我们的 collection 中获得一组 document 集合。例如,如果我们想获得所有 key 为“i”,value 大于50 的 document ,我们应该按下面方式做(重用 printDocumentBlock 阻塞和 callbackWhenFinished 回调):

// 使用范围查询获取子集
collection.find(gt("i", 50)).forEach(printDocumentBlock, callbackWhenFinished);

范例应该会打印所有 i > 50 的document。

我们也可以增加上限范围,如  50 < i <= 100

collection.find(and(gt("i", 50), lte("i", 100))).forEach(printDocumentBlock, callbackWhenFinished);

document 排序

我们可以对 document 进行排序。通过在 FindIterable 上调用 sort() 方法,我们可以在一个查询上进行一次排序。

下面的例子中,我们使用  exists()  和 降序排序 descending("i") 来为我们的 document 排序。

collection.find(exists("i")).sort(descending("i")).first(printDocument);

投射域

有时我们不需要将所有的数据都存在一个 document 中。Projections 可以用来为查询操作构建投射参数并限制返回的字段。

下面的例子中,我们会对collection进行排序,排除  _id 字段,并输出第一个匹配的 document。

collection.find().projection(excludeId()).first(printDocument);

聚合

有时,我们需要将存储在 MongoDB 中的数据聚合。 Aggregates  支持对每种类型的聚合阶段进行构建。

下面的例子,我们执行一个两步骤的转换来计算  i * 10 的值。首先我们使用 Aggregates.match 查找所有  i > 0  的document 。接着,我们使用 Aggregates.project 结合  $multiply  操作来计算 “ITimes10” 的值。

collection.aggregate(asList(
    match(gt("i", 0)),
    project(Document.parse("{ITimes10: {$multiply: ['$i', 10]}}")))
).forEach(printDocumentBlock, callbackWhenFinished);

For $group operations use the Accumulators helper for any accumulator operations.

对于 $group 操作使用 Accumulators  来处理任何 累加操作 。

下面的例子中,我们使用 Aggregates.group  结合 Accumulators.sum 来累加所有 i 的和。

collection.aggregate(singletonList(group(null, sum("total", "$i")))).first(printDocument);

注意

当前,还没有专门用于 聚合表达式 的工具类。可以使用 Document.parse() 来快速构建来自于JSON的聚合表达式。

更新 document

MongoDB 支持许多的 更新操作 。

要更新至多一个 document (可能没有匹配的document),使用 updateOne 方法指定过滤器并更新 document 。这里,我们使用 Updates.set  来更新匹配过滤器 i 等于 10 的第一个 document 并设置 i 的值为 110。

collection.updateOne(eq("i", 10), set("i", 110),
    new SingleResultCallback<UpdateResult>() {
        @Override
        public void onResult(final UpdateResult result, final Throwable t) {
            System.out.println(result.getModifiedCount());
        }
    });

使用 updateMany 方法可以更新所有匹配过滤器的 document 。这里我们使用 Updates.inc 来为所有 i 小于 100 的document 增加 100 。

collection.updateMany(lt("i", 100), inc("i", 100),
    new SingleResultCallback<UpdateResult>() {
        @Override
        public void onResult(final UpdateResult result, final Throwable t) {
            System.out.println(result.getModifiedCount());
        }
    });

更新方法返回一个  UpdateResult,其中包含了操作的信息(被修改的 document 的数量)。

删除 document

要删除至多一个 document (可能没有匹配的document)可以使用 deleteOne 方法。

collection.deleteOne(eq("i", 110), new SingleResultCallback<DeleteResult>() {
    @Override
    public void onResult(final DeleteResult result, final Throwable t) {
        System.out.println(result.getDeletedCount());
    }
});

使用 deleteMany 方法可以删除所有匹配过滤器的 document 。这里我们删除所有 i 大于等于的 document。

collection.deleteMany(gte("i", 100), new SingleResultCallback<DeleteResult>() {
    @Override
    public void onResult(final DeleteResult result, final Throwable t) {
        System.out.println(result.getDeletedCount());
    }
});

删除方法返回一个 DeleteResult,其中包含了操作的信息(被删除的 document 的数量)。

批量操作

批量操作允许批量的执行 插入、更新、删除操作。批量操作有两种类型:

  1. 有序的批量操作

有序的执行所有操作并在第一个写操作的错误处报告错误。

  1. 无序的批量操作

执行所有的操作并报告任何错误。

无序的批量操作不保证执行顺序。

我们来围观一下两个分别使用有序和无序操作的简单例子:

SingleResultCallback<BulkWriteResult> printBatchResult = new SingleResultCallback<BulkWriteResult>() {
    @Override
    public void onResult(final BulkWriteResult result, final Throwable t) {
        System.out.println(result);
    }
};

// 2. 有序批量操作
collection.bulkWrite(
  Arrays.asList(new InsertOneModel<>(new Document("_id", 4)),
                new InsertOneModel<>(new Document("_id", 5)),
                new InsertOneModel<>(new Document("_id", 6)),
                new UpdateOneModel<>(new Document("_id", 1),
                                     new Document("$set", new Document("x", 2))),
                new DeleteOneModel<>(new Document("_id", 2)),
                new ReplaceOneModel<>(new Document("_id", 3),
                                      new Document("_id", 3).append("x", 4))),
  printBatchResult
);


 // 2. 无序批量操作
collection.bulkWrite(
  Arrays.asList(new InsertOneModel<>(new Document("_id", 4)),
                new InsertOneModel<>(new Document("_id", 5)),
                new InsertOneModel<>(new Document("_id", 6)),
                new UpdateOneModel<>(new Document("_id", 1),
                                     new Document("$set", new Document("x", 2))),
                new DeleteOneModel<>(new Document("_id", 2)),
                new ReplaceOneModel<>(new Document("_id", 3),
                                      new Document("_id", 3).append("x", 4))),
  new BulkWriteOptions().ordered(false),
  printBatchResult
);

重要

不推荐在pre-2.6的MongoDB 服务器上使用 bulkWrite 方法。因为这是第一个支持批量写操作(插入、更新、删除)的服务器版本,它允许驱动去实现 BulkWriteResult 和 BulkWriteException 的语义。这个方法虽然仍然可以在pre-2.6服务器上工作,但是性能不好,一次只能执行一个写操作。


本文转自静默虚空博客园博客,原文链接:http://www.cnblogs.com/jingmoxukong/p/6054756.html,如需转载请自行联系原作者

相关实践学习
MongoDB数据库入门
MongoDB数据库入门实验。
快速掌握 MongoDB 数据库
本课程主要讲解MongoDB数据库的基本知识,包括MongoDB数据库的安装、配置、服务的启动、数据的CRUD操作函数使用、MongoDB索引的使用(唯一索引、地理索引、过期索引、全文索引等)、MapReduce操作实现、用户管理、Java对MongoDB的操作支持(基于2.x驱动与3.x驱动的完全讲解)。 通过学习此课程,读者将具备MongoDB数据库的开发能力,并且能够使用MongoDB进行项目开发。 &nbsp; 相关的阿里云产品:云数据库 MongoDB版 云数据库MongoDB版支持ReplicaSet和Sharding两种部署架构,具备安全审计,时间点备份等多项企业能力。在互联网、物联网、游戏、金融等领域被广泛采用。 云数据库MongoDB版(ApsaraDB for MongoDB)完全兼容MongoDB协议,基于飞天分布式系统和高可靠存储引擎,提供多节点高可用架构、弹性扩容、容灾、备份回滚、性能优化等解决方案。 产品详情: https://www.aliyun.com/product/mongodb
相关文章
|
1月前
|
关系型数据库 MySQL Java
【IDEA】java后台操作mysql数据库驱动常见错误解决方案
【IDEA】java后台操作mysql数据库驱动常见错误解决方案
65 0
|
3月前
|
Java
探索Java新境界!异步+事件驱动,打造响应式编程热潮,未来已来!
【8月更文挑战第30天】在现代软件开发中,系统响应性和可扩展性至关重要。Java作为主流编程语言,提供了多种机制如Future、CompletableFuture及事件驱动编程,有效提升应用性能。本文探讨Java异步编程模型与事件驱动编程,并介绍响应式模式,助您构建高效、灵活的应用程序。
59 3
|
26天前
|
NoSQL Java 数据库连接
MongoDB Java
10月更文挑战第18天
17 3
|
2月前
|
人工智能 开发框架 Java
重磅发布!AI 驱动的 Java 开发框架:Spring AI Alibaba
随着生成式 AI 的快速发展,基于 AI 开发框架构建 AI 应用的诉求迅速增长,涌现出了包括 LangChain、LlamaIndex 等开发框架,但大部分框架只提供了 Python 语言的实现。但这些开发框架对于国内习惯了 Spring 开发范式的 Java 开发者而言,并非十分友好和丝滑。因此,我们基于 Spring AI 发布并快速演进 Spring AI Alibaba,通过提供一种方便的 API 抽象,帮助 Java 开发者简化 AI 应用的开发。同时,提供了完整的开源配套,包括可观测、网关、消息队列、配置中心等。
2149 16
|
1月前
|
消息中间件 Java Kafka
Flink-08 Flink Java 3分钟上手 滑动窗口 SlidingWindow 时间驱动 事件驱动 TimeWindow CountWindow GlobalWindow
Flink-08 Flink Java 3分钟上手 滑动窗口 SlidingWindow 时间驱动 事件驱动 TimeWindow CountWindow GlobalWindow
67 7
|
1月前
|
存储 JSON NoSQL
Java 中MongoDB的使用
Java 中MongoDB的使用
16 2
|
1月前
|
消息中间件 NoSQL Java
Flink-06 Flink Java 3分钟上手 滚动窗口 时间驱动 Kafka TumblingWindow TimeWindowFunction TumblingProcessing
Flink-06 Flink Java 3分钟上手 滚动窗口 时间驱动 Kafka TumblingWindow TimeWindowFunction TumblingProcessing
37 0
|
2月前
|
NoSQL JavaScript Java
Java Python访问MongoDB
Java Python访问MongoDB
22 4
|
2月前
|
Java
JAVA并发编程系列(13)Future、FutureTask异步小王子
本文详细解析了Future及其相关类FutureTask的工作原理与应用场景。首先介绍了Future的基本概念和接口方法,强调其异步计算特性。接着通过FutureTask实现了一个模拟外卖订单处理的示例,展示了如何并发查询外卖信息并汇总结果。最后深入分析了FutureTask的源码,包括其内部状态转换机制及关键方法的实现原理。通过本文,读者可以全面理解Future在并发编程中的作用及其实现细节。