数据同步原理

本文涉及的产品
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 数据同步原理

下图展示了 Soul 数据同步的流程,Soul 网关在启动时,会从从配置服务同步配置数据,并且支持推拉模式获取配置变更信息,并且更新本地缓存。而管理员在管理后台,变更用户、规则、插件、流量配置,通过推拉模式将变更信息同步给 Soul 网关,具体是 push 模式,还是 pull 模式取决于配置。关于配置同步模块,其实是一个简版的配置中心。

1.x 版本中,配置服务依赖 zookeeper 实现,管理后台将变更信息 push 给网关。而 2.x 版本支持 webosockethttpzookeeper,通过 soul.sync.strategy 指定对应的同步策略,默认使用 http 长轮询同步策略,可以做到秒级数据同步。但是,有一点需要注意的是,soul-websoul-admin 必须使用相同的同步机制。

如下图所示,soul-admin 在用户发生配置变更之后,会通过 EventPublisher 发出配置变更通知,由 EventDispatcher 处理该变更通知,然后根据配置的同步策略(http、weboscket、zookeeper),将配置发送给对应的事件处理器

  • 如果是 websocket 同步策略,则将变更后的数据主动推送给 soul-web,并且在网关层,会有对应的 WebsocketCacheHandler 处理器处理来处 admin 的数据推送
  • 如果是 zookeeper 同步策略,将变更数据更新到 zookeeper,而 ZookeeperSyncCache 会监听到 zookeeper 的数据变更,并予以处理
  • 如果是 http 同步策略,soul-web 主动发起长轮询请求,默认有 90s 超时时间,如果 soul-admin 没有数据变更,则会阻塞 http 请求,如果有数据发生变更则响应变更的数据信息,如果超过 60s 仍然没有数据变更则响应空数据,网关层接到响应后,继续发起 http 请求,反复同样的请求

zookeeper同步

基于 zookeeper 的同步原理很简单,主要是依赖 zookeeperwatch 机制,soul-web 会监听配置的节点,soul-admin 在启动的时候,会将数据全量写入 zookeeper,后续数据发生变更时,会增量更新 zookeeper 的节点,与此同时,soul-web 会监听配置信息的节点,一旦有信息变更时,会更新本地缓存。

soul 将配置信息写到zookeeper节点,是通过精细设计的。

websocket同步

websocketzookeeper 机制有点类似,将网关与 admin 建立好 websocket 连接时,admin 会推送一次全量数据,后续如果配置数据发生变更,则将增量数据通过 websocket 主动推送给 soul-web

使用websocket同步的时候,特别要注意断线重连,也叫保持心跳。soul使用java-websocket 这个第三方库来进行websocket连接。

public class WebsocketSyncCache extends WebsocketCacheHandler {
    /**
     * The Client.
     */
    private WebSocketClient client;
    public WebsocketSyncCache(final SoulConfig.WebsocketConfig websocketConfig) {
        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1,
                SoulThreadFactory.create("websocket-connect", true));
         client = new WebSocketClient(new URI(websocketConfig.getUrl())) {
                @Override
                public void onOpen(final ServerHandshake serverHandshake) {
                  //....
                }
                @Override
                public void onMessage(final String result) {
                  //....
                }    
            };
        //进行连接
        client.connectBlocking();
        //使用调度线程池进行断线重连,30秒进行一次
        executor.scheduleAtFixedRate(() -> {
            if (client != null && client.isClosed()) {
                    client.reconnectBlocking();
            }
        }, 10, 30, TimeUnit.SECONDS);
    }

http长轮询

zookeeper、websocket 数据同步的机制比较简单,而 http 同步会相对复杂一些。Soul 借鉴了 ApolloNacos 的设计思想,取决精华,自己实现了 http 长轮询数据同步功能。注意,这里并非传统的 ajax 长轮询!

http 长轮询机制如上所示,soul-web 网关请求 admin 的配置服务,读取超时时间为 90s,意味着网关层请求配置服务最多会等待 90s,这样便于 admin 配置服务及时响应变更数据,从而实现准实时推送。

