数据同步原理

简介: Soul网关通过推拉模式实现配置同步,支持WebSocket、HTTP长轮询和Zookeeper三种策略。管理员在后台变更配置后,事件发布器通知对应处理器,网关实时更新本地缓存,确保数据一致性,其中HTTP长轮询借鉴Apollo与Nacos设计,实现准实时同步。

下图展示了 Soul 数据同步的流程,Soul 网关在启动时,会从从配置服务同步配置数据,并且支持推拉模式获取配置变更信息,并且更新本地缓存。而管理员在管理后台,变更用户、规则、插件、流量配置,通过推拉模式将变更信息同步给 Soul 网关,具体是 push 模式,还是 pull 模式取决于配置。关于配置同步模块,其实是一个简版的配置中心。

1.x 版本中,配置服务依赖 zookeeper 实现,管理后台将变更信息 push 给网关。而 2.x 版本支持 webosockethttpzookeeper,通过 soul.sync.strategy 指定对应的同步策略,默认使用 http 长轮询同步策略,可以做到秒级数据同步。但是,有一点需要注意的是,soul-websoul-admin 必须使用相同的同步机制。

如下图所示,soul-admin 在用户发生配置变更之后,会通过 EventPublisher 发出配置变更通知,由 EventDispatcher 处理该变更通知,然后根据配置的同步策略(http、weboscket、zookeeper),将配置发送给对应的事件处理器

  • 如果是 websocket 同步策略,则将变更后的数据主动推送给 soul-web,并且在网关层,会有对应的 WebsocketCacheHandler 处理器处理来处 admin 的数据推送
  • 如果是 zookeeper 同步策略,将变更数据更新到 zookeeper,而 ZookeeperSyncCache 会监听到 zookeeper 的数据变更,并予以处理
  • 如果是 http 同步策略,soul-web 主动发起长轮询请求,默认有 90s 超时时间,如果 soul-admin 没有数据变更,则会阻塞 http 请求,如果有数据发生变更则响应变更的数据信息,如果超过 60s 仍然没有数据变更则响应空数据,网关层接到响应后,继续发起 http 请求,反复同样的请求

zookeeper同步

基于 zookeeper 的同步原理很简单,主要是依赖 zookeeperwatch 机制,soul-web 会监听配置的节点,soul-admin 在启动的时候,会将数据全量写入 zookeeper,后续数据发生变更时,会增量更新 zookeeper 的节点,与此同时,soul-web 会监听配置信息的节点,一旦有信息变更时,会更新本地缓存。

soul 将配置信息写到zookeeper节点,是通过精细设计的。

websocket同步

websocketzookeeper 机制有点类似,将网关与 admin 建立好 websocket 连接时,admin 会推送一次全量数据,后续如果配置数据发生变更,则将增量数据通过 websocket 主动推送给 soul-web

使用websocket同步的时候,特别要注意断线重连,也叫保持心跳。soul使用java-websocket 这个第三方库来进行websocket连接。

public class WebsocketSyncCache extends WebsocketCacheHandler {
    /**
     * The Client.
     */
    private WebSocketClient client;
    public WebsocketSyncCache(final SoulConfig.WebsocketConfig websocketConfig) {
        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1,
                SoulThreadFactory.create("websocket-connect", true));
         client = new WebSocketClient(new URI(websocketConfig.getUrl())) {
                @Override
                public void onOpen(final ServerHandshake serverHandshake) {
                  //....
                }
                @Override
                public void onMessage(final String result) {
                  //....
                }    
            };
        //进行连接
        client.connectBlocking();
        //使用调度线程池进行断线重连,30秒进行一次
        executor.scheduleAtFixedRate(() -> {
            if (client != null && client.isClosed()) {
                    client.reconnectBlocking();
            }
        }, 10, 30, TimeUnit.SECONDS);
    }

http长轮询

zookeeper、websocket 数据同步的机制比较简单,而 http 同步会相对复杂一些。Soul 借鉴了 ApolloNacos 的设计思想,取决精华,自己实现了 http 长轮询数据同步功能。注意,这里并非传统的 ajax 长轮询!

http 长轮询机制如上所示,soul-web 网关请求 admin 的配置服务,读取超时时间为 90s,意味着网关层请求配置服务最多会等待 90s,这样便于 admin 配置服务及时响应变更数据,从而实现准实时推送。

