go-mysql-elasticsearch实现mysql 与elasticsearch实时同步深入详解

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 引言:go-mysql-elasticsearch 是国内作者开发的一款插件。测试表明:该插件优点:能实现同步增、删、改、查操作。不足之处(待完善的地方):1、仍处理开发、相对不稳定阶段;2、没有日志,不便于排查问题及查看同步结果。本文深入详解了插件的安装、使用、增删改查同步测试。

1. go-mysql-elasticsearch 插件安装

步骤1:安装go

yum install go


步骤2:安装godep

go get github.com/tools/godep


步骤3:获取go-mysql-elastisearch插件

go get github.com/siddontang/go-mysql-elasticsearch


步骤4:安装go-mysql-elastisearch插件

cd $GOPATH/src/github.com/siddontang/go-mysql-elasticsearch

make


2.go-mysql-elasticsearch 插件使用

2.1修改配置文件

[root@5b9dbaaa148a etc]# cat river.toml

# MySQL address, user and password

# user must have replication privilege in MySQL.

my_addr = "192.168.1.1:3306"

my_user = "root"

my_pass = "password@!"


# Elasticsearch address

es_addr = "192.168.1.1:9200"


# Path to store data, like master.info, and dump MySQL data

data_dir = "./var"


# Inner Http status address

stat_addr = "192.168.1.1:12800"


# pseudo server id like a slave

server_id = 1


# mysql or mariadb

flavor = "mysql"


# mysqldump execution path

mysqldump = "mysqldump"


# MySQL data source

[[source]]

schema = "test"


# Only below tables will be synced into Elasticsearch.

# "test_river_[0-9]{4}" is a wildcard table format, you can use it if you have many sub tables, like table_0000 - table_1023

# I don't think it is necessary to sync all tables in a database.

tables = ["cc"]


# Below is for special rule mapping

#[[rule]]

#schema = "test"

#table = "cc"

#index = "go_river"

#type = "go_rivert"


   # title is MySQL test_river field name, es_title is the customized name in Elasticsearch

#   [rule.field]

   # This will map column title to elastic search my_title

 #  title="es_title"

   # This will map column tags to elastic search my_tags and use array type

  # tags="my_tags,list"

   # This will map column keywords to elastic search keywords and use array type

   #keywords=",list"


# wildcard table rule, the wildcard table must be in source tables

[[rule]]

schema = "test"

table = "cc"

index = "gocc"

type = "gocc_t"


   # title is MySQL test_river field name, es_title is the customized name in Elasticsearch

   [[rule.fields]]

   mysql = "mysql101"

   elastic = "es_mysql101"

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

2.2执行同步操作

cd $GOPATH/src/github.com/siddontang/go-mysql-elasticsearch

./bin/go-mysql-elasticsearch -config=./etc/river.toml


3. go-mysql-elasticsearch 插件同步测试结果

3.1插入Insert操作实时同步验证(验证ok)

3.1.1Mysql端插入操作

mysql> insert into cc(id,name) values(12, ‘test12’);

Query OK, 1 row affected (0.06 sec)


3.1.2Mysql执行insert后查询结果

mysql> select * from cc where id =12;

+—-+——–+——–+———————+

| id | name | status | modified_at |

+—-+——–+——–+———————+

| 12 | test12 | ok | 2016-06-24 02:27:29 |

+—-+——–+——–+———————+

1 row in set (0.02 sec)


3.1.3ES端能查询到新增的value字段。

[root@5b9dbaaa148a bin]# curl -XGET http://192.168.1.1:9200/gocc/_search?pretty -d '

> {"query":

> {"term":

> {"id":12}}}'

