SpringCloud服务治理与负载均衡原理

本文涉及的产品
传统型负载均衡 CLB,每月750个小时 15LCU
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
网络型负载均衡 NLB,每月750个小时 15LCU
简介: 从源码的角度上介绍Eureka服务治理的原理以及LoadBalanced原理与策略

老规矩本文还是重点从原理上讨论,不涉及使用。

 Eureka原理

1  服务治理

整体而言Eureka的服务治理模型如下所示:

7574a59d9a227093bf97dfa3f19db22db06f48d3

Eureka的服务治理原理与过程是:

 - 启动注册中心。注册中心启动时会:启动一个定时任务用于检测、清理失效的任务(由EvictionTask实现)。

 - 启动服务集群。服务提供者启动时会:向注册中心注册服务(由DiscoveryClient#InstanceInfoReplicator类实现);启动一个定时任务用于定时发送心跳信息到注册中心(由DiscoveryClient#HeartbeatThread类实现)。

 - 启动消费者集群。服务消费者启动时会:全量的从注册中心获取服务(由DiscoverClient#getAndStoreFullRegistry()方法实现);启动定时任务用于定时增量的从注册中心获取服务信息(由DiscoverClient#getAndUpdateDelta()方法实现)。

 - 注册中心是集群模式时,会将收到的注册信息在多个Eureka Server之间拷贝,这是一种高可用方案,当部分Eureka Server失效后,整个集群依然可以运行。

 - 服务提供者向注册中心发送心跳的目的是告诉注册中心我还活着,别将我踢出了。

 - 服务消费者获取到服务提供者信息以后,会在本地缓存提供者信息,当需要使用提供者服务时,直接从缓存的清单中获取提供者信息,然后通过负载均衡的方式进行调用。本地缓存服务提供者清单可以在集群的注册中心不可用时,依然能够使用服务。

这里只是简单介绍一下Eureka的流程,后面会从源码的角度分析。

 

2  服务分区

eureka通过region和zone来进行分区。region:可以简单理解为地理上的分区,比如亚洲地区、华北地区等等,没有具体大小的限制。zone:可以简单理解为region内的具体机房,比如说region划分为上海,然后上海有两个机房,就可以在此region之下划分出zone1,zone2两个zone。

通过region和zone可以实现一种高可用的多活方案,如下图所示:

831297e7bab3ab53d07279b836c1d2a780f5d9b6

正常情况下,zone1中的Consumer1消费Provider1提供的服务,这样性能损耗最低。

当zone1中的Provider1服务不可用时,消费者可以转向zone2消费Provider2,如下图所示:

e82f6d279f5656b1ac26e5af518c73d08fe83bd6

 

3  对比Zookeeper

1)   Eureka保证AP

CAP中,Eureka保证AP:Eureka各个节点都是平等的,几个节点挂掉不会影响正常节点的工作,剩余的节点依然可以提供注册和查询服务,只要有一台Eureka Server还在,就能保证注册服务可用(保证可用性),只不过查到的信息可能不是最新的(不保证强一致性)。

如果在15分钟内超过85%的节点都没有正常的心跳,Eureka会认为客户端与注册中心出现了网络故障,那么Eureka将进入自我保护机制,此时的处理逻辑是:

 - Eureka不再从注册列表中移除因为长时间没收到心跳而应该过期的服务

 - Eureka仍然能够接受新服务的注册和查询请求,但是不会被同步到其它节点上(即保证当前节点依然可用)

  - 当网络稳定时,当前实例新的注册信息会被同步到其它节点中

2)   Zookeeper保证CP

CAP中Zookeeper保证CP:当zookeeper集群处于选主过程中,将不能向zk注册服务,也不能获取服务;当zookeeper集群超过半数机器不可用,因为无法选出master,也就处于瘫痪状态。就这一点而言,作为注册中心,其实并不是最好的选择。

 

 

 服务注册

1  服务提供方注册

1)   源码分析

服务注册主要有两个步骤组成:注册和续约。注册是为了将提供的服务信息注册到EurekaServer端,续约是在注册完服务之后,服务提供者会维护一个心跳用来持续告诉EurekaServer我还活着,以防止Eureka Server的剔除任务将该服务实例从服务列表中排除出去。

