Spring Cloud Eureka 全解 (4) - 核心流程-服务与实例列表获取详解

本文涉及的产品
传统型负载均衡 CLB,每月750个小时 15LCU
网络型负载均衡 NLB,每月750个小时 15LCU
应用型负载均衡 ALB,每月750个小时 15LCU
简介: Spring Cloud Eureka 全解 (4) - 核心流程-服务与实例列表获取详解

系列目录:

本文基于SpringCloud-Dalston.SR5


关于服务与实例列表获取


EurekaClient端

我们从Ribbon说起:EurekaClient也存在缓存,应用服务实例列表信息在每个EurekaClient服务消费端都有缓存。一般的,Ribbon的LoadBalancer会读取这个缓存,来知道当前有哪些实例可以调用,从而进行负载均衡。这个loadbalancer同样也有缓存。

首先看这个LoadBalancer的缓存更新机制,相关类是PollingServerListUpdater:

final Runnable wrapperRunnable = new Runnable() {
    @Override
    public void run() {
        if (!isActive.get()) {
            if (scheduledFuture != null) {
                scheduledFuture.cancel(true);
            }
            return;
        }
        try {
            //从EurekaClient缓存中获取服务实例列表,保存在本地缓存
            updateAction.doUpdate();
            lastUpdated = System.currentTimeMillis();
        } catch (Exception e) {
            logger.warn("Failed one update cycle", e);
        }
    }
};
//定时调度
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
        wrapperRunnable,
        initialDelayMs,
        refreshIntervalMs,
        TimeUnit.MILLISECONDS
);


这个updateAction.doUpdate();就是从EurekaClient缓存中获取服务实例列表,保存在BaseLoadBalancer的本地缓存:

protected volatile List<Server> allServerList = Collections.synchronizedList(new ArrayList<Server>());
public void setServersList(List lsrv) {
    //写入allServerList的代码,这里略
}
@Override
public List<Server> getAllServers() {
    return Collections.unmodifiableList(allServerList);
}


这里的getAllServers会在每个负载均衡规则中被调用,例如RoundRobinRule:

public Server choose(ILoadBalancer lb, Object key) {
    if (lb == null) {
        log.warn("no load balancer");
        return null;
    }
    Server server = null;
    int count = 0;
    while (server == null && count++ < 10) {
        List<Server> reachableServers = lb.getReachableServers();
        //获取服务实例列表,调用的就是刚刚提到的getAllServers
        List<Server> allServers = lb.getAllServers();
        int upCount = reachableServers.size();
        int serverCount = allServers.size();
        if ((upCount == 0) || (serverCount == 0)) {
            log.warn("No up servers available from load balancer: " + lb);
            return null;
        }
        int nextServerIndex = incrementAndGetModulo(serverCount);
        server = allServers.get(nextServerIndex);
        if (server == null) {
            /* Transient. */
            Thread.yield();
            continue;
        }
        if (server.isAlive() && (server.isReadyToServe())) {
            return (server);
        }
        // Next.
        server = null;
    }
    if (count >= 10) {
        log.warn("No available alive servers after 10 tries from load balancer: "
                + lb);
    }
    return server;
}

这个缓存需要注意下,有时候我们只修改了EurekaClient缓存的更新时间,但是没有修改这个LoadBalancer的刷新本地缓存时间,就是ribbon.ServerListRefreshInterval,这个参数可以设置的很小,因为没有从网络读取,就是从一个本地缓存刷到另一个本地缓存(如何配置缓存配置来实现服务实例快速下线快速感知快速刷新,可以参考我的另一篇文章)。


然后我们来看一下EurekaClient本身的缓存,直接看关键类DiscoveryClient的相关源码,我们这里只关心本地Region的,多Region配置我们先忽略:

//本地缓存,可以理解为是一个软链接
private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();
private void initScheduledTasks() {
    //如果配置为需要拉取服务列表,则设置定时拉取任务,这个配置默认是需要拉取服务列表
    if (clientConfig.shouldFetchRegistry()) {
        // registry cache refresh timer
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        scheduler.schedule(
                new TimedSupervisorTask(
                        "cacheRefresh",
                        scheduler,
                        cacheRefreshExecutor,
                        registryFetchIntervalSeconds,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new CacheRefreshThread()
                ),
                registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }
    //其他定时任务初始化的代码,忽略
}
//定时从EurekaServer拉取服务列表的任务
class CacheRefreshThread implements Runnable {
        public void run() {
            refreshRegistry();
        }
}
void refreshRegistry() {
    try {
        //多Region配置处理代码,忽略
        boolean success = fetchRegistry(remoteRegionsModified);
        if (success) {
            registrySize = localRegionApps.get().size();
            lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
        }
        //日志代码,忽略
    } catch (Throwable e) {
        logger.error("Cannot fetch registry from server", e);
    }        
}
//定时从EurekaServer拉取服务列表的核心方法
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
    try {
        Applications applications = getApplications();
        //判断,如果是第一次拉取,或者app列表为空,就进行全量拉取,否则就会进行增量拉取
        if (clientConfig.shouldDisableDelta()
                || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                || forceFullRegistryFetch
                || (applications == null)
                || (applications.getRegisteredApplications().size() == 0)
                || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
        {
            getAndStoreFullRegistry();
        } else {
            getAndUpdateDelta(applications);
        }
        applications.setAppsHashCode(applications.getReconcileHashCode());
        logTotalInstances();
    } catch (Throwable e) {
        logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
        return false;
    } finally {
        if (tracer != null) {
            tracer.stop();
        }
    }
    //缓存更新完成,发送个event给观察者,目前没啥用 
    onCacheRefreshed();
    // 检查下远端的服务实例列表里面包括自己,并且状态是否对,这里我们不关心
    updateInstanceRemoteStatus();
    // registry was fetched successfully, so return true
    return true;
}
//全量拉取代码
private void getAndStoreFullRegistry() throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();
    Applications apps = null;
    //访问/eureka/apps接口,拉取所有服务实例信息
    EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
            ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
            : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        apps = httpResponse.getEntity();
    }
    logger.info("The response status is {}", httpResponse.getStatusCode());
    if (apps == null) {
        logger.error("The application is null for some reason. Not storing this information");
    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
        localRegionApps.set(this.filterAndShuffle(apps));
        logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
    } else {
        logger.warn("Not updating applications as another thread is updating it already");
    }
}
//增量拉取代码
private void getAndUpdateDelta(Applications applications) throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();
    Applications delta = null;
    //访问/eureka/delta接口,拉取所有服务实例增量信息
    EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        delta = httpResponse.getEntity();
    }
    if (delta == null) {
        //如果delta为空,拉取增量失败,就全量拉取
        logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                + "Hence got the full registry.");
        getAndStoreFullRegistry();
    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
        //这里设置原子锁的原因是怕某次调度网络请求时间过长,导致同一时间有多线程拉取到增量信息并发修改
        //拉取增量成功,检查hashcode是否一样,不一样的话也会全量拉取
        logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
        String reconcileHashCode = "";
        if (fetchRegistryUpdateLock.tryLock()) {
            try {
                updateDelta(delta);
                reconcileHashCode = getReconcileHashCode(applications);
            } finally {
                fetchRegistryUpdateLock.unlock();
            }
        } else {
            logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
        }
        // There is a diff in number of instances for some reason
        if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
            reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
        }
    } else {
        logger.warn("Not updating application delta as another thread is updating it already");
        logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
    }
}

以上就是对于EurekaClient拉取服务实例信息的源代码分析,总结EurekaClient 重要缓存如下:


  1. EurekaClient第一次全量拉取,定时增量拉取应用服务实例信息,保存在缓存中。
  2. EurekaClient增量拉取失败,或者增量拉取之后对比hashcode发现不一致,就会执行全量拉取,这样避免了网络某时段分片带来的问题。
  3. 同时对于服务调用,如果涉及到ribbon负载均衡,那么ribbon对于这个实例列表也有自己的缓存,这个缓存定时从EurekaClient的缓存更新


EurekaServer端


在EurekaServer端,所有的读取请求都是读的ReadOnlyMap(这个可以配置)

有定时任务会定时从ReadWriteMap同步到ReadOnlyMap这个时间配置是:

#eureka server刷新readCacheMap的时间,注意,client读取的是readCacheMap,这个时间决定了多久会把readWriteCacheMap的缓存更新到readCacheMap上
#默认30s
eureka.server.responseCacheUpdateInvervalMs=3000

相关代码:

if (shouldUseReadOnlyResponseCache) {
            timer.schedule(getCacheUpdateTask(),
                    new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
                            + responseCacheUpdateIntervalMs),
                    responseCacheUpdateIntervalMs);
        }
