微服务原理篇(Canal-Redis)

本文涉及的产品
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
RDS AI 助手,专业版
简介: 本课程讲解ES索引同步方案,重点掌握Canal+MQ实现MySQL到ES的数据同步机制。学习内容包括:Canal伪装为MySQL slave原理、基于binlog的日志解析、数据同步流程测试、Redis双写一致性、分布式锁应用、缓存三剑客问题及解决方案、Redis持久化与集群模式、过期淘汰策略等核心知识点,全面提升数据同步与缓存架构能力。(238字)

学习目标

  1. 能够说出ES索引同步的常用方案
  2. 能够说出Canal+MQ数据同步的方案
  3. 能够说出Canal是怎么伪装成 MySQL slave
  4. 能够测试通过Canal+MQ数据同步流程
  5. 能够说出MySQL和Redis如何保证双写一致性
  6. 能够说出分布式锁Redis原生、Redisson应用场景
  7. 能够说出缓存三剑客问题和解决方案
  8. 能够说出Redis持久化两种方案
  9. 能够说出Redis集群三种模式、哨兵选举流程
  10. 能够说出Redis过期策略、淘汰策略
    1 多数据源数据同步方案
    1.1. 技术方案分析
    1.1.1 同步方式
    管理员在商城的后台维护商品信息,数据存储在MySQL。
    用户在商城搜索商品信息,从Elasticsearch搜索商品信息。

如果Elasticsearch的索引数据与MySQL的商品数据不一致会导致什么问题?
用户搜索到的商品信息并非商品最新的信息,比如:价格不同,搜索到的商品价格与实际价格不同,商品下架但是用户仍然可以搜索到商品信息,这些问题都会严重影响用户的体验。
我们需要一种方案,当管理员修改商品信息后及时的修改商品索引信息,使MySQL中的商品数据与ES中的商品数据保持一致。
常见的索引数据同步方案有两种:同步方式和异步方式。
首先说同步方式
在修改商品信息的方法中加入操作Elasticsearch索引的代码,即在原有业务方式的基础上添加索引同步的代码,CRUD操作MySQL的同时CRUD操作ES索引。如下代码,是在添加商品信息的时候向ES索引添加文档。
public void insert(Item item){
//向本地数据库Item表添加记录

//向ES的Item索引添加文档

}
此方式会在很多业务方法中加入操作ES索引的代码,增加代码的复杂度不方便维护,扩展性差。
其次,上边的代码存在分布式事务,操作Item表会访问数据库,向索引添加文档会访问ES,使用数据库本地事务是无法控制整个方法的一致性的,比如:向ES写成功了由于网络超时导致异常,最终写数据库操作回滚了而写ES操作没有回滚,数据库的数据和ES中的索引不一致。
1.1.2 异步方式
异步方式是通过引入MQ实现,修改商品信息时向MQ发送修改的商品信息,然后监听MQ的程序请求ES向索引写入,流程如下:

此方案的好处:

  1. 商品服务不用直接访问ES,通过MQ将商品服务和ES解耦合。
    缺点:
  2. 在商品的CRUD方法中仍然需要加入向MQ发送消息的代码,如下:
    public void insert(Item item){
    //向Item表添加记录

    //向MQ发送添加商品消息
    }
    此方式仍然增加代码的复杂度不方便维护,扩展性差
    这种方案不少公司是有采用的,下述Canal方案较重量级,大家自行取舍不以HM为准,以实际业务为准
    有没有一种方法不用对商品的CRUD方法进行侵入,商品的CRUD方法就是对商品的增删改查,不会存在向ES同步数据相关的代码。
    此时我们要借助一个神器就是Canal [kə'næl],先看下Canal在整个流程中的位置,如下图:

从图中可以看出,商品管理的CRUD方式仅仅包括对商品表的CRUD业务操作(下图红色框内部分),不再有操作MQ的相关逻辑。

Canal是和MySQL存在联系,并且Canal负责和MQ交互,这种方案就是借助了Canal和MQ实现的。
1.2 Canal+MQ数据同步
1.2.1. MySQL主从复制
要理解Canal的工作原理需要首先要知道MySQL主从数据同步的原理。
首先我们要知道,平时我们在学习时只用MySQL单机即可,但是生产环境中MySQL部署为主从集群模式,MySQL主从集群由MySQL主服务器(master)和MySQL从服务器(slave)组成,主数据库提供写服务,从数据库提供读服务,主从之间进行数据复制保证数据同步,如下图:

MySQL主从之间是如何同步的呢?
MySQL主从数据同步是一种数据库复制技术,进行写数据会先向主服务器写,写成功后通过binlog日志将数据同步到从数据库。
具体流程如下图:

1、主服务器将所有写操作(INSERT、UPDATE、DELETE)以二进制日志(binlog)的形式记录下来。
2、从服务器连接到主服务器,发送dump 协议,请求获取主服务器上的binlog日志。
MySQL的dump协议是MySQL复制协议中的一部分。
3、MySQL master 收到 dump 请求,开始推送 binary log 给 slave
4、从服务器解析binlog日志,根据日志内容更新从服务器的数据库,完成从服务器的数据保持与主服务器同步。
1.2.1.1 binlog
binlog日志是什么?
MySQL的binlog(二进制日志)是一种记录数据库服务器上所有修改数据的日志文件。它主要用于数据复制和数据恢复。binlog的主要作用是记录数据库的DDL(数据定义语言)操作和DML(数据操作语言)操作,以便在数据库发生故障时进行恢复。
binlog长什么样?
类似下边这样:

binlog的主要特点如下:

  1. 事务级别的记录:
    a. Binlog 以事务为单位记录数据更改,这意味着每个事务的开始和结束都会被记录下来。
    b. 这种记录方式有助于保证数据的一致性和事务的完整性。
  2. 支持多种格式:
    a. STATEMENT:记录每条 SQL 语句,适用于大多数情况,但有些 SQL 语句的结果依赖于会话状态,可能导致复制问题。
    b. ROW:记录每行数据的更改,精确度高,但会增加日志文件的大小。
    c. MIXED:默认模式,结合了 STATEMENT 和 ROW 的优点,大部分情况下采用 STATEMENT 模式,但在 STATEMENT 模式可能引起问题时自动切换到 ROW 模式。
  3. 非阻塞性:
    a. Binlog 的写入操作是非阻塞的,即写入 Binlog 不会阻塞客户端的事务提交。
    b. 这意味着应用程序可以在无需等待日志写入完成的情况下继续运行,提高了性能。
  4. 数据恢复:
    a. Binlog 可以用于数据恢复,允许恢复到特定的时间点或事务。
    b. 这对于灾难恢复非常重要,可以减少数据丢失的风险。
  5. 主从复制:
    a. Binlog 是 MySQL 主从复制的基础。
    b. 通过从主服务器读取并重放 Binlog,从服务器可以保持与主服务器相同的数据状态。
    在 MySQL 中启用 Binlog 需要在配置文件 (my.cnf 或 my.ini) 中进行设置。
    一些关键的配置选项包括:
    ● server-id:用于标识服务器的唯一 ID,这对于多服务器环境非常重要。
    ● log_bin:指定是否启用 Binlog 以及 Binlog 文件的保存位置。
    ● binlog_format:定义 Binlog 的格式,如 STATEMENT, ROW 或 MIXED。
    ● expire_logs_days:定义 Binlog 文件保留的时间,超过这个时间的文件会被自动删除。
    ● max_binlog_size:单个 Binlog 文件的最大大小,达到这个大小后会自动创建新的文件。
    举例:

注意事项:
● Binlog 文件会占用磁盘空间,因此需要定期清理不再需要的旧文件。
● 使用 Binlog 进行数据恢复或复制时,要确保所有相关服务器的时间同步,否则可能会出现问题。
binlog常用命令:查看是否开启binlog日志
show variables like 'log_bin';

使用以下命令查看所有binlog日志列表:
SHOW MASTER LOGS;

要查看MySQL服务器上的binlog状态,可以使用以下命令:
SHOW MASTER STATUS;

要查看所有的binlog文件列表,可以使用以下命令:
SHOW BINARY LOGS;
查看binlog日志保存路径
SHOW VARIABLES LIKE 'datadir';
刷新log日志,立刻产生一个新编号的binlog日志文件,跟重启一个效果,可以执行以下命令:
FLUSH LOGS;
清空所有binlog日志,可以执行以下命令:
RESET MASTER;
1.2.2. Canal+MQ同步流程
Canal是什么呢?
canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,对数据进行同步,如下图:
Canal可与很多数据源进行对接,将数据由MySQL同步到ES、MQ、DB等各个数据源。
Canal的意思是水道/管道/沟渠,它相当于一个数据管道,通过解析MySQL的binlog日志完成数据同步工作。
官方文档:https://github.com/alibaba/canal/wiki

Canal数据同步的工作流程如下:
1、Canal模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
MySQL的dump协议是MySQL复制协议中的一部分。
2、MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
。一旦连接建立成功(长连接),Canal会一直等待并监听来自MySQL主服务器的binlog事件流,当有新的数据库变更发生时MySQL master主服务器发送binlog事件流给Canal。
3、Canal会及时接收并解析这些变更事件并解析 binary log
理解了Canal的工作原理下边再看数据同步流程:

  1. 首先创建一张专门用于向ES同步商品信息的表item_sync,item_sync表的字段内容可能包含item表的字段,一定覆盖所有索引字段。
    方法:复制item表到item_sync表。
    这里为什么要单独创建一张同步表呢?
    因为同步表的字段和索引是对应的,方便进行同步。
  2. 商品服务在对商品进行CRUD时向Item表写数据并且向item_sync写入数据,并产生binlog。
  3. Canal请求MySQL读取binlog,并解析出item_sync表的数据更新日志,并发送至MQ的数据同步队列。
  4. 异步同步程序监听MQ的数据同步队列,收到消息后解析出item_sync表的更新日志。
  5. 异步同步程序根据item_sync表的更新日志请求Elasticsearch添加、更新、删除索引文档。
    最终实现了将MySQL中的Item表的数据同步至Elasticsearch
    1.2.3. 配置数据同步环境
    本节实现将MySQL的变更数据通过Canal写入MQ。
    根据Canal+MQ同步流程,进行如下配置:
  6. 配置Mysql主从同步,开启MySQL主服务器的binlog
  7. 安装Canal并配置,保证Canal连接MySQL主服务器成功
  8. 安装RabbitMQ,并配置同步队列。
  9. 在Canal中配置RabbitMQ的连接信息,保证Canal收到binlog消息写入MQ
    对于异步程序监听MQ通过Java程序中实现。以上四步配置详细参考“配置搜索及数据同步环境”。
    1.2.4. 同步程序
    前边我们实现了Canal读取binlog日志并向MQ发送消息的整个流程,下边我们需要编写同步程序监听MQ,解析出更改的数据更新ES索引数据。
    在search-service工程添加依赖:
    1.1.5


com.alibaba.otter
canal.client
${canal.version}


com.alibaba.otter
canal.protocol
${canal.version}

从课程资料中拷贝"es/canal"目录到search-service工程的com.hmall.search包下。

阅读AbstractCanalRabbitMqMsgListener类parseMsg(Message message) 方法,理解同步程序的执行思路。
parseMsg(Message message) 方法实现了解析canal发送给mq的消息,并调用batchHandle或singleHandle处理数据,在这两个方法中会调用抽象方法void batchSave(List data)和void batchDelete(List ids)去向数据库保存数据、删除数据。
public void parseMsg(Message message) throws Exception {

try {
    // 1.数据格式转换
    CanalMqInfo canalMqInfo = JSONUtil.toBean(new String(message.getBody()), CanalMqInfo.class);
    // 2.过滤数据,没有数据或者非插入、修改、删除的操作均不处理
    if (CollUtils.isEmpty(canalMqInfo.getData()) || !(OperateType.canHandle(canalMqInfo.getType()))) {
        return;
    }

    if (canalMqInfo.getData().size() > 1) {
        // 3.多条数据处理
        batchHandle(canalMqInfo);
    } else {
        // 4.单条数据处理
        singleHandle(canalMqInfo);
    }
} catch (Exception e) {
    //出现错误延迟1秒重试
    Thread.sleep(1000);
    throw new RuntimeException(e);
}

}
如果我们要实现商品信息同步就需要编写商品信息同步类,同步程序做两件事:

  1. 同步类需要监听MQ,接收canal发送给mq的消息
  2. 同步程序需要继承AbstractCanalRabbitMqMsgListener类,并重写void batchSave(List data)和void batchDelete(List ids)这两个方法,这样就实现了将canal发送的商品信息保存或删除ES中对应的数据。
    代码如下:下边的代码能读懂会用即可。
    package com.hmall.search.canal.listeners;

import cn.hutool.core.bean.BeanUtil;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.DeleteByQueryResponse;
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import com.hmall.search.domain.po.ItemDoc;
import com.hmall.search.domain.po.ItemSync;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

@Component
public class ItemCanalDataSyncHandler extends AbstractCanalRabbitMqMsgListener {

@Resource
private ElasticsearchClient esClient;

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "canal-mq-hmall-item"),
    exchange = @Exchange(name = "exchange.canal-hmall", type = ExchangeTypes.TOPIC),
    key = "canal-mq-hmall-item"),
                concurrency = "1"
               )
