Springcloud Alibaba使用Canal将Mysql数据实时同步到Redis保证缓存的一致性

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: canal [kə'næl] ,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。其诞生的背景是早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

1. 背景

canal [kə'næl] ,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。其诞生的背景是早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。所以其核心功能如下:


数据实时备份

异构数据源(elasticsearch、Hbase)与数据库数据增量同步

业务缓存cache 刷新,保证缓存一致性

带业务逻辑的增量数据处理,如监听某个数据的变化做一定的逻辑处理

canal是借助于MySQL主从复制原理实现



master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);

slave将master的binary log events拷贝到它的中继日志(relay log);

slave重做中继日志中的事件,将改变反映它自己的数据。

canal的工作原理:




canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议

mysql master收到dump请求,开始推送binary log给slave(也就是canal)

canal解析binary log对象(原始为byte流)


2. Windows系统安装canal


Github下载链接:Releases · alibaba/canal · GitHub


解压



进入/conf/example,修改instance.properties


# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=123456
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################

之后进入到bin目录下启动startup就可以了。


3.Mysql准备工作

/*
SQLyog Community v13.2.0 (64 bit)
MySQL - 8.0.33 : Database - shop
*********************************************************************
*/
/*!40101 SET NAMES utf8 */;
/*!40101 SET SQL_MODE=''*/;
/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
CREATE DATABASE /*!32312 IF NOT EXISTS*/`shop` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci */ /*!80016 DEFAULT ENCRYPTION='N' */;
USE `shop`;
/*Table structure for table `brand` */
DROP TABLE IF EXISTS `brand`;
CREATE TABLE `brand` (
  `id` INT NOT NULL AUTO_INCREMENT COMMENT '品牌id',
  `name` VARCHAR(100) NOT NULL COMMENT '品牌名称',
  `image` VARCHAR(1000) DEFAULT '' COMMENT '品牌图片地址',
  `initial` VARCHAR(1) DEFAULT '' COMMENT '品牌的首字母',
  `sort` INT DEFAULT NULL COMMENT '排序',
  PRIMARY KEY (`id`)
) ENGINE=INNODB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8mb3 COMMENT='品牌表';
/*Data for the table `brand` */
INSERT  INTO `brand`(`id`,`name`,`image`,`initial`,`sort`) VALUES 
(11,'华为','https://sklll.oss-cn-beijing.aliyuncs.com/secby/eed72cc4-a9c1-4010-949a-03cef5b933d6.jpg','',NULL),
(12,'中兴','https://sklll.oss-cn-beijing.aliyuncs.com/secby/4fedb361-5ab3-4ad0-a667-580c1f37dff0.jpg','',NULL),
(13,'大疆','https://sklll.oss-cn-beijing.aliyuncs.com/secby/e8382c48-0487-4a9b-8fd0-a3716c3eea19.jpg','',NULL);
/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;



4. 公共依赖包

<dependencies>
        <!--web包-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--MyBatis Plus-->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.3.2</version>
        </dependency>
        <!--MySQL-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
        <!--Redis-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <!--Nacos-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
    </dependencies>


5. Redis缓存设计


这里基于Springcloud Alibaba进行设计,对应的服务名称为mall-goods


bootstrap.yaml代码如下:


server:
  port: 8081
spring:
  application:
    name: mall-goods
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/shop?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
    username: root
    password: 123456
  cloud:
    nacos:
      config:
        file-extension: yaml
        server-addr: localhost:8848
      discovery:
        #Nacos的注册地址
        server-addr: localhost:8848
  redis:
    host: xx //改为自己redis ip地址
    port: 6379

导入RedisConfig配置


@Configuration
public class RedisConfig {
    @Bean
    public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory factory){
        RedisTemplate<String,Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(factory);
        GenericJackson2JsonRedisSerializer serializer = new GenericJackson2JsonRedisSerializer();
        // 值采用json序列化
        redisTemplate.setValueSerializer(serializer);
        //使用StringRedisSerializer来序列化和反序列化redis的key值
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        // 设置hash key 和value序列化模式
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashValueSerializer(serializer);
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }
    @Bean
    public RedisCacheManager redisCacheManager(RedisTemplate redisTemplate) {
        RedisCacheWriter redisCacheWriter = RedisCacheWriter.nonLockingRedisCacheWriter(redisTemplate.getConnectionFactory());
        RedisCacheConfiguration redisCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig()
                .entryTtl(Duration.ofHours(12))//设置默认缓存时间
                .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(redisTemplate.getValueSerializer()));
        return new RedisCacheManager(redisCacheWriter, redisCacheConfiguration);
    }
}


