高可用的MongoDB集群-实战篇

本文涉及的产品
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
简介:

1.概述

  最近有同学和网友私信我,问我MongoDB方面的问题;这里我整理一篇博客来赘述下MongoDB供大家学习参考,博客的目录内容如下:

  • 基本操作
  • CRUD
  • MapReduce

  本篇文章是基于MongoDB集群(Sharding+Replica Sets)上演示的,故操作的内容都是集群层面的,所以有些命令和单独的使用MongoDB库有异样。具体集群搭建可以参考我写的《高可用的MongoDB集群》。

2.基本操作

  常用的 Shell 命令如下所示:

db.help()    # 数据库帮助
db.collections.help()    # 集合帮助
rs.help()    # help on replica set
show dbs    # 展示数据库名
show collections    # 展示collections在当前库
use db_name    # 选择数据库

  查看集合基本信息,内容如下所示:

复制代码
#查看帮助  
db.yourColl.help(); 

#查询当前集合的数据条数  
db.yourColl.count(); 

#查看数据空间大小 
db.userInfo.dataSize(); 

#得到当前聚集集合所在的
db db.userInfo.getDB(); 

#得到当前聚集的状态 
db.userInfo.stats(); 

#得到聚集集合总大小 
db.userInfo.totalSize(); 

#聚集集合储存空间大小 
db.userInfo.storageSize(); 

#Shard版本信息  
db.userInfo.getShardVersion() 

#聚集集合重命名,将userInfo重命名为users
db.userInfo.renameCollection("users"); 
 
#删除当前聚集集合 
db.userInfo.drop();
复制代码

3.CRUD

3.1创建

  在集群中,我们增加一个 friends 库,命令如下所示:

db.runCommand({enablesharding:"friends"});

  在库新建后,我们在该库下创建一个user分片,命令如下:

db.runCommand( { shardcollection : "friends. user"});

3.2新增

  在MongoDB中,save和insert都能达到新增的效果。但是这两者是有区别的,在save函数中,如果原来的对象不存在,那他们都可以向collection里插入数据;如果已经存在,save会调用update更新里面的记录,而insert则会忽略操作。

  另外,在insert中可以一次性插叙一个列表,而不用遍历,效率高,save则需要遍历列表,一个个插入,下面我们可以看下两个函数的原型,通过函数原型我们可以看出,对于远程调用来说,是一次性将整个列表post过来让MongoDB去处理,效率会高些。

  Save函数原型如下所示:

  Insert函数原型(部分代码)如下所示:

3.3查询

3.3.1查询所有记录

db. user.find();

  默认每页显示20条记录,当显示不下的情况下,可以用it迭代命令查询下一页数据。注意:键入it命令不能带“;” 但是你可以设置每页显示数据的大小,用DBQuery.shellBatchSize= 50;这样每页就显示50条记录了。

3.3.2查询去掉后的当前聚集集合中的某列的重复数据

db. user.distinct("name"); 

#会过滤掉name中的相同数据 相当于:
select distict name from user;

3.3.3查询等于条件数据

db.user.find({"age": 24}); 
#相当于:
select * from user where age = 24;

3.3.4查询大于条件数据

db.user.find({age: {$gt: 24}}); 

# 相当于:
select * from user where age >24;

3.3.5查询小于条件数据

db.user.find({age: {$lt: 24}}); 
#相当于:
select * from user where age < 24;

3.3.6查询大于等于条件数据

db.user.find({age: {$gte: 24}}); 
#相当于:
select * from user where age >= 24;

3.3.7查询小于等于条件数据

db.user.find({age: {$lte: 24}}); 
#相当于:
select * from user where age <= 24;

3.3.8查询AND和OR条件数据

  • AND
db.user.find({age: {$gte: 23, $lte: 26}});

#相当于
select * from user where age >=23 and age <= 26;
  • OR

db.user.find({$or: [{age: 22}, {age: 25}]}); 

#相当于:
select * from user where age = 22 or age = 25;

 

3.3.9模糊查询

db.user.find({name: /mongo/}); 

#相当于%% 
select * from user where name like '%mongo%';

3.3.10开头匹配

db.user.find({name: /^mongo/}); 
# 与SQL中得like语法类似
select * from user where name like 'mongo%';

3.3.11指定列查询

db.user.find({}, {name: 1, age: 1}); 

#相当于:
select name, age from user;

  当然name也可以用true或false,当用ture的情况下和name:1效果一样,如果用false就是排除name,显示name以外的列信息。

3.3.12指定列查询+条件查询