http 请求到达 sou-admin 之后,并非立马响应数据,而是利用 Servlet3.0 的异步机制,异步响应数据。首先,将长轮询请求任务 LongPollingClient 扔到 BlocingQueue 中,并且开启调度任务,60s 后执行,这样做的目的是 60s 后将该长轮询请求移除队列,即便是这段时间内没有发生配置数据变更。因为即便是没有配置变更,也得让网关知道,总不能让其干等吧,而且网关请求配置服务时,也有 90s 的超时时间。

public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {
    // 因为soul-web可能未收到某个配置变更的通知,因此MD5值可能不一致,则立即响应
    List<ConfigGroupEnum> changedGroup = compareMD5(request);
    String clientIp = getRemoteIp(request);
    if (CollectionUtils.isNotEmpty(changedGroup)) {
        this.generateResponse(response, changedGroup);
        return;
    }
    // Servlet3.0异步响应http请求
    final AsyncContext asyncContext = request.startAsync();
    asyncContext.setTimeout(0L);
    scheduler.execute(new LongPollingClient(asyncContext, clientIp, 60));
}
    
class LongPollingClient implements Runnable {
    LongPollingClient(final AsyncContext ac, final String ip, final long timeoutTime) {
        // 省略......
    }
    @Override
    public void run() {
        // 加入定时任务,如果60s之内没有配置变更,则60s后执行,响应http请求
        this.asyncTimeoutFuture = scheduler.schedule(() -> {
            // clients是阻塞队列,保存了来处soul-web的请求信息
            clients.remove(LongPollingClient.this);
            List<ConfigGroupEnum> changedGroups = HttpLongPollingDataChangedListener.compareMD5((HttpServletRequest) asyncContext.getRequest());
            sendResponse(changedGroups);
        }, timeoutTime, TimeUnit.MILLISECONDS);
        // 
        clients.add(this);
    }
}

如果这段时间内,管理员变更了配置数据,此时,会挨个移除队列中的长轮询请求,并响应数据,告知是哪个 Group 的数据发生了变更(我们将插件、规则、流量配置、用户配置数据分成不同的组)。网关收到响应信息之后,只知道是哪个 Group 发生了配置变更,还需要再次请求该 Group 的配置数据。有人会问,为什么不是直接将变更的数据写出?我们在开发的时候,也深入讨论过该问题,因为 http 长轮询机制只能保证准实时,如果在网关层处理不及时,或者管理员频繁更新配置,很有可能便错过了某个配置变更的推送,安全起见,我们只告知某个 Group 信息发生了变更。

// soul-admin发生了配置变更,挨个将队列中的请求移除,并予以响应
class DataChangeTask implements Runnable {
    DataChangeTask(final ConfigGroupEnum groupKey) {
        this.groupKey = groupKey;
    }
    @Override
    public void run() {
        try {
            for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext(); ) {
                LongPollingClient client = iter.next();
                iter.remove();
                client.sendResponse(Collections.singletonList(groupKey));
            }
        } catch (Throwable e) {
            LOGGER.error("data change error.", e);
        }
    }
}

soul-web 网关层接收到 http 响应信息之后,拉取变更信息(如果有变更的话),然后再次请求 soul-admin 的配置服务,如此反复循环。

总结

总体继承关系图,如下图所示,核心借助于DataChangedEventDispatcher下面的五个监听器,根据不同的数据同步策略,触发不同的listener。

每个具体实现,都包含下面五个实现方法,用于数据同步。

其中,Http长轮询,借鉴了 `Apollo``Nacos` 的设计思想,取决精华,自己实现了 `http` 长轮询数据同步功能。注意,这里并非传统的 ajax 长轮询!

soul内置依赖 `spring-webflux` 而其底层是使用的netty。这一块只要是使用的netty线程模型

