5.数据同步原理

简介: Soul网关通过推拉模式实现配置数据同步,支持WebSocket、HTTP长轮询和Zookeeper三种策略。管理后台变更配置后,通过EventPublisher发布事件,依据同步策略将变更推送给网关,实现准实时更新。HTTP长轮询借鉴Apollo、Nacos设计,结合异步Servlet机制,确保高效与低延迟。

下图展示了 Soul 数据同步的流程,Soul 网关在启动时,会从从配置服务同步配置数据,并且支持推拉模式获取配置变更信息,并且更新本地缓存。而管理员在管理后台,变更用户、规则、插件、流量配置,通过推拉模式将变更信息同步给 Soul 网关,具体是 push 模式,还是 pull 模式取决于配置。关于配置同步模块,其实是一个简版的配置中心。

1.x 版本中,配置服务依赖 zookeeper 实现,管理后台将变更信息 push 给网关。而 2.x 版本支持 webosockethttpzookeeper,通过 soul.sync.strategy 指定对应的同步策略,默认使用 http 长轮询同步策略,可以做到秒级数据同步。但是,有一点需要注意的是,soul-websoul-admin 必须使用相同的同步机制。

如下图所示,soul-admin 在用户发生配置变更之后,会通过 EventPublisher 发出配置变更通知,由 EventDispatcher 处理该变更通知,然后根据配置的同步策略(http、weboscket、zookeeper),将配置发送给对应的事件处理器

  • 如果是 websocket 同步策略,则将变更后的数据主动推送给 soul-web,并且在网关层,会有对应的 WebsocketCacheHandler 处理器处理来处 admin 的数据推送
  • 如果是 zookeeper 同步策略,将变更数据更新到 zookeeper,而 ZookeeperSyncCache 会监听到 zookeeper 的数据变更,并予以处理
  • 如果是 http 同步策略,soul-web 主动发起长轮询请求,默认有 90s 超时时间,如果 soul-admin 没有数据变更,则会阻塞 http 请求,如果有数据发生变更则响应变更的数据信息,如果超过 60s 仍然没有数据变更则响应空数据,网关层接到响应后,继续发起 http 请求,反复同样的请求

zookeeper同步

基于 zookeeper 的同步原理很简单,主要是依赖 zookeeperwatch 机制,soul-web 会监听配置的节点,soul-admin 在启动的时候,会将数据全量写入 zookeeper,后续数据发生变更时,会增量更新 zookeeper 的节点,与此同时,soul-web 会监听配置信息的节点,一旦有信息变更时,会更新本地缓存。

soul 将配置信息写到zookeeper节点,是通过精细设计的。

websocket同步

websocketzookeeper 机制有点类似,将网关与 admin 建立好 websocket 连接时,admin 会推送一次全量数据,后续如果配置数据发生变更,则将增量数据通过 websocket 主动推送给 soul-web

使用websocket同步的时候,特别要注意断线重连,也叫保持心跳。soul使用java-websocket 这个第三方库来进行websocket连接。

public class WebsocketSyncCache extends WebsocketCacheHandler {
    /**
     * The Client.
     */
    private WebSocketClient client;
    public WebsocketSyncCache(final SoulConfig.WebsocketConfig websocketConfig) {
        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1,
                SoulThreadFactory.create("websocket-connect", true));
         client = new WebSocketClient(new URI(websocketConfig.getUrl())) {
                @Override
                public void onOpen(final ServerHandshake serverHandshake) {
                  //....
                }
                @Override
                public void onMessage(final String result) {
                  //....
                }    
            };
        //进行连接
        client.connectBlocking();
        //使用调度线程池进行断线重连,30秒进行一次
        executor.scheduleAtFixedRate(() -> {
            if (client != null && client.isClosed()) {
                    client.reconnectBlocking();
            }
        }, 10, 30, TimeUnit.SECONDS);
    }

http长轮询

zookeeper、websocket 数据同步的机制比较简单,而 http 同步会相对复杂一些。Soul 借鉴了 ApolloNacos 的设计思想,取决精华,自己实现了 http 长轮询数据同步功能。注意,这里并非传统的 ajax 长轮询!

http 长轮询机制如上所示,soul-web 网关请求 admin 的配置服务,读取超时时间为 90s,意味着网关层请求配置服务最多会等待 90s,这样便于 admin 配置服务及时响应变更数据,从而实现准实时推送。