复制代码
db.user.find({age: {$gt: 25}}, {name: 1, age: 1}); 

#相当于:
select name, age from user where age > 25;

 db.user.find({name: 'zhangsan', age: 22});

 #相当于:

 select * from user where name = 'zhangsan' and age = 22;

复制代码

3.3.13排序

#升序:
db.user.find().sort({age: 1});
#降序:
db.
user.find().sort({age: -1});

3.3.14查询5条数据

db.user.find().limit(5); 

#相当于:
select * from user limit 5;

3.3.15N条以后数据

db.user.find().skip(10); 

#相当于:
select * from user where id not in ( select * from user limit 5 );

3.3.16在一定区域内查询记录

#查询在5~10之间的数据
db.user.find().limit(10).skip(5);

  可用于分页,limit是pageSize,skip是第几页*pageSize。

3.3.17COUNT

db.user.find({age: {$gte: 25}}).count(); 

#相当于:
select count(*) from user where age >= 20;

3.3.18安装结果集排序

db.userInfo.find({sex: {$exists: true}}).sort(); 

3.3.19不等于NULL

db.user.find({sex: {$ne: null}}) 

#相当于:
select * from user where sex not null;

3.4索引

  创建索引,并指定主键字段,命令内容如下所示:

db.epd_favorites_folder.ensureIndex({"id":1},{"unique":true,"dropDups":true})
db.epd_focus.ensureIndex({"id":1},{"unique":true,"dropDups":true})

3.5更新

  update命令格式,如下所示:

db.collection.update(criteria,objNew,upsert,multi) 

  参数说明: criteria:

  查询条件 objNew:update对象和一些更新操作符

  upsert:如果不存在update的记录,是否插入objNew这个新的文档,true为插入,默认为false,不插入。

  multi:默认是false,只更新找到的第一条记录。如果为true,把按条件查询出来的记录全部更新。

  下面给出一个示例,更新id为 1 中 price 的值,内容如下所示:

db. user.update({id: 1},{$set:{price:2}});  

#相当于:
update user set price=2 where id=1;

3.6删除

3.6.1删除指定记录

db. user. remove( { id:1 } );  

#相当于:
delete from user where id=1;

3.6.2删除所有记录

db. user. remove( { } );  

#相当于:
delete from user;

3.6.3DROP

db. user. drop();  

#相当于:
drop table user;

4.MapReduce

  MongoDB中的 MapReduce 是编写JavaScript脚本,然后由MongoDB去解析执行对应的脚本,下面给出 Java API 操作MR。代码如下所示:

  MongdbManager类,用来初始化MongoDB:

复制代码
package cn.mongo.util;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.mongodb.DB;
import com.mongodb.Mongo;
import com.mongodb.MongoOptions;

/**
 * @Date Mar 3, 2015
 * 
 * @author dengjie
 * 
 * @Note mongodb manager
 */
public class MongdbManager {

    private static final Logger logger = LoggerFactory.getLogger(MongdbManager.class);
    private static Mongo mongo = null;
    private static String tag = SystemConfig.getProperty("dev.tag");

    private MongdbManager() {
    }

    static {
        initClient();
    }

    // get DB object
    public static DB getDB(String dbName) {
        return mongo.getDB(dbName);
    }

    // get DB object without param
    public static DB getDB() {
        String dbName = SystemConfig.getProperty(String.format("%s.mongodb.dbname", tag));
        return mongo.getDB(dbName);
    }

