spring cloud eureka部分源码分析及微服务管理功能

本文涉及的产品
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
注册配置 MSE Nacos/ZooKeeper,182元/月
云原生网关 MSE Higress,422元/月
简介:

eureka原生的管理页面只有查看服务节点和一些信息,没有动态启用停用服务节点的功能

一. EurekaClient获取所有注册的服务

eureka客户端会加载一个定时任务去获取注册中心的服务,任务的配置在:com.netflix.discovery.DiscoveryClient,刷新的线程是:CacheRefreshThread。
获取的注册中心服务的时候,会把所有服务都拉取下来,但是默认会过滤掉状态不是UP的服务。
获取服务的具体代码在:DiscoveryClient.getAndStoreFullRegistry()方法

/**
 * Gets the full registry information from the eureka server and stores it locally.
 * When applying the full registry, the following flow is observed:
 *
 * if (update generation have not advanced (due to another thread))
 *   atomically set the registry to the new registry
 * fi
 *
 * @return the full registry information.
 * @throws Throwable
 *             on error.
 */
private void getAndStoreFullRegistry() throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();

    logger.info("Getting all instance registry info from the eureka server");

    Applications apps = null;
    EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
            ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
            : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        apps = httpResponse.getEntity();
    }
    logger.info("The response status is {}", httpResponse.getStatusCode());

    if (apps == null) {
        logger.error("The application is null for some reason. Not storing this information");
    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
        localRegionApps.set(this.filterAndShuffle(apps));
        logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
    } else {
        logger.warn("Not updating applications as another thread is updating it already");
    }
}

过滤服务状态的代码在:DiscoveryClient.filterAndShuffle()方法

/**
 * Gets the <em>applications</em> after filtering the applications for
 * instances with only UP states and shuffling them.
 *
 * <p>
 * The filtering depends on the option specified by the configuration
 * {@link EurekaClientConfig#shouldFilterOnlyUpInstances()}. Shuffling helps
 * in randomizing the applications list there by avoiding the same instances
 * receiving traffic during start ups.
 * </p>
 *
 * @param apps
 *            The applications that needs to be filtered and shuffled.
 * @return The applications after the filter and the shuffle.
 */
private Applications filterAndShuffle(Applications apps) {
    if (apps != null) {
        if (isFetchingRemoteRegionRegistries()) {
            Map<String, Applications> remoteRegionVsApps = new ConcurrentHashMap<String, Applications>();
            apps.shuffleAndIndexInstances(remoteRegionVsApps, clientConfig, instanceRegionChecker);
            for (Applications applications : remoteRegionVsApps.values()) {
                applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
            }
            this.remoteRegionVsApps = remoteRegionVsApps;
        } else {
            apps.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
        }
    }
    return apps;
}

由此可以看出,是否需要过滤服务状态的配置是在clientConfig中,下一步寻找这个配置具体要怎么写。在类中可以看出,clientConfig对应的是EurekaClientConfig,但这是个接口,有两个实现

  • EurekaClientConfig

    • DefaultEurekaClientConfig
    • EurekaClientConfigBean

仅看名字以为是使用的DefaultEurekaClientConfig,找到对应的配置方法是:

/*
 * (non-Javadoc)
 *
 * @see
 * com.netflix.discovery.EurekaClientConfig#shouldFilterOnlyUpInstances()
 */
@Override
public boolean shouldFilterOnlyUpInstances() {
    return configInstance.getBooleanProperty(
            namespace + SHOULD_FILTER_ONLY_UP_INSTANCES_KEY, true).get();
}

于是找到namespace以及对应的常量,组合起来的配置应该是:eureka.shouldFilterOnlyUpInstances,然而,加上配置后并没有什么用。仔细看看这是eureka包里的,而不是spring包里的,所以这应该是独立使用eureka的时候配置的方法,回过头来看,另外一个配置的实现(EurekaClientConfigBean)是spring cloud包里的,找找在哪里有用到?果然不出所料,在EurekaClientAutoConfiguration自动配置中有初始化此bean,并且EurekaClientConfigBean上有@ConfigurationProperties注解,其实也就是个properties。所以结论出来了,EurekaClientConfigBean实际是将properties配置和对EurekaClientConfig接口的实现放一起了。。。,这样就能找到配置不自动过滤状态为UP的服务的方法了

