Canal+Kafka实现MySQL与Redis数据同步(二)

本文涉及的产品
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
RDS AI 助手,专业版
简介: Canal+Kafka实现MySQL与Redis数据同步

创建MQ消费者进行同步

在application.yml配置文件加上kafka的配置信息:

spring:
  kafka:
      # Kafka服务地址
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      # 指定一个默认的组名
      group-id: consumer-group1
      #序列化反序列化
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringDeserializer
      value-serializer: org.apache.kafka.common.serialization.StringDeserializer
      # 批量抓取
      batch-size: 65536
      # 缓存容量
      buffer-memory: 524288

根据上面Kafka消费命令那里,我们知道了json数据的结构,可以创建一个CanalBean对象进行接收:

public class CanalBean {
    //数据
    private List<TbCommodityInfo> data;
    //数据库名称
    private String database;
    private long es;
    //递增,从1开始
    private int id;
    //是否是DDL语句
    private boolean isDdl;
    //表结构的字段类型
    private MysqlType mysqlType;
    //UPDATE语句,旧数据
    private String old;
    //主键名称
    private List<String> pkNames;
    //sql语句
    private String sql;
    private SqlType sqlType;
    //表名
    private String table;
    private long ts;
    //(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等
    private String type;
    //getter、setter方法
}
public class MysqlType {
    private String id;
    private String commodity_name;
    private String commodity_price;
    private String number;
    private String description;
    //getter、setter方法
}
public class SqlType {
    private int id;
    private int commodity_name;
    private int commodity_price;
    private int number;
    private int description;
}

最后就可以创建一个消费者CanalConsumer进行消费:

@Component
public class CanalConsumer {
    //日志记录
    private static Logger log = LoggerFactory.getLogger(CanalConsumer.class);
    //redis操作工具类
    @Resource
    private RedisClient redisClient;
    //监听的队列名称为:canaltopic
    @KafkaListener(topics = "canaltopic")
    public void receive(ConsumerRecord<?, ?> consumer) {
        String value = (String) consumer.value();
        log.info("topic名称:{},key:{},分区位置:{},下标:{},value:{}", consumer.topic(), consumer.key(),consumer.partition(), consumer.offset(), value);
        //转换为javaBean
        CanalBean canalBean = JSONObject.parseObject(value, CanalBean.class);
        //获取是否是DDL语句
        boolean isDdl = canalBean.getIsDdl();
        //获取类型
        String type = canalBean.getType();
        //不是DDL语句
        if (!isDdl) {
            List<TbCommodityInfo> tbCommodityInfos = canalBean.getData();
            //过期时间
            long TIME_OUT = 600L;
            if ("INSERT".equals(type)) {
                //新增语句
                for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {
                    String id = tbCommodityInfo.getId();
                    //新增到redis中,过期时间是10分钟
                    redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);
                }
            } else if ("UPDATE".equals(type)) {
                //更新语句
                for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {
                    String id = tbCommodityInfo.getId();
                    //更新到redis中,过期时间是10分钟
                    redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);
                }
            } else {
                //删除语句
                for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {
                    String id = tbCommodityInfo.getId();
                    //从redis中删除
                    redisClient.deleteKey(id);
                }
            }
        }
    }
}

测试MySQL与Redis同步

mysql对应的表结构如下:

CREATE TABLE `tb_commodity_info` (
  `id` varchar(32) NOT NULL,
  `commodity_name` varchar(512) DEFAULT NULL COMMENT '商品名称',
  `commodity_price` varchar(36) DEFAULT '0' COMMENT '商品价格',
  `number` int(10) DEFAULT '0' COMMENT '商品数量',
  `description` varchar(2048) DEFAULT '' COMMENT '商品描述',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品信息表';

首先在MySQL创建表。然后启动项目,接着新增一条数据:

INSERT INTO `canaldb`.`tb_commodity_info` (`id`, `commodity_name`, `commodity_price`, `number`, `description`) VALUES ('3e71a81fd80711eaaed600163e046cc3', '叉包', '3.99', '3', '大叉包,老喜欢');

tb_commodity_info表查到新增的数据:

Redis也查到了对应的数据,证明同步成功!

如果更新呢?试一下Update语句:

UPDATE `canaldb`.`tb_commodity_info` SET `commodity_name`='青菜包',`description`='便宜的青菜包' WHERE `id`='3e71a81fd80711eaaed600163e046cc3';

没有问题!

总结

canal的缺点:

  1. canal只能同步增量数据。
  2. 不是实时同步,是准实时同步。
  3. 存在一些bug,不过社区活跃度较高,对于提出的bug能及时修复。
  4. MQ顺序性问题。
    网的回答,大家参考一下

尽管有一些缺点,毕竟没有一样技术(产品)是完美的,合适最重要。

相关文章
|
4月前
|
缓存 NoSQL 关系型数据库
MySQL 与 Redis 如何保证双写一致性?
我是小假 期待与你的下一次相遇 ~
507 7
|
7月前
|
关系型数据库 应用服务中间件 nginx
Docker一键安装中间件(RocketMq、Nginx、MySql、Minio、Jenkins、Redis)
本系列脚本提供RocketMQ、Nginx、MySQL、MinIO、Jenkins和Redis的Docker一键安装与配置方案,适用于快速部署微服务基础环境。
|
9月前
|
缓存 NoSQL 关系型数据库
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
|
4月前
|
NoSQL 算法 Redis
【Docker】(3)学习Docker中 镜像与容器数据卷、映射关系!手把手带你安装 MySql主从同步 和 Redis三主三从集群!并且进行主从切换与扩容操作,还有分析 哈希分区 等知识点!
Union文件系统(UnionFS)是一种**分层、轻量级并且高性能的文件系统**,它支持对文件系统的修改作为一次提交来一层层的叠加,同时可以将不同目录挂载到同一个虚拟文件系统下(unite several directories into a single virtual filesystem) Union 文件系统是 Docker 镜像的基础。 镜像可以通过分层来进行继承,基于基础镜像(没有父镜像),可以制作各种具体的应用镜像。
614 5
|
9月前
|
消息中间件 缓存 NoSQL
基于Spring Data Redis与RabbitMQ实现字符串缓存和计数功能(数据同步)
总的来说,借助Spring Data Redis和RabbitMQ,我们可以轻松实现字符串缓存和计数的功能。而关键的部分不过是一些"厨房的套路",一旦你掌握了这些套路,那么你就像厨师一样可以准备出一道道饕餮美食了。通过这种方式促进数据处理效率无疑将大大提高我们的生产力。
302 32
|
11月前
|
缓存 NoSQL 关系型数据库
Redis和Mysql如何保证数据⼀致?
1. 先更新Mysql,再更新Redis,如果更新Redis失败,可能仍然不⼀致 2. 先删除Redis缓存数据,再更新Mysql,再次查询的时候在将数据添加到缓存中 这种⽅案能解决1 ⽅案的问题,但是在⾼并发下性能较低,⽽且仍然会出现数据不⼀致的问题,⽐如线程1删除了 Redis缓存数据,正在更新Mysql,此时另外⼀个查询再查询,那么就会把Mysql中⽼数据⼜查到 Redis中 1. 使用MQ异步同步, 保证数据的最终一致性 我们项目中会根据业务情况 , 使用不同的方案来解决Redis和Mysql的一致性问题 : 1. 对于一些一致性要求不高的场景 , 不做处理例如 : 用户行为数据 ,
|
11月前
|
消息中间件 缓存 NoSQL
缓存与数据库的一致性方案,Redis与Mysql一致性方案,大厂P8的终极方案(图解+秒懂+史上最全)
缓存与数据库的一致性方案,Redis与Mysql一致性方案,大厂P8的终极方案(图解+秒懂+史上最全)
|
缓存 NoSQL 关系型数据库
Redis与MySQL的数据一致性
在高并发环境下,保持 Redis 和 MySQL 的数据一致性是一个复杂但重要的问题。通过采用读写穿透、写穿透、分布式锁、双写一致性保障和延时双删策略,可以有效地减少数据不一致的风险,确保系统的稳定性和可靠性。通过合理的缓存策略和数据同步机制,可以显著提升系统的性能和用户体验。
625 22
|
NoSQL 关系型数据库 Redis
《docker高级篇(大厂进阶):1.Docker复杂安装详说》包括:安装mysql主从复制、安装redis集群
《docker高级篇(大厂进阶):1.Docker复杂安装详说》包括:安装mysql主从复制、安装redis集群
371 14
|
关系型数据库 MySQL 应用服务中间件
《docker基础篇:8.Docker常规安装简介》包括:docker常规安装总体步骤、安装tomcat、安装mysql、安装redis
《docker基础篇:8.Docker常规安装简介》包括:docker常规安装总体步骤、安装tomcat、安装mysql、安装redis
423 7