mysql 5.7同步数据到es 7.6.2(集群)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS PostgreSQL,高可用系列 2核4GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: mysql 5.7同步数据到es 7.6.2(集群)

正文


一、前提条件


安装mycat 1.6

安装es 7.6.2集群

安装logstash 7.6.2


二、可选方案


在实际项目中,业务数据主流存储在mysql,但是mysql处理海量数据的搜索能力较差,推荐mysql搭配es,为业务提供强大的搜索能力成为业界主流方案,难点在于如何将mysql导入到es。


mysql数据同步到es,有以下几种方案:


更新mysql数据库的同时通过es api实现数据写入es(同步)

通过logstash同步数据到es(异步)

通过mysql binlog,将数据同步到es(异步)


三、es api同步数据


直接调用es api即可


    @PutMapping("/sync")
    @ApiOperation("同步修改ES")
    @CrossOrigin
    public void updateData(@RequestBody final ElasticsearchModel model) {
        String id = model.getId();
        String indexName = model.getIndexName();
        String paramJson = model.getData();
        elasticsearchUtil.updateData(indexName,id,paramJson);
    }


四、logstash同步数据到es


参考博客同步MYSQL数据到Elasticsearch


logstash通过使用jdbc-input实现定时同步mysql数据,因此需要提前下载mysql的jdbc驱动


1.配置logstash.mysql.conf


input {
  # 配置JDBC数据源
  jdbc {
    # mysql jdbc驱动路径
    jdbc_driver_library => "D:/xxxxx/mysql-connector-java-5.1.35.jar"
    # mysql jdbc驱动
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    # mysql 连接地址
    jdbc_connection_string => "jdbc:mysql://localhost:8066/TESTDB"
    # mysql 账号
    jdbc_user => "root"
    # mysql 密码
    jdbc_password => "123456"
    # 定时任务配置,一分钟执行一次
    schedule => "* * * * *"
    # sql查询语句
    # 增量更新数据
    statement => "select * from boot_resource where id > :sql_last_value"
    # 允许sql_last_value的值来源于结果的某个字段值
    use_column_value => true
    # sql_last_value的值来自查询结果的最后一个id值
    tracking_column => "id"
    # 开启分页
    jdbc_paging_enabled => true
    # 分页大小
    jdbc_page_size => 50
  }
}
output {
  # 配置数据导入es
  elasticsearch {
    # 索引名称
    index => "laokou-resource"
    # es地址
    hosts => ["192.168.1.1:9200","192.168.1.2:9200","192.168.1.3:9200"]
    # document_id设置mysql表主键
    document_id => "%{id}"
  }
  stdout {
    codec => rubydebug
  }
}


2.jdbc schedule 配置说明


jdbc schedule的配置规则


*   *   *   *   *
分  时  天  月  星期


各字段取值范围


  • 分 - 0 ~ 59
  • 时 - 0 ~ 23
  • 天 - 1 ~ 31
  • 月 - 1 ~ 12
  • 星期 - 0 ~ 6


常见特殊字符含义


  • 星号(*):代表所有值,如第一个星号,表示每分钟
  • 斜线(/):指定时间的间隔频率,如*/5,用在第一个分钟字段,表示每5分钟执行一次


举个栗子


# 每分钟执行一次
* * * * *
# 每5分钟执行一次
*/5 * * * *
# 每小时执行一次
* */1 * * *
# 每天八点执行一次
0 8 * * *


3.增量同步数据


同步数据的sql语句,直接扫描全表的数据,如果数据量较小,问题不大,如果数据量比较大,会直接卡死,logstash OOM挂掉,因此需要增量同步数据,并且每次仅同步新增数据


logstash提供了sql_last_value字段值,可以实现增量同步数据


实现思路:logstash每次执行sql时,会将sql查询结果的最后一条记录某个字段的值保存到sql_last_value字段中,下一次执行sql时,就以sql_last_value的值做条件,从这个值往后查询数据

