Eureka Server 服务下线
在《Eureka Client取消注册》中我们有分析到,当Eureka Client 客户端当服务关闭,触发客户端服务下线方法,客户端执行一系列下线逻辑后会向Eureka Server服务端发送服务下线请求,服务端处理下线的请求是在com.netflix.eureka.resources.InstanceResource#cancelLease方法中
/**处理此特定实例的租赁取消* Handles cancellation of leases for this particular instance.** @param isReplication* a header parameter containing information whether this is* replicated from other nodes.* @return response indicating whether the operation was a success or* failure.*/publicResponsecancelLease( PeerEurekaNode.HEADER_REPLICATION) StringisReplication) { (try { //调用 InstanceRegistry的 cancel 方法下线服务booleanisSuccess=registry.cancel(app.getName(), id, "true".equals(isReplication)); if (isSuccess) { logger.debug("Found (Cancel): {} - {}", app.getName(), id); returnResponse.ok().build(); } else { logger.info("Not Found (Cancel): {} - {}", app.getName(), id); returnResponse.status(Status.NOT_FOUND).build(); } } catch (Throwablee) { logger.error("Error (cancel): {} - {}", app.getName(), id, e); returnResponse.serverError().build(); } }
InstanceRegistry的 cancel 方法
publicclassInstanceRegistryextendsPeerAwareInstanceRegistryImplimplementsApplicationContextAware { publicbooleancancel(StringappName, StringserverId, booleanisReplication) { //抛出EurekaInstanceCanceledEvent取消事件handleCancelation(appName, serverId, isReplication); //调用PeerAwareInstanceRegistryImpl#cancelreturnsuper.cancel(appName, serverId, isReplication); } //抛出EurekaInstanceCanceledEvent取消事件privatevoidhandleCancelation(StringappName, Stringid, booleanisReplication) { log("cancel "+appName+", serverId "+id+", isReplication "+isReplication); publishEvent(newEurekaInstanceCanceledEvent(this, appName, id, isReplication)); }
这里比较简单,抛出EurekaInstanceCanceledEvent时间后调用super(PeerAwareInstanceRegistryImpl)的下线方法
/** (non-Javadoc)** @see com.netflix.eureka.registry.InstanceRegistry#cancel(java.lang.String,* java.lang.String, long, boolean)*/publicbooleancancel(finalStringappName, finalStringid, finalbooleanisReplication) { //调用super.cancel下线方法:com.netflix.eureka.registry.AbstractInstanceRegistry#cancelif (super.cancel(appName, id, isReplication)) { //复制到其他的Eureka节点replicateToPeers(Action.Cancel, appName, id, null, null, isReplication); synchronized (lock) { if (this.expectedNumberOfRenewsPerMin>0) { //如果每分钟的预期续订次数大于0//由于客户想取消该阈值,因此降低阈值(1次 30秒,2次 1分钟)// Since the client wants to cancel it, reduce the threshold (1 for 30 seconds, 2 for a minute)//预期续订次数 = 预期续订次数 -2 (2次 1分钟 , 1次 30)this.expectedNumberOfRenewsPerMin=this.expectedNumberOfRenewsPerMin-2; //续订次数阈值 = 预期续订次数 * 0.85this.numberOfRenewsPerMinThreshold= (int) (this.expectedNumberOfRenewsPerMin*serverConfig.getRenewalPercentThreshold()); } } returntrue; } returnfalse; }
这里继续调用super.cancel(AbstractInstanceRegistry#cancel),完成下线后重新计算了续约阈值,继续看AbstractInstanceRegistry#cancel方法
publicabstractclassAbstractInstanceRegistryimplementsInstanceRegistry { /**//取消实例注册//这通常是由客户端调用时关闭通知服务器从流量中除去实例* Cancels the registration of an instance.** <p>* This is normally invoked by a client when it shuts down informing the* server to remove the instance from traffic.* </p>** @param appName the application name of the application.* @param id the unique identifier of the instance.* @param isReplication true if this is a replication event from other nodes, false* otherwise.* @return true if the instance was removed from the {@link AbstractInstanceRegistry} successfully, false otherwise.*/publicbooleancancel(StringappName, Stringid, booleanisReplication) { returninternalCancel(appName, id, isReplication); } /**内部取消服务注册* {@link #cancel(String, String, boolean)} method is overridden by {@link PeerAwareInstanceRegistry}, so each* cancel request is replicated to the peers. This is however not desired for expires which would be counted* in the remote peers as valid cancellations, so self preservation mode would not kick-in.*/protectedbooleaninternalCancel(StringappName, Stringid, booleanisReplication) { try { //上锁read.lock(); //服务取消数增加CANCEL.increment(isReplication); //获取当前提出的服务Map<String, Lease<InstanceInfo>>gMap=registry.get(appName); Lease<InstanceInfo>leaseToCancel=null; if (gMap!=null) { //从服务注册的map中移除掉当前服务leaseToCancel=gMap.remove(id); } //添加到最近取消队列synchronized (recentCanceledQueue) { recentCanceledQueue.add(newPair<Long, String>(System.currentTimeMillis(), appName+"("+id+")")); } //overriddenInstanceStatusMap 服务状态map中移除当前服务InstanceStatusinstanceStatus=overriddenInstanceStatusMap.remove(id); if (instanceStatus!=null) { logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name()); } if (leaseToCancel==null) { //没找到服务CANCEL_NOT_FOUND.increment(isReplication); logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id); returnfalse; } else { //调用Lease.cancel方法leaseToCancel.cancel(); //获取服务实例信息InstanceInfoinstanceInfo=leaseToCancel.getHolder(); Stringvip=null; Stringsvip=null; if (instanceInfo!=null) { //实例状态修改为删除instanceInfo.setActionType(ActionType.DELETED); //添加最近修改队列recentlyChangedQueue.add(newRecentlyChangedItem(leaseToCancel)); //实例信息对象修改最后修改时间instanceInfo.setLastUpdatedTimestamp(); vip=instanceInfo.getVIPAddress(); svip=instanceInfo.getSecureVipAddress(); } //使缓存无效,调用responseCache.invalidate让服务在缓存中失效invalidateCache(appName, vip, svip); logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication); returntrue; } } finally { read.unlock(); } } }
AbstractInstanceRegistry的cancel方法中调用了internalCancel去取消服务的注册,其实在之间分析服务剔除的时候也嗲用这个方法,方法的逻辑大致是:
- 1.从registry中移除服务,
- 2.从overriddenInstanceStatusMap状态map中移除服务状态
- 3.添加到最近取消队列
- 4.调用Lease.cancel方法,将租约对象中的逐出时间修改为当前时间
- 5.修改服务的InstanceInfo的状态为DELETE
- 6.添加到最近修改队列
- 7.更新服务最后修改时间
- 8.使ReponseCache缓存无效
总结
我们发现,Eureka Server服务剔除和服务下线的流程后半截很相似,一个是通过定时任务来实现服务端过期的服务的剔除,一个是客户端发送请求到服务端剔除指定的服务,都是走服务下线逻辑