private TimerTask getCacheUpdateTask() {
    return new TimerTask() {
        @Override
        public void run() {
            logger.debug("Updating the client cache from response cache");
            for (Key key : readOnlyCacheMap.keySet()) {
                if (logger.isDebugEnabled()) {
                    Object[] args = {key.getEntityType(), key.getName(), key.getVersion(), key.getType()};
                    logger.debug("Updating the client cache from response cache for key : {} {} {} {}", args);
                }
                try {
                    CurrentRequestVersion.set(key.getVersion());
                    Value cacheValue = readWriteCacheMap.get(key);
                    Value currentCacheValue = readOnlyCacheMap.get(key);
                    if (cacheValue != currentCacheValue) {
                        readOnlyCacheMap.put(key, cacheValue);
                    }
                } catch (Throwable th) {
                    logger.error("Error while updating the client cache from response cache", th);
                }
            }
        }
    };
}

ReadWriteMap是一个LoadingCache,将Registry中的服务实例信息封装成要返回的http响应(分别是经过gzip压缩和非压缩的),同时还有两个特殊key,ALLAPPS和ALLAPPSDELTA
ALL
APPS就是所有服务实例信息

ALLAPPSDELTA就是之前讲注册说的RecentlyChangedQueue里面的实例列表封装的http响应信息

相关实践学习
SLB负载均衡实践
本场景通过使用阿里云负载均衡 SLB 以及对负载均衡 SLB 后端服务器 ECS 的权重进行修改,快速解决服务器响应速度慢的问题
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
相关文章
|
15天前
|
消息中间件 监控 Java
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + RabbitMQ应用程序部署到Pivotal Cloud Foundry (PCF)
31 6
|
15天前
|
Java 关系型数据库 MySQL
如何将Spring Boot + MySQL应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot + MySQL应用程序部署到Pivotal Cloud Foundry (PCF)
35 5
|
15天前
|
缓存 监控 Java
如何将Spring Boot应用程序部署到Pivotal Cloud Foundry (PCF)
如何将Spring Boot应用程序部署到Pivotal Cloud Foundry (PCF)
27 5
|
4月前
|
缓存 NoSQL Java
【Azure Redis 缓存】示例使用 redisson-spring-boot-starter 连接/使用 Azure Redis 服务
【Azure Redis 缓存】示例使用 redisson-spring-boot-starter 连接/使用 Azure Redis 服务
|
3月前
|
Java API 对象存储
微服务魔法启动!Spring Cloud与Netflix OSS联手,零基础也能创造服务奇迹!
这段内容介绍了如何使用Spring Cloud和Netflix OSS构建微服务架构。首先,基于Spring Boot创建项目并添加Spring Cloud依赖项。接着配置Eureka服务器实现服务发现,然后创建REST控制器作为API入口。为提高服务稳定性,利用Hystrix实现断路器模式。最后,在启动类中启用Eureka客户端功能。此外,还可集成其他Netflix OSS组件以增强系统功能。通过这些步骤,开发者可以更高效地构建稳定且可扩展的微服务系统。
60 1
|
2月前
|
负载均衡 算法 Nacos
SpringCloud 微服务nacos和eureka
SpringCloud 微服务nacos和eureka
69 0
|
3月前
|
负载均衡 Java Nacos
SpringCloud基础1——远程调用、Eureka,Nacos注册中心、Ribbon负载均衡
微服务介绍、SpringCloud、服务拆分和远程调用、Eureka注册中心、Ribbon负载均衡、Nacos注册中心
SpringCloud基础1——远程调用、Eureka,Nacos注册中心、Ribbon负载均衡
|
4月前
|
负载均衡 监控 Java
SpringCloud常见面试题(一):SpringCloud 5大组件,服务注册和发现,nacos与eureka区别,服务雪崩、服务熔断、服务降级,微服务监控
SpringCloud常见面试题(一):SpringCloud 5大组件,服务注册和发现,nacos与eureka区别,服务雪崩、服务熔断、服务降级,微服务监控
SpringCloud常见面试题(一):SpringCloud 5大组件,服务注册和发现,nacos与eureka区别,服务雪崩、服务熔断、服务降级,微服务监控
|
4月前
|
存储 Java Spring
【Azure Spring Cloud】Azure Spring Cloud服务,如何获取应用程序日志文件呢?
【Azure Spring Cloud】Azure Spring Cloud服务,如何获取应用程序日志文件呢?
|
4月前
|
Java Spring
【Azure 服务总线】Spring Cloud 的应用 使用Service Bus 引起 org.springframework.beans.BeanInstantiationException 异常,无法启动
【Azure 服务总线】Spring Cloud 的应用 使用Service Bus 引起 org.springframework.beans.BeanInstantiationException 异常,无法启动