一、问题起源
高并发请求 NameNode 会遇到什么样的问题?
现在大家都明白每次请求 NameNode 修改一条元数据(比如说申请上传一个文件,那么就需要在内存目录树中加入一个文件),都要写一条 edits log,包括两个步骤:
- 写入本地磁盘。
- 通过网络传输给 JournalNodes 集群。
但是如果并发请求,看见会设计到线程安全的问题!!!
NameNode 在写 edits log 时,必须保证每条 edits log 都有一个全局顺序递增的 transactionId(简称为 txid),这样才可以标识出来一条一条的 edits log 的先后顺序。
那么如何保证每条 edits log 的 txid 都是递增的?
回答:当然是加锁吖!!!
那么问题来了!!!
如果每次都是在一个加锁的代码块里,生成 txid,然后写磁盘文件 edits log,网络请求写入 journalnodes 一条 edits log,会咋样?
试想下结果:NN(NameNode)用多线程接收多个客户端的请求,接着修改完内存的元数据后,排着队写edits log!!!
写磁盘、网络传输非常耗费性能!如果每秒并发很高过来,处理性能是扛不住的。如同老太太坐轮椅。
二、双缓冲机制+分段锁的源码篇
2.1 简单介绍
首先是因为虽然元数据是首先写入内存的,但是你要知道元数据在内存中并不是安全的,所以 namenode 要将元数据刷新到磁盘里面;
但是 namenode 并不是直接写入磁盘的,而是采用双缓冲机制,先将数据写入到内存中,然后在从内存中写入到磁盘里面
模型大概长这个样子:
大概的步骤简单概述一下:
1、首先将元数据写入内存(bufCurrent)中
2、当满足一定条件的时候,我们会将两个内存进行交换
3、我们将 bufCurrent 里面的数据交换到 bufReady 里面,然后 bufCurrent 里面为空,继续接收写入内存的数据
bufReady 里面保存的是写入内存里面的数据,然后偷偷刷到磁盘,刷完后清空内存
4、然后周而复始,bufCurrent 永远去接收数据,然后会把数据传递给 bufReady,bufReady 在继续偷偷刷磁盘
那么通过这个这种双缓冲机制,就将原本写磁盘的操作变成了写内存操作。从而大大提高了效率。
2.2 源码-hadoop 是如何实现双缓冲+分段锁的
public void logSync() { long syncStart = 0;//用来记录事务的最大ID //获取当前线程ID long mytxid = myTransactionId.get().txid; //默认不同步(意思是第二块内存还没有数据,所以不需要刷磁盘,既不需要做同步操作) boolean sync = false; try { EditLogOutputStream logStream = null;//EditLog的输出流 synchronized (this) {//TODO 分段锁开始 try { printStatistics(false);//打印静态信息 /** * 如果当前工作的线程> 最大事务ID && 是同步状态的,那么说明当前线程正处于刷盘状态。 * 说明此事正处于刷盘状态,则等待1s * */ while (mytxid > synctxid && isSyncRunning) { try { wait(1000); } catch (InterruptedException ie) { } } // // If this transaction was already flushed, then nothing to do //如果当前的线程ID < 当前处理事务的最大ID,则说明当前线程的任务已经被其他线程完成了,什么也不用做了 if (mytxid <= synctxid) { numTransactionsBatchedInSync++; if (metrics != null) { // Metrics is non-null only when used inside name node metrics.incrTransactionsBatchedInSync(); } return; } //此事开启同步状态,开始刷盘 syncStart = txid; isSyncRunning = true;//开启同步 sync = true; //TODO 双缓冲区,交换数据 try { if (journalSet.isEmpty()) { throw new IOException("No journals available to flush"); } //双缓冲区交换数据 editLogStream.setReadyToFlush(); } catch (IOException e) { final String msg = "Could not sync enough journals to persistent storage " + "due to " + e.getMessage() + ". " + "Unsynced transactions: " + (txid - synctxid); LOG.fatal(msg, new Exception()); synchronized(journalSetLock) { IOUtils.cleanup(LOG, journalSet); } terminate(1, msg); } } finally { // 防止RuntimeException阻止其他日志编辑写入 doneWithAutoSyncScheduling(); } //editLogStream may become null, //so store a local variable for flush. logStream = editLogStream; }//TODO 分段锁结束 // do the sync long start = monotonicNow(); try { if (logStream != null) { //TODO 将缓冲区数据刷到磁盘(没有上锁) logStream.flush();///tmp/hadoop-angel/dfs/name/current } } catch (IOException ex) { synchronized (this) { final String msg = "Could not sync enough journals to persistent storage. " + "Unsynced transactions: " + (txid - synctxid); LOG.fatal(msg, new Exception()); synchronized(journalSetLock) { IOUtils.cleanup(LOG, journalSet); } //TODO terminate(1, msg); } } long elapsed = monotonicNow() - start; if (metrics != null) { // Metrics non-null only when used inside name node metrics.addSync(elapsed); } } finally { // 持久化完毕之后,第二块内存空了,然后我们在修改下标志位,告诉程序现在没有做刷磁盘操作了 synchronized (this) {//TODO 分段锁开始 if (sync) { synctxid = syncStart; isSyncRunning = false; } this.notifyAll(); } } }
三、总结
第一把锁,主要是判断isAutoSyncScheduled以及对isAutoSyncScheduled的赋值,这个主要是说明bufCurrent和bufReady开始交换内存了。
第二把锁,主要是判断isSyncRunning以及对isSyncRunning和isAutoSyncScheduled的赋值。isSyncRunning是用来判断是否在写磁盘,isAutoSyncScheduled用来判断是否在交换内存,如果在交换,就不能写入bufCurrent,如果在写磁盘,那就不能写磁盘。
第三把锁,赋值isSyncRunning,说明磁盘写入完成。
这期间最耗时的操作并没有加锁,其他内存操作的加锁,但是速度比较快,采用在这种分段加锁的方式和双缓冲机制,大大提高了性能。