对于Brand进行redis缓存设计


在主启动类上添加@EnableCaching开启缓存。


@Cacheable 注解可以标记一个方法需要被缓存。在注解中,可以指定缓存的名称和缓存的键。当方法被执行时,Spring Boot 会先查找缓存,如果缓存中存在相应的数据,则直接从缓存中读取,否则执行方法并将结果缓存到缓存中。


@CachePut 注解可以标记一个方法需要更新缓存。在注解中,可以指定缓存的名称和缓存的键。当方法被执行时,Spring Boot 会更新缓存中的数据。


@CacheEvict 注解可以标记一个方法需要删除缓存。在注解中,可以指定缓存的名称和缓存的键。当方法被执行时,Spring Boot 会删除缓存中对应的数据。


代码如下:

@RestController
@RequestMapping("/brand")
public class BrandController {
    @Autowired
    BrandService brandService;
    @GetMapping("/{id}")
    @Cacheable(value = "brand",key = "#id")
    public Brand search(@PathVariable(value = "id")Integer id){
        return brandService.getById(id);
    }
     /****
     * 添加缓存
     */
    @PostMapping
    @CachePut(value = "brand",key = "#brand.id")
    public Brand add(@RequestBody Brand brand){
        return brand;
    }
    /****
     * 修改缓存
     */
    @PutMapping
    @CachePut(value = "brand",key = "#brand.id")
    public Brand update(@RequestBody Brand brand){
        return brand;
    }
    /****
     * 删除缓存
     */
    @DeleteMapping("/{id}")
    @CacheEvict(value = "brand",key = "#id")
    public void delete(@PathVariable(value = "id")Integer id){
    }
}


6. mall-canal-service


这里微服务名称为mall-canal-service,监控Mysql数据库数据变化进行实时的更新缓存。


除公共依赖外导入依赖

<dependency>
            <groupId>top.javatool</groupId>
            <artifactId>canal-spring-boot-starter</artifactId>
            <version>1.2.1-RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-loadbalancer</artifactId>
        </dependency>

 

bootstrap.yaml设计如下:


server:
  port: 8083
spring:
  application:
    name: mall-canal
  cloud:
    nacos:
      config:
        file-extension: yaml
        server-addr: localhost:8848
      discovery:
        #Nacos的注册地址
        server-addr: localhost:8848
#Canal配置
canal:
  server: localhost:11111
  destination: example



设计Feign接口


@FeignClient(value = "mall-goods",contextId = "brand")//服务名字
public interface BrandFeign {
    @GetMapping("/brand/{id}")
    Brand search(@PathVariable(value = "id")Integer id);
    @PostMapping("/brand")
    Brand add(@RequestBody Brand brand);
    /****
     * 修改方法
     */
    @PutMapping("/brand")
     Brand update(@RequestBody Brand brand);
    /****
     * 删除方法
     */
    @DeleteMapping("/brand/{id}")
    void delete(@PathVariable(value = "id")Integer id);
}


Canal实时监控Mysql数据库变化,代码设计如下:


@Component
@CanalTable(value = "brand")
public class BrandHandler implements EntryHandler<Brand> {
    @Autowired
    BrandFeign brandFeign;
    @Override
    public void insert(Brand brand) {
        System.out.println(brand);
        brandFeign.add(brand);
    }
    @Override
    public void update(Brand before, Brand after) {
        System.out.println(after);
        brandFeign.update(after);
    }
    @Override
    public void delete(Brand brand) {
        System.out.println(brand);
        brandFeign.delete(brand.getId());
    }
}



在MallCanalApplication主启动类上添加@EnableFeignClients(basePackages = {"org.example.feign"}),包名为feign接口路径。


注意:本人在使用canal时,数据库有非varchar类型的null值,在运行时会抛异常错误。


at top.javatool.canal.client.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:52) ~[canal-client-1.2.1-RELEASE.jar:na]
  at top.javatool.canal.client.handler.impl.AsyncMessageHandlerImpl.lambda$handleMessage$0(AsyncMessageHandlerImpl.java:30) ~[canal-client-1.2.1-RELEASE.jar:na]
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_281]
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_281]
  at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_281]
Caused by: java.lang.NumberFormatException: For input string: ""
  at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) ~[na:1.8.0_281]