对于DiscoverClient而言,主要的逻辑是:创建DiscoverClient时,会注册当前服务到EurekaServer,并且启动一个定时程序来不断的续约。

a)    注册

在DiscoverClient的构造函数中,会调用initScheduledTasks()方法,这个方法就是用来注册服务的。示例代码如下:

@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, DiscoveryClientOptionalArgs args,
               Provider<BackupRegistry> backupRegistryProvider) {

//省略代码
    initScheduledTasks();
//
省略代码
}

initScheduledTasks是注册和续约的核心逻辑,他先通过InstanceInfoReplicator来注册服务,然后创建定时程序不断的续约。

private void initScheduledTasks() {
    if (clientConfig.shouldFetchRegistry()) {
        // registry cache refresh timer
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        scheduler.schedule(
                new TimedSupervisorTask(
                        "cacheRefresh", scheduler, cacheRefreshExecutor,
                       registryFetchIntervalSeconds, TimeUnit.SECONDS,
                        expBackOffBound, new CacheRefreshThread()
                ),
               registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }

    if (clientConfig.shouldRegisterWithEureka()) {
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);

        // Heartbeat timer
        scheduler.schedule(
                new TimedSupervisorTask(
                "heartbeat", scheduler, heartbeatExecutor, 
                renewalIntervalInSecs, TimeUnit.SECONDS, 
                expBackOffBound, new HeartbeatThread()
                ),
                renewalIntervalInSecs, TimeUnit.SECONDS);

        // InstanceInfo replicator
        instanceInfoReplicator = new InstanceInfoReplicator(
                this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(),2); // burstSize

        statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
            @Override
            public String getId() {
                return "statusChangeListener";
            }

            @Override
            public void notify(StatusChangeEvent statusChangeEvent) {
                if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                       InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                    // log at warn level if DOWN was involved
                    logger.warn("Saw local status change event {}", statusChangeEvent);
                } else {
                    logger.info("Saw local status change event {}", statusChangeEvent);
                }
               instanceInfoReplicator.onDemandUpdate();
            }
        };

        if (clientConfig.shouldOnDemandUpdateStatusChange()) {
           applicationInfoManager.registerStatusChangeListener(statusChangeListener);
        }

       instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    } else {
        logger.info("Not registering with Eureka server per configuration");
    }
}

InstanceInfoReplicator#start方法将会触发注册操作,注册的源码如下:

boolean register() throws Throwable {
    logger.info(PREFIX + appPathIdentifier + ": registering service...");
    EurekaHttpResponse<Void> httpResponse;
    try {
        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } catch (Exception e) {
        logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
        throw e;
    }
    if (logger.isInfoEnabled()) {
        logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
    }
    return httpResponse.getStatusCode() == 204;
}

以下为截取注册信息(即instanceInfo信息),它包含了查找服务提供者的所有信息。

5d183640f4e9ddc135956d1f49226f54823b26b6

 

b)    续约

通过以下两个属性,可以控制续约的间隔时间、续约过期时间:

eureka.instance.lease-renewal-interval-in-seconds=30

eureka.instance.lease -expiration-duration-in-seconds=90 

 

在DiscoveryClient#initScheduledTasks中,会启动一个定时程序,用不断的续约。

