Hadoop源码分析之NameNode元数据的存储

在对NameNode节点进行格式化时,调用了FSImage的saveFSImage()方法和FSEditLog.createEditLogFile()存储当前的元数据,启动NameNode节点时,又要从镜像和编辑日志中读取元数据。所以先分析FSImage是如何存储元数据到镜像文件和如何加载元数据到内存的。

存储元数据到镜像文件

在NameNode运行时会将内存中的元数据信息存储到所指定的文件,即${dfs.name.dir}/current目录下的fsimage文件,此外还会将另外一部分对NameNode更改的日志信息存储到${dfs.name.dir}/current目录下的edits文件中。fsimage文件和edits文件可以确定NameNode节点当前的状态,这样在NameNode节点由于突发原因崩溃时,可以根据这两个文件中的内容恢复到节点崩溃前的状态,所以对NameNode节点中内存元数据的每次修改都必须保存下来。但是如果每次都保存到fsimage文件中,这样效率就特别低效,所以引入编辑日志文件edits,保存对对元数据的修改信息,也就是fsimage文件保存NameNode节点中某一时刻内存中的元数据(即目录树),edits保存这一时刻之后的对元数据的更改信息。

镜像的保存

NameNode节点通过方法FSImage.saveFSImage()方法保存内存元数据到fsimage文件中,方法的代码如下:

/**
   * 将当前时刻的命名空间镜像保存到newFile命名的文件中
   */
  void saveFSImage(File newFile) throws IOException {
    FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem();
    FSDirectory fsDir = fsNamesys.dir;
    long startTime = FSNamesystem.now();
    // Write out data,创建输出流
   DataOutputStream out = new DataOutputStream(
                                                new BufferedOutputStream(
                                                                         new FileOutputStream(newFile)));
    try {
    	//保存命名空间镜像的文件头
      out.writeInt(FSConstants.LAYOUT_VERSION);
      out.writeInt(namespaceID);
      out.writeLong(fsDir.rootDir.numItemsInTree());//目录树包含的节点数
      out.writeLong(fsNamesys.getGenerationStamp());//当前数据块版本号
      byte[] byteStore = new byte[4*FSConstants.MAX_PATH_LENGTH];
      ByteBuffer strbuf = ByteBuffer.wrap(byteStore);
      // save the root
      saveINode2Image(strbuf, fsDir.rootDir, out);//单独保存根节点,由于根节点是NameNode管理的特殊INode,它的成员属性INode.name长度为0,所以必须做特殊处理
      // save the rest of the nodes
      saveImage(strbuf, 0, fsDir.rootDir, out);//保存目录树中的其他节点
      fsNamesys.saveFilesUnderConstruction(out);//保存构建中的节点
      fsNamesys.saveSecretManagerState(out);//安全信息
      strbuf = null; } finally { out.close(); } }


saveFSImage方法只有一个参数,这个参数代表内存元数据将要保存的位置。在saveFSImage方法中,先得到已经创建的FSNamesystem对象和FSDirectory(在调用FSImage.saveFSImage()方法之前,会创建FSNamesystem对象和FSDirectory对象),然后创建到文件参数newFile的一个输出流。再向文件中写入镜像文件头信息,如表示HDFS存储系统信息结构的版本号的FSConstants.LAYOUT_VERSION,存储系统标识namespaceID,目录树的节点数,文件系统的时间戳信息等。

