微服务轮子项目(36) -Canal数据库日志解析消费

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
云数据库 RDS SQL Server,独享型 2核4GB
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: 微服务轮子项目(36) -Canal数据库日志解析消费

1. Canal概述

早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。

目前内部版本已经支持mysql和oracle部分版本的日志解析

当前的canal开源版本支持5.7及以下的版本(阿里内部mysql

5.7.13, 5.6.10, mysql 5.5.18和5.1.40/48)

基于日志增量订阅&消费支持的业务:

  • 数据库镜像
  • 数据库实时备份
  • 多级索引 (卖家和买家各自分库索引)
  • search build
  • 业务cache刷新
  • 价格变化等重要业务消息

1.1 工作原理

  1. canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  2. mysql master收到dump请求,开始推送binary log给slave(也就是canal)
  3. canal解析binary log对象(原始为byte流)

1.2 架构

说明:

  • server代表一个canal运行实例,对应于一个jvm
  • instance对应于一个数据队列 (1个server对应1…n个instance)

instance模块:

  • eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
  • eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
  • eventStore (数据存储)
  • metaManager (增量订阅&消费信息管理器)

1.3 HA机制设计

canal的HA分为两部分,canal servercanal client分别有对应的HA实现

  • canal server: 为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态.
  • canal client: 为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。

整个HA机制的控制主要是依赖了zookeeper的几个特性,watcher和EPHEMERAL节点(和session生命周期绑定)。

Canal Server:

大致步骤:

  1. canal server要启动某个canal instance时都先向zookeeper进行一次尝试启动判断 (实现:创建EPHEMERAL节点,谁创建成功就允许谁启动)
  2. 创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状态
  3. 一旦zookeeper发现canal server A创建的节点消失后,立即通知其他的canal server再次进行步骤1的操作,重新选出一个canal server启动instance.
  4. canal client每次进行connect时,会首先向zookeeper询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect.

Canal Client的方式和canal server方式类似,也是利用zokeeper的抢占EPHEMERAL节点的方式进行控制。

1.4 相关资料

2. 安装部署

2.1 创建数据库用户canal

目标数据库先要创建好canal用的用户

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

2.2 远程拉取

  1. 访问docker hub获取最新的版本 访问:https://hub.docker.com/r/canal/canal-server/tags/
  2. 下载对应的版本,比如最新版为1.1.3
docker pull canal/canal-server:v1.1.3

2.3 启动Docker

docker目录下自带了一个run.sh脚本

下载脚本:

wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run.sh 

创建启动脚本:

vim start.sh
docker stop canal-server
docker rm canal-server
sh run.sh -e canal.auto.scan=false \
              -e canal.destinations=zlt-test \
              -e canal.instance.master.address=192.168.28.130:3306  \
              -e canal.instance.dbUsername=canal  \
              -e canal.instance.dbPassword=canal  \
              -e canal.instance.connectionCharset=UTF-8 \
              -e canal.instance.tsdb.enable=true \
              -e canal.instance.gtidon=false  \

destinations:目标数据库名

instance.master.address:目标数据库地址

instance.dbUsername:目标数据库用户名

instance.dbPassword:目标数据库密码

具体其他配置可参考AdminGuide

docker模式下,单docker实例只能运行一个instance,主要为配置问题。如果需要运行多instance时,可以自行制作一份docker镜像即可

2.4 运行

sh start.sh
• 1

运行效果:看到successful之后,就代表canal-server启动成功,可以启动canal-client连接上来进行binlog订阅了

2.5 MQ消息投递

canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有:RocketMQ和Kafka

将上面的启动脚本改为以下,增加MQ相关参数:

docker stop canal-server
docker rm canal-server
sh run.sh -e canal.auto.scan=false \
                  -e canal.destinations=zlt-test \
                  -e canal.instance.master.address=192.168.28.130:3306  \
                  -e canal.instance.dbUsername=canal  \
                  -e canal.instance.dbPassword=canal  \
                  -e canal.instance.connectionCharset=UTF-8 \
                  -e canal.instance.tsdb.enable=true \
                  -e canal.instance.gtidon=false  \
                  -e canal.mq.topic=canal-test \
                  -e canal.serverMode=RocketMQ \
                  -e canal.mq.servers=192.168.28.130:9876 \

canal.mq.topic:配置mq的topic

canal.serverMode:tcp(默认), kafka, RocketMQ

canal.mq.servers:mq地址

投递MQ后的消息如下图:

2.6 如果要订阅的是mysql的从库该怎么做?

生产环境中的主库是不能随便重启的,所以订阅的话必须订阅mysql主从的从库,而从库中是默认下只将主库的操作写进中继日志,并写到自己的二进制日志的,所以需要让其成为canal的主库,必须让其将日志也写到自己的二进制日志里面。处理方法:修改my.cnf,增加一行log_slave_updates=1,重启数据库后就可以了。

3. 实时同步数据到ElasticSearch

如今大型的IT系统中,都会使用分布式的方式,作为使用最广泛的数据库,如何将mysql的数据与中间件的数据进行同步,既能确保数据的一致性、及时性,也能做到代码无侵入的方式呢?下面介绍如何实现数据修改后,需要及时的将mysql中的数据更新到elasticsearch中。

3.1 数据同步方案选择

  • 代码实现(双写):针对代码中进行数据库的增删改操作时,同时进行elasticsearch的增删改操作。
  • mybatis实现:通过mybatis plugin进行实现,截取sql语句进行分析, 针对insert、update、delete的语句进行处理。显然,这些操作如果都是单条数据的操作,是很容易处理的。但是,实际开发中,总是会有一些批量的更新或者删除操作,这时候,就很难进行处理了。
  • Aop实现:不管是通过哪种Aop方式,根据制定的规则,如规范方法名,注解等进行切面处理,但依然还是会出现无法处理批量操作数据的问题。
  • logstash:logstash类似的同步组件提供的文件和数据同步的功能,可以进行数据的同步,只需要简单的配置就能将mysql数据同步到elasticsearch,但是logstash的原理是每分钟进行一次增量数据查询,将结果同步到elasticsearch,实时性要求特别高的,可能无法满足要求。且此方案的性能不是很好,造成资源的浪费。
实现方式 优缺点
代码实现 技术难度低,侵入性强,实时性高
基于mybatis 有一定的技术难度,但是无法覆盖所有的场景
Aop实现 技术难度低,半侵入性,需要规范代码,依然无法覆盖所有的场景
logstash 技术难度低,无侵入性,无需开发,但会造成资源浪费,实时性也不高

那么是否有什么更好的方式进行处理吗?

  • mysql binlog同步,实时性强,对于应用无任何侵入性,且性能更好,不会造成资源浪费。Canal安装部署通过数据库binlog实时抓取数据更新信息推送到消息队列MQ里,然后就可以通过消费MQ消息把数据实时同步到不同的异构数据源里了

3.2 增量同步ES

1.创建索引:在同步之前需要先创建号索引,下面是创建sys_user索引的例子

