大规模集群下Hadoop NameNode如何承载每秒上千次的高并发访问

简介: 大规模集群下Hadoop NameNode如何承载每秒上千次的高并发访问

一、问题起源

高并发请求 NameNode 会遇到什么样的问题?

现在大家都明白每次请求 NameNode 修改一条元数据(比如说申请上传一个文件,那么就需要在内存目录树中加入一个文件),都要写一条 edits log,包括两个步骤:

  • 写入本地磁盘。
  • 通过网络传输给 JournalNodes 集群。

但是如果并发请求,看见会设计到线程安全的问题!!!

NameNode 在写 edits log 时,必须保证每条 edits log 都有一个全局顺序递增的 transactionId(简称为 txid),这样才可以标识出来一条一条的 edits log 的先后顺序。

那么如何保证每条 edits log 的 txid 都是递增的?

回答:当然是加锁吖!!!

那么问题来了!!!

如果每次都是在一个加锁的代码块里,生成 txid,然后写磁盘文件 edits log,网络请求写入 journalnodes 一条 edits log,会咋样?

640.png

试想下结果:NN(NameNode)用多线程接收多个客户端的请求,接着修改完内存的元数据后,排着队写edits log!!

写磁盘、网络传输非常耗费性能!如果每秒并发很高过来,处理性能是扛不住的。如同老太太坐轮椅。

二、双缓冲机制+分段锁的源码篇

2.1 简单介绍

首先是因为虽然元数据是首先写入内存的,但是你要知道元数据在内存中并不是安全的,所以 namenode 要将元数据刷新到磁盘里面;

但是 namenode 并不是直接写入磁盘的,而是采用双缓冲机制,先将数据写入到内存中,然后在从内存中写入到磁盘里面

模型大概长这个样子:

640.png

大概的步骤简单概述一下:

1、首先将元数据写入内存(bufCurrent)中

2、当满足一定条件的时候,我们会将两个内存进行交换

3、我们将 bufCurrent 里面的数据交换到 bufReady 里面,然后 bufCurrent 里面为空,继续接收写入内存的数据

bufReady 里面保存的是写入内存里面的数据,然后偷偷刷到磁盘,刷完后清空内存

4、然后周而复始,bufCurrent 永远去接收数据,然后会把数据传递给 bufReady,bufReady 在继续偷偷刷磁盘

那么通过这个这种双缓冲机制,就将原本写磁盘的操作变成了写内存操作。从而大大提高了效率。

2.2 源码-hadoop 是如何实现双缓冲+分段锁的

public void logSync() {
    long syncStart = 0;//用来记录事务的最大ID
    //获取当前线程ID
    long mytxid = myTransactionId.get().txid;
    //默认不同步(意思是第二块内存还没有数据,所以不需要刷磁盘,既不需要做同步操作)
    boolean sync = false;
    try {
      EditLogOutputStream logStream = null;//EditLog的输出流
      synchronized (this) {//TODO 分段锁开始
        try {
          printStatistics(false);//打印静态信息
          /**
           * 如果当前工作的线程> 最大事务ID && 是同步状态的,那么说明当前线程正处于刷盘状态。
           * 说明此事正处于刷盘状态,则等待1s
           * */
          while (mytxid > synctxid && isSyncRunning) {
            try {
              wait(1000);
            } catch (InterruptedException ie) {
            }
          }
          //
          // If this transaction was already flushed, then nothing to do
          //如果当前的线程ID < 当前处理事务的最大ID,则说明当前线程的任务已经被其他线程完成了,什么也不用做了
          if (mytxid <= synctxid) {
            numTransactionsBatchedInSync++;
            if (metrics != null) {
              // Metrics is non-null only when used inside name node
              metrics.incrTransactionsBatchedInSync();
            }
            return;
          }
          //此事开启同步状态,开始刷盘
          syncStart = txid;
          isSyncRunning = true;//开启同步
          sync = true;
          //TODO 双缓冲区,交换数据
          try {
            if (journalSet.isEmpty()) {
              throw new IOException("No journals available to flush");
            }
            //双缓冲区交换数据
            editLogStream.setReadyToFlush();
          } catch (IOException e) {
            final String msg =
                    "Could not sync enough journals to persistent storage " +
                            "due to " + e.getMessage() + ". " +
                            "Unsynced transactions: " + (txid - synctxid);
            LOG.fatal(msg, new Exception());
            synchronized(journalSetLock) {
              IOUtils.cleanup(LOG, journalSet);
            }
            terminate(1, msg);
          }
        } finally {
          // 防止RuntimeException阻止其他日志编辑写入
          doneWithAutoSyncScheduling();
        }
        //editLogStream may become null,
        //so store a local variable for flush.
        logStream = editLogStream;
      }//TODO 分段锁结束
      // do the sync
      long start = monotonicNow();
      try {
        if (logStream != null) {
          //TODO 将缓冲区数据刷到磁盘(没有上锁)
          logStream.flush();///tmp/hadoop-angel/dfs/name/current
        }
      } catch (IOException ex) {
        synchronized (this) {
          final String msg =
                  "Could not sync enough journals to persistent storage. "
                          + "Unsynced transactions: " + (txid - synctxid);
          LOG.fatal(msg, new Exception());
          synchronized(journalSetLock) {
            IOUtils.cleanup(LOG, journalSet);
          }
          //TODO
          terminate(1, msg);
        }
      }
      long elapsed = monotonicNow() - start;
      if (metrics != null) { // Metrics non-null only when used inside name node
        metrics.addSync(elapsed);
      }
    } finally {
      // 持久化完毕之后,第二块内存空了,然后我们在修改下标志位,告诉程序现在没有做刷磁盘操作了
      synchronized (this) {//TODO 分段锁开始
        if (sync) {
          synctxid = syncStart;
          isSyncRunning = false;
        }
        this.notifyAll();
      }
    }
  }