private void initScheduledTasks() {
//
省略代码
    if (clientConfig.shouldRegisterWithEureka()) {
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        // Heartbeat timer
        scheduler.schedule(
                new TimedSupervisorTask(
                       "heartbeat",
                        scheduler,
                       heartbeatExecutor,
                       renewalIntervalInSecs,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new HeartbeatThread()
                ),
                renewalIntervalInSecs, TimeUnit.SECONDS);
//
省略代码
}

续约的定时程序最终将触发DiscoveryClient#renew方法,此方法将会发送心跳信息到EurekaServer。

boolean renew() {
   EurekaHttpResponse<InstanceInfo> httpResponse;
    try {
        httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
        logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
        if (httpResponse.getStatusCode() == 404) {
           REREGISTER_COUNTER.increment();
            logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
            return register();
        }
        return httpResponse.getStatusCode() == 200;
    } catch (Throwable e) {
        logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
        return false;
    }
}

 

2  注册中心管理服务

注册中心受理注册申请的入口在ApplicationResource#addInstance方法中,他通过调用PeerAwareInstanceRegistry#register方法完成注册。PeerAwareInstanceRegistryImpl#register首先进行服务登记,然后将此服务信息复制到其他的注册中心,源码如下所示:

public void register(final InstanceInfo info, final boolean isReplication) {
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    super.register(info, leaseDuration, isReplication);
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

注意:续约的入口是InstanceResource#renewLease方法

 

 

1)   服务登记

服务登记的逻辑比较复杂,简单来说就是将收到的注册申请信息放到一个ConcurrentHashMap中,然后更新此Instance的时间、状态等信息,这些信息对于服务中心统计服务存活状态非常重要。源码如下所示:

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    try {
        read.lock();
        Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
       REGISTER.increment(isReplication);
        if (gMap == null) {
            final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
            gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
            if (gMap == null) {
                gMap = gNewMap;
            }
        }
        Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
        // Retain the last dirty timestamp without overwriting it, if there is already a lease
        if (existingLease != null && (existingLease.getHolder() != null)) {
            Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
            Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
            logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
            if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                        " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

                if ("true".equals(serverConfig.getExperimental("registry.registration.ignoreIfDirtyTimestampIsOlder"))) {
                   logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    registrant = existingLease.getHolder();
                } else {
                   registrant.setLastDirtyTimestamp(existingLastDirtyTimestamp);
                }
            }
        } else {
            // The lease does not exist and hence it is a new registration
            synchronized (lock) {
                if (this.expectedNumberOfRenewsPerMin > 0) {
                    // Since the client wants to cancel it, reduce the threshold
                    // (1
                    // for 30 seconds, 2 for a minute)
                   this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                   this.numberOfRenewsPerMinThreshold =
                            (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                }
            }
            logger.debug("No previous lease information found; it is new registration");
        }
        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
        if (existingLease != null) {
           lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
        }
        gMap.put(registrant.getId(), lease);
        synchronized (recentRegisteredQueue) {
            recentRegisteredQueue.add(new Pair<Long, String>(
                    System.currentTimeMillis(),
                   registrant.getAppName() + "(" + registrant.getId() + ")"));
        }
        // This is where the initial state transfer of overridden status happens
        if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
            logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                            + "overrides", registrant.getOverriddenStatus(), registrant.getId());
            if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                logger.info("Not found overridden id {} and hence adding it", registrant.getId());
               overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
            }
        }
        InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
        if (overriddenStatusFromMap != null) {
            logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
           registrant.setOverriddenStatus(overriddenStatusFromMap);
        }

        // Set the status based on the overridden status rules
        InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
       registrant.setStatusWithoutDirty(overriddenInstanceStatus);

        // If the lease is registered with UP status, set lease service up timestamp
        if (InstanceStatus.UP.equals(registrant.getStatus())) {
            lease.serviceUp();
        }
       registrant.setActionType(ActionType.ADDED);
        recentlyChangedQueue.add(new RecentlyChangedItem(lease));
       registrant.setLastUpdatedTimestamp();
       invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
        logger.info("Registered instance {}/{} with status {} (replication={})",
                registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
    } finally {
        read.unlock();
    }
)

 

 

2)   拷贝服务信息到其他注册中心

拷贝服务信息到其他注册中心,可以实现高可用,只向一个Eureka Server中注册服务,他们在整个Eureka Server集群中都能看到这些服务实例,那么当出现注册中心部分机器不可用的时候,这个集群依然可用。

拷贝逻辑的核心是:首先确认是否要拷贝,然后获取出所有的Eureka Server节点,然后将此信息逐个拷贝到这些节点中。

private void replicateToPeers(Action action, String appName, String id,
          InstanceInfo info /* optional */,
          InstanceStatus newStatus /* optional */, boolean isReplication) {
    Stopwatch tracer = action.getTimer().start();
    try {
        if (isReplication) {
            numberOfReplicationsLastMin.increment();
        }
        // If it is a replication already, do not replicate again as this will create a poison replication
        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
            return;
        }

        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
            // If the url represents this host, do not replicate to yourself.
            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                continue;
            }
           replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
        }
    } finally {
        tracer.stop();
    }
}