curl -X PUT "http://192.168.28.130:9200/sys_user/" -H 'Content-Type: application/json' -d'
{
    "settings" : {
      "number_of_shards" : 1,
        "number_of_replicas" : 0
    },
    "mappings":{
      "properties":{
                "id": {
                    "type": "long"
                },
                "username": {
                    "type": "text",
                    "fields":{
                        "keyword":{
                            "type":"keyword",
                            "ignore_above":256
                        }
                    },
                    "analyzer": "ik_max_word"
                },
                "nickname": {
                    "type": "text",
                    "fields":{
                        "keyword":{
                            "type":"keyword",
                            "ignore_above":256
                        }
                    },
                    "analyzer": "ik_max_word"
                },
                "mobile": {
                    "type": "text",
                    "fields":{
                        "keyword":{
                            "type":"keyword",
                            "ignore_above":256
                        }
                    },
                    "analyzer": "ik_max_word"
                },
                "sex": {
                    "type": "keyword"
                },
                "type": {
                    "type": "text",
                    "fields":{
                        "keyword":{
                            "type":"keyword",
                            "ignore_above":256
                        }
                    },
                    "analyzer": "ik_max_word"
                },
                "createTime": {
                    "type": "date"
                },
                "updateTime": {
                    "type": "date"
                },
                "company": {
                    "type": "text",
                    "fields":{
                        "keyword":{
                            "type":"keyword",
                            "ignore_above":256
                        }
                    },
                    "analyzer": "ik_max_word"
                },
                "openId": {
                    "type": "text",
                    "fields":{
                        "keyword":{
                            "type":"keyword",
                            "ignore_above":256
                        }
                    }
                },
                "isDel": {
                    "type": "keyword"
                }
      }
    }
}

地址192.168.28.130:9200为es的ip地址

sys_user为索引名

2.安装配置Adapter,下载adapter:https://github.com/alibaba/canal/releases

详细的配置说明请参考官方wiki:https://github.com/alibaba/canal/wiki/Sync-ES

3.同步表sys_user的配置样例:

canal.adapter-xxx\conf\application.yml:

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null
canal.conf:
  mode: rocketMQ # kafka rocketMQ
  mqServers: 192.168.28.130:9876 #or rocketmq
  flatMessage: true
  batchSize: 500
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://192.168.28.130:3306/user-center?useUnicode=true
      username: canal
      password: canal
  canalAdapters:
  - instance: canal-sys-user # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: es
        hosts: 192.168.28.130:9300
        properties:
          cluster.name: my-es

mode:消费的类型有3种选择tcp、kafka和rocketMQ

mqServers: mq的地址

defaultDS:配置源数据库的地址

instance:配置mq的topic名称

es:配置es的地址和集群名

canal.adapter-xxx\conf\es\sys_user.yml:

dataSourceKey: defaultDS
destination: canal-sys-user
groupId: g1
esMapping:
  _index: sys_user
  _type: search_data
  _id: id
  upsert: true
  sql: "select id, username, nickname, mobile
          , case when sex = 0 then '男' when sex = 1 then '女' end sex
          , case when type = 'app' then '移动用户' when type = 'BACKEND' then '后台用户' end type
          , create_time createTime, update_time updateTime, company, open_id openId
          , case when is_del = 0 then '否' when is_del = 1 then '是' end isDel
        from sys_user"
  etlCondition: "where update_time >= '{0}'"
  commitBatch: 3000

dataSourceKey:配置application.yml中源数据库的key

destination:配置mq的topic名称

_index:插入es中的索引名

_type:插入es中mappings的type属性

_id:配置id字段

upsert:配置插入数据正常时写入,主键冲突时更新

sql:配置具体要同步es的数据

etlCondition:条件判断,通过更新日期实现增量同步

3.3 历史数据全量同步ES

如果在搭建增量同步之前mysql数据库已经存在历史数据,就需要做初始化同步,全量同步可以使用Canal-Adapterrest-api来实现

全量同步初始化,例子如下:

curl -X POST http://192.168.28.130:8081/etl/es/sys_user.yml

ip为Canal-Adapter所在服务器ip

路径/es/sys_user.yml为conf目录下配置文件的路径,会自动忽略where条件进行全量同步