三、总结

第一把锁,主要是判断isAutoSyncScheduled以及对isAutoSyncScheduled的赋值,这个主要是说明bufCurrent和bufReady开始交换内存了。

第二把锁,主要是判断isSyncRunning以及对isSyncRunning和isAutoSyncScheduled的赋值。isSyncRunning是用来判断是否在写磁盘,isAutoSyncScheduled用来判断是否在交换内存,如果在交换,就不能写入bufCurrent,如果在写磁盘,那就不能写磁盘。

第三把锁,赋值isSyncRunning,说明磁盘写入完成。

这期间最耗时的操作并没有加锁,其他内存操作的加锁,但是速度比较快,采用在这种分段加锁的方式和双缓冲机制,大大提高了性能。

相关文章
|
29天前
|
分布式计算 Kubernetes Hadoop
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
126 6
|
30天前
|
分布式计算 Hadoop Shell
Hadoop-35 HBase 集群配置和启动 3节点云服务器 集群效果测试 Shell测试
Hadoop-35 HBase 集群配置和启动 3节点云服务器 集群效果测试 Shell测试
65 4
|
30天前
|
SQL 分布式计算 Hadoop
Hadoop-37 HBase集群 JavaAPI 操作3台云服务器 POM 实现增删改查调用操作 列族信息 扫描全表
Hadoop-37 HBase集群 JavaAPI 操作3台云服务器 POM 实现增删改查调用操作 列族信息 扫描全表
31 3
|
30天前
|
分布式计算 Hadoop Shell
Hadoop-36 HBase 3节点云服务器集群 HBase Shell 增删改查 全程多图详细 列族 row key value filter
Hadoop-36 HBase 3节点云服务器集群 HBase Shell 增删改查 全程多图详细 列族 row key value filter
55 3
|
30天前
|
分布式计算 Java Hadoop
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
Hadoop-30 ZooKeeper集群 JavaAPI 客户端 POM Java操作ZK 监听节点 监听数据变化 创建节点 删除节点
61 1
|
30天前
|
分布式计算 监控 Hadoop
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器
36 1
|
30天前
|
分布式计算 Hadoop Unix
Hadoop-28 ZooKeeper集群 ZNode简介概念和测试 数据结构与监听机制 持久性节点 持久顺序节点 事务ID Watcher机制
Hadoop-28 ZooKeeper集群 ZNode简介概念和测试 数据结构与监听机制 持久性节点 持久顺序节点 事务ID Watcher机制
39 1
|
30天前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
42 1
|
30天前
|
存储 SQL 消息中间件
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
43 0
|
30天前
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
77 0