statement => "select * from boot_resource where id > :sql_last_value"


4.数据分页


增量同步的数据太多,会导致logstash卡死,尤其是首次增量同步时(首次同步数据,其实是全表扫描),为避免一次查询太多数据,可以配置分页


    # 开启分页
    jdbc_paging_enabled => true
    # 分页大小
    jdbc_page_size => 50


5.启动logstash


logstash -f logstash.mysql.conf


五、mysql通过binlog同步数据到es


阿里的开源神器canal333.png


1. canal简介


canal主要用于对mysql的增量日志进行解析(请注意,只支持增量解析,不支持全量解析),提供增量数据的订阅和消费,对mysql增量数据进行实时同步,支持同步到mysql、elasticsearch、hbase等数据源


2.canal常用组件


canal-deployer(canal-server):监听mysql的binlog,把自己伪装成mysql slave,只负责接收数据,不做数据处理

canal-adapter:canal客户端,从canal-server中获取数据,对数据进行同步,可以同步到mysql、elasticsearch、hbase等数据源

canal-admin:提供整体配置管理、节点运维等功能,web界面方便用户快速和安全的操作


3.canal工作原理


333.png


canal模拟mysql master slave的交互协议,把自己伪装mysql slave,向mysql master发送dump协议(dump用于备份)

mysql master接收到dump请求,向canal推送binlog

canal通过解析binlog,将数据同步到其他的数据源


4.MySQL主从复制原理


111.png


mysql master在每个事务更新数据完成之前,将该操作记录串行写入binlog(二进制文件)文件中

mysql slave 开启一个I/O线程,该线程在master打开一个普通连接,主要工作的是binlog dump process,如果读取的进度已经跟上master,就会进入休眠状态并等待master产生新的事件,I/O线程的最终目的就是将这些事件写入到replay log(中继日志)

sql线程会读取中继日志,并按顺序执行该日志的sql事件,从而与mysql master的数据保持一致


5.下载canal 1.1.5并安装


6.mysql配置


master 配置


log-bin=mysql-bin ##开启二进制日志
binlog-format=row ##二进制日志格式(row\mixed\statement)


查看binlog是否启用


show variables like '%log_bin%';


222.png


查看binlog模式


show variables like '%binlog_format%';


111.png


7.创建账号,用于订阅binlog


CREATE USER canal IDENTIFIED BY '123456';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;


8.创建数据库


DROP TABLE IF EXISTS `boot_city`;
CREATE TABLE `boot_city` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'id',
  `parent_id` bigint(20) NOT NULL COMMENT '上级编号',
  `region` varchar(100) NOT NULL COMMENT '地区名称',
  `region_id` int(11) NOT NULL COMMENT '地区编号',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1324914279466823682 DEFAULT CHARSET=utf8mb4 ROW_FORMAT=DYNAMIC COMMENT='城市';


9.配置canal-server


1.配置conf/example/instance.properties


# 需要同步数据的MySQL地址
canal.instance.master.address=192.168.1.1:3306 
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# 用于同步数据的数据库账号
canal.instance.dbUsername=canal
# 用于同步数据的数据库密码
canal.instance.dbPassword=123456
# 数据库连接编码
canal.instance.connectionCharset = UTF-8
# 需要订阅binlog的表过滤正则表达式
canal.instance.filter.regex=.*\\..*


2.启动服务只需要双击bin/startup.bat


10.配置canal-adapter


1.修改conf/application.yml


#修改 es\mysql配置 
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://192.168.1.1:3306/kcloud?useUnicode=true
      username: root
      password: 123456
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: es7
        hosts: 192.168.1.1:9300,192.168.1.2:9300,192.168.1.3:9300 # 127.0.0.1:9200 for rest mode
        properties:
          mode: transport # or rest
          # security.auth: test:123456 #  only used for rest mode
          cluster.name: laokou-elasticsearch