eureka.client.filterOnlyUpInstances=false

二. 动态更新EurekaClient的状态

首先,spring cloud官方文档中,介绍了一个endpoint(/service-registry/instance-status),提供了get和post方法,get方法用来获取节点状态,post用来修改节点状态

Service Registry Actuator Endpoint

A /service-registry actuator endpoint is provided by Commons. This endpoint relys on a Registration bean in the Spring Application Context. Calling /service-registry/instance-status via a GET will return the status of the Registration. A POST to the same endpoint with a String body will change the status of the current Registration to the new value. Please see the documentation of the ServiceRegistry implementation you are using for the allowed values for updating the status and the values retured for the status.

spring cloud eureka client默认是没有开启endpoint的,需要自己引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

不过,引入依赖后会不止这一个endpoint,并且需要安全认证,当然也可以配置不需要安全认证:

management.security.enabled=false

为了不依赖actutator包,并且了解spring cloud具体是如何更新节点状态的,找到了endpoint中修改状态的具体实现:ServiceRegistryEndpoint。可以看出,主要是通过ServiceRegistry和Registration实现的,而这两个接口并不是actuator包里的,所以尝试自己实(拷)现(贝)一下。自己写一个controller,注入以上两个对象,然后将ServiceRegistryEndpoint中的获取和修改状态的方法复制粘贴,源码如下:

import javax.annotation.Resource;

import org.springframework.cloud.client.serviceregistry.ServiceRegistry;
import org.springframework.cloud.netflix.eureka.serviceregistry.EurekaRegistration;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import com.itopener.framework.ResultMap;

@RestController
@RequestMapping("eureka-client")
public class ServiceRegistryController {

    @Resource
    private ServiceRegistry<EurekaRegistration> serviceRegistry;
    
    @Resource
    private EurekaRegistration registration;
    
    @RequestMapping(value = "status", method = RequestMethod.GET)
    public ResultMap getStatus(){
        return ResultMap.buildSuccess().put("status", serviceRegistry.getStatus(registration));
    }
    
    @RequestMapping(value = "status", method = RequestMethod.POST)
    public ResultMap setStatus(String status){
        serviceRegistry.setStatus(registration, status);
        return ResultMap.buildSuccess();
    }
}

需要注意的是

  • ServiceRegistry有Registration接口的实现类的泛型,如果不对应会注入失败,这样看具体是哪个实现类?ServiceRegistry只有一个实现类:EurekaServiceRegistry,所以结果就显而易见了
    当然,这两个bean的初始化也会在自动配置类中(EurekaClientAutoConfiguration)
@Bean
public EurekaServiceRegistry eurekaServiceRegistry() {
    return new EurekaServiceRegistry();
}

@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient, CloudEurekaInstanceConfig instanceConfig,
    ApplicationInfoManager applicationInfoManager) {
    return EurekaRegistration.builder(instanceConfig)
        .with(applicationInfoManager)
        .with(eurekaClient)
        .with(healthCheckHandler)
        .build();
}

三. 动态管理spring cloud eureka服务

基于以上两点,就可以动态停用或启用eureka中注册的服务节点了。创建一个独立的web应用,与普通的服务一样注册到eureka中心,当然为了还是需要与其他服务有些不一样的配置

#本节点不注册到eureka
eureka.client.register-with-eureka=false
#可以从eureka拉取注册的服务
eureka.client.fetch-registry=true
#不过滤服务节点的UP状态,即需要使用所有的服务节点
eureka.client.filterOnlyUpInstances=false

然后就可以使用EurekaClient获取注册中心的服务了

@Resource
private EurekaClient eurekaClient;

/**
 * @description 获取服务数量和节点数量
 * @author fuwei.deng
 * @date 2017年7月21日 下午3:36:24
 * @version 1.0.0
 * @return
 */
@RequestMapping(value = "home", method = RequestMethod.GET)
public ResultMap home(){
    List<Application> apps = eurekaClient.getApplications().getRegisteredApplications();
    int appCount = apps.size();
    int nodeCount = 0;
    for(Application app : apps){
        nodeCount += app.getInstancesAsIsFromEureka().size();
    }
    return ResultMap.buildSuccess().put("appCount", appCount).put("nodeCount", nodeCount);
}

