微服务轮子项目(36) -Canal数据库日志解析消费

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: 微服务轮子项目(36) -Canal数据库日志解析消费

1. Canal概述

早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。

目前内部版本已经支持mysql和oracle部分版本的日志解析

当前的canal开源版本支持5.7及以下的版本(阿里内部mysql

5.7.13, 5.6.10, mysql 5.5.18和5.1.40/48)

基于日志增量订阅&消费支持的业务:

  • 数据库镜像
  • 数据库实时备份
  • 多级索引 (卖家和买家各自分库索引)
  • search build
  • 业务cache刷新
  • 价格变化等重要业务消息

1.1 工作原理

  1. canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  2. mysql master收到dump请求,开始推送binary log给slave(也就是canal)
  3. canal解析binary log对象(原始为byte流)

1.2 架构

说明:

  • server代表一个canal运行实例,对应于一个jvm
  • instance对应于一个数据队列 (1个server对应1…n个instance)

instance模块:

  • eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
  • eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
  • eventStore (数据存储)
  • metaManager (增量订阅&消费信息管理器)

1.3 HA机制设计

canal的HA分为两部分,canal servercanal client分别有对应的HA实现

  • canal server: 为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态.
  • canal client: 为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。

整个HA机制的控制主要是依赖了zookeeper的几个特性,watcher和EPHEMERAL节点(和session生命周期绑定)。

Canal Server:

大致步骤:

  1. canal server要启动某个canal instance时都先向zookeeper进行一次尝试启动判断 (实现:创建EPHEMERAL节点,谁创建成功就允许谁启动)
  2. 创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状态
  3. 一旦zookeeper发现canal server A创建的节点消失后,立即通知其他的canal server再次进行步骤1的操作,重新选出一个canal server启动instance.
  4. canal client每次进行connect时,会首先向zookeeper询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect.

Canal Client的方式和canal server方式类似,也是利用zokeeper的抢占EPHEMERAL节点的方式进行控制。

1.4 相关资料

2. 安装部署

2.1 创建数据库用户canal

目标数据库先要创建好canal用的用户

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

2.2 远程拉取

  1. 访问docker hub获取最新的版本 访问:https://hub.docker.com/r/canal/canal-server/tags/
  2. 下载对应的版本,比如最新版为1.1.3
docker pull canal/canal-server:v1.1.3

2.3 启动Docker

docker目录下自带了一个run.sh脚本

下载脚本:

wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run.sh 

创建启动脚本:

vim start.sh
docker stop canal-server
docker rm canal-server
sh run.sh -e canal.auto.scan=false \
              -e canal.destinations=zlt-test \
              -e canal.instance.master.address=192.168.28.130:3306  \
              -e canal.instance.dbUsername=canal  \
              -e canal.instance.dbPassword=canal  \
              -e canal.instance.connectionCharset=UTF-8 \
              -e canal.instance.tsdb.enable=true \
              -e canal.instance.gtidon=false  \

destinations:目标数据库名

instance.master.address:目标数据库地址

instance.dbUsername:目标数据库用户名

instance.dbPassword:目标数据库密码

具体其他配置可参考AdminGuide

docker模式下,单docker实例只能运行一个instance,主要为配置问题。如果需要运行多instance时,可以自行制作一份docker镜像即可

2.4 运行

sh start.sh
• 1

运行效果:看到successful之后,就代表canal-server启动成功,可以启动canal-client连接上来进行binlog订阅了

2.5 MQ消息投递

canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有:RocketMQ和Kafka

将上面的启动脚本改为以下,增加MQ相关参数:

docker stop canal-server
docker rm canal-server
sh run.sh -e canal.auto.scan=false \
                  -e canal.destinations=zlt-test \
                  -e canal.instance.master.address=192.168.28.130:3306  \
                  -e canal.instance.dbUsername=canal  \
                  -e canal.instance.dbPassword=canal  \
                  -e canal.instance.connectionCharset=UTF-8 \
                  -e canal.instance.tsdb.enable=true \
                  -e canal.instance.gtidon=false  \
                  -e canal.mq.topic=canal-test \
                  -e canal.serverMode=RocketMQ \
                  -e canal.mq.servers=192.168.28.130:9876 \

canal.mq.topic:配置mq的topic

canal.serverMode:tcp(默认), kafka, RocketMQ

canal.mq.servers:mq地址

投递MQ后的消息如下图:

2.6 如果要订阅的是mysql的从库该怎么做?

生产环境中的主库是不能随便重启的,所以订阅的话必须订阅mysql主从的从库,而从库中是默认下只将主库的操作写进中继日志,并写到自己的二进制日志的,所以需要让其成为canal的主库,必须让其将日志也写到自己的二进制日志里面。处理方法:修改my.cnf,增加一行log_slave_updates=1,重启数据库后就可以了。

3. 实时同步数据到ElasticSearch