2.修改conf/es7/mytest_user.yml


dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
  _index: laokou-city
  _id: id
  sql: "select a.id,a.parent_id,a.region,a.region_id from boot_city a"
  etlCondition: "where a.id >= {}"
  commitBatch: 3000


3.启动服务只需要双击bin/startup.bat


11.启动kibana,创建索引


PUT /laokou-city
{
  "mappings": {
    "properties": {
      "id":{
        "type": "long"
      },
      "parent_id":{
        "type": "long"
      },
      "region":{
        "type": "keyword"
      },
      "region_id":{
        "type": "integer"
      }
    }
  }
}


12.数据库插入数据


INSERT INTO `boot_city` VALUES ('1324912501966925826', '227', '东安县', '2093');
INSERT INTO `boot_city` VALUES ('1324912504676446210', '227', '双牌县', '2094');
INSERT INTO `boot_city` VALUES ('1324912506161229825', '227', '道县', '2095');
INSERT INTO `boot_city` VALUES ('1324912507595681794', '227', '江永县', '2096');
INSERT INTO `boot_city` VALUES ('1324912509306957825', '227', '宁远县', '2097');
INSERT INTO `boot_city` VALUES ('1324912511995506689', '227', '蓝山县', '2098');
INSERT INTO `boot_city` VALUES ('1324912514667278338', '227', '新田县', '2099');
INSERT INTO `boot_city` VALUES ('1324912516038815746', '227', '江华瑶族自治县', '2100');
INSERT INTO `boot_city` VALUES ('1324912517494239234', '228', '鹤城区', '2101');
INSERT INTO `boot_city` VALUES ('1324912520161816578', '228', '中方县', '2102');
INSERT INTO `boot_city` VALUES ('1324912521667571714', '228', '沅陵县', '2103');
INSERT INTO `boot_city` VALUES ('1324912523072663553', '228', '辰溪县', '2104');
INSERT INTO `boot_city` VALUES ('1324912524578418690', '228', '溆浦县', '2105');
INSERT INTO `boot_city` VALUES ('1324912526096756737', '228', '会同县', '2106');
INSERT INTO `boot_city` VALUES ('1324912527514431490', '228', '麻阳苗族自治县', '2107');
INSERT INTO `boot_city` VALUES ('1324912529036963841', '228', '新晃侗族自治县', '2108');
INSERT INTO `boot_city` VALUES ('1324912530588856321', '228', '芷江侗族自治县', '2109');
INSERT INTO `boot_city` VALUES ('1324912531956199426', '228', '靖州苗族侗族自治县', '2110');
INSERT INTO `boot_city` VALUES ('1324912533394845697', '228', '通道侗族自治县', '2111');


canal-server会监听binlog日志数据是否发生更改


13.效果截图


111.png


14.问题解决


注:如果能获取canal-sever数据,不能写入es


请替换canal-adapter的plugin目录下的client-adapter.es7x-1.1.5-jar-with-dependencies.jar