@Bean
public NettyReactiveWebServerFactory nettyReactiveWebServerFactory() {
    NettyReactiveWebServerFactory webServerFactory = new NettyReactiveWebServerFactory();
    webServerFactory.addServerCustomizers(new EventLoopNettyCustomizer());
    return webServerFactory;
}
private static class EventLoopNettyCustomizer implements NettyServerCustomizer {
    @Override
    public HttpServer apply(final HttpServer httpServer) {
        return httpServer
            .tcpConfiguration(tcpServer -> tcpServer
                              .runOn(LoopResources.create("soul-netty", 1, DEFAULT_IO_WORKER_COUNT, true), false)
                              .selectorOption(ChannelOption.SO_REUSEADDR, true)
                              .selectorOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                              .option(ChannelOption.TCP_NODELAY, true)
                              .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT));
    }
}
相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
5月前
|
存储 关系型数据库 MySQL
【TiDB原理与实战详解】5、BR 物理备份恢复与Binlog 数据同步~学不会? 不存在的!
BR(Backup & Restore)是 TiDB 分布式备份恢复的命令行工具,适用于大数据量场景,支持常规备份恢复及大规模数据迁移。BR 通过向各 TiKV 节点下发命令执行备份或恢复操作,生成 SST 文件存储数据信息与 `backupmeta` 文件存储元信息。推荐部署配置包括在 PD 节点部署 BR 工具,使用万兆网卡等。本文介绍 BR 的工作原理、部署配置、使用限制及多种备份恢复方式,如全量备份、单库/单表备份、过滤备份及增量备份等。
|
5月前
|
SQL canal 关系型数据库
(二十四)全解MySQL之主从篇:死磕主从复制中数据同步原理与优化
兜兜转转,经过《全解MySQL专栏》前面二十多篇的内容讲解后,基本对MySQL单机模式下的各方面进阶知识做了详细阐述,同时在前面的《分库分表概念篇》、《分库分表隐患篇》两章中也首次提到了数据库的一些高可用方案,但前两章大多属于方法论,并未涵盖真正的实操过程。接下来的内容,会以目前这章作为分割点,开启MySQL高可用方案的落地实践分享的新章程!
2243 1
|
5月前
|
关系型数据库 MySQL 调度
【TiDB原理与实战详解】4、DM 迁移和TiCDC数据同步~学不会? 不存在的!
TiDB Data Migration (DM) 和 TiCDC 是两款用于数据库迁移和同步的强大工具。DM 支持将兼容 MySQL 协议的数据库(如 MySQL、MariaDB)的数据异步迁移到 TiDB 中,具备全量和增量数据传输能力,并能合并分库分表的数据。TiCDC 则专注于 TiDB 的增量同步,利用 TiKV 日志实现高可用性和水平扩展,支持多种下游系统和输出格式。两者均可通过 TiUP 工具进行部署与管理,简化了集群的安装、配置及任务管理过程。
|
5月前
|
canal 关系型数据库 MySQL
"揭秘阿里数据同步黑科技Canal:从原理到实战,手把手教你玩转MySQL数据秒级同步,让你的数据处理能力瞬间飙升,成为技术界的新晋网红!"
【8月更文挑战第18天】Canal是一款由阿里巴巴开源的高性能数据同步系统,它通过解析MySQL的增量日志(Binlog),提供低延迟、可靠的数据订阅和消费功能。Canal模拟MySQL Slave与Master间的交互协议来接收并解析Binary Log,支持数据的增量同步。配置简单直观,包括Server和Instance两层配置。在实战中,Canal可用于数据库镜像、实时备份等多种场景,通过集成Canal Client可实现数据的消费和处理,如更新缓存或写入消息队列。
895 0
|
6月前
|
消息中间件 存储 RocketMQ
MetaQ/RocketMQ 原理问题之MetaQ中Broker的数据同步方式的问题如何解决
MetaQ/RocketMQ 原理问题之MetaQ中Broker的数据同步方式的问题如何解决
|
8月前
|
NoSQL Redis
Redis入门到通关之Redis主从数据同步原理
Redis入门到通关之Redis主从数据同步原理
128 0
|
8月前
|
存储 负载均衡 NoSQL
Redis 高可用篇:你管这叫主从架构数据同步原理?
Redis 高可用篇:你管这叫主从架构数据同步原理?
342 5
|
8月前
|
存储 NoSQL 数据库连接
Redis主从模式以及数据同步原理:全量数据同步、增量数据同步
Redis主从模式以及数据同步原理:全量数据同步、增量数据同步
846 0
|
数据库 流计算
Flink CDC的工作原理是通过监听数据库的binlog来实现实时数据同步的
Flink CDC的工作原理是通过监听数据库的binlog来实现实时数据同步的
487 1
深入浅出阿里数据同步神器:Canal原理+配置+实战全网最全解析!
canal 翻译为管道,主要用途是基于 MySQL 数据库的增量日志 Binlog 解析,提供增量数据订阅和消费。 早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

热门文章

最新文章