mongo-connector实现MongoDB与elasticsearch实时同步深入详解

本文涉及的产品
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: mongo-connector工具创建一个从MongoDB簇到一个或多个目标系统的管道,目标系统包括:Solr,Elasticsearch,或MongoDB簇。

引言:

验证表明:mongo-connector工具支持MongoDB与ES之间的实时增insert、删delete、改update操作。
对于历史数据,mongo-connector工具不能同步到ES中,根因是本身工具不支持(初步界定),还是没有这种场景,待查(进一步研究后再更新)。

1. mongo-connector 地址:

https://github.com/mongodb-labs/mongo-connector

2、 mongo-connector 工具简介

mongo-connector工具创建一个从MongoDB簇到一个或多个目标系统的管道,目标系统包括:Solr,Elasticsearch,或MongoDB簇。
该工具在MongoDB与目标系统间同步数据,并跟踪MongoDB的oplog,保持操作与MongoDB的实时同步。
该工具已经在python2.6,2.7,3.3+下进行验证。
mongo-connector工具是基于python开发的实时同步服务工具。它要求mongo运行在replica-set模式,且需要 elastic2_doc_manager将数据写入ES。
image.png

3、 elastic2-doc-manager 工具简介

这是Elastic2.x版本的文档管理器。对应Elastic1.x版本需要使用 elastic-doc-manager。

4、ES与MongoDB同步步骤:

####(1)安装 mongo-connector。

pip install mongo-connector

(2)安装 elastic2-doc-manager。

pip install elastic2-doc-manager

注意:
如果不安装(2)直接进入(3)、(4)则会报错:

[root@5b9dbaaa148a bin]# mongo-connector -m 10.8.5.99:27017 -t 10.8.5.101:9200 -d elastic2_doc_manager
Logging to mongo-connector.log.
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/lib64/python2.6/threading.py", line 532, in __bootstrap_inner
  self.run()

(3)mongo端启动

MongoDB 必须开启复制集,如果已经开启请忽略这一步:

1)通过 –replSet 设定副本集名称。
[root@b48eafd69929 bin]# ./mongod --replSet "rs0"

2)将mongo与副本集成员连接

[root@b48eafd69929 bin]# ./mongo
MongoDB shell version: 3.2.4
connecting to: test
Server has startup warnings:
2016-07-05T09:49:01.330+0100 I CONTROL [initandlisten] ** WARNING: You are running this process as the root user, which is not recommended.
2016-07-05T09:49:01.330+0100 I CONTROL [initandlisten]
2016-07-05T09:49:01.331+0100 I CONTROL [initandlisten]
2016-07-05T09:49:01.331+0100 I CONTROL [initandlisten] ** WARNING: You are running on a NUMA machine.
2016-07-05T09:49:01.331+0100 I CONTROL [initandlisten] ** We suggest launching mongod like this to avoid performance problems:
2016-07-05T09:49:01.332+0100 I CONTROL [initandlisten] ** numactl --interleave=all mongod [other options]
2016-07-05T09:49:01.332+0100 I CONTROL [initandlisten]
2016-07-05T09:49:01.332+0100 I CONTROL [initandlisten] ** WARNING: /sys/kernel/mm/transparent_hugepage/enabled is 'always'.
2016-07-05T09:49:01.332+0100 I CONTROL [initandlisten] ** We suggest setting it to 'never'
2016-07-05T09:49:01.332+0100 I CONTROL [initandlisten]
2016-07-05T09:49:01.332+0100 I CONTROL [initandlisten] ** WARNING: /sys/kernel/mm/transparent_hugepage/defrag is 'always'.
2016-07-05T09:49:01.332+0100 I CONTROL [initandlisten] ** We suggest setting it to 'never'
2016-07-05T09:49:01.332+0100 I CONTROL [initandlisten]

3)初始化副本集

> rs.initiate()
{
  "info2" : "no configuration specified. Using a default configuration for the set",
  "me" : "b48eafd69929:27017",
  "ok" : 1
}

4)【验证】初始化副本集的配置