{

 "took" : 402,

 "timed_out" : false,

 "_shards" : {

   "total" : 8,

   "successful" : 8,

   "failed" : 0

 },

 "hits" : {

   "total" : 1,

   "max_score" : 1.0,

   "hits" : [ {

     "_index" : "gocc",

     "_type" : "gocc_t",

     "_id" : "12",

     "_score" : 1.0,

     "_source" : {

       "id" : 12,

       "modified_at" : "2016-06-24T02:27:29+01:00",

       "name" : "test12",

       "status" : "ok"

     }

   } ]

 }

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

3.2修改Update操作实时同步验证(验证ok)

3.2.1mysql执行更新操作

mysql> update cc set name = 'test12_001' where id = 12;

Query OK, 1 row affected (0.05 sec)

Rows matched: 1  Changed: 1  Warnings: 0

1

2

3

3.2.2mysql执行修改后查询

Mysql查询修改后结果:


mysql> select * from cc where id = 12;

+----+------------+--------+---------------------+

| id | name       | status | modified_at         |

+----+------------+--------+---------------------+

| 12 | test12_001 | ok     | 2016-06-24 02:27:29 |

+----+------------+--------+---------------------+

1 row in set (0.00 sec)

1

2

3

4

5

6

7

3.2.3 ES查询修改结果

[root@5b9dbaaa148a bin]# curl -XGET http://192.168.1.1:9200/gocc/_search?pretty -d '

{"query":

{"term":

{"id":12}}}'

{

 "took" : 59,

 "timed_out" : false,

 "_shards" : {

   "total" : 8,

   "successful" : 8,

   "failed" : 0

 },

 "hits" : {

   "total" : 1,

   "max_score" : 1.0,

   "hits" : [ {

     "_index" : "gocc",

     "_type" : "gocc_t",

     "_id" : "12",

     "_score" : 1.0,

     "_source" : {

       "id" : 12,

       "modified_at" : "2016-06-24T02:27:29+01:00",

       "name" : "test12_001",

       "status" : "ok"

     }

   } ]

 }

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

3.3删除操作实时同步验证

3.3.1Mysql执行删除操作

mysql> delete from cc where id = 12;

Query OK, 1 row affected (0.04 sec)

1

2

3.3.2删除后查询表

mysql> select * from cc;

+----+--------------------+--------+---------------------+

| id | name               | status | modified_at         |

+----+--------------------+--------+---------------------+

|  1 | laoyang360         | ok     | 0000-00-00 00:00:00 |

|  2 | test002            | ok     | 2016-06-23 06:16:42 |

|  3 | dlllaoyang360      | ok     | 0000-00-00 00:00:00 |

| 11 | test11             | ok     | 2016-06-24 02:09:15 |

|  5 | jdbc_test_update08 | ok     | 0000-00-00 00:00:00 |

|  7 | test7              | ok     | 0000-00-00 00:00:00 |

|  8 | test008            | ok     | 0000-00-00 00:00:00 |

|  9 | test009            | ok     | 0000-00-00 00:00:00 |

| 10 | test10             | ok     | 2016-06-24 02:08:14 |

+----+--------------------+--------+---------------------+

9 rows in set (0.02 sec)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

3.3.3ES查询删除后结果

[root@5b9dbaaa148a bin]# curl -XGET http://192.168.1.1:9200/gocc/_search?pretty -d '

{"query":

{"term":

{"id":12}}}'

{

 "took" : 40,

 "timed_out" : false,

 "_shards" : {

   "total" : 8,

   "successful" : 8,

   "failed" : 0

 },

 "hits" : {

   "total" : 0,

   "max_score" : null,

   "hits" : [ ]

 }

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

4小结

验证发现:

(1)go-mysql-elasticsearch插件可以实现同步insert、update、delete操作。

(2)可视化做的不好,没有打印日志。

(3)go-mysql-elasticsearch尚不大稳定,出现过无法同步成功的情况,但没有报错。不便于排查。

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
9天前
|
Java 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何实现MySQL数据库之间的实时同步
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
Kubernetes 关系型数据库 MySQL
实时计算 Flink版产品使用合集之在Kubernetes(k8s)中同步MySQL变更到Elasticsearch该怎么操作
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
29天前
|
消息中间件 存储 关系型数据库
【微服务】mysql + elasticsearch数据双写设计与实现
【微服务】mysql + elasticsearch数据双写设计与实现
|
1月前
|
存储 关系型数据库 MySQL
实时计算 Flink版产品使用合集之如何配置可以实现实时同步多张MySQL源表时只读取一次binlog
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
关系型数据库 MySQL 数据库
实时计算 Flink版产品使用合集之将MySQL中的数据实时同步到Vertica如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
SQL Oracle 关系型数据库
实时计算 Flink版产品使用合集之源MySQL表新增字段后,要同步这个改变到Elasticsearch的步骤是什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
分布式计算 DataWorks 调度
DataWorks产品使用合集之在DataWorks中,查看ODPS表的OSS对象如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
43 1
|
1天前
|
存储 关系型数据库 MySQL
关系型数据库MySQL的MyISAM
【6月更文挑战第17天】
18 11
|
3天前
|
存储 关系型数据库 MySQL
|
3天前
|
SQL 关系型数据库 MySQL