    // init mongodb pool
    private static void initClient() {
        try {
            String[] hosts = SystemConfig.getProperty(String.format("%s.mongodb.host", tag)).split(",");
            for (int i = 0; i < hosts.length; i++) {
                try {
                    String host = hosts[i].split(":")[0];
                    int port = Integer.parseInt(hosts[i].split(":")[1]);
                    mongo = new Mongo(host, port);
                    if (mongo.getDatabaseNames().size() > 0) {
                        logger.info(String.format("connection success,host=[%s],port=[%d]", host, port));
                        break;
                    }
                } catch (Exception ex) {
                    ex.printStackTrace();
                    logger.error(String.format("create connection has error,msg is %s", ex.getMessage()));
                }
            }

            // 设置连接池的信息
            MongoOptions opt = mongo.getMongoOptions();
            opt.connectionsPerHost = SystemConfig.getIntProperty(String.format("%s.mongodb.poolsize", tag));// poolsize
            opt.threadsAllowedToBlockForConnectionMultiplier = SystemConfig.getIntProperty(String.format(
                    "%s.mongodb.blocksize", tag));// blocksize
            opt.socketKeepAlive = true;
            opt.autoConnectRetry = true;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
复制代码

  MongoDBFactory类,用来封装操作业务代码,具体内容如下所示:

复制代码
package cn.mongo.util;

import java.util.ArrayList;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cn.diexun.domain.MGDCustomerSchema;

import com.mongodb.BasicDBList;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.util.JSON;

/**
 * @Date Mar 3, 2015
 *
 * @Author dengjie
 */
public class MongoDBFactory {

    private static Logger logger = LoggerFactory.getLogger(MongoDBFactory.class);

    // save data to mongodb
    public static void save(MGDCustomerSchema mgs, String collName) {
        DB db = null;
        try {
            db = MongdbManager.getDB();
            DBCollection coll = db.getCollection(collName);
            DBObject dbo = (DBObject) JSON.parse(mgs.toString());
            coll.insert(dbo);
        } catch (Exception ex) {
            ex.printStackTrace();
            logger.error(String.format("save object to mongodb has error,msg is %s", ex.getMessage()));
        } finally {
            if (db != null) {
                db.requestDone();
                db = null;
            }
        }
    }

    // batch insert
    public static void save(List<?> mgsList, String collName) {
        DB db = null;
        try {
            db = MongdbManager.getDB();
            DBCollection coll = db.getCollection(collName);
            BasicDBList data = (BasicDBList) JSON.parse(mgsList.toString());
            List<DBObject> list = new ArrayList<DBObject>();
            int commitSize = SystemConfig.getIntProperty("mongo.commit.size");
            int rowCount = 0;
            long start = System.currentTimeMillis();
            for (Object dbo : data) {
                rowCount++;
                list.add((DBObject) dbo);
                if (rowCount % commitSize == 0) {
                    try {
                        coll.insert(list);
                        list.clear();
                        logger.info(String.format("current commit rowCount = [%d],commit spent time = [%s]s", rowCount,
                                (System.currentTimeMillis() - start) / 1000.0));
                    } catch (Exception ex) {
                        ex.printStackTrace();
                        logger.error(String.format("batch commit data to mongodb has error,msg is %s", ex.getMessage()));
                    }
                }
            }
            if (rowCount % commitSize != 0) {
                try {
                    coll.insert(list);
                    logger.info(String.format("insert data to mongo has spent total time = [%s]s",
                            (System.currentTimeMillis() - start) / 1000.0));
                } catch (Exception ex) {
                    ex.printStackTrace();
                    logger.error(String.format("commit end has error,msg is %s", ex.getMessage()));
                }
            }
        } catch (Exception ex) {
            ex.printStackTrace();
            logger.error(String.format("save object list to mongodb has error,msg is %s", ex.getMessage()));
        } finally {
            if (db != null) {
                db.requestDone();
                db = null;
            }
        }
    }
}
复制代码

  LoginerAmountMR类,这是一个统计登录用户数的MapReduce计算类,代码如下:

复制代码
package cn.mongo.mapreduce;

import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import org.bson.BSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import cn.diexun.conf.ConfigureAPI.MR;
import cn.diexun.conf.ConfigureAPI.PRECISION;
import cn.diexun.domain.Kpi;
import cn.diexun.util.CalendarUtil;
import cn.diexun.util.MongdbManager;
import cn.diexun.util.MysqlFactory;

import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.MapReduceOutput;
import com.mongodb.ReadPreference;

/**
 * @Date Mar 13, 2015
 * 
 * @Author dengjie
 * 
 * @Note use mr jobs stats user login amount
 */
public class LoginerAmountMR {
    private static Logger logger = LoggerFactory.getLogger(LoginerAmountMR.class);

// map 函数JS字符串拼接
private static String map() { String map = "function(){"; map += "if(this.userName != \"\"){"; map += "emit({" + "kpi_code:'login_times',username:this.userName," + "district_id:this.districtId,product_style:this.product_style," + "customer_property:this.customer_property},{count:1});"; map += "}"; map += "}"; return map; }
private static String reduce() { String reduce = "function(key,values){"; reduce += "var total = 0;"; reduce += "for(var i=0;i<values.length;i++){"; reduce += "total += values[i].count;}"; reduce += "return {count:total};"; reduce += "}"; return reduce; }
// reduce 函数字符串拼接
public static void main(String[] args) { loginNumbers("t_login_20150312"); } /** * login user amount * * @param collName */ public static void loginNumbers(String collName) { DB db = null; try { db = MongdbManager.getDB(); db.setReadPreference(ReadPreference.secondaryPreferred()); DBCollection coll = db.getCollection(collName); String result = MR.COLLNAME_TMP; long start = System.currentTimeMillis(); MapReduceOutput mapRed = coll.mapReduce(map(), reduce(), result, null); logger.info(String.format("mr run spent time=%ss", (System.currentTimeMillis() - start) / 1000.0)); start = System.currentTimeMillis(); DBCursor cursor = mapRed.getOutputCollection().find(); List<Kpi> list = new ArrayList<Kpi>(); while (cursor.hasNext()) { DBObject obj = cursor.next(); BSONObject key = (BSONObject) obj.get("_id"); BSONObject value = (BSONObject) obj.get("value"); Object kpiValue = value.get("count"); Object userName = key.get("username"); Object districtId = key.get("district_id"); Object customerProperty = key.get("customer_property"); Object productStyle = key.get("product_style"); Kpi kpi = new Kpi(); try { kpi.setUserName(userName == null ? "" : userName.toString()); kpi.setKpiCode(key.get("kpi_code").toString()); kpi.setKpiValue(Math.round(Double.parseDouble(kpiValue.toString()))); kpi.setCustomerProperty(customerProperty == null ? "" : customerProperty.toString()); kpi.setDistrictId(districtId == "" ? 0 : Integer.parseInt(districtId.toString())); kpi.setProductStyle(productStyle == null ? "" : productStyle.toString()); kpi.setCreateDate(collName.split("_")[2]); kpi.setUpdateDate(Timestamp.valueOf(CalendarUtil.formatMap.get(PRECISION.HOUR).format(new Date()))); list.add(kpi); } catch (Exception exx) { exx.printStackTrace(); logger.error(String.format("parse type or get value has error,msg is %s", exx.getMessage())); } } MysqlFactory.insert(list); logger.info(String.format("store mysql spent time is %ss", (System.currentTimeMillis() - start) / 1000.0)); } catch (Exception ex) { ex.printStackTrace(); logger.error(String.format("run map-reduce jobs has error,msg is %s", ex.getMessage())); } finally { if (db != null) { db.requestDone(); db = null; } } } }
复制代码

5.总结

  在计算 MongoDB 的MapReduce计算的时候,拼接JavaScript字符串时需要谨慎小心,很容易出错,上面给出的代码只是一部分代码,供参考学习使用;另外,若是要做MapReduce任务计算,推荐使用Hadoop的MapReduce计算框架,MongoDB的MapReduce框架这里仅做介绍学习了解。

6.结束语

  这篇博客就和大家分享到这里,若是大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

联系方式: 
邮箱:smartloli.org@gmail.com 
Twitter: https://twitter.com/smartloli 
QQ群(Hadoop - 交流社区1): 424769183 
温馨提示:请大家加群的时候写上加群理由(姓名+公司/学校),方便管理员审核,谢谢! 

热爱生活,享受编程,与君共勉!



本文转自哥不是小萝莉博客园博客,原文链接:http://www.cnblogs.com/smartloli/,如需转载请自行联系原作者

相关实践学习
MongoDB数据库入门
MongoDB数据库入门实验。
快速掌握 MongoDB 数据库
本课程主要讲解MongoDB数据库的基本知识,包括MongoDB数据库的安装、配置、服务的启动、数据的CRUD操作函数使用、MongoDB索引的使用(唯一索引、地理索引、过期索引、全文索引等)、MapReduce操作实现、用户管理、Java对MongoDB的操作支持(基于2.x驱动与3.x驱动的完全讲解)。 通过学习此课程,读者将具备MongoDB数据库的开发能力,并且能够使用MongoDB进行项目开发。 &nbsp; 相关的阿里云产品:云数据库 MongoDB版 云数据库MongoDB版支持ReplicaSet和Sharding两种部署架构,具备安全审计,时间点备份等多项企业能力。在互联网、物联网、游戏、金融等领域被广泛采用。 云数据库MongoDB版(ApsaraDB for MongoDB)完全兼容MongoDB协议,基于飞天分布式系统和高可靠存储引擎,提供多节点高可用架构、弹性扩容、容灾、备份回滚、性能优化等解决方案。 产品详情: https://www.aliyun.com/product/mongodb
相关文章
|
3月前
|
存储 NoSQL JavaScript
MongoDB存储过程实战:聚合框架、脚本、最佳实践,一文全掌握!
【8月更文挑战第24天】MongoDB是一款备受欢迎的文档型NoSQL数据库,以灵活的数据模型和强大功能著称。尽管其存储过程支持不如传统关系型数据库,本文深入探讨了MongoDB在此方面的最佳实践。包括利用聚合框架处理复杂业务逻辑、封装业务逻辑提高复用性、运用JavaScript脚本实现类似存储过程的功能以及考虑集成其他工具提升数据处理能力。通过示例代码展示如何创建订单处理集合并定义验证规则,虽未直接实现存储过程,但有效地演示了如何借助JavaScript脚本处理业务逻辑,为开发者提供更多实用指导。
69 2
|
3月前
|
存储 NoSQL 算法
MongoDB保姆级指南(中):从副本集群、分片集群起航,探索分布式存储的趋势!
本文一起来聊聊MongoDB集群,顺带以MongoDB集群为起点,共同探讨一下分布式存储的发展趋势~
233 15
|
3月前
|
存储 监控 NoSQL
震撼!揭秘高可用 MongoDB 分片集群搭建的神秘魔法,开启数据存储的无敌模式!
【8月更文挑战第9天】在数字化时代,数据至关重要。MongoDB作为流行非关系型数据库,通过搭建高可用分片集群确保系统稳定性和性能。分片技术将大数据集分布于多服务器以实现水平扩展。搭建集群需准备服务器资源,配置环境,启动配置服务器、路由服务器及分片服务器,并设置分片策略。例如,对特定数据库和集合启用分片。此架构适用于高流量应用如大型电商平台,确保数据高效处理和高可用性。搭建过程需持续监控和优化,合理规划分片策略以维持系统稳定运行。
40 3
|
3月前
|
NoSQL Java 测试技术
5-MongoDB实战演练
本文档详细介绍了如何使用MongoDB实现头条文章的评论系统。主要功能包括基本的增删改查API、根据文章ID查询评论、以及评论的点赞功能。文章分析了表结构设计,明确了各字段的意义,并给出了具体的字段类型。技术选型方面,文档推荐使用mongodb-driver作为Java连接MongoDB的驱动包,同时介绍了Spring Data MongoDB这一更高层次的持久层框架。此外,文档还提供了搭建文章微服务模块的具体步骤,包括项目工程的搭建、实体类的编写、索引的添加方式等,并展示了如何使用MongoTemplate实现评论点赞功能。
|
4月前
|
自然语言处理 运维 NoSQL
MongoDB集群同步
实现 MongoDB Cluster-to-Cluster 即集群同步的工具是:mongosync 详情可参考如下官方文档: https://www.mongodb.com/zh-cn/docs/cluster-to-cluster-sync/current/quickstart/ 以上这个地址的文档一看就是机器翻译的,可能有不恰当的地方,但基本可参考使用。 以下是本次在某项目地配置集群同步的简要步骤,可参考使用。
86 6
|
3月前
|
存储 运维 NoSQL
轻松上手:逐步搭建你的高可用MongoDB集群(分片)
【8月更文挑战第13天】在数据激增的背景下,传统单机数据库难以胜任。MongoDB作为流行NoSQL数据库,采用分片技术实现水平扩展,有效处理海量数据。分片将数据分散存储,提高并发处理能力和容错性,是高可用架构基石。构建MongoDB集群需理解shard、config server和router三组件协同工作原理。通过具体实例演示集群搭建流程,包括各组件的启动及配置,确保数据高可用性和系统稳定性。合理规划与实践可构建高效稳定的MongoDB集群,满足业务需求并支持未来扩展。
87 0
|
5月前
|
存储 NoSQL MongoDB
MongoDB实战面试指南:常见问题一网打尽
MongoDB实战面试指南:常见问题一网打尽
|
5月前
|
存储 负载均衡 NoSQL
MongoDB的架构设计基于三种集群模式
【6月更文挑战第5天】MongoDB的架构设计基于三种集群模式
220 3
|
1月前
|
存储 关系型数据库 MySQL
一个项目用5款数据库?MySQL、PostgreSQL、ClickHouse、MongoDB区别,适用场景
一个项目用5款数据库?MySQL、PostgreSQL、ClickHouse、MongoDB——特点、性能、扩展性、安全性、适用场景比较
|
21天前
|
NoSQL Cloud Native atlas
探索云原生数据库:MongoDB Atlas 的实践与思考
【10月更文挑战第21天】本文探讨了MongoDB Atlas的核心特性、实践应用及对云原生数据库未来的思考。MongoDB Atlas作为MongoDB的云原生版本,提供全球分布式、完全托管、弹性伸缩和安全合规等优势,支持快速部署、数据全球化、自动化运维和灵活定价。文章还讨论了云原生数据库的未来趋势,如架构灵活性、智能化运维和混合云支持,并分享了实施MongoDB Atlas的最佳实践。