rs0:SECONDARY> rs.conf()
{
  "_id" : "rs0",
  "version" : 1,
  "protocolVersion" : NumberLong(1),
  "members" : [
  {
  "_id" : 0,
  "host" : "b48eafd69929:27017",
  "arbiterOnly" : false,
  "buildIndexes" : true,
  "hidden" : false,
  "priority" : 1,
  "tags" : {

  },
  "slaveDelay" : NumberLong(0),
  "votes" : 1
  }
  ],
  "settings" : {
  "chainingAllowed" : true,
  "heartbeatIntervalMillis" : 2000,
  "heartbeatTimeoutSecs" : 10,
  "electionTimeoutMillis" : 10000,
  "getLastErrorModes" : {

  },
  "getLastErrorDefaults" : {
  "w" : 1,
  "wtimeout" : 0
  },
  "replicaSetId" : ObjectId("577b74bd0ba41a313110ad62")
  }
}

5)【验证】副本集的状态。

rs0:PRIMARY> rs.status()
{
  "set" : "rs0",
  "date" : ISODate("2016-07-05T08:50:55.272Z"),
  "myState" : 1,
  "term" : NumberLong(1),
  "heartbeatIntervalMillis" : NumberLong(2000),
  "members" : [
  {
  "_id" : 0,
  "name" : "b48eafd69929:27017",
  "health" : 1,
  "state" : 1,
  "stateStr" : "PRIMARY",
  "uptime" : 115,
  "optime" : {
  "ts" : Timestamp(1467708606, 1),
  "t" : NumberLong(1)
  },
  "optimeDate" : ISODate("2016-07-05T08:50:06Z"),
  "infoMessage" : "could not find member to sync from",
  "electionTime" : Timestamp(1467708605, 2),
  "electionDate" : ISODate("2016-07-05T08:50:05Z"),
  "configVersion" : 1,
  "self" : true
  }
  ],
  "ok" : 1
}

(4)ES端同步操作

[root@5b9dbaaa148a bin]# mongo-connector -m 10.8.5.99:27017 -t 10.8.5.101:9200 -d elastic2_doc_manager
Logging to mongo-connector.log.

参数含义:
-m: mongodb的地址与端口,端口默认为27017。
-t:ES的地址与端口,端口默认为9200。
-d:doc manager的名称,2.x版本为: elastic2-doc-manager。

5、ES与MongoDB Insert插入操作的同步验证

(1)Mongo端插入数据操作:

#Mongo创建数据库(对应ES的Index)
rs0:PRIMARY> use zhang_index
switched to db zhang_index

#Mongo中插入数据(其中col_02对应ES中的Type)
rs0:PRIMARY> db.col_02.insert({name:"laoluo", birth:"1964-03-21", sex:"man", company:"chuizi"});
WriteResult({ "nInserted" : 1 })
rs0:PRIMARY> db.col_02.insert({name:"renzhengfei", birth:"1954-03-21", sex:"man", company:"huawei"});

(2)Es端检索验证

[root@5b9dbaaa148a test_log]# curl -XGET http://10.8.5.101:9200/zhang_index/col_02/_search?pretty
{
"took" : 4,
"timed_out" : false,
"_shards" : {
"total" : 8,
"successful" : 8,
"failed" : 0
},
"hits" : {
"total" : 2,
"max_score" : 1.0,
"hits" : [ {
"_index" : "zhang_index",
"_type" : "col_02",
"_id" : "577b7d8ceb8e3dc2d1db12a9",
"_score" : 1.0,
"_source" : {
"company" : "huawei",
"name" : "renzhengfei",
"birth" : "1954-03-21",
"sex" : "man"
}
}, {
"_index" : "zhang_index",
"_type" : "col_02",
"_id" : "577b7d4aeb8e3dc2d1db12a7",
"_score" : 1.0,
"_source" : {
"company" : "chuizi",
"name" : "laoluo",
"birth" : "1964-03-21",
"sex" : "man"
}
} ]
}
}


##6、 ES与MongoDB Update更新操作的同步验证

(1)MongoDB的更新update操作
rs0:PRIMARY> db.col_02.update({'name':'laoluo'}, {$set:{'name':'luoyonghao'}})
WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })
rs0:PRIMARY>
rs0:PRIMARY> db.col_02.find().pretty()
{
  "_id" : ObjectId("577b7d4aeb8e3dc2d1db12a7"),
  "name" : "luoyonghao",
  "birth" : "1964-03-21",
  "sex" : "man",
  "company" : "chuizi"
}
{
  "_id" : ObjectId("577b7d8ceb8e3dc2d1db12a9"),
  "name" : "renzhengfei",
  "birth" : "1954-03-21",
  "sex" : "man",
  "company" : "huawei"
}

