大数据学习笔记——HDFS写入过程源码分析(1)

HDFS写入过程方法调用逻辑 & 源码注释解读

前一篇介绍HDFS模块的博客中,我们重点从实践角度介绍了各种API如何使用以及IDEA的基本安装和配置步骤,而从这一篇开始,将会正式整理HDFS的读写原理分析,由于全部一次性整理篇幅过长,本人会将这一部分的内容拆分成多篇,这一篇将会从宏观上把控整个写入过程的框架,并啃一啃源码中的注释部分,好了,废话不多说,直接开始吧!

1. 框架图展示

大数据学习笔记——HDFS写入过程源码分析(1)

上图摘录自《Hadoop权威指南》,从这张图可知,整个写入过程被分成了七大步骤,在第一篇源码分析博客中,本人会就前三个步骤做一个介绍,分别是HDFS客户端新建一个FileSystem对象在名称节点上新建元数据,以及新建一个FSDataOutputStream对象

2. 源码解读

2.1 HDFS客户端新建FileSystem对象

这一步操作实际上包含了两个主要步骤:新建一个Configuration对象以及使用FileSystem类的静态方法get方法获取到FileSystem对象

2.1.1 注释文档翻译

首先,我们翻译一下FileSystem类的文档,从宏观上把控这个类:

An abstract base class for a fairly generic filesystem.  It
may be implemented as a distributed filesystem, or as a "local"
one that reflects the locally-connected disk. The local version
exists for small Hadoop instances and for testing.

一个通用的文件系统的抽象基类,它可以被应用于一个分布式的文件系统,或者作为一个“本地的”反映了本地磁盘的文件系统而存在,本地化的版本一般比较适合应用于较小的Hadoop实例或用于测试环境

All user code that may potentially use the Hadoop Distributed
File System should be written to use a FileSystem object. The
Hadoop DFS is a multi-machine system that appears as a single
disk. It's useful because of its fault tolerance and potentially
very large capacity.

所有的可能会使用到HDFS的用户代码在进行编写时都应该使用FileSystem对象,HDFS文件系统是一个跨机器的系统,并且是一个单独的磁盘(即根目录)的形式出现的,这样的方式非常有用,是因为它的容错机制和海量的容量

2.1.2 新建Configuration对象

我们将断点打到下图为止,进行调试,来看看新建Configuration对象时究竟发生了些什么

大数据学习笔记——HDFS写入过程源码分析(1)

 关键代码如下:

static{
    //print deprecation warning if hadoop-site.xml is found in classpath
    ClassLoader cL = Thread.currentThread().getContextClassLoader();
    if (cL == null) {
      cL = Configuration.class.getClassLoader();
    }
    if(cL.getResource("hadoop-site.xml")!=null) {
      LOG.warn("DEPRECATED: hadoop-site.xml found in the classpath. " +
          "Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, "
          + "mapred-site.xml and hdfs-site.xml to override properties of " +
          "core-default.xml, mapred-default.xml and hdfs-default.xml " +
          "respectively");
    }
    addDefaultResource("core-default.xml");
    addDefaultResource("core-site.xml");
  }

由此可见,Configuration对象会加入两个默认的配置文件,core-default.xml以及core-site.xml

2.1.3 获取FileSystem对象

现在我们将断点打到下图位置:

大数据学习笔记——HDFS写入过程源码分析(1)

经过方法的层层调用,我们最终找到了FileSystem对象是通过调用getInternal方法得到的

首先在getInternal方法中调用了createFileSystem方法

大数据学习笔记——HDFS写入过程源码分析(1)

进入createFileSystem方法,关键的来了!

private static FileSystem createFileSystem(URI uri, Configuration conf
      ) throws IOException {
    Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
    FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
    fs.initialize(uri, conf);
    return fs;
  }

原来,FileSystem实例是通过反射的方式获得的,具体实现是通过调用反射工具类ReflectionUtils的newInstance方法并将class对象以及Configuration对象作为参数传入最终得到了FileSystem实例

2.2 在名称节点上新建元数据

2.2.1 注释文档翻译

此步骤一共涉及到这几个类,DistributedFileSystem,DFSClient以及DFSOutputStream

DistributedFileSystem类

Implementation of the abstract FileSystem for the DFS system.
This object is the way end-user code interacts with a Hadoop
DistributedFileSystem.

在分布式文件系统上,抽象的FileSystem类的实现子类,这个对象是末端的用户代码用来与Hadoop分布式文件系统进行交互的一种方式

DFSClient类

DFSClient can connect to a Hadoop Filesystem and 
perform basic file tasks. It uses the ClientProtocol
to communicate with a NameNode daemon, and connects
directly to DataNodes to read/write block data.
Hadoop DFS users should obtain an instance of
DistributedFileSystem, which uses DFSClient to handle
filesystem tasks.