http 请求到达 sou-admin 之后,并非立马响应数据,而是利用 Servlet3.0 的异步机制,异步响应数据。首先,将长轮询请求任务 LongPollingClient 扔到 BlocingQueue 中,并且开启调度任务,60s 后执行,这样做的目的是 60s 后将该长轮询请求移除队列,即便是这段时间内没有发生配置数据变更。因为即便是没有配置变更,也得让网关知道,总不能让其干等吧,而且网关请求配置服务时,也有 90s 的超时时间。

public void doLongPolling(final HttpServletRequest request, final HttpServletResponse response) {
    // 因为soul-web可能未收到某个配置变更的通知,因此MD5值可能不一致,则立即响应
    List<ConfigGroupEnum> changedGroup = compareMD5(request);
    String clientIp = getRemoteIp(request);
    if (CollectionUtils.isNotEmpty(changedGroup)) {
        this.generateResponse(response, changedGroup);
        return;
    }
    // Servlet3.0异步响应http请求
    final AsyncContext asyncContext = request.startAsync();
    asyncContext.setTimeout(0L);
    scheduler.execute(new LongPollingClient(asyncContext, clientIp, 60));
}
    
class LongPollingClient implements Runnable {
    LongPollingClient(final AsyncContext ac, final String ip, final long timeoutTime) {
        // 省略......
    }
    @Override
    public void run() {
        // 加入定时任务,如果60s之内没有配置变更,则60s后执行,响应http请求
        this.asyncTimeoutFuture = scheduler.schedule(() -> {
            // clients是阻塞队列,保存了来处soul-web的请求信息
            clients.remove(LongPollingClient.this);
            List<ConfigGroupEnum> changedGroups = HttpLongPollingDataChangedListener.compareMD5((HttpServletRequest) asyncContext.getRequest());
            sendResponse(changedGroups);
        }, timeoutTime, TimeUnit.MILLISECONDS);
        // 
        clients.add(this);
    }
}

如果这段时间内,管理员变更了配置数据,此时,会挨个移除队列中的长轮询请求,并响应数据,告知是哪个 Group 的数据发生了变更(我们将插件、规则、流量配置、用户配置数据分成不同的组)。网关收到响应信息之后,只知道是哪个 Group 发生了配置变更,还需要再次请求该 Group 的配置数据。有人会问,为什么不是直接将变更的数据写出?我们在开发的时候,也深入讨论过该问题,因为 http 长轮询机制只能保证准实时,如果在网关层处理不及时,或者管理员频繁更新配置,很有可能便错过了某个配置变更的推送,安全起见,我们只告知某个 Group 信息发生了变更。

// soul-admin发生了配置变更,挨个将队列中的请求移除,并予以响应
class DataChangeTask implements Runnable {
    DataChangeTask(final ConfigGroupEnum groupKey) {
        this.groupKey = groupKey;
    }
    @Override
    public void run() {
        try {
            for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext(); ) {
                LongPollingClient client = iter.next();
                iter.remove();
                client.sendResponse(Collections.singletonList(groupKey));
            }
        } catch (Throwable e) {
            LOGGER.error("data change error.", e);
        }
    }
}

soul-web 网关层接收到 http 响应信息之后,拉取变更信息(如果有变更的话),然后再次请求 soul-admin 的配置服务,如此反复循环。

总结

总体继承关系图,如下图所示,核心借助于DataChangedEventDispatcher下面的五个监听器,根据不同的数据同步策略,触发不同的listener。

每个具体实现,都包含下面五个实现方法,用于数据同步。

其中,Http长轮询,借鉴了 `Apollo``Nacos` 的设计思想,取决精华,自己实现了 `http` 长轮询数据同步功能。注意,这里并非传统的 ajax 长轮询!

soul内置依赖 `spring-webflux` 而其底层是使用的netty。这一块只要是使用的netty线程模型