(2)Es端检索更新后结果

[root@5b9dbaaa148a test_log]# curl -XGET http://10.8.5.101:9200/zhang_index/col_02/_search?pretty
{
  "took" : 1,
  "timed_out" : false,
  "_shards" : {
  "total" : 8,
  "successful" : 8,
  "failed" : 0
  },
  "hits" : {
  "total" : 2,
  "max_score" : 1.0,
  "hits" : [ {
  "_index" : "zhang_index",
  "_type" : "col_02",
  "_id" : "577b7d8ceb8e3dc2d1db12a9",
  "_score" : 1.0,
  "_source" : {
  "company" : "huawei",
  "name" : "renzhengfei",
  "birth" : "1954-03-21",
  "sex" : "man"
  }
  }, {
  "_index" : "zhang_index",
  "_type" : "col_02",
  "_id" : "577b7d4aeb8e3dc2d1db12a7",
  "_score" : 1.0,
  "_source" : {
  "company" : "chuizi",
  "name" : "luoyonghao",
  "birth" : "1964-03-21",
  "sex" : "man"
  }
  } ]
  }
}

7、 ES与MongoDB delete删除操作的同步验证

(1) MongoDB的删除delete操作

rs0:PRIMARY> db.col_02.remove({'name':'renzhengfei'})
WriteResult({ "nRemoved" : 1 })
rs0:PRIMARY> db.col_02.find()
{ "_id" : ObjectId("577b7d4aeb8e3dc2d1db12a7"), "name" : "luoyonghao", "birth" : "1964-03-21", "sex" : "man", "company" : "chuizi" }
rs0:PRIMARY> db.col_02.find().pretty()
{
  "_id" : ObjectId("577b7d4aeb8e3dc2d1db12a7"),
  "name" : "luoyonghao",
  "birth" : "1964-03-21",
  "sex" : "man",
  "company" : "chuizi"
}

(2)ES端检索删除后结果

结果表明,MongoDB删除的内容,ES端已经同步删除。

[root@5b9dbaaa148a test_log]# curl -XGET http://10.8.5.101:9200/zhang_index/col_02/_search?pretty
{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
  "total" : 8,
  "successful" : 8,
  "failed" : 0
  },
  "hits" : {
  "total" : 1,
  "max_score" : 1.0,
  "hits" : [ {
  "_index" : "zhang_index",
  "_type" : "col_02",
  "_id" : "577b7d4aeb8e3dc2d1db12a7",
  "_score" : 1.0,
  "_source" : {
  "company" : "chuizi",
  "name" : "luoyonghao",
  "birth" : "1964-03-21",
  "sex" : "man"
  }
  } ]
  }
}

image.png

参见详细介绍:

https://docs.mongodb.com/manual/tutorial/deploy-replica-set/

Mongo与ES同步的5种方式:

https://www.linkedin.com/pulse/5-way-sync-data-from-mongodb-es-kai-hao

常见Bug:

How to setup a MongoDB replica set for the connector?
https://docs.mongodb.com/manual/tutorial/deploy-replica-set/


作者:铭毅天下
转载请标明出处,原文地址:
http://blog.csdn.net/laoyang360/article/details/51842822