DFSClient类可以连接到Hadoop文件系统并执行基本的文件任务,它使用ClientProtocal来与一个NameNode进程通讯,并且直接连接到DataNodes上来读取或者写入块数据,HDFS的使用者应该要获得一个DistributedFileSystem的实例,使用DFSClient来处理文件系统任务

DFSOutputStream类

DFSOutputStream creates files from a stream of bytes.

DFSOutputStream从字节流中创建文件

The client application writes data that is cached internally by
this stream. Data is broken up into packets, each packet is
typically 64K in size. A packet comprises of chunks. Each chunk
is typically 512 bytes and has an associated checksum with it.

客户端写被这个流缓存在内部的数据,数据被切分成packets的单位,每一个packet大小是64K,一个packet是由chunks组成的,每一个chunk为512字节大小并且伴随一个校验和

2.2.2 新建元数据源码解读

先将断点打到下图位置,然后debug

大数据学习笔记——HDFS写入过程源码分析(1)

第一步调试,我们首先进入到的是FileSystem类,经过create方法的层层调用,最终我们找到了出口

public FSDataOutputStream create(Path f,
                                            boolean overwrite,
                                            int bufferSize,
                                            short replication,
                                            long blockSize,
                                            Progressable progress
                                            ) throws IOException {
    return this.create(f, FsPermission.getFileDefault().applyUMask(
        FsPermission.getUMask(getConf())), overwrite, bufferSize,
        replication, blockSize, progress);
  }

继续调试,我们发现FSDataOutputStream是一个包装类,它是通过调用DistributedFileSystem类的create方法返回的,而查看代码可知,这个包装类所包装的,正是DFSOutputStream!!!于是乎,第二个出口也被我们找到了

@Override
  public FSDataOutputStream create(final Path f, final FsPermission permission,
    final EnumSet<CreateFlag> cflags, final int bufferSize,
    final short replication, final long blockSize, final Progressable progress,
    final ChecksumOpt checksumOpt) throws IOException {
    statistics.incrementWriteOps(1);
    Path absF = fixRelativePart(f);
    return new FileSystemLinkResolver<FSDataOutputStream>() {
      @Override
      public FSDataOutputStream doCall(final Path p)
          throws IOException, UnresolvedLinkException {
        final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
                cflags, replication, blockSize, progress, bufferSize,
                checksumOpt);
        return dfs.createWrappedOutputStream(dfsos, statistics);
      }

继续调试,我们发现这个DFSOutputStream是从DFSClient类的create方法中返回过来的

public DFSOutputStream create(String src, 
                             FsPermission permission,
                             EnumSet<CreateFlag> flag, 
                             boolean createParent,
                             short replication,
                             long blockSize,
                             Progressable progress,
                             int buffersize,
                             ChecksumOpt checksumOpt,
                             InetSocketAddress[] favoredNodes) throws IOException {
    checkOpen();
    if (permission == null) {
      permission = FsPermission.getFileDefault();
    }
    FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
    if(LOG.isDebugEnabled()) {
      LOG.debug(src + ": masked=" + masked);
    }
    final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
        src, masked, flag, createParent, replication, blockSize, progress,
        buffersize, dfsClientConf.createChecksum(checksumOpt),
        getFavoredNodesStr(favoredNodes));
    beginFileLease(result.getFileId(), result);
    return result;
  }

查看已标记了的关键代码,我们又发现,DFSClient类中的DFSOutputStream实例对象是通过调用DFSOutputStream类的的newStreamForCreate方法产生的,于是乎,我们单步进入这个方法,一探究竟,终于,我们找到了新建元数据的关键代码!!!

static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
      FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
      short replication, long blockSize, Progressable progress, int buffersize,
      DataChecksum checksum, String[] favoredNodes) throws IOException {
    TraceScope scope =
        dfsClient.getPathTraceScope("newStreamForCreate", src);
    try {
      HdfsFileStatus stat = null;

      // Retry the create if we get a RetryStartFileException up to a maximum
      // number of times
      boolean shouldRetry = true;
      int retryCount = CREATE_RETRY_COUNT;
      while (shouldRetry) {
        shouldRetry = false;
        try {
          stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
              new EnumSetWritable<CreateFlag>(flag), createParent, replication,
              blockSize, SUPPORTED_CRYPTO_VERSIONS);
          break;
        } catch (RemoteException re) {
          IOException e = re.unwrapRemoteException(
              AccessControlException.class,
              DSQuotaExceededException.class,
              FileAlreadyExistsException.class,
              FileNotFoundException.class,
              ParentNotDirectoryException.class,
              NSQuotaExceededException.class,
              RetryStartFileException.class,
              SafeModeException.class,
              UnresolvedPathException.class,
              SnapshotAccessControlException.class,
              UnknownCryptoProtocolVersionException.class);
          if (e instanceof RetryStartFileException) {
            if (retryCount > 0) {
              shouldRetry = true;
              retryCount--;
            } else {
              throw new IOException("Too many retries because of encryption" +
                  " zone operations", e);
            }
          } else {
            throw e;
          }
        }
      }
      Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
      final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
          flag, progress, checksum, favoredNodes);
      out.start();
      return out;
    } finally {
      scope.close();
    }
  }

 查看关键代码,我们发现这个stat对象是调用namenode的create方法产生的,而ctrl + 左键点击namenode后发现namenode正是之前注释里面提到的ClientProtocal的一个实例对象,而ClientProtocal是一个接口,它的一个实现子类名字叫做ClientNamenodeProtocalTranslatorPB就是我们想要的,我们找寻这个类的方法,最终发现了create方法!!!而返回值是通过调用rpcProxy的create方法实现的,这里用到的是Google的Protobuf序列化技术