replicateInstanceActionsToPeers接口根据不同的action执行不同的业务操作,在服务注册阶段,这里是通过node.register来将服务的实例信息注册到其他注册中心节点。

private void replicateInstanceActionsToPeers(Action action, String appName,
        String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) {
    try {
        InstanceInfo infoFromRegistry = null;
        CurrentRequestVersion.set(Version.V2);
        switch (action) {
            case Cancel:
                node.cancel(appName, id);
                break;
            case Heartbeat:
                InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                break;
            case Register:
                node.register(info);
                break;
            case StatusUpdate:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
               node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                break;
            case DeleteStatusOverride:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
               node.deleteStatusOverride(appName, id, infoFromRegistry);
                break;
        }
    } catch (Throwable t) {
        logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
    }
}

 

3)   服务提供者关闭服务

服务提供者关闭时,依然会调用ApplicationResource#addInstance方法,区别是,注册时参数InstanceInfo中status=UP,关闭时参数InstanceInfo中status=DOWN。因为状态不懂,所以关闭时,不会更新租约信息的时间信息。

 

4)   清理失效的服务

Eureka Server启动时,会创建一个定时器,定期检查过期的服务,然后将这些服务器清理掉。AbstractInstanceRegistry#postInit方法就是用来清理失效服务的,源码如下:

protected void postInit() {
    renewsLastMin.start();
    if (evictionTaskRef.get() != null) {
        evictionTaskRef.get().cancel();
    }
    evictionTaskRef.set(new EvictionTask());
   evictionTimer.schedule(evictionTaskRef.get(),
           serverConfig.getEvictionIntervalTimerInMs(),
           serverConfig.getEvictionIntervalTimerInMs());
}

 

AbstractInstanceRegistry#evict包含了清理失效服务的核心逻辑,

 - 首先检查是否清理租赁期满的服务。

 - 如果需要清理,那么从registry(一个ConcurrentHashMap)中逐个取出服务信息并检查是否已经过期。

 - 如果过期那么放到expiredLeases集合中。

 - 通过方法Math.min(expiredLeases.size(), evictionLimit)计算出需要清理的数量,然后随机清理。

源码如下所示:

public void evict(long additionalLeaseMs) {
    logger.debug("Running the evict task");

    if (!isLeaseExpirationEnabled()) {
        logger.debug("DS: lease expiration is currently disabled.");
        return;
    }

    // We collect first all expired items, to evict them in random order. For large eviction sets,
    // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
    // the impact should be evenly distributed across all applications.
    List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
    for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
        Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
        if (leaseMap != null) {
            for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                Lease<InstanceInfo> lease = leaseEntry.getValue();
                if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                   expiredLeases.add(lease);
                }
            }
        }
    }

    // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
    // triggering self-preservation. Without that we would wipe out full registry.
    int registrySize = (int) getLocalRegistrySize();
    int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
    int evictionLimit = registrySize - registrySizeThreshold;

    int toEvict = Math.min(expiredLeases.size(), evictionLimit);
    if (toEvict > 0) {
        logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);

        Random random = new Random(System.currentTimeMillis());
        for (int i = 0; i < toEvict; i++) {
            // Pick a random item (Knuth shuffle algorithm)
            int next = i + random.nextInt(expiredLeases.size() - i);
           Collections.swap(expiredLeases, i, next);
            Lease<InstanceInfo> lease = expiredLeases.get(i);

            String appName = lease.getHolder().getAppName();
            String id = lease.getHolder().getId();
            EXPIRED.increment();
            logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
            internalCancel(appName, id, false);
        }
    }
}

 

PeerAwareInstanceRegistryImpl#isLeaseExpirationEnabled方法用来计算是否启用租赁过期清理功能。源码如下所示:

public boolean isLeaseExpirationEnabled() {
    if (!isSelfPreservationModeEnabled()) {
        // The self preservation mode is disabled, hence allowing the instances to expire.
        return true;
    }
    return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
}

配置项eureka.server.enable-self-preservation用来控制注册中心的保护机制,默认为false。isSelfPreservationModeEnabled()方法由此配置项控制。Eureka会统计15分钟之内心跳失败的比例低于85%将会触发保护机制,不剔除服务提供者,如果关闭服务注册中心将不可用的实例正确剔除

 

 服务发现