相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。   相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情: https://www.aliyun.com/product/rds/mysql 
目录
相关文章
|
5月前
|
缓存 NoSQL 关系型数据库
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
|
3月前
|
SQL 人工智能 关系型数据库
如何实现MySQL百万级数据的查询?
本文探讨了在MySQL中对百万级数据进行排序分页查询的优化策略。面对五百万条数据,传统的浅分页和深分页查询效率较低,尤其深分页因偏移量大导致性能显著下降。通过为排序字段添加索引、使用联合索引、手动回表等方法,有效提升了查询速度。最终建议根据业务需求选择合适方案:浅分页可加单列索引,深分页推荐联合索引或子查询优化,同时结合前端传递最后一条数据ID的方式实现高效翻页。
142 0
|
5月前
|
负载均衡 算法 关系型数据库
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!
本文聚焦 MySQL 集群架构中的负载均衡算法,阐述其重要性。详细介绍轮询、加权轮询、最少连接、加权最少连接、随机、源地址哈希等常用算法,分析各自优缺点及适用场景。并提供 Java 语言代码实现示例,助力直观理解。文章结构清晰,语言通俗易懂,对理解和应用负载均衡算法具有实用价值和参考价值。
大数据大厂之MySQL数据库课程设计:揭秘MySQL集群架构负载均衡核心算法:从理论到Java代码实战,让你的数据库性能飙升!
|
2月前
|
存储 关系型数据库 MySQL
在CentOS 8.x上安装Percona Xtrabackup工具备份MySQL数据步骤。
以上就是在CentOS8.x上通过Perconaxtabbackup工具对Mysql进行高效率、高可靠性、无锁定影响地实现在线快速全量及增加式数据库资料保存与恢复流程。通过以上流程可以有效地将Mysql相关资料按需求完成定期或不定期地保存与灾难恢复需求。
153 10
|
7月前
|
关系型数据库 MySQL Shell
MySQL 备份 Shell 脚本:支持远程同步与阿里云 OSS 备份
一款自动化 MySQL 备份 Shell 脚本,支持本地存储、远程服务器同步(SSH+rsync)、阿里云 OSS 备份,并自动清理过期备份。适用于数据库管理员和开发者,帮助确保数据安全。
|
3月前
|
SQL 存储 缓存
MySQL 如何高效可靠处理持久化数据
本文详细解析了 MySQL 的 SQL 执行流程、crash-safe 机制及性能优化策略。内容涵盖连接器、分析器、优化器、执行器与存储引擎的工作原理,深入探讨 redolog 与 binlog 的两阶段提交机制,并分析日志策略、组提交、脏页刷盘等关键性能优化手段,帮助提升数据库稳定性与执行效率。
|
6月前
|
关系型数据库 MySQL Linux
在Linux环境下备份Docker中的MySQL数据并传输到其他服务器以实现数据级别的容灾
以上就是在Linux环境下备份Docker中的MySQL数据并传输到其他服务器以实现数据级别的容灾的步骤。这个过程就像是一场接力赛,数据从MySQL数据库中接力棒一样传递到备份文件,再从备份文件传递到其他服务器,最后再传递回MySQL数据库。这样,即使在灾难发生时,我们也可以快速恢复数据,保证业务的正常运行。
276 28
|
6月前
|
负载均衡 算法 关系型数据库
大数据新视界--大数据大厂之MySQL数据库课程设计:MySQL集群架构负载均衡故障排除与解决方案
本文深入探讨 MySQL 集群架构负载均衡的常见故障及排除方法。涵盖请求分配不均、节点无法响应、负载均衡器故障等现象,介绍多种负载均衡算法及故障排除步骤,包括检查负载均衡器状态、调整算法、诊断修复节点故障等。还阐述了预防措施与确保系统稳定性的方法,如定期监控维护、备份恢复策略、团队协作与知识管理等。为确保 MySQL 数据库系统高可用性提供全面指导。
|
5月前
|
存储 SQL 缓存
mysql数据引擎有哪些
MySQL 提供了多种存储引擎,每种引擎都有其独特的特点和适用场景。以下是一些常见的 MySQL 存储引擎及其特点:
140 0
|
7月前
|
存储 SQL 关系型数据库
【YashanDB知识库】MySQL迁移至崖山char类型数据自动补空格问题
**简介**:在MySQL迁移到崖山环境时,若字段类型为char(2),而应用存储的数据仅为'0'或'1',查询时崖山会自动补空格。原因是mysql的sql_mode可能启用了PAD_CHAR_TO_FULL_LENGTH模式,导致保留CHAR类型尾随空格。解决方法是与应用确认数据需求,可将崖山环境中的char类型改为varchar类型以规避补空格问题,适用于所有版本。

推荐镜像

更多