前言
本片文章的目的是分析Eureka Server的注册流程,您可以结合《Eureka Client服务注册》更容易理解
Eureka Server服务注册流程
在《Eureka Server初始化流程》 文章中我们知道,在EurekaServerAutoConfiguration中注册了JerseyFilter用来处理所有的/eureka开头的请求,当Eureka Client客户端发起注册请求,请求被该Filter接待
/*** Register the Jersey filter* 注册Jersey filter*/publicFilterRegistrationBeanjerseyFilterRegistration( javax.ws.rs.core.ApplicationeurekaJerseyApp) { FilterRegistrationBeanbean=newFilterRegistrationBean(); //ServletContainer 是核心处理类bean.setFilter(newServletContainer(eurekaJerseyApp)); bean.setOrder(Ordered.LOWEST_PRECEDENCE); //处理的请求/eureka/**bean.setUrlPatterns( Collections.singletonList(EurekaConstants.DEFAULT_PREFIX+"/*")); returnbean; }
具体的实现是在ServletContainer中完成
//处理请求privatevoiddoFilter(HttpServletRequestrequest, HttpServletResponseresponse, FilterChainchain, StringrequestURI, StringservletPath, StringqueryString) throwsIOException, ServletException { Patternp=this.getStaticContentPattern(); //处理 ContextPath 上下文路径if (p!=null&&p.matcher(servletPath).matches()) { chain.doFilter(request, response); } else { if (this.filterContextPath!=null) { if (!servletPath.startsWith(this.filterContextPath)) { thrownewContainerException("The servlet path, \""+servletPath+"\", does not start with the filter context path, \""+this.filterContextPath+"\""); } if (servletPath.length() ==this.filterContextPath.length()) { if (this.webComponent.getResourceConfig().getFeature("com.sun.jersey.config.feature.Redirect")) { URIl=UriBuilder.fromUri(request.getRequestURL().toString()).path("/").replaceQuery(queryString).build(newObject[0]); response.setStatus(307); response.setHeader("Location", l.toASCIIString()); return; } requestURI=requestURI+"/"; } } UriBuilderabsoluteUriBuilder=UriBuilder.fromUri(request.getRequestURL().toString()); URIbaseUri=this.filterContextPath==null?absoluteUriBuilder.replacePath(request.getContextPath()).path("/").build(newObject[0]) : absoluteUriBuilder.replacePath(request.getContextPath()).path(this.filterContextPath).path("/").build(newObject[0]); URIrequestUri=absoluteUriBuilder.replacePath(requestURI).replaceQuery(queryString).build(newObject[0]); //这里调用service方法intstatus=this.service(baseUri, requestUri, request, response); if (this.forwardOn404&&status==404&&!response.isCommitted()) { response.setStatus(200); chain.doFilter(request, response); } } } // 处理请求,调用WebComponent的service方法publicintservice(URIbaseUri, URIrequestUri, HttpServletRequestrequest, HttpServletResponseresponse) throwsServletException, IOException { returnthis.webComponent.service(baseUri, requestUri, request, response); }
最终会调用ApplicationResource实现服务注册 , Eureka Server对于Eureka client注册服务实例,获取服务实例的的REST请求的都交给ApplicationResource处理,其中用来服务注册的方法是addInstance,我们来看一下他的源码
/**服务实例注册,InstanceInfo是服务注册信息,isReplicationd为true代表是从其他Eureka Server节点复制实例,如果是isReplication为false,代表是Eureka Client 注册的* Registers information about a particular instance for an* {@link com.netflix.discovery.shared.Application}.** @param info* {@link InstanceInfo} information of the instance.* @param isReplication* a header parameter containing information whether this is* replicated from other nodes.*/"application/json", "application/xml"}) ({publicResponseaddInstance(InstanceInfoinfo, PeerEurekaNode.HEADER_REPLICATION) StringisReplication) { (logger.debug("Registering instance {} (replication={})", info.getId(), isReplication); //验证instanceinfo包含所有必需的必填字段// validate that the instanceinfo contains all the necessary required fieldsif (isBlank(info.getId())) { returnResponse.status(400).entity("Missing instanceId").build(); } elseif (isBlank(info.getHostName())) { returnResponse.status(400).entity("Missing hostname").build(); } elseif (isBlank(info.getIPAddr())) { returnResponse.status(400).entity("Missing ip address").build(); } elseif (isBlank(info.getAppName())) { returnResponse.status(400).entity("Missing appName").build(); } elseif (!appName.equals(info.getAppName())) { returnResponse.status(400).entity("Mismatched appName, expecting "+appName+" but was "+info.getAppName()).build(); } elseif (info.getDataCenterInfo() ==null) { returnResponse.status(400).entity("Missing dataCenterInfo").build(); } elseif (info.getDataCenterInfo().getName() ==null) { returnResponse.status(400).entity("Missing dataCenterInfo Name").build(); } //处理客户端可能在数据缺失的情况下向错误的DataCenterInfo注册的情况// handle cases where clients may be registering with bad DataCenterInfo with missing dataDataCenterInfodataCenterInfo=info.getDataCenterInfo(); if (dataCenterInfoinstanceofUniqueIdentifier) { StringdataCenterInfoId= ((UniqueIdentifier) dataCenterInfo).getId(); if (isBlank(dataCenterInfoId)) { booleanexperimental="true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId")); if (experimental) { Stringentity="DataCenterInfo of type "+dataCenterInfo.getClass() +" must contain a valid id"; returnResponse.status(400).entity(entity).build(); } elseif (dataCenterInfoinstanceofAmazonInfo) { AmazonInfoamazonInfo= (AmazonInfo) dataCenterInfo; StringeffectiveId=amazonInfo.get(AmazonInfo.MetaDataKey.instanceId); if (effectiveId==null) { amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId()); } } else { logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass()); } } } //【重要】:这里在调用PeerAwareInstanceRegistry的register注册服务,使用的是实现类:InstanceRegistryregistry.register(info, "true".equals(isReplication)); returnResponse.status(204).build(); // 204 to be backwards compatible}
ApplicationResource.addInstance看方法名就能推测出他是用来注册实例的方法,其中参数InstanceInfo 是客户端提交的注册信息,请求头中isReplicationd为true代表是从其他Eureka Server节点复制实例,如果是isReplication为false,代表是Eureka Client 注册的
在做了一些列参数判断之后,这里在调用PeerAwareInstanceRegistry的register注册服务,使用的是实现类:InstanceRegistry,这个类在之前有介绍过,就是Eureak Server用来实现服务注册,服务发现,服务续约,取消注册等的具体实现,他的继承关系如下:
跟踪下去,InstanceRegistry .register方法源码如下
publicclassInstanceRegistryextendsPeerAwareInstanceRegistryImplimplementsApplicationContextAware { publicvoidregister(finalInstanceInfoinfo, finalbooleanisReplication) { //调用handleRegistration方法,抛出事件:EurekaInstanceRegisteredEventthis.handleRegistration(info, this.resolveInstanceLeaseDuration(info), isReplication); //调用父类PeerAwareInstanceRegistryImpl的register方法super.register(info, isReplication); } //服务注册,抛出EurekaInstanceRegisteredEvent事件privatevoidhandleRegistration(InstanceInfoinfo, intleaseDuration, booleanisReplication) { this.log("register "+info.getAppName() +", vip "+info.getVIPAddress() +", leaseDuration "+leaseDuration+", isReplication "+isReplication); this.publishEvent(newEurekaInstanceRegisteredEvent(this, info, leaseDuration, isReplication)); } ...省略... }
这里在调用handleRegistration方法,抛出事件:EurekaInstanceRegisteredEvent后继续调用了super.register方法,即:PeerAwareInstanceRegistryImpl.register,继续跟踪下去:
publicclassPeerAwareInstanceRegistryImplextendsAbstractInstanceRegistryimplementsPeerAwareInstanceRegistry { /**注册服务信息 InstanceInfo,并将此信息InstanceInfo复制到所有对等的eureka server节点。如果这是来自其他副本节点的复制事件,则不会复制它。* Registers the information about the {@link InstanceInfo} and replicates* this information to all peer eureka nodes. If this is replication event* from other replica nodes then it is not replicated.** @param info* the {@link InstanceInfo} to be registered and replicated.* @param isReplication* true if this is a replication event from other replica nodes,* false otherwise.*/publicvoidregister(finalInstanceInfoinfo, finalbooleanisReplication) { //租期失效时间 90 sintleaseDuration=Lease.DEFAULT_DURATION_IN_SECS; if (info.getLeaseInfo() !=null&&info.getLeaseInfo().getDurationInSecs() >0) { //如果服务的租期失效时间大于默认的90s,则重新赋值租期时间leaseDuration=info.getLeaseInfo().getDurationInSecs(); } //服务注册super.register(info, leaseDuration, isReplication); //把注册的服务信息复制到其他的Eureka Server 节点,注意这里是Action.RegisterreplicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); } }
该方法做了3个事情,
- 1是更新租期失效时间,
- 2是调用super.register服务注册(AbstractInstanceRegistry.register)
- 3是调用replicateToPeers把服务实例拷贝到其他的Eureak Server节点
我们先看下super.register方法即:AbstractInstanceRegistry.register方法的源码
/*** Registers a new instance with a given duration.** @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)*/publicvoidregister(InstanceInforegistrant, intleaseDuration, booleanisReplication) { try { //获取锁:ReentrantReadWriteLock.lock()read.lock(); //根据注册的服务的名字取本地服务注册表中获取服务注册信息,如果该服务已经被注册了,那么registry中将会存在它Map<String, Lease<InstanceInfo>>gMap=registry.get(registrant.getAppName()); REGISTER.increment(isReplication); if (gMap==null) { //如果该服务实例没被注册,就把服务实例注册到本地的registry中,本质是一个MapfinalConcurrentHashMap<String, Lease<InstanceInfo>>gNewMap=newConcurrentHashMap<String, Lease<InstanceInfo>>(); //没有的话就添加到registry中gMap=registry.putIfAbsent(registrant.getAppName(), gNewMap); if (gMap==null) { gMap=gNewMap; } } //根据服务实例id获取服务的租约对象Lease<InstanceInfo>existingLease=gMap.get(registrant.getId()); // Retain the last dirty timestamp without overwriting it, if there is already a lease//如果已经有租约,则保留最后的脏时间戳而不覆盖它if (existingLease!=null&& (existingLease.getHolder() !=null)) { //registry中已经存在的当前服务的最后修改时间的时间戳LongexistingLastDirtyTimestamp=existingLease.getHolder().getLastDirtyTimestamp(); //提交注册的当前服务的最后修改时间LongregistrationLastDirtyTimestamp=registrant.getLastDirtyTimestamp(); logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); //因为如果时间戳相等,我们仍然采用远程传输的InstanceInfo而不是服务器本地副本// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted// InstanceInfo instead of the server local copy.//如果已存在的该服务的修改时间 大于 当前提交注册的该服务的最后修改时间,//则采用registy中已存在的服务为准,因为要选择修改时间靠后的if (existingLastDirtyTimestamp>registrationLastDirtyTimestamp) { logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater"+" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant"); //使用现有的instanceInfo代替新的instanceInfo作为注册者registrant=existingLease.getHolder(); } } else { //执行到这里,说明该服务是新注册// The lease does not exist and hence it is a new registrationsynchronized (lock) { //这里在计算服务的续约频率值if (this.expectedNumberOfRenewsPerMin>0) { // Since the client wants to cancel it, reduce the threshold// (1// for 30 seconds, 2 for a minute)//(expectedNumberOfRenewsPerMin)期待的每分钟续订次数,默认是30s/个,给他增加到2,每分钟2个请求this.expectedNumberOfRenewsPerMin=this.expectedNumberOfRenewsPerMin+2; //修改numberOfRenewsPerMinThreshold每分钟续约阀值 = 2 *(85%),//RenewalPercentThreshold是获取续订阈值百分比 this.numberOfRenewsPerMinThreshold= (int) (this.expectedNumberOfRenewsPerMin*serverConfig.getRenewalPercentThreshold()); } } logger.debug("No previous lease information found; it is new registration"); } //创建租约对象,把注册实例和租期放进去Lease<InstanceInfo>lease=newLease<InstanceInfo>(registrant, leaseDuration); if (existingLease!=null) { //设置服务上线时间lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); } //以注册的实例的ID为key把服务实例存封装到 MapgMap.put(registrant.getId(), lease); //添加到注册队列synchronized (recentRegisteredQueue) { recentRegisteredQueue.add(newPair<Long, String>( System.currentTimeMillis(), registrant.getAppName() +"("+registrant.getId() +")")); } // This is where the initial state transfer of overridden status happensif (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "+"overrides", registrant.getOverriddenStatus(), registrant.getId()); if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) { logger.info("Not found overridden id {} and hence adding it", registrant.getId()); //添加服务的OverriddenStatusoverriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); } } InstanceStatusoverriddenStatusFromMap=overriddenInstanceStatusMap.get(registrant.getId()); if (overriddenStatusFromMap!=null) { logger.info("Storing overridden status {} from map", overriddenStatusFromMap); registrant.setOverriddenStatus(overriddenStatusFromMap); } //根据覆盖的状态规则设置状态// Set the status based on the overridden status rulesInstanceStatusoverriddenInstanceStatus=getOverriddenInstanceStatus(registrant, existingLease, isReplication); registrant.setStatusWithoutDirty(overriddenInstanceStatus); // If the lease is registered with UP status, set lease service up timestamp//如果租约已注册为UP状态,请设置租约服务启动时间戳记if (InstanceStatus.UP.equals(registrant.getStatus())) { //更新服务上线时间戳lease.serviceUp(); } //服务动作:ADDED添加,MODIFIED修改,DELETED删除registrant.setActionType(ActionType.ADDED); //添加到最近更改的队列recentlyChangedQueue.add(newRecentlyChangedItem(lease)); //最后更新时间registrant.setLastUpdatedTimestamp(); //使当前应用的ResponseCache失效invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); logger.info("Registered instance {}/{} with status {} (replication={})", registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication); } finally { read.unlock(); } }
总结一下,这个方法做了什么呢
- 判断当前服务是否已经被注册,如果是,则以最后更新时间为准,选择更新时间靠后的服务实例进行注册
- 维护实例的租约信息Lease,并放到Eureka Server本地维护维护的registry注册表中,本质是一个Map(ConcurrentHashMap<String, Map<String, Lease>>)
- 如果是服务是新注册的,把注册的实例封装成Leaset存储到registry注册表中,并更新每分钟续约阀值numberOfRenewsPerMinThreshold
- 维护了两个队列,recentRegisteredQueue最近注册队列,recentlyChangedQueue最近更改队列,这个队列可以用来获取最近操作的信息。
- 维护当前实例的OverriddenStatus
- 更新服务实例的最后更新时间戳
- 使ResponseCache缓存失效
该方法结束,我们回到PeerAwareInstanceRegistryImpl.register方法中,继续跟踪replicateToPeers 方法
publicclassPeerAwareInstanceRegistryImplextendsAbstractInstanceRegistryimplementsPeerAwareInstanceRegistry { /*** Replicates all eureka actions to peer eureka nodes except for replication* traffic to this node.* 将所有eureka操作复制到对等eureka节点*/privatevoidreplicateToPeers(Actionaction, StringappName, Stringid, InstanceInfoinfo/* optional */, InstanceStatusnewStatus/* optional */, booleanisReplication) { //开始计时Stopwatchtracer=action.getTimer().start(); try { //是否是其他节点复制过来的if (isReplication) { //最后一分钟的复制次数+1numberOfReplicationsLastMin.increment(); } // If it is a replication already, do not replicate again as this will create a poison replication//如果已经是复制,则不要再次复制if (peerEurekaNodes==Collections.EMPTY_LIST||isReplication) { return; } //遍历集群所有节点for (finalPeerEurekaNodenode : peerEurekaNodes.getPeerEurekaNodes()) { // If the url represents this host, do not replicate to yourself.//如果该URL代表此主机,请不要复制到您自己,当前节点不复制if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { continue; } //复制实例到其他某个EurekareplicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); } } finally { tracer.stop(); } }
继续跟踪replicateInstanceActionsToPeers的源码
/*** Replicates all instance changes to peer eureka nodes except for* replication traffic to this node.**/privatevoidreplicateInstanceActionsToPeers(Actionaction, StringappName, Stringid, InstanceInfoinfo, InstanceStatusnewStatus, PeerEurekaNodenode) { try { InstanceInfoinfoFromRegistry=null; CurrentRequestVersion.set(Version.V2); //判断请求的是什么操作switch (action) { //取消注册,调用 PeerEurekaNode.cancelcaseCancel: node.cancel(appName, id); break; //心跳请求,调用PeerEurekaNode.heartbeatcaseHeartbeat: InstanceStatusoverriddenStatus=overriddenInstanceStatusMap.get(id); infoFromRegistry=getInstanceByAppAndId(appName, id, false); node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false); break; //服务注册调用PeerEurekaNode.registercaseRegister: node.register(info); break; //状态修改调用PeerEurekaNode.statusUpdatecaseStatusUpdate: infoFromRegistry=getInstanceByAppAndId(appName, id, false); node.statusUpdate(appName, id, newStatus, infoFromRegistry); break; //状态删除调用PeerEurekaNode.deleteStatusOverridecaseDeleteStatusOverride: infoFromRegistry=getInstanceByAppAndId(appName, id, false); node.deleteStatusOverride(appName, id, infoFromRegistry); break; } } catch (Throwablet) { logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t); } }
这里在根据请求的动作类型选择PeerEurekaNode的不同方法,我们这里是服务注册.register,调用的是 PeerEurekaNode.register ,源码如下:
/*** Sends the registration information of {@link InstanceInfo} receiving by* this node to the peer node represented by this class.** @param info* the instance information {@link InstanceInfo} of any instance* that is send to this instance.* @throws Exception*/publicvoidregister(finalInstanceInfoinfo) throwsException { //到期时间当,前时间加上30s过期longexpiryTime=System.currentTimeMillis() +getLeaseRenewalOf(info); //封装InstanceReplicationTask 实例赋值任务到调度器中batchingDispatcher.process( taskId("register", info), newInstanceReplicationTask(targetHost, Action.Register, info, null, true) { publicEurekaHttpResponse<Void>execute() { //复制器客户端 HttpReplicationClient(JerseyReplicationClient),执行注册//调用AbstractJerseyEurekaHttpClient#registerreturnreplicationClient.register(info); } }, expiryTime ); }
把InstanceInfo注册实例封装成InstanceReplicationTask实例复制任务,交给batchingDispatcher批量任务调度器去执行,replicationClient是HttpReplicationClient它的默认实现是JerseyEurekaHttpClient,底层会调用AbstractJerseyEurekaHttpClient#register的方法完成实例的注册,这里其实就和当时我们分析Eureka Client 服务注册的最后注册请求一样了
publicabstractclassAbstractJerseyEurekaHttpClientimplementsEurekaHttpClient { publicEurekaHttpResponse<Void>register(InstanceInfoinfo) { //请求地址StringurlPath="apps/"+info.getAppName(); ClientResponseresponse=null; EurekaHttpResponsevar5; try { BuilderresourceBuilder=this.jerseyClient.resource(this.serviceUrl).path(urlPath).getRequestBuilder(); this.addExtraHeaders(resourceBuilder); //把InstanceInfo作为参数,发送post请求提交服务注册response= (ClientResponse)((Builder)((Builder)((Builder)resourceBuilder.header("Accept-Encoding", "gzip")).type(MediaType.APPLICATION_JSON_TYPE)).accept(newString[]{"application/json"})).post(ClientResponse.class, info); var5=EurekaHttpResponse.anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", newObject[]{this.serviceUrl, urlPath, info.getId(), response==null?"N/A" : response.getStatus()}); } if (response!=null) { response.close(); } } returnvar5; }
到这里就结束了,在PeerEurekaNode中封装InstanceReplicationTask实例服务任务,通过EurekaHttpClient去发起请求(JerseyEurekaHttpClient),最终通过JerseyEurekaHttpClient父类AbstractJerseyEurekaHttpClient#register方法注册,方法中把InstanceInfo作为参数使用POST提交请求。
总结