1  消费端获取服务

当消费者启动时,会发送一个REST请求给服务注册中心,来获取注册中心注册的服务清单。为了性能考虑,Eureka Server会维护一份只读的服务清单来返回给消费端,同时消费端也会缓存此列表,默认每隔30秒更新一次。有两个与之相关的参数:

eureka.client.fetch-registry= true //是否获取注册的服务

eureka.client.registery-fetch-interval-second=30 //多久获取一次注册的服务,单位是秒。

1)   启动时获取服务

在DiscoverClient启动时,会通过以下代码触发获取注册服务的请求,如果获取失败,那么会通过fetchRegistryFromBackup方法获取。

if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
    fetchRegistryFromBackup();
}

获取服务并缓存的逻辑由fetchRegistry实现,具体来说就是通过getAndStoreFullRegistry()或者getAndUpdateDelta()方法来获取提供者信息;然后刷新缓存,最后更新服务状态标识。

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

    try {
        // If the delta is disabled or if it is the first time, get all
        // applications
        Applications applications = getApplications();

        if (clientConfig.shouldDisableDelta()
                || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                || forceFullRegistryFetch
                || (applications == null)
                || (applications.getRegisteredApplications().size() == 0)
                || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
        {
            getAndStoreFullRegistry();
        } else {
           getAndUpdateDelta(applications);
        }
       applications.setAppsHashCode(applications.getReconcileHashCode());
        logTotalInstances();
    } catch (Throwable e) {
        logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
        return false;
    } finally {
        if (tracer != null) {
            tracer.stop();
        }
    }

    // Notify about cache refresh before updating the instance remote status
    onCacheRefreshed();

    // Update remote status based on refreshed data held in the cache
    updateInstanceRemoteStatus();

    // registry was fetched successfully, so return true
    return true;
}

 

2)   定时更新

在DiscoverClient启动时,会启动一个定时程序,用于定时获取服务信息。

private void initScheduledTasks() {
    if (clientConfig.shouldFetchRegistry()) {
        // registry cache refresh timer
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        scheduler.schedule(
                new TimedSupervisorTask(
                        "cacheRefresh", scheduler, cacheRefreshExecutor,
                       registryFetchIntervalSeconds, TimeUnit.SECONDS,
                        expBackOffBound, new CacheRefreshThread()
                ),
                registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }
//
省略代码
}

 

2  注册中心受理获取服务的请求

1)   全量获取

消费端首次启动时,通过这个方法获取全量信息

public InstanceResource getInstanceInfo(@PathParam("id") String id) {
    return new InstanceResource(this, id, serverConfig, registry);
}

从以下的截图可以看到,请求服务信息时,注册中心直接返回了缓存的服务列表信息。

4aa33973de823d7398612198ba3284f7a1997d71

 

2)   增量获取

增量更新的逻辑如下,对用于处理消费端getAndUpdateDelta()的请求。

@Path("delta")
@GET
public Response getContainerDifferential(
        @PathParam("version") String version,
        @HeaderParam(HEADER_ACCEPT) String acceptHeader,
       @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
       @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
        @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) {

    boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();

    // If the delta flag is disabled in discovery or if the lease expiration
    // has been disabled, redirect clients to get all instances
    if ((serverConfig.shouldDisableDelta()) || (!registry.shouldAllowAccess(isRemoteRegionRequested))) {
        return Response.status(Status.FORBIDDEN).build();
    }

    String[] regions = null;
    if (!isRemoteRegionRequested) {
       EurekaMonitors.GET_ALL_DELTA.increment();
    } else {
        regions = regionsStr.toLowerCase().split(",");
        Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
       EurekaMonitors.GET_ALL_DELTA_WITH_REMOTE_REGIONS.increment();
    }

    CurrentRequestVersion.set(Version.toEnum(version));
    KeyType keyType = Key.KeyType.JSON;
    String returnMediaType = MediaType.APPLICATION_JSON;
    if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
        keyType = Key.KeyType.XML;
        returnMediaType = MediaType.APPLICATION_XML;
    }

    Key cacheKey = new Key(Key.EntityType.Application,
           ResponseCacheImpl.ALL_APPS_DELTA,
            keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
    );

    if (acceptEncoding != null
            && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
        return Response.ok(responseCache.getGZIP(cacheKey))
               .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
                .header(HEADER_CONTENT_TYPE, returnMediaType)
                .build();
    } else {
        return Response.ok(responseCache.get(cacheKey))
                .build();
    }
}

 

 服务消费