目录
相关文章
|
2月前
|
监控 安全 测试技术
从开发到测试再到发布,全方位解析项目上线的完美路程!
从开发到测试再到发布,全方位解析项目上线的完美路程!
|
3月前
|
Dubbo Java 应用服务中间件
阿里巴巴资深架构师深度解析微服务架构设计之SpringCloud+Dubbo
软件架构是一个包含各种组织的系统组织,这些组件包括Web服务器,应用服务器,数据库,存储,通讯层),它们彼此或和环境存在关系。系统架构的目标是解决利益相关者的关注点。
|
1天前
|
SQL NoSQL 数据库
深入浅出:微服务架构下的数据库事务管理
【2月更文挑战第12天】 在当今微服务架构日益流行的背景下,如何有效地管理跨服务的数据库事务成为了开发与维护中的一大挑战。本文旨在探讨微服务环境下数据库事务管理的关键技术和策略,包括但不限于分布式事务的基本概念、常见的解决方案(如两阶段提交、补偿事务等),以及这些方案在实际应用中的优缺点比较。通过深入浅出的方式,本文希望能够帮助读者更好地理解并应对微服务架构下的数据库事务管理问题,进而提升系统的稳定性和可靠性。
|
9天前
|
设计模式 测试技术 Go
Go 项目必备:Wire 依赖注入工具的深度解析与实战应用
在现代软件开发中,依赖注入(Dependency Injection,简称 DI)已经成为一种广泛采用的设计模式。它的核心思想是通过外部定义的方式,将组件之间的依赖关系解耦,从而提高代码的可维护性、可扩展性和可测试性。然而,随着项目规模的增长,手动管理复杂的依赖关系变得日益困难。这时,依赖注入代码生成工具就显得尤为重要。在众多工具中,Wire 以其简洁、强大和易用性脱颖而出,成为 Go 语言项目中的宠儿。本文将带你深入了解 Wire 的安装、基本使用、核心概念以及高级用法,并通过一个实际的 web 博客项目示例,展示如何利用 Wire 简化依赖注入的实现。准备好了吗?让我们开始这场代码解耦的奇
|
28天前
|
SQL 监控 关系型数据库
数据库日志解析:深入了解MySQL中的各类日志
数据库日志解析:深入了解MySQL中的各类日志
53 0
|
28天前
|
NoSQL 数据管理 数据库
浅谈微服务架构下的数据库设计策略
在当今快速发展的软件工程领域,微服务架构以其灵活性和可扩展性成为了众多企业和开发者的首选。然而,随着服务的细分,数据管理和存储面临着前所未有的挑战。本文将探讨微服务架构下的数据库设计策略,包括服务间数据的独立性、事务一致性问题的处理、以及数据迁移和备份的最佳实践。我们将通过对比传统单体架构与微服务架构下的数据库设计差异,提出几种有效的数据库设计方案,旨在为开发者提供在微服务环境下处理复杂数据问题的思路和方法。
15 0
|
1月前
|
敏捷开发 弹性计算 架构师
浅谈微服务架构下的数据库设计与实践
在当今快速发展的软件工程领域,微服务架构因其高度的模块化和灵活性而受到广泛欢迎。然而,随之而来的是对数据库设计和管理提出了新的挑战。本文将探讨在微服务架构下,如何有效地设计和实践数据库以支持服务的独立性、数据的一致性和系统的扩展性。我们将从微服务的数据库隔离策略谈起,深入分析数据库的分库分表、事务管理、数据一致性解决方案等关键技术,并通过实例说明如何在实际项目中应用这些原则和技术。本文旨在为软件开发者和架构师提供一份指南,帮助他们在微服务架构的环境下,更好地进行数据库设计和管理。
145 1
|
2月前
|
存储 JSON NoSQL
cJSON项目解析
cJSON项目解析
|
3月前
|
Java Maven
maven 项目配置日志打印以及异常日志打印问题
maven 项目配置日志打印以及异常日志打印问题
24 0
|
3月前
|
NoSQL Java 应用服务中间件
2024年面试复盘大全500道:Redis+ZK+Nginx+数据库+分布式+微服务
今天分享给大家的都是目前主流企业使用最高频的面试题库,也都是 Java 版本升级之后,重新整理归纳的最新答案,会让面试者少走很多不必要的弯路。同时每个专题都做到了详尽的面试解析文档,以确保每个阶段的读者都能看得懂,同时这部分面试文档也是可以免费的提供给有需要的同学们学习的

推荐镜像

更多