代码运行后,经过测试成功。

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
8天前
|
存储 Oracle 关系型数据库
【赵渝强老师】MySQL InnoDB的数据文件与重做日志文件
本文介绍了MySQL InnoDB存储引擎中的数据文件和重做日志文件。数据文件包括`.ibd`和`ibdata`文件,用于存放InnoDB数据和索引。重做日志文件(redo log)确保数据的可靠性和事务的持久性,其大小和路径可由相关参数配置。文章还提供了视频讲解和示例代码。
114 11
【赵渝强老师】MySQL InnoDB的数据文件与重做日志文件
|
8天前
|
缓存 NoSQL 关系型数据库
大厂面试高频:如何解决Redis缓存雪崩、缓存穿透、缓存并发等5大难题
本文详解缓存雪崩、缓存穿透、缓存并发及缓存预热等问题,提供高可用解决方案,帮助你在大厂面试和实际工作中应对这些常见并发场景。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
大厂面试高频:如何解决Redis缓存雪崩、缓存穿透、缓存并发等5大难题
|
9天前
|
存储 缓存 NoSQL
【赵渝强老师】基于Redis的旁路缓存架构
本文介绍了引入缓存后的系统架构,通过缓存可以提升访问性能、降低网络拥堵、减轻服务负载和增强可扩展性。文中提供了相关图片和视频讲解,并讨论了数据库读写分离、分库分表等方法来减轻数据库压力。同时,文章也指出了缓存可能带来的复杂度增加、成本提高和数据一致性问题。
【赵渝强老师】基于Redis的旁路缓存架构
|
8天前
|
缓存 NoSQL 关系型数据库
Redis和Mysql如何保证数据⼀致?
在项目中,为了解决Redis与Mysql的数据一致性问题,我们采用了多种策略:对于低一致性要求的数据,不做特别处理;时效性数据通过设置缓存过期时间来减少不一致风险;高一致性但时效性要求不高的数据,利用MQ异步同步确保最终一致性;而对一致性和时效性都有高要求的数据,则采用分布式事务(如Seata TCC模式)来保障。
42 14
|
11天前
|
SQL 前端开发 关系型数据库
SpringBoot使用mysql查询昨天、今天、过去一周、过去半年、过去一年数据
SpringBoot使用mysql查询昨天、今天、过去一周、过去半年、过去一年数据
43 9
|
11天前
|
NoSQL Java Redis
springCloud中将redis共用到common模块
通过将Redis配置和操作服务提取到Common模块,可以在Spring Cloud微服务架构中实现高效的代码复用和统一管理。这种设计不仅简化了各个服务的配置和依赖管理,还提高了代码的可维护性和可读性。希望本文对你在Spring Cloud项目中集成和使用Redis有所帮助。
13 0
|
2月前
|
SpringCloudAlibaba API 开发者
新版-SpringCloud+SpringCloud Alibaba
新版-SpringCloud+SpringCloud Alibaba
|
3月前
|
资源调度 Java 调度
Spring Cloud Alibaba 集成分布式定时任务调度功能
定时任务在企业应用中至关重要,常用于异步数据处理、自动化运维等场景。在单体应用中,利用Java的`java.util.Timer`或Spring的`@Scheduled`即可轻松实现。然而,进入微服务架构后,任务可能因多节点并发执行而重复。Spring Cloud Alibaba为此发布了Scheduling模块,提供轻量级、高可用的分布式定时任务解决方案,支持防重复执行、分片运行等功能,并可通过`spring-cloud-starter-alibaba-schedulerx`快速集成。用户可选择基于阿里云SchedulerX托管服务或采用本地开源方案(如ShedLock)
124 1
|
1月前
|
JSON SpringCloudAlibaba Java
Springcloud Alibaba + jdk17+nacos 项目实践
本文基于 `Springcloud Alibaba + JDK17 + Nacos2.x` 介绍了一个微服务项目的搭建过程,包括项目依赖、配置文件、开发实践中的新特性(如文本块、NPE增强、模式匹配)以及常见的问题和解决方案。通过本文,读者可以了解如何高效地搭建和开发微服务项目,并解决一些常见的开发难题。项目代码已上传至 Gitee,欢迎交流学习。
128 1
Springcloud Alibaba + jdk17+nacos 项目实践
|
24天前
|
消息中间件 自然语言处理 Java
知识科普:Spring Cloud Alibaba基本介绍
知识科普:Spring Cloud Alibaba基本介绍
56 2