@Override
  public HdfsFileStatus create(String src, FsPermission masked,
      String clientName, EnumSetWritable<CreateFlag> flag,
      boolean createParent, short replication, long blockSize, 
      CryptoProtocolVersion[] supportedVersions)
      throws AccessControlException, AlreadyBeingCreatedException,
      DSQuotaExceededException, FileAlreadyExistsException,
      FileNotFoundException, NSQuotaExceededException,
      ParentNotDirectoryException, SafeModeException, UnresolvedLinkException,
      IOException {
    CreateRequestProto.Builder builder = CreateRequestProto.newBuilder()
        .setSrc(src)
        .setMasked(PBHelper.convert(masked))
        .setClientName(clientName)
        .setCreateFlag(PBHelper.convertCreateFlag(flag))
        .setCreateParent(createParent)
        .setReplication(replication)
        .setBlockSize(blockSize);
    builder.addAllCryptoProtocolVersion(PBHelper.convert(supportedVersions));
    CreateRequestProto req = builder.build();
    try {
      CreateResponseProto res = rpcProxy.create(null, req);
      return res.hasFs() ? PBHelper.convert(res.getFs()) : null;
    } catch (ServiceException e) {
      throw ProtobufHelper.getRemoteException(e);
    }

  }

2.3 新建FSDataOutputStream对象

之前讲解的是新建元数据的代码,而事实上,整个过程并未结束,还需要新建一个DFSOutputStream对象才行,同样在之前的newStreamForCreate方法中,我们发现了以下几行代码,最终返回的是这个out对象,并且在返回之前,调用了out对象的start方法

final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
          flag, progress, checksum, favoredNodes);
      out.start();
      return out;

点进start方法,发现调用的是streamer对象的start方法

private synchronized void start() {
    streamer.start();
  } 

点进streamer对象,发现它是DataStreamer类的一个实例,并且DataStreamer类是DFSOutputSteam的一个内部类,在这个类中,有一个方法叫做run方法,数据写入的关键代码就在这个run方法中实现!!!