public void onMessage(Message message) throws Exception {
    parseMsg(message);
}

@Override
public void batchSave(List<ItemSync> data) {

    BulkRequest.Builder br = new BulkRequest.Builder();

    for (ItemSync itemSync : data) {
        br.operations(op -> op
                      .index(idx -> idx
                             .index("items")
                             .id(itemSync.getId().toString())
                             .document(itemSync)
                            )
                     );
    }

    BulkResponse result = null;
    try {
        result = esClient.bulk(br.build());
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
    Boolean aBoolean = result.errors();
    if(aBoolean) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        throw new RuntimeException("同步失败");
    }

}

@Override
public void batchDelete(List<Long> ids) {
    List<String> idList = ids.stream().map(id -> id.toString()).collect(Collectors.toList());
    DeleteByQueryResponse response = null;
    try {
        response = esClient.deleteByQuery(dq -> dq
                                          .query(t -> t.ids(t1 -> t1.values(idList)))
                                          .index("items"));
        boolean hasFailures = response.failures().size() > 0;
        if(hasFailures) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            throw new RuntimeException("同步失败");
        }
    } catch (IOException e) {
        throw new RuntimeException("同步失败");
    }



}

}
接下来测试:
● 手动修改Item_sync表的数据,断点跟踪onMessage(Message message)方法,当插入、修改数据时执行踪onMessage(Message message)方法,当删除数据时执行batchDelete(List ids)。
● 手动对Item_sync表增、删、改,观察ES中item索引的数据是否正常增、删、改。
1.2.5. 保证消息的顺序性
如何保证Canal+MQ同步消息的顺序性?场景如下图:
首先明确Canal解析binlog日志信息按顺序发到MQ的队列中,现在是要保证消费端如何按顺序消费队列中的消息。生产中同一个服务会启动多个jvm进程,每个进程作为同一个队列的消费者,如下图:

