一、HealthMonitor是什么
HealthMonitor是一个周期性工作的后台线程,它在一个循环中周期性的同HA服务进行心跳,负责跟踪NameNode服务的健康状况,并在健康状况变化时调用failover控制器的回调方法。
二、HealthMonitor是如何实现的
1、成员变量
HealthMonitor内部有如下主要成员变量:
1)后台工作线程:真正卖力干活的
private Daemon daemon;// 后台工作线程2)一些时间间隔、rpc超时参数等配置信息,见注释
// 重连、周期性检查、失连后的睡眠时间等时间间隔 private long connectRetryInterval; private long checkIntervalMillis; private long sleepAfterDisconnectMillis; // rpc超时时间 private int rpcTimeout;
// 配置信息
private final Configuration conf;
3)连接代理相关
/** The connected proxy */ // 连接代理 private HAServiceProtocol proxy; /** The HA service to monitor */ // HA服务到监视器的对象,通过它来获取连接代理proxy,代表一个客户端HA管理命令的目标。 private final HAServiceTarget targetToMonitor;
4)监视器s,状态发生变化时需要周知监视器s,故它们都实现了相同的接口
/** * Listeners for state changes * 状态变更监视器列表,里面的元素是实现Callback的对象 */ private List<Callback> callbacks = Collections.synchronizedList( new LinkedList<Callback>()); /** * 服务状态监视器列表,里面的元素是实现ServiceStateCallback的对象 */ private List<ServiceStateCallback> serviceStateCallbacks = Collections .synchronizedList(new LinkedList<ServiceStateCallback>());
5)状态
// 服务状态,默认为正在初始化 private State state = State.INITIALIZING;
/** * 最后一次服务状态,默认为正在初始化 */ private HAServiceStatus lastServiceState = new HAServiceStatus( HAServiceState.INITIALIZING);
2、状态&接口
2.1、状态
HealthMonitor定义了几个状态,如下:
@InterfaceAudience.Private public enum State { /** * The health monitor is still starting up. * 健康监视器正在启动 */ INITIALIZING, /** * The service is not responding to health check RPCs. * 健康监测RPCs服务没有响应 */ SERVICE_NOT_RESPONDING, /** * The service is connected and healthy. * 服务已连接且健康 */ SERVICE_HEALTHY, /** * The service is running but unhealthy. * 服务正在运行但是不健康 */ SERVICE_UNHEALTHY, /** * The health monitor itself failed unrecoverably and can * no longer provide accurate information. * 健康监视器自己发生不可恢复故障且不能再提供准确信息 */ HEALTH_MONITOR_FAILED; }分别对应了HealthMonitor监视服务健康状况过程中的一些状态,如下:
1)INITIALIZING:健康监视器正在启动;
2)SERVICE_NOT_RESPONDING:健康监测RPCs服务没有响应,有可能是连接、rpc通讯问题等等;
3)SERVICE_HEALTHY:服务已连接且健康,这是一种比较理想的结果;
4)SERVICE_UNHEALTHY:服务正在运行但是不健康,这个结果也不赖,至少知道了服务不健康,也属于正常的检测结果;
5)HEALTH_MONITOR_FAILED:健康监视器自己发生不可恢复故障且不能再提供准确信息,糟糕透顶,监视线程本身出故障了。
2.2、监视器回调接口
HealthMonitor定义了两个接口,Callback和ServiceStateCallback,分别是在状态发生变更和服务状态发生变更时的,需要周知所有监听器的回调方法,如下:
/** * Callback interface for state change events. * 状态变更事件的回调接口,需要实现enteredState()方法,即进入一种新的状态 * * This interface is called from a single thread which also performs * the health monitoring. If the callback processing takes a long time, * no further health checks will be made during this period, nor will * other registered callbacks be called. * 这个接口被一个执行健康监测的特定线程调用。如果回调处理需要很长时间,在此期间没有进一步的健康检查进行,其他注册回调函数也不会被调用。 * * If the callback itself throws an unchecked exception, no other * callbacks following it will be called, and the health monitor * will terminate, entering HEALTH_MONITOR_FAILED state. * * */ static interface Callback { void enteredState(State newState); } /** * Callback interface for service states. * 服务状态的回调接口 */ static interface ServiceStateCallback { void reportServiceStatus(HAServiceStatus status); }至于上述状态和接口如何调用,在下面会详解。
3、内部工作流程
HealthMonitor的工作流程,主要看内部工作线程的执行,如下:
/** * 监视器后台线程 * */ private class MonitorDaemon extends Daemon { /** * 私有内部类的私有构造函数 */ private MonitorDaemon() { super(); // 设置线程名称 setName("Health Monitor for " + targetToMonitor); // 设置异常处理:调用enterState()方法,并传入健康监控器自己失败的状态 setUncaughtExceptionHandler(new UncaughtExceptionHandler() { @Override public void uncaughtException(Thread t, Throwable e) { LOG.fatal("Health monitor failed", e); enterState(HealthMonitor.State.HEALTH_MONITOR_FAILED); } }); } /** * 线程核心run()方法 */ @Override public void run() { while (shouldRun) {// 在一个while循环内,循环的条件是shouldRun标志位 try { loopUntilConnected();// 循环直到连接上 doHealthChecks();// 执行健康检查 } catch (InterruptedException ie) { Preconditions.checkState(!shouldRun, "Interrupted but still supposed to run"); } } } }首先,监视后台线程的核心run()方法,在一个while循环内,循环的条件是shouldRun标志位:
1)循环直到连接上:不停的连接,直到连接上;
2)执行健康检查。
如此简单的两步,下面我们需要看下以下问题:
1)如何连接?
2)如何执行健康检查,得到检查结果后如何处理?
1、如何连接?
分析loopUntilConnected()方法如下:
/** * 循环直到连接上 */ private void loopUntilConnected() throws InterruptedException { // 尝试连接 tryConnect(); // tryConnect()成功的话,proxy应该不为null // proxy为null,说明还需要重试,在一个while循环内进行,循环的条件就是proxy为null while (proxy == null) { // 线程休眠一段时间 Thread.sleep(connectRetryInterval); // 再次尝试连接 tryConnect(); } // 最后的assert需要确保proxy不为null assert proxy != null; } private void tryConnect() { // 仅当代理proxy为空时才尝试连接 Preconditions.checkState(proxy == null); try { synchronized (this) {// 同步代码块内创建代理proxy proxy = createProxy(); } } catch (IOException e) { // 代理proxy创建发生异常时,proxy设置为null,方便下次重试,并且调用enterState()方法,确定状态为健康监测RPCs服务没有响应 LOG.warn("Could not connect to local service at " + targetToMonitor + ": " + e.getMessage()); proxy = null; enterState(State.SERVICE_NOT_RESPONDING); } } /** * Connect to the service to be monitored. Stubbed out for easier testing. */ protected HAServiceProtocol createProxy() throws IOException { // 通过目标到监视器的对象targetToMonitor获取代理 return targetToMonitor.getProxy(conf, rpcTimeout); }很简单,通过targetToMonitor获得代理,获取成功的话,直接返回,失败的话线程休眠一段时间,继续重试。
2、如何执行健康检查,得到检查结果后如何处理?
分析doHealthChecks()方法如下:
在一个while循环内,循环的依据同样是shouldRun标志位为true,且线程会周期性休眠:
1)HA服务状态status设置为null;
2)标志位healthy默认为false,即不健康;
3)获取服务代理的服务状态,并由代理执行健康检查;
4)1、如果健康检查能够正确返回,标志位healthy设置为true,表明服务健康;
2、抛出了异常:
2.1、如果是健康检查失败异常,调用enterState()方法,确定状态为服务正在运行但是不健康;
2.1、否则调用enterState()方法,确定状态为健康监测RPCs服务没有响应,停止并清空代理,线程休眠一段时间,避免异常情况下没有必要的重复尝试;
5)设置上次服务状态;
6)如果检测结果为健康,则调用enterState()方法,确定状态为服务已连接且健康;
7)工作线程周期性休眠。
可以看到,健康状况的检查是通过代理完成的,且,如果有检查结果(无论是正常还是不正常,这里的正常是指返回了明确的结果,不正常只通讯、连接或者线程本身出现问题),会通过enterState()方法通知监听器s,如下:
private synchronized void enterState(State newState) { // 如果状态变更,调用所有监听器的enteredState()方法 if (newState != state) { LOG.info("Entering state " + newState); state = newState; synchronized (callbacks) { for (Callback cb : callbacks) { cb.enteredState(newState); } } } }而整体HA服务状态也会通过setLastServiceStatus()方法,设置lastServiceState成员变量,并通知服务状态监听器,如下:
// 设置上次服务状态,同步所有ServiceStateCallback服务状态监听器的服务状态 private synchronized void setLastServiceStatus(HAServiceStatus status) { this.lastServiceState = status; for (ServiceStateCallback cb : serviceStateCallbacks) { cb.reportServiceStatus(lastServiceState); } }这也就是上述状态和接口存在的意义。而监听器的注册和注销则是通过如下实现的:
/** * 以下四个为增减状态、服务状态监视器方法 */ public void addCallback(Callback cb) { this.callbacks.add(cb); } public void removeCallback(Callback cb) { callbacks.remove(cb); } public synchronized void addServiceStateCallback(ServiceStateCallback cb) { this.serviceStateCallbacks.add(cb); } public synchronized void removeServiceStateCallback(ServiceStateCallback cb) { serviceStateCallbacks.remove(cb); }
三、HealthMonitor的初始化及启动
HealthMonitor的构造及启动是在ZKFailoverController的initHM()方法内完成的,它是整个Hadoop HDFS HA中的一个控制组件,初始化及启动如下:
// ------------------------------------------ // Begin actual guts of failover controller // ------------------------------------------ private void initHM() { healthMonitor = new HealthMonitor(conf, localTarget); healthMonitor.addCallback(new HealthCallbacks()); healthMonitor.addServiceStateCallback(new ServiceStateCallBacks()); healthMonitor.start(); }构造一个HealthMonitor实例,传入的代理为localTarget,并注册了一个state change的监听器HealthCallbacks和service state change的监听器ServiceStateCallBacks,然后调用start()方法启动。
至于HealthMonitor如何与其它组件一起工作的,如何获得的代理,代理怎么执行监控检查,监听器如何处理健康检查的结果,请关注后续文章。