1  负载均衡

示例代码中使用了@LoadBalanced注解后,LoadBalancerAutoConfiguration会在创建RestTemplate时为他加上LoadBalancerInterceptor拦截器。源代码如下:

@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
      final LoadBalancerInterceptor loadBalancerInterceptor) {
   return new RestTemplateCustomizer() {
      @Override
      public void customize(RestTemplate restTemplate) {
        List<ClientHttpRequestInterceptor> list = new ArrayList<>(
              restTemplate.getInterceptors());
        list.add(loadBalancerInterceptor);
        restTemplate.setInterceptors(list);
      }
   };
}

 

LoadBalancerInterceptor#intercept负责对请求进行拦截,这里拦截的核心罗是:通过负载均衡的方式调用服务提供者的服务,如下所示:

@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
      final ClientHttpRequestExecution execution) throws IOException {
   final URI originalUri = request.getURI();
   String serviceName = originalUri.getHost();
   return this.loadBalancer.execute(serviceName,
         new LoadBalancerRequest<ClientHttpResponse>() {

            @Override
            public ClientHttpResponse apply(final ServiceInstance instance)
                  throws Exception {
               HttpRequest serviceRequest = new ServiceRequestWrapper(request,
                     instance);
               return execution.execute(serviceRequest, body);
            }
         });
}

 

通过RestTemplate请求时,经过拦截器,最终会通过LoadBalancerClient#execute执行业务逻辑。RibbonLoadBalancerClient#execute主要逻辑如下:

 - 首先选择一个LoadBalancer。

 - 然后通过LoadBalancer的规则选择服务提供者的服务器,默认情况LoadBalancer为ZoneAwareLoadBalancer。

 - 封装球球上下文,并发起http请求,返回结果

代码如下所示:

@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
   ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
   Server server = getServer(loadBalancer);
   if (server == null) {
      throw new IllegalStateException("No instances available for " + serviceId);
   }
   RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
         serviceId), serverIntrospector(serviceId).getMetadata(server));

   RibbonLoadBalancerContext context = this.clientFactory
        .getLoadBalancerContext(serviceId);
   RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);

   try {
      T returnVal = request.apply(ribbonServer);
     statsRecorder.recordStats(returnVal);
      return returnVal;
   }
   // catch IOException and rethrow so RestTemplate behaves correctly
   catch (IOException ex) {
      statsRecorder.recordStats(ex);
      throw ex;
   }
   catch (Exception ex) {
      statsRecorder.recordStats(ex);
      ReflectionUtils.rethrowRuntimeException(ex);
   }
   return null;
}

 

2  路由选择规则

负载均衡策略,很大程度上决定于路由策略。

1)   RandomRule

随机选择一个服务实例,具体来说是通过Random随机的选择服务器实例。

2)   RoundRobinRule

轮询服务器实例,选择服务实例,是默认采用的负载均衡策略。具体的实现逻辑是,定义一个counter,每选择一次提供者counter+1,通过「counter/总的实例数」确定这次选择第几个实例。

 

3)   RetryRule

轮询+重试的策略,首先会尝试通过轮询方式获取服务实例,如果获取的服务实例不可用那么尝试重新获取,重试有一个时间限制,如果超过了deadline(默认500ms)还是没取到,则会返回一个null。

 

4)   WeightedResponseTimeRule

会根据每一个实例的运行情况来给计算出该实例的一个权重,然后在挑选实例的时候则根据权重进行挑选,这样能够实现更优的实例调用。

WeightedResponseTimeRule中有一个名叫DynamicServerWeightTask的定时任务,默认情况下每隔30秒会计算一次各个服务实例的权重,权重的计算规则也很简单,如果一个服务的平均响应时间越短则权重越大,那么该服务实例被选中执行任务的概率也就越大。

 

5)   ClientConfigEnabledRoundRobinRule