现在对商品价格先修改为100再修改为200,在MQ中的有两个消息:
修改价格为100
修改价格为200
预期:最终将价格修改为200
此时两条消息会被分发给两个jvm进程,假设“修改价格为100”的消息发给jvm进程1,“修改价格为200”的消息发给jvm进程2,两个进程分别去消费,此时无法控制两个消息的先后顺序,可能导致价格最终并非修改200。
解决方法:
多个jvm进程监听同一个队列保证只有一个消费者活跃,即只有一个消费者接收消息。
消费队列中的数据使用单线程。
如何保证只有一个消费者接收消息?
队列需要增加x-single-active-consumer参数,表示否启用单一活动消费者模式。

配置完成查保证队列上存在SAC标识,如下图:

当有多个jvm进程都去监听该队列时,只有一个为活跃状态

如果使用x-single-active-consumer参数需要修改为如下代码:
在Queue中添加:arguments={@Argument(name="x-single-active-consumer", value = "true", type = "java.lang.Boolean") }
如下所示:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "canal-mq-hmall-item",arguments={@Argument(name="x-single-active-consumer", value = "true", type = "java.lang.Boolean") }),
exchange = @Exchange(name = "exchange.canal-hmall", type = ExchangeTypes.TOPIC),
key = "canal-mq-hmall-item"),
concurrency = "1"
)
public void onMessage(Message message) throws Exception {
parseMsg(message);
}
concurrency=”1“表示 指定消费线程为1。
1.2.6 面试题
Canal是怎么伪装成 MySQL slave?
Canal数据同步异常了怎么处理?
项目中如何进行索引同步的?
如何保证Canal+MQ同步消息的顺序性?