/**
 * @description 获取所有服务节点
 * @author fuwei.deng
 * @date 2017年7月21日 下午3:36:38
 * @version 1.0.0
 * @return
 */
@RequestMapping(value = "apps", method = RequestMethod.GET)
public ResultMap apps(){
    List<Application> apps = eurekaClient.getApplications().getRegisteredApplications();
    Collections.sort(apps, new Comparator<Application>() {
        public int compare(Application l, Application r) {
            return l.getName().compareTo(r.getName());
        }
    });
    return ResultMap.buildSuccess().put("list", apps);
}

如果需要动态修改节点的状态,以达到停用和启用服务节点的目的,可以使用http调用对应节点的接口

@RequestMapping(value = "status/{appName}", method = RequestMethod.POST)
public ResultMap status(@PathVariable String appName, String instanceId, String status){
    Application application = eurekaClient.getApplication(appName);
    InstanceInfo instanceInfo = application.getByInstanceId(instanceId);
    HttpUtil.post(instanceInfo.getHomePageUrl() + "eureka-client/status", "status=" + status);
    return ResultMap.buildSuccess();
}

当然如果是使用服务节点的actuator endpoint接口,调用接口的地址不一样(还有安全认证,此处代码未涉及),需要注意的是,endpoint接收的参数是@RequestBody(并且使用的jackson转换,fastjson转换是会出现异常的)

@RequestMapping(value = "status/{appName}", method = RequestMethod.POST)
public ResultMap status(@PathVariable String appName, String instanceId, String status){
    Application application = eurekaClient.getApplication(appName);
    InstanceInfo instanceInfo = application.getByInstanceId(instanceId);
    HttpUtil.post(instanceInfo.getHomePageUrl() + "service-registry/instance-status", status);
    return ResultMap.buildSuccess();
}

由于eureka注册中心没有通知的功能,只能由节点自己发起刷新请求,所以修改状态后,需要等到相关节点下一次刷新后才会生效。节点刷新是通过定时任务实现的,源码在com.netflix.discovery.DiscoveryClient中,并且任务是在构造方法中初始化的,还不能自己手动触发,主要代码如下:

//任务调度器,私有属性
private final ScheduledExecutorService scheduler;
//刷新注册中心节点的线程池,私有属性
private final ThreadPoolExecutor cacheRefreshExecutor

//1224行,私有方法中,如果允许拉取注册中心的节点,则初始化调度任务,从源码中可以看出能配置任务执行的间隔时间
if (clientConfig.shouldFetchRegistry()) {
    // registry cache refresh timer
    int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
    int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
    scheduler.schedule(
        new TimedSupervisorTask(
                "cacheRefresh",
                scheduler,
                cacheRefreshExecutor,
                registryFetchIntervalSeconds,
                TimeUnit.SECONDS,
                expBackOffBound,
                new CacheRefreshThread()
        ),
        registryFetchIntervalSeconds, TimeUnit.SECONDS);
}

配置在EurekaClientAutoConfiguration-->RefreshableEurekaClientConfiguration,使用DiscoveryClient的子类CloudEurekaClient实例化

@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
@org.springframework.cloud.context.config.annotation.RefreshScope
@Lazy
public EurekaClient eurekaClient(ApplicationInfoManager manager,
        EurekaClientConfig config, EurekaInstanceConfig instance) {
    manager.getInfo(); // force initialization
    return new CloudEurekaClient(manager, config, this.optionalArgs,
            this.context);
}

CloudEurekaClient中有一个刷新的方法,发布一个心跳事件,但这个方法是protected,没法通过实例调用,并且依赖于心跳事件。应用节点默认刷新事件是60秒一次,时间也不算太长,所以动态停用节点后再60秒内生效,应该是在能接受的范围吧,并且这个时间还能配置