和RoundRobinRule策略一致。

 

6)   BestAvailableRule

根据loadBalancerStats中保存的服务实例的状态信息来过滤掉失效的服务实例的功能,然后顺便找出并发请求最小的服务实例来使用。如果loadBalancerStats为null,则采用轮询策略。

 

7)   PredicateBasedRule

通过内部一个过滤器过滤出一部分服务实例清单,然后采用轮询策略。

 

8)   ZoneAvoidanceRule

ZoneAvoidanceRule中的过滤条件是以ZoneAvoidancePredicate为主过滤条件和以AvailabilityPredicate为次过滤条件组成的一个叫做CompositePredicate的组合过滤条件,然后采用轮询策略。

 

 

 

相关实践学习
SLB负载均衡实践
本场景通过使用阿里云负载均衡 SLB 以及对负载均衡 SLB 后端服务器 ECS 的权重进行修改,快速解决服务器响应速度慢的问题
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
相关文章
|
3月前
|
负载均衡 算法 Java
Spring Cloud全解析:负载均衡算法
本文介绍了负载均衡的两种方式:集中式负载均衡和进程内负载均衡,以及常见的负载均衡算法,包括轮询、随机、源地址哈希、加权轮询、加权随机和最小连接数等方法,帮助读者更好地理解和应用负载均衡技术。
|
1月前
|
负载均衡 算法 应用服务中间件
5大负载均衡算法及原理,图解易懂!
本文详细介绍负载均衡的5大核心算法:轮询、加权轮询、随机、最少连接和源地址散列,帮助你深入理解分布式架构中的关键技术。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
5大负载均衡算法及原理,图解易懂!
|
22天前
|
负载均衡 网络协议
slb健康检查的基本原理
slb健康检查的基本原理
37 6
|
1月前
|
负载均衡 算法 Java
除了 Ribbon,Spring Cloud 中还有哪些负载均衡组件?
这些负载均衡组件各有特点,在不同的场景和需求下,可以根据项目的具体情况选择合适的负载均衡组件来实现高效、稳定的服务调用。
51 5
|
2月前
|
负载均衡 算法 Java
蚂蚁面试:Nacos、Sentinel了解吗?Springcloud 核心底层原理,你知道多少?
40岁老架构师尼恩分享了关于SpringCloud核心组件的底层原理,特别是针对蚂蚁集团面试中常见的面试题进行了详细解析。内容涵盖了Nacos注册中心的AP/CP模式、Distro和Raft分布式协议、Sentinel的高可用组件、负载均衡组件的实现原理等。尼恩强调了系统化学习的重要性,推荐了《尼恩Java面试宝典PDF》等资料,帮助读者更好地准备面试,提高技术实力,最终实现“offer自由”。更多技术资料和指导,可关注公众号【技术自由圈】获取。
蚂蚁面试:Nacos、Sentinel了解吗?Springcloud 核心底层原理,你知道多少?
|
2月前
|
负载均衡 应用服务中间件 Apache
Tomcat负载均衡原理详解及配置Apache2.2.22+Tomcat7
Tomcat负载均衡原理详解及配置Apache2.2.22+Tomcat7
46 3
|
3月前
|
负载均衡 Java 对象存储
负载均衡策略:Spring Cloud与Netflix OSS的最佳实践
负载均衡策略:Spring Cloud与Netflix OSS的最佳实践
53 2
|
2月前
|
负载均衡 Java API
Spring Cloud原理详解
Spring Cloud原理详解
75 0
|
2月前
|
负载均衡 Java 网络架构
Spring Cloud原理详解
介绍了Spring Cloud的原理和核心组件,包括服务注册与发现、配置管理、负载均衡、断路器、智能路由、分布式消息传递、分布式追踪和服务熔断等,旨在帮助开发人员快速构建和管理微服务架构中的分布式系统。
59 0
|
3月前
|
负载均衡 Java Nacos
SpringCloud基础1——远程调用、Eureka,Nacos注册中心、Ribbon负载均衡
微服务介绍、SpringCloud、服务拆分和远程调用、Eureka注册中心、Ribbon负载均衡、Nacos注册中心
SpringCloud基础1——远程调用、Eureka,Nacos注册中心、Ribbon负载均衡