相关文章
|
12天前
|
人工智能 自然语言处理 Shell
🦞 如何在 OpenClaw (Clawdbot/Moltbot) 配置阿里云百炼 API
本教程指导用户在开源AI助手Clawdbot中集成阿里云百炼API,涵盖安装Clawdbot、获取百炼API Key、配置环境变量与模型参数、验证调用等完整流程,支持Qwen3-max thinking (Qwen3-Max-2026-01-23)/Qwen - Plus等主流模型,助力本地化智能自动化。
🦞 如何在 OpenClaw (Clawdbot/Moltbot) 配置阿里云百炼 API
|
7天前
|
人工智能 安全 机器人
OpenClaw(原 Clawdbot)钉钉对接保姆级教程 手把手教你打造自己的 AI 助手
OpenClaw(原Clawdbot)是一款开源本地AI助手,支持钉钉、飞书等多平台接入。本教程手把手指导Linux下部署与钉钉机器人对接,涵盖环境配置、模型选择(如Qwen)、权限设置及调试,助你快速打造私有、安全、高权限的专属AI助理。(239字)
4335 12
OpenClaw(原 Clawdbot)钉钉对接保姆级教程 手把手教你打造自己的 AI 助手
|
8天前
|
人工智能 机器人 Linux
保姆级 OpenClaw (原 Clawdbot)飞书对接教程 手把手教你搭建 AI 助手
OpenClaw(原Clawdbot)是一款开源本地AI智能体,支持飞书等多平台对接。本教程手把手教你Linux下部署,实现数据私有、系统控制、网页浏览与代码编写,全程保姆级操作,240字内搞定专属AI助手搭建!
4697 17
保姆级 OpenClaw (原 Clawdbot)飞书对接教程 手把手教你搭建 AI 助手
|
6天前
|
人工智能 机器人 Linux
OpenClaw(Clawdbot、Moltbot)汉化版部署教程指南(零门槛)
OpenClaw作为2026年GitHub上增长最快的开源项目之一,一周内Stars从7800飙升至12万+,其核心优势在于打破传统聊天机器人的局限,能真正执行读写文件、运行脚本、浏览器自动化等实操任务。但原版全英文界面对中文用户存在上手门槛,汉化版通过覆盖命令行(CLI)与网页控制台(Dashboard)核心模块,解决了语言障碍,同时保持与官方版本的实时同步,确保新功能最快1小时内可用。本文将详细拆解汉化版OpenClaw的搭建流程,涵盖本地安装、Docker部署、服务器远程访问等场景,同时提供环境适配、问题排查与国内应用集成方案,助力中文用户高效搭建专属AI助手。
3035 8
|
10天前
|
人工智能 JavaScript 应用服务中间件
零门槛部署本地AI助手:Windows系统Moltbot(Clawdbot)保姆级教程
Moltbot(原Clawdbot)是一款功能全面的智能体AI助手,不仅能通过聊天互动响应需求,还具备“动手”和“跑腿”能力——“手”可读写本地文件、执行代码、操控命令行,“脚”能联网搜索、访问网页并分析内容,“大脑”则可接入Qwen、OpenAI等云端API,或利用本地GPU运行模型。本教程专为Windows系统用户打造,从环境搭建到问题排查,详细拆解全流程,即使无技术基础也能顺利部署本地AI助理。
7210 16
|
8天前
|
存储 人工智能 机器人
OpenClaw是什么?阿里云OpenClaw(原Clawdbot/Moltbot)一键部署官方教程参考
OpenClaw是什么?OpenClaw(原Clawdbot/Moltbot)是一款实用的个人AI助理,能够24小时响应指令并执行任务,如处理文件、查询信息、自动化协同等。阿里云推出的OpenClaw一键部署方案,简化了复杂配置流程,用户无需专业技术储备,即可快速在轻量应用服务器上启用该服务,打造专属AI助理。本文将详细拆解部署全流程、进阶功能配置及常见问题解决方案,确保不改变原意且无营销表述。
4944 5
|
10天前
|
人工智能 JavaScript API
零门槛部署本地 AI 助手:Clawdbot/Meltbot 部署深度保姆级教程
Clawdbot(Moltbot)是一款智能体AI助手,具备“手”(读写文件、执行代码)、“脚”(联网搜索、分析网页)和“脑”(接入Qwen/OpenAI等API或本地GPU模型)。本指南详解Windows下从Node.js环境搭建、一键安装到Token配置的全流程,助你快速部署本地AI助理。(239字)
4824 23
|
16天前
|
人工智能 API 开发者
Claude Code 国内保姆级使用指南:实测 GLM-4.7 与 Claude Opus 4.5 全方案解
Claude Code是Anthropic推出的编程AI代理工具。2026年国内开发者可通过配置`ANTHROPIC_BASE_URL`实现本地化接入:①极速平替——用Qwen Code v0.5.0或GLM-4.7,毫秒响应,适合日常编码;②满血原版——经灵芽API中转调用Claude Opus 4.5,胜任复杂架构与深度推理。
8983 13