目录
相关文章
|
16天前
|
数据可视化 Java BI
将 Spring 微服务与 BI 工具集成:最佳实践
本文探讨了 Spring 微服务与商业智能(BI)工具集成的潜力与实践。随着微服务架构和数据分析需求的增长,Spring Boot 和 Spring Cloud 提供了构建可扩展、弹性服务的框架,而 BI 工具则增强了数据可视化与实时分析能力。文章介绍了 Spring 微服务的核心概念、BI 工具在企业中的作用,并深入分析了两者集成带来的优势,如实时数据处理、个性化报告、数据聚合与安全保障。同时,文中还总结了集成过程中的最佳实践,包括事件驱动架构、集中配置管理、数据安全控制、模块化设计与持续优化策略,旨在帮助企业构建高效、智能的数据驱动系统。
将 Spring 微服务与 BI 工具集成:最佳实践
|
21天前
|
Java 数据库 数据安全/隐私保护
Spring 微服务和多租户:处理多个客户端
本文介绍了如何在 Spring Boot 微服务架构中实现多租户。多租户允许单个应用实例为多个客户提供独立服务,尤其适用于 SaaS 应用。文章探讨了多租户的类型、优势与挑战,并详细说明了如何通过 Spring Boot 的灵活配置实现租户隔离、动态租户管理及数据源路由,同时确保数据安全与系统可扩展性。结合微服务的优势,开发者可以构建高效、可维护的多租户系统。
249 127
|
16天前
|
存储 安全 Java
管理 Spring 微服务中的分布式会话
在微服务架构中,管理分布式会话是确保用户体验一致性和系统可扩展性的关键挑战。本文探讨了在 Spring 框架下实现分布式会话管理的多种方法,包括集中式会话存储和客户端会话存储(如 Cookie),并分析了它们的优缺点。同时,文章还涵盖了与分布式会话相关的安全考虑,如数据加密、令牌验证、安全 Cookie 政策以及服务间身份验证。此外,文中强调了分布式会话在提升系统可扩展性、增强可用性、实现数据一致性及优化资源利用方面的显著优势。通过合理选择会话管理策略,结合 Spring 提供的强大工具,开发人员可以在保证系统鲁棒性的同时,提供无缝的用户体验。
|
16天前
|
消息中间件 Java 数据库
Spring 微服务中的数据一致性:最终一致性与强一致性
本文探讨了在Spring微服务中实现数据一致性的策略,重点分析了最终一致性和强一致性的定义、优缺点及适用场景。结合Spring Boot与Spring Cloud框架,介绍了如何根据业务需求选择合适的一致性模型,并提供了实现建议,帮助开发者在分布式系统中确保数据的可靠性与同步性。
|
1月前
|
监控 Java API
Spring Boot 3.2 结合 Spring Cloud 微服务架构实操指南 现代分布式应用系统构建实战教程
Spring Boot 3.2 + Spring Cloud 2023.0 微服务架构实践摘要 本文基于Spring Boot 3.2.5和Spring Cloud 2023.0.1最新稳定版本,演示现代微服务架构的构建过程。主要内容包括: 技术栈选择:采用Spring Cloud Netflix Eureka 4.1.0作为服务注册中心,Resilience4j 2.1.0替代Hystrix实现熔断机制,配合OpenFeign和Gateway等组件。 核心实操步骤: 搭建Eureka注册中心服务 构建商品
352 3
|
3月前
|
JavaScript 前端开发 Java
垃圾分类管理系统基于 Spring Boot Vue 3 微服务架构实操指南
本文介绍了基于Java技术的垃圾分类管理系统开发方案与实施案例。系统采用前后端分离架构,后端使用Spring Boot框架搭配MySQL数据库,前端可选择Vue.js或Java Swing实现。核心功能模块包括垃圾分类查询、科普教育、回收预约等。文中提供了两个典型应用案例:彭湖花园小区使用的Swing桌面系统和基于Spring Boot+Vue的城市管理系统,分别满足不同场景需求。最新技术方案升级为微服务架构,整合Spring Cloud、Redis、Elasticsearch等技术,并采用Docker容器
209 0
|
17天前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
17天前
|
Prometheus 监控 Java
日志收集和Spring 微服务监控的最佳实践
在微服务架构中,日志记录与监控对系统稳定性、问题排查和性能优化至关重要。本文介绍了在 Spring 微服务中实现高效日志记录与监控的最佳实践,涵盖日志级别选择、结构化日志、集中记录、服务ID跟踪、上下文信息添加、日志轮转,以及使用 Spring Boot Actuator、Micrometer、Prometheus、Grafana、ELK 堆栈等工具进行监控与可视化。通过这些方法,可提升系统的可观测性与运维效率。
日志收集和Spring 微服务监控的最佳实践