http 请求到达 sou-admin 之后,并非立马响应数据,而是利用 Servlet3.0 的异步机制,异步响应数据。首先,将长轮询请求任务 LongPollingClient 扔到 BlocingQueue 中,并且开启调度任务,60s 后执行,这样做的目的是 60s 后将该长轮询请求移除队列,即便是这段时间内没有发生配置数据变更。因为即便是没有配置变更,也得让网关知道,总不能让其干等吧,而且网关请求配置服务时,也有 90s 的超时时间。

public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {
    // 因为soul-web可能未收到某个配置变更的通知,因此MD5值可能不一致,则立即响应
    List<ConfigGroupEnum> changedGroup = compareMD5(request);
    String clientIp = getRemoteIp(request);
    if (CollectionUtils.isNotEmpty(changedGroup)) {
        this.generateResponse(response, changedGroup);
        return;
    }
    // Servlet3.0异步响应http请求
    final AsyncContext asyncContext = request.startAsync();
    asyncContext.setTimeout(0L);
    scheduler.execute(new LongPollingClient(asyncContext, clientIp, 60));
}
    
class LongPollingClient implements Runnable {
    LongPollingClient(final AsyncContext ac, final String ip, final long timeoutTime) {
        // 省略......
    }
    @Override
    public void run() {
        // 加入定时任务,如果60s之内没有配置变更,则60s后执行,响应http请求
        this.asyncTimeoutFuture = scheduler.schedule(() -> {
            // clients是阻塞队列,保存了来处soul-web的请求信息
            clients.remove(LongPollingClient.this);
            List<ConfigGroupEnum> changedGroups = HttpLongPollingDataChangedListener.compareMD5((HttpServletRequest) asyncContext.getRequest());
            sendResponse(changedGroups);
        }, timeoutTime, TimeUnit.MILLISECONDS);
        // 
        clients.add(this);
    }
}

如果这段时间内,管理员变更了配置数据,此时,会挨个移除队列中的长轮询请求,并响应数据,告知是哪个 Group 的数据发生了变更(我们将插件、规则、流量配置、用户配置数据分成不同的组)。网关收到响应信息之后,只知道是哪个 Group 发生了配置变更,还需要再次请求该 Group 的配置数据。有人会问,为什么不是直接将变更的数据写出?我们在开发的时候,也深入讨论过该问题,因为 http 长轮询机制只能保证准实时,如果在网关层处理不及时,或者管理员频繁更新配置,很有可能便错过了某个配置变更的推送,安全起见,我们只告知某个 Group 信息发生了变更。

// soul-admin发生了配置变更,挨个将队列中的请求移除,并予以响应
class DataChangeTask implements Runnable {
    DataChangeTask(final ConfigGroupEnum groupKey) {
        this.groupKey = groupKey;
    }
    @Override
    public void run() {
        try {
            for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext(); ) {
                LongPollingClient client = iter.next();
                iter.remove();
                client.sendResponse(Collections.singletonList(groupKey));
            }
        } catch (Throwable e) {
            LOGGER.error("data change error.", e);
        }
    }
}

soul-web 网关层接收到 http 响应信息之后,拉取变更信息(如果有变更的话),然后再次请求 soul-admin 的配置服务,如此反复循环。

总结

总体继承关系图,如下图所示,核心借助于DataChangedEventDispatcher下面的五个监听器,根据不同的数据同步策略,触发不同的listener。

每个具体实现,都包含下面五个实现方法,用于数据同步。

其中,Http长轮询,借鉴了 `Apollo``Nacos` 的设计思想,取决精华,自己实现了 `http` 长轮询数据同步功能。注意,这里并非传统的 ajax 长轮询!

soul内置依赖 `spring-webflux` 而其底层是使用的netty。这一块只要是使用的netty线程模型