如今大型的IT系统中,都会使用分布式的方式,作为使用最广泛的数据库,如何将mysql的数据与中间件的数据进行同步,既能确保数据的一致性、及时性,也能做到代码无侵入的方式呢?下面介绍如何实现数据修改后,需要及时的将mysql中的数据更新到elasticsearch中。

3.1 数据同步方案选择

  • 代码实现(双写):针对代码中进行数据库的增删改操作时,同时进行elasticsearch的增删改操作。
  • mybatis实现:通过mybatis plugin进行实现,截取sql语句进行分析, 针对insert、update、delete的语句进行处理。显然,这些操作如果都是单条数据的操作,是很容易处理的。但是,实际开发中,总是会有一些批量的更新或者删除操作,这时候,就很难进行处理了。
  • Aop实现:不管是通过哪种Aop方式,根据制定的规则,如规范方法名,注解等进行切面处理,但依然还是会出现无法处理批量操作数据的问题。
  • logstash:logstash类似的同步组件提供的文件和数据同步的功能,可以进行数据的同步,只需要简单的配置就能将mysql数据同步到elasticsearch,但是logstash的原理是每分钟进行一次增量数据查询,将结果同步到elasticsearch,实时性要求特别高的,可能无法满足要求。且此方案的性能不是很好,造成资源的浪费。
实现方式 优缺点
代码实现 技术难度低,侵入性强,实时性高
基于mybatis 有一定的技术难度,但是无法覆盖所有的场景
Aop实现 技术难度低,半侵入性,需要规范代码,依然无法覆盖所有的场景
logstash 技术难度低,无侵入性,无需开发,但会造成资源浪费,实时性也不高

那么是否有什么更好的方式进行处理吗?

  • mysql binlog同步,实时性强,对于应用无任何侵入性,且性能更好,不会造成资源浪费。Canal安装部署通过数据库binlog实时抓取数据更新信息推送到消息队列MQ里,然后就可以通过消费MQ消息把数据实时同步到不同的异构数据源里了

3.2 增量同步ES

1.创建索引:在同步之前需要先创建号索引,下面是创建sys_user索引的例子

curl -X PUT "http://192.168.28.130:9200/sys_user/" -H 'Content-Type: application/json' -d'
{
    "settings" : {
      "number_of_shards" : 1,
        "number_of_replicas" : 0
    },
    "mappings":{
      "properties":{
                "id": {
                    "type": "long"
                },
                "username": {
                    "type": "text",
                    "fields":{
                        "keyword":{
                            "type":"keyword",
                            "ignore_above":256
                        }
                    },
                    "analyzer": "ik_max_word"
                },
                "nickname": {
                    "type": "text",
                    "fields":{
                        "keyword":{
                            "type":"keyword",
                            "ignore_above":256
                        }
                    },
                    "analyzer": "ik_max_word"
                },
                "mobile": {
                    "type": "text",
                    "fields":{
                        "keyword":{
                            "type":"keyword",
                            "ignore_above":256
                        }
                    },
                    "analyzer": "ik_max_word"
                },
                "sex": {
                    "type": "keyword"
                },
                "type": {
                    "type": "text",
                    "fields":{
                        "keyword":{
                            "type":"keyword",
                            "ignore_above":256
                        }
                    },
                    "analyzer": "ik_max_word"
                },
                "createTime": {
                    "type": "date"
                },
                "updateTime": {
                    "type": "date"
                },
                "company": {
                    "type": "text",
                    "fields":{
                        "keyword":{
                            "type":"keyword",
                            "ignore_above":256
                        }
                    },
                    "analyzer": "ik_max_word"
                },
                "openId": {
                    "type": "text",
                    "fields":{
                        "keyword":{
                            "type":"keyword",
                            "ignore_above":256
                        }
                    }
                },
                "isDel": {
                    "type": "keyword"
                }
      }
    }
}

地址192.168.28.130:9200为es的ip地址

sys_user为索引名

2.安装配置Adapter,下载adapter:https://github.com/alibaba/canal/releases

详细的配置说明请参考官方wiki:https://github.com/alibaba/canal/wiki/Sync-ES

3.同步表sys_user的配置样例:

canal.adapter-xxx\conf\application.yml:

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null
canal.conf:
  mode: rocketMQ # kafka rocketMQ
  mqServers: 192.168.28.130:9876 #or rocketmq
  flatMessage: true
  batchSize: 500
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://192.168.28.130:3306/user-center?useUnicode=true
      username: canal
      password: canal
  canalAdapters:
  - instance: canal-sys-user # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: es
        hosts: 192.168.28.130:9300
        properties:
          cluster.name: my-es

mode:消费的类型有3种选择tcp、kafka和rocketMQ

mqServers: mq的地址

defaultDS:配置源数据库的地址

instance:配置mq的topic名称

es:配置es的地址和集群名

canal.adapter-xxx\conf\es\sys_user.yml:

dataSourceKey: defaultDS
destination: canal-sys-user
groupId: g1
esMapping:
  _index: sys_user
  _type: search_data
  _id: id
  upsert: true
  sql: "select id, username, nickname, mobile
          , case when sex = 0 then '男' when sex = 1 then '女' end sex
          , case when type = 'app' then '移动用户' when type = 'BACKEND' then '后台用户' end type
          , create_time createTime, update_time updateTime, company, open_id openId
          , case when is_del = 0 then '否' when is_del = 1 then '是' end isDel
        from sys_user"
  etlCondition: "where update_time >= '{0}'"
  commitBatch: 3000

dataSourceKey:配置application.yml中源数据库的key

destination:配置mq的topic名称

_index:插入es中的索引名

_type:插入es中mappings的type属性

_id:配置id字段

upsert:配置插入数据正常时写入,主键冲突时更新

sql:配置具体要同步es的数据

etlCondition:条件判断,通过更新日期实现增量同步

3.3 历史数据全量同步ES

如果在搭建增量同步之前mysql数据库已经存在历史数据,就需要做初始化同步,全量同步可以使用Canal-Adapterrest-api来实现

全量同步初始化,例子如下:

curl -X POST http://192.168.28.130:8081/etl/es/sys_user.yml

ip为Canal-Adapter所在服务器ip

路径/es/sys_user.yml为conf目录下配置文件的路径,会自动忽略where条件进行全量同步

目录
相关文章
|
3天前
|
人工智能 监控 算法
3D-Speaker:阿里通义开源的多模态说话人识别项目,支持说话人识别、语种识别、多模态识别、说话人重叠检测和日志记录
3D-Speaker是阿里巴巴通义实验室推出的多模态说话人识别开源项目,结合声学、语义和视觉信息,提供高精度的说话人识别和语种识别功能。项目包含工业级模型、训练和推理代码,以及大规模多设备、多距离、多方言的数据集,适用于多种应用场景。
71 18
3D-Speaker:阿里通义开源的多模态说话人识别项目,支持说话人识别、语种识别、多模态识别、说话人重叠检测和日志记录
|
3月前
|
消息中间件 监控 开发工具
微服务(三)-实现自动刷新配置(不重启项目情况下)
微服务(三)-实现自动刷新配置(不重启项目情况下)
|
24天前
|
存储 运维 数据可视化
如何为微服务实现分布式日志记录
如何为微服务实现分布式日志记录
43 1
|
1月前
|
JSON Java 数据库
SpringBoot项目使用AOP及自定义注解保存操作日志
SpringBoot项目使用AOP及自定义注解保存操作日志
53 1
|
1月前
|
消息中间件 存储 监控
微服务日志监控的挑战及应对方案
【10月更文挑战第23天】微服务化带来模块独立与快速扩展,但也使得日志监控复杂。日志作用包括业务记录、异常追踪和性能定位。
|
2月前
|
存储 监控 安全
深入解析Sysmon日志:增强网络安全与威胁应对的关键一环
在不断演进的网络安全领域中,保持对威胁的及时了解至关重要。Sysmon日志在这方面发挥了至关重要的作用,通过提供有价值的见解,使组织能够加强其安全姿态。Windows在企业环境中是主导的操作系统,因此深入了解Windows事件日志、它们的独特特性和局限性,并通过Sysmon进行增强,变得至关重要。
|
2月前
|
存储 关系型数据库 MySQL
MySQL中的Redo Log、Undo Log和Binlog:深入解析
【10月更文挑战第21天】在数据库管理系统中,日志是保障数据一致性和完整性的关键机制。MySQL作为一种广泛使用的关系型数据库管理系统,提供了多种日志类型来满足不同的需求。本文将详细介绍MySQL中的Redo Log、Undo Log和Binlog,从背景、业务场景、功能、底层实现原理、使用措施等方面进行详细分析,并通过Java代码示例展示如何与这些日志进行交互。
247 0
|
3月前
|
存储 缓存 关系型数据库
redo log 原理解析
redo log 原理解析
57 0
redo log 原理解析
|
4月前
|
开发框架 .NET Docker
【Azure 应用服务】App Service .NET Core项目在Program.cs中自定义添加的logger.LogInformation,部署到App Service上后日志不显示Log Stream中的问题
【Azure 应用服务】App Service .NET Core项目在Program.cs中自定义添加的logger.LogInformation,部署到App Service上后日志不显示Log Stream中的问题
|
4月前
|
数据库 Java 监控
Struts 2 日志管理化身神秘魔法师,洞察应用运行乾坤,演绎奇幻篇章!
【8月更文挑战第31天】在软件开发中,了解应用运行状况至关重要。日志管理作为 Struts 2 应用的关键组件,记录着每个动作和决策,如同监控摄像头,帮助我们迅速定位问题、分析性能和使用情况,为优化提供依据。Struts 2 支持多种日志框架(如 Log4j、Logback),便于配置日志级别、格式和输出位置。通过在 Action 类中添加日志记录,我们能在开发过程中获取详细信息,及时发现并解决问题。合理配置日志不仅有助于调试,还能分析用户行为,提升应用性能和稳定性。
66 0

热门文章

最新文章

推荐镜像

更多