相关实践学习
MongoDB数据库入门
MongoDB数据库入门实验。
快速掌握 MongoDB 数据库
本课程主要讲解MongoDB数据库的基本知识,包括MongoDB数据库的安装、配置、服务的启动、数据的CRUD操作函数使用、MongoDB索引的使用(唯一索引、地理索引、过期索引、全文索引等)、MapReduce操作实现、用户管理、Java对MongoDB的操作支持(基于2.x驱动与3.x驱动的完全讲解)。 通过学习此课程,读者将具备MongoDB数据库的开发能力,并且能够使用MongoDB进行项目开发。   相关的阿里云产品:云数据库 MongoDB版 云数据库MongoDB版支持ReplicaSet和Sharding两种部署架构,具备安全审计,时间点备份等多项企业能力。在互联网、物联网、游戏、金融等领域被广泛采用。 云数据库MongoDB版(ApsaraDB for MongoDB)完全兼容MongoDB协议,基于飞天分布式系统和高可靠存储引擎,提供多节点高可用架构、弹性扩容、容灾、备份回滚、性能优化等解决方案。 产品详情: https://www.aliyun.com/product/mongodb
相关文章
|
3月前
|
消息中间件 监控 关系型数据库
MySQL数据实时同步到Elasticsearch:技术深度解析与实践分享
在当今的数据驱动时代,实时数据同步成为许多应用系统的核心需求之一。MySQL作为关系型数据库的代表,以其强大的事务处理能力和数据完整性保障,广泛应用于各种业务场景中。然而,随着数据量的增长和查询复杂度的提升,单一依赖MySQL进行高效的数据检索和分析变得日益困难。这时,Elasticsearch(简称ES)以其卓越的搜索性能、灵活的数据模式以及强大的可扩展性,成为处理复杂查询需求的理想选择。本文将深入探讨MySQL数据实时同步到Elasticsearch的技术实现与最佳实践。
241 0
|
8月前
|
canal 缓存 SpringCloudAlibaba
Springcloud Alibaba 使用Canal将MySql数据实时同步到Elasticsearch
本篇文章在Springcloud Alibaba使用Canal将Mysql数据实时同步到Redis保证缓存的一致性-CSDN博客 基础上使用canal将mysql数据实时同步到Elasticsearch。
161 0
|
NoSQL 前端开发 Java
基于 Mongodb 实现商品管理系统之准备工作讲解|学习笔记
快速学习基于 Mongodb 实现商品管理系统之准备工作讲解
基于 Mongodb 实现商品管理系统之准备工作讲解|学习笔记
|
NoSQL 数据库连接 MongoDB
基于 Mongodb 实现商品管理系统之向数据库中添加商品编写讲解|学习笔记
快速学习基于 Mongodb 实现商品管理系统之向数据库中添加商品编写讲解
基于 Mongodb 实现商品管理系统之向数据库中添加商品编写讲解|学习笔记
|
NoSQL 数据库连接 MongoDB
基于Mongodb实现商品管理系统之根据商品编号删除商品编写讲解|学习笔记
快速学习基于Mongodb实现商品管理系统之根据商品编号删除商品编写讲解
基于Mongodb实现商品管理系统之根据商品编号删除商品编写讲解|学习笔记
|
NoSQL MongoDB Android开发
基于 Mongodb 实现商品管理系统之 Web 层编写讲解|学习笔记
快速学习基于 Mongodb 实现商品管理系统之 Web 层编写讲解
基于 Mongodb 实现商品管理系统之 Web 层编写讲解|学习笔记
|
关系型数据库 MySQL 开发工具
Python:使用 mysqlsmom 模块实时同步MySQL数据到ElasticSearch
Python:使用 mysqlsmom 模块实时同步MySQL数据到ElasticSearch
162 0
|
存储 消息中间件 NoSQL
博客数据库要连接Elasticsearch,使用MySQL还是MongoDB更合理
博客数据库要连接Elasticsearch,使用MySQL还是MongoDB更合理
博客数据库要连接Elasticsearch,使用MySQL还是MongoDB更合理
|
存储 缓存 NoSQL
分布式服务器框架之Servers.Core中 实现Log模块设计 写入MongoDB数据库
游戏服务器中都需要用到Log模块,log模块存在的意义第一个是将log输出到控制台又或者是写入到log文件中,出了BUG方便定位;第二是常用于将用户的数据(例如玩家登录、道具购买量)将这种log统计到数据库中,方便统计用户留存信息、数据分析等。
|
NoSQL MongoDB 数据库
分布式服务器框架之Server.Core库中实现YFUniqueEntity、YFUniqueIDBase 管理MongoDB 自定义Id的自增
YFUniqueEntity是数据库中的结构,GetUniqueID函数中会根据Type和自增步长去数据库中寻找该类型的当前ID是多少,然后会用当前的Id去加上步长,把更新后的新ID插入到MongoDB中记录着ID的那张表里。