@Bean
public NettyReactiveWebServerFactory nettyReactiveWebServerFactory() {
    NettyReactiveWebServerFactory webServerFactory = new NettyReactiveWebServerFactory();
    webServerFactory.addServerCustomizers(new EventLoopNettyCustomizer());
    return webServerFactory;
}
private static class EventLoopNettyCustomizer implements NettyServerCustomizer {
    @Override
    public HttpServer apply(final HttpServer httpServer) {
        return httpServer
            .tcpConfiguration(tcpServer -> tcpServer
                              .runOn(LoopResources.create("soul-netty", 1, DEFAULT_IO_WORKER_COUNT, true), false)
                              .selectorOption(ChannelOption.SO_REUSEADDR, true)
                              .selectorOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                              .option(ChannelOption.TCP_NODELAY, true)
                              .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT));
    }
}
相关文章
|
2月前
|
Java 数据库连接 调度
xxljob执行源码分析
本文深入解析XXL-JOB分布式任务调度框架源码,涵盖架构设计、核心执行流程与关键线程池机制。内容包括任务触发、注册、失败重试、日志报告及时间轮调度原理,结合带中文注释的源码包与分析导图,全面剖析其高性能设计实现。
 xxljob执行源码分析
|
7月前
|
存储 安全 数据库
抖音封号能注销吗?请问
一、封号与注销的底层逻辑关系 账号状态机模型
|
2月前
|
敏捷开发 Dubbo Java
需求开发人日评估
本文介绍敏捷开发中工时评估的关键方法,重点讲解“人日”概念及开发、自测、联调、测试等各阶段的参考周期。结合常见需求如Excel导入导出、增删改查、跨服务调用等,提供实用的人日估算标准,并附天机学堂案例与拆分模板,助力团队科学排期。
需求开发人日评估
|
2月前
|
安全 数据库 数据安全/隐私保护
1.RememberMe简介及用法
RememberMe功能并非简单保存用户名密码,而是通过服务端生成令牌(Token),借助Cookie实现用户关闭浏览器后仍保持登录状态。勾选“记住我”后,系统在响应头设置remember-me令牌,后续请求自动携带该令牌验证身份。为提升安全性,可将Token持久化至数据库并增加二次校验机制,防止令牌泄露带来的安全风险。
1.RememberMe简介及用法
|
2月前
|
安全 C++ Perl
南京观海微电子----LED发光二极管限流电阻计算
LED是开源项目中常用的输出设备,用于状态指示。为防止烧坏,需串联限流电阻。通过公式 \( R = \frac{V_{\text{supply}} - V_F}{I_F} \) 计算电阻值,并根据功率选择合适封装,确保电路安全稳定运行。
|
2月前
|
JSON 前端开发 Java
Spring Boot 中常用的 MVC 注解
Spring Boot 开发中,核心注解如 `@RestController`、`@RequestMapping`、`@PathVariable`、`@RequestParam` 和 `@RequestBody` 是构建 RESTful 接口的基础。它们分别用于定义控制器、映射请求路径、获取路径参数、查询参数及请求体数据,掌握后可高效开发 Web 接口,适用于前后端分离架构。
|
2月前
|
安全 Java Spring
2.过滤器链加载原理
本文深入解析Spring Security过滤器加载机制,通过源码分析DelegatingFilterProxy、FilterChainProxy与SecurityFilterChain的协作流程,揭示十五个安全过滤器如何自动装配并执行,帮助理解框架底层原理,为自定义认证页面打下基础。
|
2月前
|
存储 缓存 安全
1-常用过滤器介绍
本文介绍了Spring Security中的核心过滤器链,涵盖SecurityContextPersistenceFilter、CsrfFilter、LogoutFilter等15个关键过滤器的作用与执行逻辑,解析其如何实现认证、授权、会话管理及异常处理。实际加载的过滤器取决于配置,并非固定不变。
|
网络安全 数据安全/隐私保护 网络架构
ABCDE类网络的划分及保留网段
ABCDE类网络的划分及保留网段
5290 7
|
SQL 存储 分布式计算
Kylin使用心得:从入门到进阶的探索之旅
【5月更文挑战第2天】Apache Kylin是开源大数据分析平台,提供亚秒级OLAP查询。本文深入解析Kylin的工作原理,包括预计算模型Cube、构建过程和查询引擎。常见问题涉及Cube设计、查询性能和资源管理,解决方案涵盖合理设计、性能监控和测试验证。文中还分享了Cube创建的JSON示例,并探讨了Cube构建优化、查询优化、与其他组件集成、监控维护及生产环境问题解决。通过学习和实践,读者能有效提升数据洞察力和决策效率。
792 5