正文
一、前提条件
安装mycat 1.6
安装es 7.6.2集群
安装logstash 7.6.2
二、可选方案
在实际项目中,业务数据主流存储在mysql,但是mysql处理海量数据的搜索能力较差,推荐mysql搭配es,为业务提供强大的搜索能力成为业界主流方案,难点在于如何将mysql导入到es。
mysql数据同步到es,有以下几种方案:
更新mysql数据库的同时通过es api实现数据写入es(同步)
通过logstash同步数据到es(异步)
通过mysql binlog,将数据同步到es(异步)
三、es api同步数据
直接调用es api即可
@PutMapping("/sync") @ApiOperation("同步修改ES") @CrossOrigin public void updateData(@RequestBody final ElasticsearchModel model) { String id = model.getId(); String indexName = model.getIndexName(); String paramJson = model.getData(); elasticsearchUtil.updateData(indexName,id,paramJson); }
四、logstash同步数据到es
logstash通过使用jdbc-input实现定时同步mysql数据,因此需要提前下载mysql的jdbc驱动
1.配置logstash.mysql.conf
input { # 配置JDBC数据源 jdbc { # mysql jdbc驱动路径 jdbc_driver_library => "D:/xxxxx/mysql-connector-java-5.1.35.jar" # mysql jdbc驱动 jdbc_driver_class => "com.mysql.jdbc.Driver" # mysql 连接地址 jdbc_connection_string => "jdbc:mysql://localhost:8066/TESTDB" # mysql 账号 jdbc_user => "root" # mysql 密码 jdbc_password => "123456" # 定时任务配置,一分钟执行一次 schedule => "* * * * *" # sql查询语句 # 增量更新数据 statement => "select * from boot_resource where id > :sql_last_value" # 允许sql_last_value的值来源于结果的某个字段值 use_column_value => true # sql_last_value的值来自查询结果的最后一个id值 tracking_column => "id" # 开启分页 jdbc_paging_enabled => true # 分页大小 jdbc_page_size => 50 } } output { # 配置数据导入es elasticsearch { # 索引名称 index => "laokou-resource" # es地址 hosts => ["192.168.1.1:9200","192.168.1.2:9200","192.168.1.3:9200"] # document_id设置mysql表主键 document_id => "%{id}" } stdout { codec => rubydebug } }
2.jdbc schedule 配置说明
jdbc schedule的配置规则
* * * * * 分 时 天 月 星期
各字段取值范围
- 分 - 0 ~ 59
- 时 - 0 ~ 23
- 天 - 1 ~ 31
- 月 - 1 ~ 12
- 星期 - 0 ~ 6
常见特殊字符含义
- 星号(*):代表所有值,如第一个星号,表示每分钟
- 斜线(/):指定时间的间隔频率,如*/5,用在第一个分钟字段,表示每5分钟执行一次
举个栗子
# 每分钟执行一次 * * * * * # 每5分钟执行一次 */5 * * * * # 每小时执行一次 * */1 * * * # 每天八点执行一次 0 8 * * *
3.增量同步数据
同步数据的sql语句,直接扫描全表的数据,如果数据量较小,问题不大,如果数据量比较大,会直接卡死,logstash OOM挂掉,因此需要增量同步数据,并且每次仅同步新增数据
logstash提供了sql_last_value字段值,可以实现增量同步数据
实现思路:logstash每次执行sql时,会将sql查询结果的最后一条记录某个字段的值保存到sql_last_value字段中,下一次执行sql时,就以sql_last_value的值做条件,从这个值往后查询数据
statement => "select * from boot_resource where id > :sql_last_value"
4.数据分页
增量同步的数据太多,会导致logstash卡死,尤其是首次增量同步时(首次同步数据,其实是全表扫描),为避免一次查询太多数据,可以配置分页
# 开启分页 jdbc_paging_enabled => true # 分页大小 jdbc_page_size => 50
5.启动logstash
logstash -f logstash.mysql.conf
五、mysql通过binlog同步数据到es
阿里的开源神器canal
1. canal简介
canal主要用于对mysql的增量日志进行解析(请注意,只支持增量解析,不支持全量解析),提供增量数据的订阅和消费,对mysql增量数据进行实时同步,支持同步到mysql、elasticsearch、hbase等数据源
2.canal常用组件
canal-deployer(canal-server):监听mysql的binlog,把自己伪装成mysql slave,只负责接收数据,不做数据处理
canal-adapter:canal客户端,从canal-server中获取数据,对数据进行同步,可以同步到mysql、elasticsearch、hbase等数据源
canal-admin:提供整体配置管理、节点运维等功能,web界面方便用户快速和安全的操作
3.canal工作原理
canal模拟mysql master slave的交互协议,把自己伪装mysql slave,向mysql master发送dump协议(dump用于备份)
mysql master接收到dump请求,向canal推送binlog
canal通过解析binlog,将数据同步到其他的数据源
4.MySQL主从复制原理
mysql master在每个事务更新数据完成之前,将该操作记录串行写入binlog(二进制文件)文件中
mysql slave 开启一个I/O线程,该线程在master打开一个普通连接,主要工作的是binlog dump process,如果读取的进度已经跟上master,就会进入休眠状态并等待master产生新的事件,I/O线程的最终目的就是将这些事件写入到replay log(中继日志)
sql线程会读取中继日志,并按顺序执行该日志的sql事件,从而与mysql master的数据保持一致
5.下载canal 1.1.5并安装
6.mysql配置
master 配置
log-bin=mysql-bin ##开启二进制日志 binlog-format=row ##二进制日志格式(row\mixed\statement)
查看binlog是否启用
show variables like '%log_bin%';
查看binlog模式
show variables like '%binlog_format%';
7.创建账号,用于订阅binlog
CREATE USER canal IDENTIFIED BY '123456'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
8.创建数据库
DROP TABLE IF EXISTS `boot_city`; CREATE TABLE `boot_city` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id', `parent_id` bigint(20) NOT NULL COMMENT '上级编号', `region` varchar(100) NOT NULL COMMENT '地区名称', `region_id` int(11) NOT NULL COMMENT '地区编号', PRIMARY KEY (`id`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=1324914279466823682 DEFAULT CHARSET=utf8mb4 ROW_FORMAT=DYNAMIC COMMENT='城市';
9.配置canal-server
1.配置conf/example/instance.properties
# 需要同步数据的MySQL地址 canal.instance.master.address=192.168.1.1:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= # 用于同步数据的数据库账号 canal.instance.dbUsername=canal # 用于同步数据的数据库密码 canal.instance.dbPassword=123456 # 数据库连接编码 canal.instance.connectionCharset = UTF-8 # 需要订阅binlog的表过滤正则表达式 canal.instance.filter.regex=.*\\..*
2.启动服务只需要双击bin/startup.bat
10.配置canal-adapter
1.修改conf/application.yml
#修改 es\mysql配置 srcDataSources: defaultDS: url: jdbc:mysql://192.168.1.1:3306/kcloud?useUnicode=true username: root password: 123456 canalAdapters: - instance: example # canal instance Name or mq topic name groups: - groupId: g1 outerAdapters: - name: logger - name: es7 hosts: 192.168.1.1:9300,192.168.1.2:9300,192.168.1.3:9300 # 127.0.0.1:9200 for rest mode properties: mode: transport # or rest # security.auth: test:123456 # only used for rest mode cluster.name: laokou-elasticsearch
2.修改conf/es7/mytest_user.yml
dataSourceKey: defaultDS destination: example groupId: g1 esMapping: _index: laokou-city _id: id sql: "select a.id,a.parent_id,a.region,a.region_id from boot_city a" etlCondition: "where a.id >= {}" commitBatch: 3000
3.启动服务只需要双击bin/startup.bat
11.启动kibana,创建索引
PUT /laokou-city { "mappings": { "properties": { "id":{ "type": "long" }, "parent_id":{ "type": "long" }, "region":{ "type": "keyword" }, "region_id":{ "type": "integer" } } } }
12.数据库插入数据
INSERT INTO `boot_city` VALUES ('1324912501966925826', '227', '东安县', '2093'); INSERT INTO `boot_city` VALUES ('1324912504676446210', '227', '双牌县', '2094'); INSERT INTO `boot_city` VALUES ('1324912506161229825', '227', '道县', '2095'); INSERT INTO `boot_city` VALUES ('1324912507595681794', '227', '江永县', '2096'); INSERT INTO `boot_city` VALUES ('1324912509306957825', '227', '宁远县', '2097'); INSERT INTO `boot_city` VALUES ('1324912511995506689', '227', '蓝山县', '2098'); INSERT INTO `boot_city` VALUES ('1324912514667278338', '227', '新田县', '2099'); INSERT INTO `boot_city` VALUES ('1324912516038815746', '227', '江华瑶族自治县', '2100'); INSERT INTO `boot_city` VALUES ('1324912517494239234', '228', '鹤城区', '2101'); INSERT INTO `boot_city` VALUES ('1324912520161816578', '228', '中方县', '2102'); INSERT INTO `boot_city` VALUES ('1324912521667571714', '228', '沅陵县', '2103'); INSERT INTO `boot_city` VALUES ('1324912523072663553', '228', '辰溪县', '2104'); INSERT INTO `boot_city` VALUES ('1324912524578418690', '228', '溆浦县', '2105'); INSERT INTO `boot_city` VALUES ('1324912526096756737', '228', '会同县', '2106'); INSERT INTO `boot_city` VALUES ('1324912527514431490', '228', '麻阳苗族自治县', '2107'); INSERT INTO `boot_city` VALUES ('1324912529036963841', '228', '新晃侗族自治县', '2108'); INSERT INTO `boot_city` VALUES ('1324912530588856321', '228', '芷江侗族自治县', '2109'); INSERT INTO `boot_city` VALUES ('1324912531956199426', '228', '靖州苗族侗族自治县', '2110'); INSERT INTO `boot_city` VALUES ('1324912533394845697', '228', '通道侗族自治县', '2111');
canal-server会监听binlog日志数据是否发生更改
13.效果截图
14.问题解决
注:如果能获取canal-sever数据,不能写入es
请替换canal-adapter的plugin目录下的client-adapter.es7x-1.1.5-jar-with-dependencies.jar