@Override
    public void run() {
      long lastPacket = Time.monotonicNow();
      TraceScope scope = NullScope.INSTANCE;
      while (!streamerClosed && dfsClient.clientRunning) {
        // if the Responder encountered an error, shutdown Responder
        if (hasError && response != null) {
          try {
            response.close();
            response.join();
            response = null;
          } catch (InterruptedException  e) {
            DFSClient.LOG.warn("Caught exception ", e);
          }
        }

        DFSPacket one;
        try {
          // process datanode IO errors if any
          boolean doSleep = false;
          if (hasError && (errorIndex >= 0 || restartingNodeIndex.get() >= 0)) {
            doSleep = processDatanodeError();
          }

          synchronized (dataQueue) {
            // wait for a packet to be sent.
            long now = Time.monotonicNow();
            while ((!streamerClosed && !hasError && dfsClient.clientRunning 
                && dataQueue.size() == 0 && 
                (stage != BlockConstructionStage.DATA_STREAMING || 
                 stage == BlockConstructionStage.DATA_STREAMING && 
                 now - lastPacket < dfsClient.getConf().socketTimeout/2)) || doSleep ) {
              long timeout = dfsClient.getConf().socketTimeout/2 - (now-lastPacket);
              timeout = timeout <= 0 ? 1000 : timeout;
              timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
                 timeout : 1000;
              try {
                dataQueue.wait(timeout);
              } catch (InterruptedException  e) {
                DFSClient.LOG.warn("Caught exception ", e);
              }
              doSleep = false;
              now = Time.monotonicNow();
            }
            if (streamerClosed || hasError || !dfsClient.clientRunning) {
              continue;
            }
            // get packet to be sent.
            if (dataQueue.isEmpty()) {
              one = createHeartbeatPacket();
              assert one != null;
            } else {
              one = dataQueue.getFirst(); // regular data packet
              long parents[] = one.getTraceParents();
              if (parents.length > 0) {
                scope = Trace.startSpan("dataStreamer", new TraceInfo(0, parents[0]));
                // TODO: use setParents API once it's available from HTrace 3.2
//                scope = Trace.startSpan("dataStreamer", Sampler.ALWAYS);
//                scope.getSpan().setParents(parents);
              }
            }
          }

          // get new block from namenode.
          if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
            if(DFSClient.LOG.isDebugEnabled()) {
              DFSClient.LOG.debug("Allocating new block");
            }
            setPipeline(nextBlockOutputStream());
            initDataStreaming();
          } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
            if(DFSClient.LOG.isDebugEnabled()) {
              DFSClient.LOG.debug("Append to block " + block);
            }
            setupPipelineForAppendOrRecovery();
            initDataStreaming();
          }

          long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
          if (lastByteOffsetInBlock > blockSize) {
            throw new IOException("BlockSize " + blockSize +
                " is smaller than data size. " +
                " Offset of packet in block " + 
                lastByteOffsetInBlock +
                " Aborting file " + src);
          }

          if (one.isLastPacketInBlock()) {
            // wait for all data packets have been successfully acked
            synchronized (dataQueue) {
              while (!streamerClosed && !hasError && 
                  ackQueue.size() != 0 && dfsClient.clientRunning) {
                try {
                  // wait for acks to arrive from datanodes
                  dataQueue.wait(1000);
                } catch (InterruptedException  e) {
                  DFSClient.LOG.warn("Caught exception ", e);
                }
              }
            }
            if (streamerClosed || hasError || !dfsClient.clientRunning) {
              continue;
            }
            stage = BlockConstructionStage.PIPELINE_CLOSE;
          }
          
          // send the packet
          Span span = null;
          synchronized (dataQueue) {
            // move packet from dataQueue to ackQueue
            if (!one.isHeartbeatPacket()) {
              span = scope.detach();
              one.setTraceSpan(span);
              dataQueue.removeFirst();
              ackQueue.addLast(one);
              dataQueue.notifyAll();
            }
          }

          if (DFSClient.LOG.isDebugEnabled()) {
            DFSClient.LOG.debug("DataStreamer block " + block +
                " sending packet " + one);
          }

          // write out data to remote datanode
          TraceScope writeScope = Trace.startSpan("writeTo", span);
          try {
            one.writeTo(blockStream);
            blockStream.flush();   
          } catch (IOException e) {
            // HDFS-3398 treat primary DN is down since client is unable to 
            // write to primary DN. If a failed or restarting node has already
            // been recorded by the responder, the following call will have no 
            // effect. Pipeline recovery can handle only one node error at a
            // time. If the primary node fails again during the recovery, it
            // will be taken out then.
            tryMarkPrimaryDatanodeFailed();
            throw e;
          } finally {
            writeScope.close();
          }
          lastPacket = Time.monotonicNow();
          
          // update bytesSent
          long tmpBytesSent = one.getLastByteOffsetBlock();
          if (bytesSent < tmpBytesSent) {
            bytesSent = tmpBytesSent;
          }

          if (streamerClosed || hasError || !dfsClient.clientRunning) {
            continue;
          }

          // Is this block full?
          if (one.isLastPacketInBlock()) {
            // wait for the close packet has been acked
            synchronized (dataQueue) {
              while (!streamerClosed && !hasError && 
                  ackQueue.size() != 0 && dfsClient.clientRunning) {
                dataQueue.wait(1000);// wait for acks to arrive from datanodes
              }
            }
            if (streamerClosed || hasError || !dfsClient.clientRunning) {
              continue;
            }

            endBlock();
          }
          if (progress != null) { progress.progress(); }

          // This is used by unit test to trigger race conditions.
          if (artificialSlowdown != 0 && dfsClient.clientRunning) {
            Thread.sleep(artificialSlowdown); 
          }
        } catch (Throwable e) {
          // Log warning if there was a real error.
          if (restartingNodeIndex.get() == -1) {
            DFSClient.LOG.warn("DataStreamer Exception", e);
          }
          if (e instanceof IOException) {
            setLastException((IOException)e);
          } else {
            setLastException(new IOException("DataStreamer Exception: ",e));
          }
          hasError = true;
          if (errorIndex == -1 && restartingNodeIndex.get() == -1) {
            // Not a datanode issue
            streamerClosed = true;
          }
        } finally {
          scope.close();
        }
      }
      closeInternal();
    }

    private void closeInternal() {
      closeResponder();       // close and join
      closeStream();
      streamerClosed = true;
      setClosed();
      synchronized (dataQueue) {
        dataQueue.notifyAll();
      }
    }

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

上一篇:Centos 7/8 高级磁盘管理技术


下一篇:HDFS的Java API操作(笔记)