@Bean
public NettyReactiveWebServerFactory nettyReactiveWebServerFactory() {
    NettyReactiveWebServerFactory webServerFactory = new NettyReactiveWebServerFactory();
    webServerFactory.addServerCustomizers(new EventLoopNettyCustomizer());
    return webServerFactory;
}
private static class EventLoopNettyCustomizer implements NettyServerCustomizer {
    @Override
    public HttpServer apply(final HttpServer httpServer) {
        return httpServer
            .tcpConfiguration(tcpServer -> tcpServer
                              .runOn(LoopResources.create("soul-netty", 1, DEFAULT_IO_WORKER_COUNT, true), false)
                              .selectorOption(ChannelOption.SO_REUSEADDR, true)
                              .selectorOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                              .option(ChannelOption.TCP_NODELAY, true)
                              .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT));
    }
}
目录
相关文章
|
存储 NoSQL 大数据
mongodb数据库的优缺点
MongoDB 是一个流行的 NoSQL 数据库,主要用于大规模数据存储和处理。下面是 MongoDB 数据库的一些优点和缺点: ### 优点: 1. **灵活的模式设计**:MongoDB 是一个文档数据库,支持动态的模式设计,允许您存储不同结构和格式的数据。 2. **水平扩展**:MongoDB 支持水平扩展,可以轻松地在多个节点和服务器之间分布数据,以满足高并发和大规模数据处理的需求。 3. **丰富的查询功能**:MongoDB 提供了强大的查询语言和索引支持,允许您在大数据集上进行高效的数据检索和分析。 4. **高性能**:通过使用内存映射(mmap)和其他优化技术,M
1460 0
|
3月前
|
机器学习/深度学习 人工智能 自然语言处理
AI大模型分词器详解
分词器是将文本转为模型可处理数字序列的关键组件。本文详解BPE、WordPiece、SentencePiece三大主流算法原理与优劣,对比其在多语言支持、分词粒度等方面的差异,并提供中英文实战代码示例,助你掌握词汇表构建流程、特殊标记处理及常见面试问题应对策略。
361 1
|
3月前
|
机器学习/深度学习 人工智能
AI大模型位置编码详解
位置编码为Transformer提供序列顺序信息,弥补注意力机制无位置感知的缺陷。主要分为绝对编码(如可学习、Sinusoidal)和相对编码(如RoPE、ALiBi)。RoPE通过旋转矩阵支持长序列,ALiBi以线性偏置增强外推能力。不同方法在长度外推、效率等方面各有优劣,广泛应用于LLaMA、BLOOM等大模型中。
246 0
AI大模型位置编码详解
|
3月前
|
IDE 开发工具 Swift
Xcode 26.2 (17C52) 发布 - Apple 平台 IDE
IDE for iOS/iPadOS/macOS/watchOS/tvOS/visonOS
1007 1
|
4月前
|
监控 测试技术 API
避免人为漏测:Dify工作流成为你的“测试策略大脑”,全天候在线排查
本文介绍如何利用Dify工作流构建自动化测试分析系统,通过解析代码变更智能生成测试策略。该系统可集成至CI/CD流程,实现7x24小时风险识别与测试重点推荐,有效提升测试覆盖率和问题预防能力。
|
3月前
|
机器学习/深度学习 算法 关系型数据库
🎮 强化学习
强化学习通过智能体与环境交互,基于状态、动作和奖励学习最优策略。核心方法包括价值迭代、Q-learning、策略梯度及Actor-Critic框架,结合在线/离线学习与同/异策略优化,实现高效决策。
380 0
 🎮 强化学习
|
3月前
|
机器学习/深度学习 人工智能 缓存
AI大模型注意力机制详解
注意力机制是Transformer的核心,实现序列间动态关注。包括自注意力、交叉注意力、多头(MHA)、分组(GQA)、多查询(MQA)及低秩压缩的MLA等变体,平衡效率与性能,广泛应用于大模型优化与推理加速。
234 0
AI大模型注意力机制详解
|
3月前
|
自然语言处理
模型架构篇🏗️ 主流大模型结构
本文系统梳理主流大模型架构:Encoder-Decoder、Decoder-Only、Encoder-Only及Prefix-Decoder,解析GPT、LLaMA、BERT等代表模型特点与应用,并对比参数、上下文长度与优势场景,涵盖中英文大模型发展及面试核心要点。
316 0
|
机器学习/深度学习 人工智能 自然语言处理
[大语言模型-论文精读] ACL2024-长尾知识在检索增强型大型语言模型中的作用
[大语言模型-论文精读] ACL2024-长尾知识在检索增强型大型语言模型中的作用
|
机器学习/深度学习 人工智能 数据处理
AI计算机视觉笔记六:mediapipe测试
MediaPipe是由Google Research开发并开源的多媒体机器学习框架,已被集成到YouTube、Google Lens等重要产品中。该框架支持多种功能,如物体检测、自拍分割、头发分割、人脸检测、手部检测及运动追踪等。本文档将指导你通过Python环境搭建与测试MediaPipe,包括创建虚拟环境、安装依赖库,并进行手指骨骼识别测试。具体步骤涵盖环境配置、摄像头数据处理及结果显示。
1382 3

热门文章

最新文章