HDFS写入过程方法调用逻辑 & 源码注释解读
前一篇介绍HDFS模块的博客中,我们重点从实践角度介绍了各种API如何使用以及IDEA的基本安装和配置步骤,而从这一篇开始,将会正式整理HDFS的读写原理分析,由于全部一次性整理篇幅过长,本人会将这一部分的内容拆分成多篇,这一篇将会从宏观上把控整个写入过程的框架,并啃一啃源码中的注释部分,好了,废话不多说,直接开始吧!
1. 框架图展示
上图摘录自《Hadoop权威指南》,从这张图可知,整个写入过程被分成了七大步骤,在第一篇源码分析博客中,本人会就前三个步骤做一个介绍,分别是HDFS客户端新建一个FileSystem对象,在名称节点上新建元数据,以及新建一个FSDataOutputStream对象
2. 源码解读
2.1 HDFS客户端新建FileSystem对象
这一步操作实际上包含了两个主要步骤:新建一个Configuration对象以及使用FileSystem类的静态方法get方法获取到FileSystem对象
2.1.1 注释文档翻译
首先,我们翻译一下FileSystem类的文档,从宏观上把控这个类:
An abstract base class for a fairly generic filesystem. It
may be implemented as a distributed filesystem, or as a "local"
one that reflects the locally-connected disk. The local version
exists for small Hadoop instances and for testing.
一个通用的文件系统的抽象基类,它可以被应用于一个分布式的文件系统,或者作为一个“本地的”反映了本地磁盘的文件系统而存在,本地化的版本一般比较适合应用于较小的Hadoop实例或用于测试环境
All user code that may potentially use the Hadoop Distributed
File System should be written to use a FileSystem object. The
Hadoop DFS is a multi-machine system that appears as a single
disk. It's useful because of its fault tolerance and potentially
very large capacity.
所有的可能会使用到HDFS的用户代码在进行编写时都应该使用FileSystem对象,HDFS文件系统是一个跨机器的系统,并且是一个单独的磁盘(即根目录)的形式出现的,这样的方式非常有用,是因为它的容错机制和海量的容量
2.1.2 新建Configuration对象
我们将断点打到下图为止,进行调试,来看看新建Configuration对象时究竟发生了些什么
关键代码如下:
static{ //print deprecation warning if hadoop-site.xml is found in classpath ClassLoader cL = Thread.currentThread().getContextClassLoader(); if (cL == null) { cL = Configuration.class.getClassLoader(); } if(cL.getResource("hadoop-site.xml")!=null) { LOG.warn("DEPRECATED: hadoop-site.xml found in the classpath. " + "Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, " + "mapred-site.xml and hdfs-site.xml to override properties of " + "core-default.xml, mapred-default.xml and hdfs-default.xml " + "respectively"); } addDefaultResource("core-default.xml"); addDefaultResource("core-site.xml"); }
由此可见,Configuration对象会加入两个默认的配置文件,core-default.xml以及core-site.xml
2.1.3 获取FileSystem对象
现在我们将断点打到下图位置:
经过方法的层层调用,我们最终找到了FileSystem对象是通过调用getInternal方法得到的
首先在getInternal方法中调用了createFileSystem方法
进入createFileSystem方法,关键的来了!
private static FileSystem createFileSystem(URI uri, Configuration conf ) throws IOException { Class<?> clazz = getFileSystemClass(uri.getScheme(), conf); FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf); fs.initialize(uri, conf); return fs; }
原来,FileSystem实例是通过反射的方式获得的,具体实现是通过调用反射工具类ReflectionUtils的newInstance方法并将class对象以及Configuration对象作为参数传入最终得到了FileSystem实例
2.2 在名称节点上新建元数据
2.2.1 注释文档翻译
此步骤一共涉及到这几个类,DistributedFileSystem,DFSClient以及DFSOutputStream
DistributedFileSystem类
Implementation of the abstract FileSystem for the DFS system.
This object is the way end-user code interacts with a Hadoop
DistributedFileSystem.
在分布式文件系统上,抽象的FileSystem类的实现子类,这个对象是末端的用户代码用来与Hadoop分布式文件系统进行交互的一种方式
DFSClient类
DFSClient can connect to a Hadoop Filesystem and
perform basic file tasks. It uses the ClientProtocol
to communicate with a NameNode daemon, and connects
directly to DataNodes to read/write block data.
Hadoop DFS users should obtain an instance of
DistributedFileSystem, which uses DFSClient to handle
filesystem tasks.
DFSClient类可以连接到Hadoop文件系统并执行基本的文件任务,它使用ClientProtocal来与一个NameNode进程通讯,并且直接连接到DataNodes上来读取或者写入块数据,HDFS的使用者应该要获得一个DistributedFileSystem的实例,使用DFSClient来处理文件系统任务
DFSOutputStream类
DFSOutputStream creates files from a stream of bytes.
DFSOutputStream从字节流中创建文件
The client application writes data that is cached internally by
this stream. Data is broken up into packets, each packet is
typically 64K in size. A packet comprises of chunks. Each chunk
is typically 512 bytes and has an associated checksum with it.
客户端写被这个流缓存在内部的数据,数据被切分成packets的单位,每一个packet大小是64K,一个packet是由chunks组成的,每一个chunk为512字节大小并且伴随一个校验和
2.2.2 新建元数据源码解读
先将断点打到下图位置,然后debug
第一步调试,我们首先进入到的是FileSystem类,经过create方法的层层调用,最终我们找到了出口
public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress ) throws IOException { return this.create(f, FsPermission.getFileDefault().applyUMask( FsPermission.getUMask(getConf())), overwrite, bufferSize, replication, blockSize, progress); }
继续调试,我们发现FSDataOutputStream是一个包装类,它是通过调用DistributedFileSystem类的create方法返回的,而查看代码可知,这个包装类所包装的,正是DFSOutputStream!!!于是乎,第二个出口也被我们找到了
@Override public FSDataOutputStream create(final Path f, final FsPermission permission, final EnumSet<CreateFlag> cflags, final int bufferSize, final short replication, final long blockSize, final Progressable progress, final ChecksumOpt checksumOpt) throws IOException { statistics.incrementWriteOps(1); Path absF = fixRelativePart(f); return new FileSystemLinkResolver<FSDataOutputStream>() { @Override public FSDataOutputStream doCall(final Path p) throws IOException, UnresolvedLinkException { final DFSOutputStream dfsos = dfs.create(getPathName(p), permission, cflags, replication, blockSize, progress, bufferSize, checksumOpt); return dfs.createWrappedOutputStream(dfsos, statistics); }
继续调试,我们发现这个DFSOutputStream是从DFSClient类的create方法中返回过来的
public DFSOutputStream create(String src, FsPermission permission, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes) throws IOException { checkOpen(); if (permission == null) { permission = FsPermission.getFileDefault(); } FsPermission masked = permission.applyUMask(dfsClientConf.uMask); if(LOG.isDebugEnabled()) { LOG.debug(src + ": masked=" + masked); } final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, src, masked, flag, createParent, replication, blockSize, progress, buffersize, dfsClientConf.createChecksum(checksumOpt), getFavoredNodesStr(favoredNodes)); beginFileLease(result.getFileId(), result); return result; }
查看已标记了的关键代码,我们又发现,DFSClient类中的DFSOutputStream实例对象是通过调用DFSOutputStream类的的newStreamForCreate方法产生的,于是乎,我们单步进入这个方法,一探究竟,终于,我们找到了新建元数据的关键代码!!!
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, DataChecksum checksum, String[] favoredNodes) throws IOException { TraceScope scope = dfsClient.getPathTraceScope("newStreamForCreate", src); try { HdfsFileStatus stat = null; // Retry the create if we get a RetryStartFileException up to a maximum // number of times boolean shouldRetry = true; int retryCount = CREATE_RETRY_COUNT; while (shouldRetry) { shouldRetry = false; try { stat = dfsClient.namenode.create(src, masked, dfsClient.clientName, new EnumSetWritable<CreateFlag>(flag), createParent, replication, blockSize, SUPPORTED_CRYPTO_VERSIONS); break; } catch (RemoteException re) { IOException e = re.unwrapRemoteException( AccessControlException.class, DSQuotaExceededException.class, FileAlreadyExistsException.class, FileNotFoundException.class, ParentNotDirectoryException.class, NSQuotaExceededException.class, RetryStartFileException.class, SafeModeException.class, UnresolvedPathException.class, SnapshotAccessControlException.class, UnknownCryptoProtocolVersionException.class); if (e instanceof RetryStartFileException) { if (retryCount > 0) { shouldRetry = true; retryCount--; } else { throw new IOException("Too many retries because of encryption" + " zone operations", e); } } else { throw e; } } } Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!"); final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat, flag, progress, checksum, favoredNodes); out.start(); return out; } finally { scope.close(); } }
查看关键代码,我们发现这个stat对象是调用namenode的create方法产生的,而ctrl + 左键点击namenode后发现namenode正是之前注释里面提到的ClientProtocal的一个实例对象,而ClientProtocal是一个接口,它的一个实现子类名字叫做ClientNamenodeProtocalTranslatorPB就是我们想要的,我们找寻这个类的方法,最终发现了create方法!!!而返回值是通过调用rpcProxy的create方法实现的,这里用到的是Google的Protobuf序列化技术
@Override public HdfsFileStatus create(String src, FsPermission masked, String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize, CryptoProtocolVersion[] supportedVersions) throws AccessControlException, AlreadyBeingCreatedException, DSQuotaExceededException, FileAlreadyExistsException, FileNotFoundException, NSQuotaExceededException, ParentNotDirectoryException, SafeModeException, UnresolvedLinkException, IOException { CreateRequestProto.Builder builder = CreateRequestProto.newBuilder() .setSrc(src) .setMasked(PBHelper.convert(masked)) .setClientName(clientName) .setCreateFlag(PBHelper.convertCreateFlag(flag)) .setCreateParent(createParent) .setReplication(replication) .setBlockSize(blockSize); builder.addAllCryptoProtocolVersion(PBHelper.convert(supportedVersions)); CreateRequestProto req = builder.build(); try { CreateResponseProto res = rpcProxy.create(null, req); return res.hasFs() ? PBHelper.convert(res.getFs()) : null; } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } }
2.3 新建FSDataOutputStream对象
之前讲解的是新建元数据的代码,而事实上,整个过程并未结束,还需要新建一个DFSOutputStream对象才行,同样在之前的newStreamForCreate方法中,我们发现了以下几行代码,最终返回的是这个out对象,并且在返回之前,调用了out对象的start方法
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat, flag, progress, checksum, favoredNodes); out.start(); return out;
点进start方法,发现调用的是streamer对象的start方法
private synchronized void start() { streamer.start(); }
点进streamer对象,发现它是DataStreamer类的一个实例,并且DataStreamer类是DFSOutputSteam的一个内部类,在这个类中,有一个方法叫做run方法,数据写入的关键代码就在这个run方法中实现!!!
@Override public void run() { long lastPacket = Time.monotonicNow(); TraceScope scope = NullScope.INSTANCE; while (!streamerClosed && dfsClient.clientRunning) { // if the Responder encountered an error, shutdown Responder if (hasError && response != null) { try { response.close(); response.join(); response = null; } catch (InterruptedException e) { DFSClient.LOG.warn("Caught exception ", e); } } DFSPacket one; try { // process datanode IO errors if any boolean doSleep = false; if (hasError && (errorIndex >= 0 || restartingNodeIndex.get() >= 0)) { doSleep = processDatanodeError(); } synchronized (dataQueue) { // wait for a packet to be sent. long now = Time.monotonicNow(); while ((!streamerClosed && !hasError && dfsClient.clientRunning && dataQueue.size() == 0 && (stage != BlockConstructionStage.DATA_STREAMING || stage == BlockConstructionStage.DATA_STREAMING && now - lastPacket < dfsClient.getConf().socketTimeout/2)) || doSleep ) { long timeout = dfsClient.getConf().socketTimeout/2 - (now-lastPacket); timeout = timeout <= 0 ? 1000 : timeout; timeout = (stage == BlockConstructionStage.DATA_STREAMING)? timeout : 1000; try { dataQueue.wait(timeout); } catch (InterruptedException e) { DFSClient.LOG.warn("Caught exception ", e); } doSleep = false; now = Time.monotonicNow(); } if (streamerClosed || hasError || !dfsClient.clientRunning) { continue; } // get packet to be sent. if (dataQueue.isEmpty()) { one = createHeartbeatPacket(); assert one != null; } else { one = dataQueue.getFirst(); // regular data packet long parents[] = one.getTraceParents(); if (parents.length > 0) { scope = Trace.startSpan("dataStreamer", new TraceInfo(0, parents[0])); // TODO: use setParents API once it's available from HTrace 3.2 // scope = Trace.startSpan("dataStreamer", Sampler.ALWAYS); // scope.getSpan().setParents(parents); } } } // get new block from namenode. if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) { if(DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Allocating new block"); } setPipeline(nextBlockOutputStream()); initDataStreaming(); } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) { if(DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("Append to block " + block); } setupPipelineForAppendOrRecovery(); initDataStreaming(); } long lastByteOffsetInBlock = one.getLastByteOffsetBlock(); if (lastByteOffsetInBlock > blockSize) { throw new IOException("BlockSize " + blockSize + " is smaller than data size. " + " Offset of packet in block " + lastByteOffsetInBlock + " Aborting file " + src); } if (one.isLastPacketInBlock()) { // wait for all data packets have been successfully acked synchronized (dataQueue) { while (!streamerClosed && !hasError && ackQueue.size() != 0 && dfsClient.clientRunning) { try { // wait for acks to arrive from datanodes dataQueue.wait(1000); } catch (InterruptedException e) { DFSClient.LOG.warn("Caught exception ", e); } } } if (streamerClosed || hasError || !dfsClient.clientRunning) { continue; } stage = BlockConstructionStage.PIPELINE_CLOSE; } // send the packet Span span = null; synchronized (dataQueue) { // move packet from dataQueue to ackQueue if (!one.isHeartbeatPacket()) { span = scope.detach(); one.setTraceSpan(span); dataQueue.removeFirst(); ackQueue.addLast(one); dataQueue.notifyAll(); } } if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("DataStreamer block " + block + " sending packet " + one); } // write out data to remote datanode TraceScope writeScope = Trace.startSpan("writeTo", span); try { one.writeTo(blockStream); blockStream.flush(); } catch (IOException e) { // HDFS-3398 treat primary DN is down since client is unable to // write to primary DN. If a failed or restarting node has already // been recorded by the responder, the following call will have no // effect. Pipeline recovery can handle only one node error at a // time. If the primary node fails again during the recovery, it // will be taken out then. tryMarkPrimaryDatanodeFailed(); throw e; } finally { writeScope.close(); } lastPacket = Time.monotonicNow(); // update bytesSent long tmpBytesSent = one.getLastByteOffsetBlock(); if (bytesSent < tmpBytesSent) { bytesSent = tmpBytesSent; } if (streamerClosed || hasError || !dfsClient.clientRunning) { continue; } // Is this block full? if (one.isLastPacketInBlock()) { // wait for the close packet has been acked synchronized (dataQueue) { while (!streamerClosed && !hasError && ackQueue.size() != 0 && dfsClient.clientRunning) { dataQueue.wait(1000);// wait for acks to arrive from datanodes } } if (streamerClosed || hasError || !dfsClient.clientRunning) { continue; } endBlock(); } if (progress != null) { progress.progress(); } // This is used by unit test to trigger race conditions. if (artificialSlowdown != 0 && dfsClient.clientRunning) { Thread.sleep(artificialSlowdown); } } catch (Throwable e) { // Log warning if there was a real error. if (restartingNodeIndex.get() == -1) { DFSClient.LOG.warn("DataStreamer Exception", e); } if (e instanceof IOException) { setLastException((IOException)e); } else { setLastException(new IOException("DataStreamer Exception: ",e)); } hasError = true; if (errorIndex == -1 && restartingNodeIndex.get() == -1) { // Not a datanode issue streamerClosed = true; } } finally { scope.close(); } } closeInternal(); } private void closeInternal() { closeResponder(); // close and join closeStream(); streamerClosed = true; setClosed(); synchronized (dataQueue) { dataQueue.notifyAll(); } }