在saveFSImage方法中,使用了FSConstants.MAX_PATH_LENGTH常量,它表示HDFS文件/目录的绝对路径所占用的最大空间,为8000字节,但是定义的缓存大小则是4*8000即31.25KB,为什么要使用8000*4呢?个人觉得这是为了兼容三种Unicode编码UTF-8/UTF-16/UTF-32,最大编码占用4个字节(参考http://hi.baidu.com/tinggu_android/item/77c2930ecb7811ca90571855),与次相关的另一个常量是FSConstants.MAX_PATH_DEPTH,它表示目录深度最多是1000层,这两个变量为何定义成这两个值,在https://issues.apache.org/jira/browse/HADOOP-438这里有讨论。定义完缓冲区的大小之后,再使用ButeBuffer类将其包装成一个ByteBuffer对象,这样对这个ByteBuffer对象的操作,可以在字节数组中表现出来,对字节数组的操作也可以在ByteBuffer对象中表现出来。

下面,调用saveINode2Image()方法来保存元数据所表示的目录树的根节点,因为根节点是NameNode管理的特殊INode,它的成员属性INode.name长度为0,所以必须做特殊处理,该刚发也会在saveImage()方法中调用,所以先来分析saveImage方法,代码如下:

private static void saveImage(ByteBuffer parentPrefix,
                                int prefixLength,
                                INodeDirectory current,
                                DataOutputStream out) throws IOException {
    int newPrefixLength = prefixLength;
    if (current.getChildrenRaw() == null)
      return;//空目录
    for(INode child : current.getChildren()) {//输出当前节点的所有子节点
      // print all children first
      parentPrefix.position(prefixLength);
      parentPrefix.put(PATH_SEPARATOR).put(child.getLocalNameBytes());
      saveINode2Image(parentPrefix, child, out);
    }
    for(INode child : current.getChildren()) {//子节点是目录,输出该目录下面的节点
      if(!child.isDirectory())
        continue;//文件,忽略
      parentPrefix.position(prefixLength);
      //准别参数
      parentPrefix.put(PATH_SEPARATOR).put(child.getLocalNameBytes());
      newPrefixLength = parentPrefix.position();
      //递归调用
      saveImage(parentPrefix, newPrefixLength, (INodeDirectory)child, out);
    }
    parentPrefix.position(prefixLength);
  }

saveImage方法有四个参数,第一个参数parentPrefix是一个字节缓冲区,存储着当前目录要保存的目录的父目录路径,第二个参数是父路径的长度,即路径在字节缓冲区占内存空间的大小,第三个参数是当前的目录,第四个参数是镜像数据的输出流。如果当前的目录是/current目录,那么parentPrefix中的内容就是目录/current的字节表示。在saveImage方法中首先记录下当前的parentPrefix路径长度,因为saveImage方法是递归调用的,整个过程中都需要使用同一个缓冲区,所以每次保存一个目录时都需要保存当前的prefixLength变量。再判断当前目录下是否存在文件或目录,不存在就直接返回,如果存在,遍历这个目录下面的所有子节点,分别调用saveINode2Image方法进行输出。如何输出子节点呢?首先记录下子节点的路径,再调用saveINode2Image。ByteBuffer.position()方法用于设置ByteBuffer类的position变量,表示从下次对缓冲区的读写从position开始进行,然后向缓冲中加入路径分隔符和当前要输出的节点名(参考http://ifeve.com/buffers/)。再调用saveINode2Image方法输出节点。saveImage方法输出完所有的节点之后,会再次遍历当前目录下的节点,对目录子节点进行递归的输出,同样是要先设置缓冲区的position变量,再加入子目录的目录名,然后递归调用。方法的最后是恢复表示路径的字节缓冲区,方便与current目录在同一个目录下的目录进行输出。

FSImage.saveINode2Image()方法的功能就是保存一个节点数据到镜像文件中,方法的代码如下:

private static void saveINode2Image(ByteBuffer name,
                                      INode node,
                                      DataOutputStream out) throws IOException {
    int nameLen = name.position();
    out.writeShort(nameLen);
    out.write(name.array(), name.arrayOffset(), nameLen);
    if (!node.isDirectory()) {  // write file inode,输出文件的INode
      INodeFile fileINode = (INodeFile)node;
      out.writeShort(fileINode.getReplication());
      out.writeLong(fileINode.getModificationTime());
      out.writeLong(fileINode.getAccessTime());
      out.writeLong(fileINode.getPreferredBlockSize());
      Block[] blocks = fileINode.getBlocks();
      out.writeInt(blocks.length);
      for (Block blk : blocks)
        blk.write(out);
      FILE_PERM.fromShort(fileINode.getFsPermissionShort());
      PermissionStatus.write(out, fileINode.getUserName(),
                             fileINode.getGroupName(),
                             FILE_PERM);
    } else {   // write directory inode,输出目录的INode
      out.writeShort(0);  // replication
      out.writeLong(node.getModificationTime());
      out.writeLong(0);   // access time
      out.writeLong(0);   // preferred block size
      out.writeInt(-1);    // # of blocks,数据块数,-1代表这是目录
      out.writeLong(node.getNsQuota());
      out.writeLong(node.getDsQuota());
      FILE_PERM.fromShort(node.getFsPermissionShort());
      PermissionStatus.write(out, node.getUserName(),
                             node.getGroupName(),
                             FILE_PERM);
    }
  }

saveINode2Image方法比较简单,先保存文件/目录的绝对路径长度和绝对路径,再根据文件和目录分别进行保存。如果是文件,则输出文件副本数量,最后修改时间,访问时间,数据块大小,数据块数量,各个数据块信息和文件权限信息,如果是目录对应于文件的字段中的副本数,访问时间,数据块大小这几个字段设置为0,数据块数量设置为-1,读镜像文件信息时正是以这个信息区分是文件还是目录,然后输出目录的节点配额和空间配额,最后是目录的权限信息。

输出完内存中的元数据之后,再返回到saveFSImage()方法中,再调用FSNamesystem.saveFilesUnderConstruction()方法保存当前系统中以写方式打开的文件,即当前系统中存在的INodeFileUnderConstruction类型的对象。最后会保存一些安全信息。这部分的代码暂时还为研究,以后再来分析。

编辑日志的保存

编辑日志用于保存对某一时刻的镜像文件之后的元数据的修改信息,镜像文件并不能完全反映NameNode节点中真实的数据,因为在每个时刻将NameNode节点的内存修改信息保追加到同一个镜像文件中是非常低效的,所以HDFS的开发者引入了编辑日志,来保存对NameNode节点内存元数据的修改。与编辑日志相关的代码在FSEditLog类,EditLogOutputStream类中。

编辑日志文件在NameNode节点格式化的时候会创建,在文章Hadoop源码分析之NameNode的格式化 中分析NameNode格式化的过程中,分析到saveCurrent方法,在saveCurrent方法中,会针对保存编辑日志类型的目录调用FSEditLog.createEditLogFile()方法创建一个编辑日志文件,该方法的代码如下:

public synchronized void createEditLogFile(File name) throws IOException {
    EditLogOutputStream eStream = new EditLogFileOutputStream(name);
    eStream.create();
    eStream.close();
  }

类EditLogOutputStream抽象了编辑日志输出流,它是一个抽象类,定义了与编辑日志输出相关的一系列方法,它有一个子类EditLogFileOutputStream,这个类是FSEditLog类中的一个静态内部类。在createEditLogFile方法中,先创建一个EditLogFileOutputStream类型的对象,然后调用其create方法,最后关闭这个输出流。这样就创建了一个编辑日志文件,那么这个过程到底做了些什么呢?

先来看看EditLogFileOutputStream类的定义,代码如下:

static class EditLogFileOutputStream extends EditLogOutputStream {
    /** Preallocation buffer, padded with OP_INVALID */
    private static final ByteBuffer PREALLOCATION_BUFFER
        = ByteBuffer.allocateDirect(MIN_PREALLOCATION_LENGTH);
    static {
      PREALLOCATION_BUFFER.position(0).limit(MIN_PREALLOCATION_LENGTH);
      for(int i = 0; i < PREALLOCATION_BUFFER.capacity(); i++) {
        PREALLOCATION_BUFFER.put(OP_INVALID);
      }
    }
    /**输出文件对应的文件通道**/
    private File file;
    /**日志写入缓冲区**/
    private FileOutputStream fp;    // file stream for storing edit logs
    /**日志输出通道**/
    private FileChannel fc;         // channel of the file stream for sync
    /**日志写入缓冲区**/
    private DataOutputBuffer bufCurrent;  // current buffer for writing
    /**写文件缓冲区**/
    private DataOutputBuffer bufReady;    // buffer ready for flushing

    EditLogFileOutputStream(File name) throws IOException {
      super();
      file = name;
      bufCurrent = new DataOutputBuffer(sizeFlushBuffer);
      bufReady = new DataOutputBuffer(sizeFlushBuffer);
      RandomAccessFile rp = new RandomAccessFile(name, "rw");
      fp = new FileOutputStream(rp.getFD()); // open for append
      fc = rp.getChannel();
      fc.position(fc.size());
    }

EditLogFileOutputStream类预先定义了一个缓冲区,这个缓冲区中全部存储OP_INVALID,OP_INVALID是HDFS定义的常量,是对NameNode节点内存的操作码,表示无效数据或者文件结束,定义在FSEditLog类中,其余的操作码的代码及注释在本文的最后部分全部贴出来。此外EditLogFileOutputStream类定义了五个成员变量,其中file对应者要将编辑日志写入到的文件,fp是一个文件输出流,fc是文件输出通道,bufCurrent和bufReady是两个相同大小的缓冲区。EditLogFileOutputStream类有一个构造方法,在构造方法中对几个成员变量进行了初始化。fp变量定义为对日志输出文件的随机读写对象,fc变量者是这个随机读写对象的管道,并且管道的positon设置为了当前管道的大小,这样就可以向文件中追加数据,而不覆盖文件中的已有数据。在EditLogFileOutputStream中定义的两个缓冲区大小一样,EditLogFileOutputStream类的对象使用这两个缓冲区时,先将日志数据写到bufCurrent中,当bufCurrent中的内容需要写往文件时,则会交换量几个缓冲区,即将bufCurrent指向bufReady所代表的缓冲区,bufReady指向bufCurrent所代表的缓冲区。

再回到FSEditLog.createEditLogFile()方法,创建EditLogFileOutputStream对象后,再调用其create()方法,只有调用了create方法之后才能向这个日志输出文件写数据。create方法的代码如下:

/**
     * 通过日志输出文件对应的文件通道fc,将文件清空,并向缓冲区bufCurrent写入版本号
     */
    @Override
    void create() throws IOException {
      fc.truncate(0);
      fc.position(0);
      bufCurrent.writeInt(FSConstants.LAYOUT_VERSION);//保存日志格式的版本号
      setReadyToFlush();//交换bufCurrent和bufReady
      flush();//将bufReady缓冲区中的数据写出到文件
    }

create方法先将日志输出文件清空,再将当前的position设置为文件的开始处,然后向文件中写入HDFS系统的版本号,再调用setReadyToflush()方法交换bufCurrent和bufReady缓冲区,最后输出。

setReadyToFlush()方法主要是交换bufCurrent缓冲区和bufReady缓冲区,为写入数据到日志文件中做准备,方法的代码如下:

 void setReadyToFlush() throws IOException {
      assert bufReady.size() == 0 : "previous data is not flushed yet";
      DataOutputBuffer tmp = bufReady;
      bufReady = bufCurrent;
      bufCurrent = tmp;
    }

那么,为什么要交换这两个缓冲区呢?上文提到,在EditLogFileOutputStream类中定义了两个缓冲区bufCurrent和bufReady,其中bufCurrent称为写入日志缓冲区,bufReady称为写文件缓冲区,保存日志时,先将日志数据写到bufCurrent中,在将数据写入前,交换bufCurrent和bufReady,让bufReady指向bufCurrent指向的缓冲区,bufCurrent指向bufReady原来指向的缓冲区中,交换两个缓冲区前,bufReady缓冲区必须是空的,这样,交换完成之后,bufReady缓冲区的数据就是当前要写入到文件的日志数据,bufCurrent缓冲区就是空的,其他的日志数据就又可以写入bufCurrent中,而bufReady中的数据就直接写入到文件中。

再来分析flush方法,这个方法的作用就是将数据刷出到文件中,方法的代码如下:

public void flush() throws IOException {
    numSync++;
    long start = FSNamesystem.now();//记录统计信息
    flushAndSync();
    long end = FSNamesystem.now();
    totalTimeSync += (end - start);
  }

flush方法在EditLogOutputStream类中实现,这个方法主要是通过调用flushAndSync()方法将数据写入到文件中。flushAndSync方法的代码如下:

/ ** 输出内存缓冲区中的日志数据并持久化到磁盘
     */
    @Override
    protected void flushAndSync() throws IOException {
      preallocate();            // preallocate file if necessary
      bufReady.writeTo(fp);     // write data to file
      bufReady.reset();         // erase all data in the buffer
      fc.force(false);          // metadata updates not needed because of preallocation
    }

在flushAndSync()方法中,先调用preallocate()方法给文件预先分配好一个文件块,保证写日志的操作空间足够。然后就将bufReady缓冲区中的数据通过通道写出到文件中,再将bufReady中的内容清空,方法最后调用FileChannel.force()方法,保证将对文件任何的更改写入到文件中,传入参数false则值只将对文件内容的更改写入文件中,如果参数为true,则文件的元数据信息(如权限)的更改也会写入到文件中,这将会增加一次I/O操作,如果不调用FileChannel.force()方法,那么对文件的修改可能只是写到的操作系统内核的缓冲区中,而没有实际写入到磁盘设备中,调用force方法之后,就可以保证数据写入到的磁盘中。

preallocate()方法很有意思,它会为文件预先分配一定的数据块,方法的代码如下:

 private void preallocate() throws IOException {
      long size = fc.size();
      int bufSize = bufReady.getLength();
      long need = bufSize - (size - fc.position());
      if (need <= 0) {
        return;
      }
      long oldSize = size;
      long total = 0;
      long fillCapacity = PREALLOCATION_BUFFER.capacity();
      while (need > 0) {
        PREALLOCATION_BUFFER.position(0);
        do {
          size += fc.write(PREALLOCATION_BUFFER, size);
        } while (PREALLOCATION_BUFFER.remaining() > 0);
        need -= fillCapacity;
        total += fillCapacity;
      }
      if(FSNamesystem.LOG.isDebugEnabled()) {
        FSNamesystem.LOG.debug("Preallocated " + total + " bytes at the end of " +
            "the edit log (offset " + oldSize + ")");
      }
    }

这个方法使用到了EditLogFileOutputStream的缓冲区PREALLOCATION_BUFFER,这个缓冲区是一个静态final类型,并且在类的static块中初始化,大小为1024 * 1024字节,并且全部存储的是OP_INVALID(表示无效或文件结束)。在preallocate方法中先计算bufReady缓冲区的大小,然后再计算是否需要给fc对应的文件预先分配一段数据块,用于存储数据,要分配的块大小为need,如果需要就不断的向文件中写入PREALLOCATION_BUFFER缓冲区中的数据,直到need<=0,即已经给文件分配了足够的空间。既然预先给文件分配了这么一个块,用于向文件中写入日志信息,如果预先分配的这块数据块没有用到,岂不是浪费了存储空间?因为日志数据可能会不断的增长,并且HDFS还使用了SecondaryNameNode节点来合并镜像文件和日志,所以这样做是合理的。由preallocate()方法可以看出,日志文件的结尾处是至少有一个OP_INVALID标记,来日志数据结束。

分析完了如何将编辑日志写入文件,那么在修改了NameNode节点的元数据之后,是如何创建EditLogFileOutputStream对象,进而调用响应的方法将日志写入文件呢?HDFS使用了FSEditLog类。在这个类中,定义了大量以log开头的方法,如logOpenFile方法表示打开一个文件的日志记录,logMkDir表示创建一个目点的日志等,每种log方法都需要记录相应的信息,如路径信息,最后访问时间,最后修改时间等。在这些以log开头的方法中有两个比较特殊,分别是logEdit和logSync方法,对NameNode节点元数据进行修改的log方法都需要调用logEdit方法将日志信息写入到bufCurrent缓冲区,日志文件的写入是通过调用logSync方法实现的。先来看看方法logEdit,方法代码如下:

synchronized void logEdit(byte op, Writable ... writables) {
    if (getNumEditStreams() < 1) {
      throw new AssertionError("No edit streams to log to");
    }
    long start = FSNamesystem.now();
    for (int idx = 0; idx < editStreams.size(); idx++) {
      EditLogOutputStream eStream = editStreams.get(idx);
      try {
        eStream.write(op, writables);
      } catch (IOException ioe) {
        removeEditsAndStorageDir(idx);
        idx--; 
      }
    }
    exitIfNoStreams();
    // get a new transactionId
    txid++;//获取一个新的irzhi交易标识并记录

    //
    // record the transactionId when new data was written to the edits log
    //
    TransactionId id = myTransactionId.get();
    id.txid = txid;

    // update statistics,记录一些统计信息
    long end = FSNamesystem.now();
    numTransactions++;
    totalTimeTransactions += (end-start);
    if (metrics != null) // Metrics is non-null only when used inside name node
      metrics.addTransaction(end-start);
  }

logEdit方法中使用的集合editStreams是编辑日志输出文件的集合,集合中每个元素对应一个编辑日志文件的输出流。在FSEditLog.open()方法中对editStreams变量进行初始化,然后遍历所有的编辑日志保存目录,再针对每个目录创建一个编辑日志文件输出流。

logEdit方法先遍历输出流集合,将要日志数据保存在bufCurrent中,再给当前线程分配一个事务ID(在《Hadoop技术内幕:深入理解Hadoop Common和HDFS架构设计与实现原理》称TransactionId为交易标识,但是个人觉得这里仿照关系数据块中译为事务更合适些)。并记录统计信息。

每个线程都有个事务ID,这个事务ID标识用于同步线程间对FSEditLog对象的调用,具体这个事务ID如何起作用,在logSync方法中分析。当调用logEdit方法后,数据写入到了EditLogFileOutputStream对象的bufCurrent缓冲区中,那么在何时将数据刷出到文件中呢?下面来分析logSync方法,代码如下:

/**
  * Sync all modifications done by this thread.<br/>
  * 同步日志修改
  */
  public void logSync() throws IOException {
    ArrayList<EditLogOutputStream> errorStreams = null;
    long syncStart = 0;

    // Fetch the transactionId of this thread. 
    long mytxid = myTransactionId.get().txid;

    ArrayList<EditLogOutputStream> streams = new ArrayList<EditLogOutputStream>();
    boolean sync = false;
    try {
      synchronized (this) {
        printStatistics(false);

        // if somebody is already syncing, then wait,有其他线程在执行日志同步操作
        while (mytxid > synctxid && isSyncRunning) {
          try {
            wait(1000);
          } catch (InterruptedException ie) { 
          }
        }

        //
        // If this transaction was already flushed, then nothing to do,日志已经被其他线程同步
        //
        if (mytxid <= synctxid) {
          numTransactionsBatchedInSync++;
          if (metrics != null) // Metrics is non-null only when used inside name node
            metrics.incrTransactionsBatchedInSync();
          return;
        }

        // now, this thread will do the sync,同步由当前线程执行,记录相关信息
        syncStart = txid;
        isSyncRunning = true;
        sync = true;

        // swap buffers,交换缓冲区
        exitIfNoStreams();
        for(EditLogOutputStream eStream : editStreams) {
          try {
            eStream.setReadyToFlush();
            streams.add(eStream);
          } catch (IOException ie) {
            LOG.error("Unable to get ready to flush.", ie);
            //
            // remember the streams that encountered an error.
            //
            if (errorStreams == null) {
              errorStreams = new ArrayList<EditLogOutputStream>(1);
            }
            errorStreams.add(eStream);
          }
        }
      }

      // do the sync,执行日志同步
      long start = FSNamesystem.now();
      for (EditLogOutputStream eStream : streams) {
        try {
          eStream.flush();
        } catch (IOException ie) {
          LOG.error("Unable to sync edit log.", ie);
          //
          // remember the streams that encountered an error.
          //
          if (errorStreams == null) {
            errorStreams = new ArrayList<EditLogOutputStream>(1);
          }
          errorStreams.add(eStream);
        }
      }
      long elapsed = FSNamesystem.now() - start;
      removeEditsStreamsAndStorageDirs(errorStreams);
      exitIfNoStreams();

      if (metrics != null) // Metrics is non-null only when used inside name node
        metrics.addSync(elapsed);

    } finally {
      synchronized (this) {
        if(sync) {
          synctxid = syncStart;
          isSyncRunning = false;
        }
        this.notifyAll();
      }
    }
  }

在logSync方法中定义了一个集合streams用于存放需要向文件中刷出日志的日志流输出对象,还定义了一个用于存放出错的输出流的集合对象errorStreams,当调用日志输出流(EditLogFileOutputStream对象)的setReadyToFlush方法出现错误时,会将这个输出流记录在这个错误输出流集合中,在logSync方法的最后进行处理。

logSync方法可以分为四个部分,第一个部分是等待其他同步线程执行完毕,直到当前线程可以执行;第二个部分是,判断当前的线程是否还需要执行,如果当前线程的事务id小于FSEditLog对象记录的事务id,那么当前线程就没有执行的必要了,第三个部分是遍历日志输出流集合,将正常的日志输出流加入到集合streams中,第四个部分是将streams集合中所有的日志输出流中的日志数据刷出到对应文件中。下面分别对这四个部分进行分析。

第一个部分是等待其他同步线程执行完毕,为什么要等待呢?在NameNode节点运行过程中,可能有多个线程在执行,这多个线程中可能都持有FSEditLog对象的引用,那么就需要对这些线程进行同步,在一个线程调用FSEditLog的同步方法的过程中,其他线程不能调用这些同步方法。logSync方法使用synctxid变量记录当前正在使用logSync方法同步块的线程事务ID,使用isSyncRunning布尔变量记录logSync方法的同步块正在被使用,这样如果本线程的事务ID(mytxid)大于synctxid,且isSyncRunning为true,那么就这个线程就进入等待,将计算资源让给其他线程执行。

第二个部分是判断当前线程是否还需要执行,如果当前线程的事务id小于FSEditLog对象记录的事务id,那么当前线程就没有执行的必要了。为什么这么说?每个线程将日志数据刷出到文件都要执行logEdit方法和logSync方法,假设有一个线程执行了logEdit方法后由于线程切换等原因,计算资源被其他线程占用,而此时线程2连续执行了logEdit方法和logSync方法,线程1执行了完了logEdit方法,说明线程1的日志数据已经存放在了EditLogFileOutputStream对象的bufCurrent缓冲区中,那么此时线程2继续执行logSync方法,假设线程1的事务ID是N,线程2的事务ID是2,将isSyncRunning变量设置为true,其他变量都不能执行logSync方法的第一个synchronized块,线程2将FSEditLog对象的集合editStreams中的数据全部刷出到文件,并且在方法的最后给synctxid变量赋值为线程2的事务ID,那么此时FSEditLog对象的变量synctxid就记录了线程2的事务ID,线程1的事务ID为N就比synctxid变量的值小,说明此时线程2已经将所有执行了logEdit方法的线程的日志输出流刷出到了文件中,这样线程1就不需要再执行logSync方法进行刷出数据了,线程1就返回。

第三部分是遍历日志输出流集合,将正常的日志输出流加入到集合streams中。为什么多要定义streams,直接使用FSEditLog对象的editStreams集合不行吗?因为第一部分,第二部分,第三部分都是在一个synchronized代码块中执行的,并且同步的是FSEditLog这个对像,且logEdit整个方法是一个同步方法,如果使用editStreams执行数据刷出,则同一时间只有一个线程能使用editStreams集合,那么效率肯定相当低效,多定义一个集合streams临时变量,用于刷出数据,这样效率会高很多,比如一个线程在执行logEdit方法时,另一个线程可执行logSync的非同步代码块将streams中的数据刷出。

第四部分就是将streams的数据刷出到文件中,即调用了EditLogOutputSteam的flush方法,这个方法上面已经分析了。

在方法的最后有一个同步代码块,给synctxid赋值当前线程的事务ID,并且将变量isSyncRunning设置为false。

以上的过程就是NameNode节点的元数据的存储过程。

Reference

http://ifeve.com/buffers/

http://hi.baidu.com/tinggu_android/item/77c2930ecb7811ca90571855

https://issues.apache.org/jira/browse/HADOOP-438

《Hadoop技术内幕:深入理解Hadoop Common和HDFS架构设计与实现原理》

HDFS运行过程中记录编辑日志的操作码

/**无效/结束**/
  static final byte OP_INVALID = -1;
  /**创建文件**/
  private static final byte OP_ADD = 0;
  /**改名**/
  private static final byte OP_RENAME = 1;  // rename
  /**删除**/
  private static final byte OP_DELETE = 2;  // delete
  /**创建目录**/
  private static final byte OP_MKDIR = 3;   // create directory
  /**设置文件副本数**/
  private static final byte OP_SET_REPLICATION = 4; // set replication
  //the following two are used only for backward compatibility :
  /**添加节点,已经废弃**/
  @Deprecated private static final byte OP_DATANODE_ADD = 5;
  /**移除数据节点**/
  @Deprecated private static final byte OP_DATANODE_REMOVE = 6;
  /**设置权限**/
  private static final byte OP_SET_PERMISSIONS = 7;
  /**设置文件主**/
  private static final byte OP_SET_OWNER = 8;
  /**关闭文件**/
  private static final byte OP_CLOSE = 9;    // close after write
  /**设置数据块版本号**/
  private static final byte OP_SET_GENSTAMP = 10;    // store genstamp
  /* The following two are not used any more. Should be removed once
   * LAST_UPGRADABLE_LAYOUT_VERSION is -17 or newer. */
  /**设置节点配额**/
  private static final byte OP_SET_NS_QUOTA = 11; // set namespace quota
  /**清除节点配额**/
  private static final byte OP_CLEAR_NS_QUOTA = 12; // clear namespace quota
  /**设置访问或修改时间**/
  private static final byte OP_TIMES = 13; // sets mod & access time on a file
  /**设置配额**/
  private static final byte OP_SET_QUOTA = 14; // sets name and disk quotas.
  /**连接文件**/
  private static final byte OP_CONCAT_DELETE = 16; // concat files.
  /**新授权令牌**/
  private static final byte OP_GET_DELEGATION_TOKEN = 18; //new delegation token
  /**更新授权令牌**/
  private static final byte OP_RENEW_DELEGATION_TOKEN = 19; //renew delegation token
  /**取消授权令牌**/
  private static final byte OP_CANCEL_DELEGATION_TOKEN = 20; //cancel delegation token
  /**更新主密钥**/
  private static final byte OP_UPDATE_MASTER_KEY = 21; //update master key


Hadoop源码分析之NameNode元数据的存储

上一篇:Zabbix实现微信告警


下一篇:Spring框架学习【IoC容器依赖注入】