scan的调用代码示例如下:
// 创建HBase配置config Configuration config = HBaseConfiguration.create(); config.set("hbase.zookeeper.quorum", "192.168.1.226");// zookeeper部署的服务器IP config.set("hbase.zookeeper.property.clientPort", "2181");// zookeeper允许连接的客户端端口号 // 定义HBase连接 Connection connection = null; try { // 获取HBase数据库连接 connection = ConnectionFactory.createConnection(config); // 输出连接建立结果 while (!connection.isClosed()) { break; } // 获取HBase数据库表 Table table = connection.getTable(TableName.valueOf("test_table")); // 构造Scan实例 Scan scan = new Scan(); scan.addColumn(Bytes.toBytes("family1"), Bytes.toBytes("cloumn1")); // 获取查询结果 ResultScanner result = table.getScanner(scan); // 解析查询结果 for (Result r : result) { // 此处为处理Result的代码 byte[] row = r.getRow(); if(row.length == 0){ //... } } result.close(); table.close(); table = null; scan = null; } catch (IOException e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } }下面,我们对scan的整个流程进行分析。
首先从Table的getScanner(Scan scan)方法入手,它的定义如下:
/** * Returns a scanner on the current table as specified by the {@link Scan} * object. * Note that the passed {@link Scan}'s start row and caching properties * maybe changed. * * @param scan A configured {@link Scan} object. * @return A scanner. * @throws IOException if a remote or network exception occurs. * @since 0.20.0 */ ResultScanner getScanner(Scan scan) throws IOException;它的实现是由HTable来完成的,源码如下:
/** * The underlying {@link HTable} must not be closed. * {@link HTableInterface#getScanner(Scan)} has other usage details. * * HBase中scan的入口方法 */ @Override public ResultScanner getScanner(final Scan scan) throws IOException { // small scan不可以设置batch if (scan.getBatch() > 0 && scan.isSmall()) { throw new IllegalArgumentException("Small scan should not be used with batching"); } // 设置caching // 取参数“hbase.client.scanner.caching”,如果参数未配置,则默认为100 if (scan.getCaching() <= 0) { scan.setCaching(getScannerCaching()); } // 取参数“HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY”,如果参数未配置,则默认为2 * 1024 * 1024 if (scan.getMaxResultSize() <= 0) { scan.setMaxResultSize(scannerMaxResultSize); } /** * scan总共分为四种类型: * 1、reversed、small--ClientSmallReversedScanner * 2、reversed、big--ReversedClientScanner * 3、notReversed、small--ClientSmallScanner * 4、notReversed、big--ClientScanner */ if (scan.isReversed()) {// 反向扫描 if (scan.isSmall()) { return new ClientSmallReversedScanner(getConfiguration(), scan, getName(), this.connection, this.rpcCallerFactory, this.rpcControllerFactory, pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); } else { return new ReversedClientScanner(getConfiguration(), scan, getName(), this.connection, this.rpcCallerFactory, this.rpcControllerFactory, pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); } } if (scan.isSmall()) { return new ClientSmallScanner(getConfiguration(), scan, getName(), this.connection, this.rpcCallerFactory, this.rpcControllerFactory, pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); } else { return new ClientScanner(getConfiguration(), scan, getName(), this.connection, this.rpcCallerFactory, this.rpcControllerFactory, pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); } }这里,我们先只研究ClientScanner,其他三种以后再说。
/** * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start * row maybe changed changed. * @param conf The {@link Configuration} to use. * @param scan {@link Scan} to use in this scanner * @param tableName The table that we wish to scan * @param connection Connection identifying the cluster * @throws IOException */ public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("Scan table=" + tableName + ", startRow=" + Bytes.toStringBinary(scan.getStartRow())); } // 设置scan、tableName等成员变量 this.scan = scan; this.tableName = tableName; this.lastNext = System.currentTimeMillis(); this.connection = connection; this.pool = pool; this.primaryOperationTimeout = primaryOperationTimeout; // 重试次数,取参数“hbase.client.retries.number”,如果参数未配置,则默认为31 this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); if (scan.getMaxResultSize() > 0) { this.maxScannerResultSize = scan.getMaxResultSize(); } else { this.maxScannerResultSize = conf.getLong( HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); } // scanner超时时间 this.scannerTimeout = HBaseConfiguration.getInt(conf, HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); // check if application wants to collect scan metrics initScanMetrics(scan); // Use the caching from the Scan. If not set, use the default cache setting for this table. // 处理caching if (this.scan.getCaching() > 0) { this.caching = this.scan.getCaching(); } else { this.caching = conf.getInt( HConstants.HBASE_CLIENT_SCANNER_CACHING, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING); } // 初始化caller // caller为RpcRetryingCaller类型 this.caller = rpcFactory.<Result[]> newCaller(); // rpcControllerFactory为RpcControllerFactory类型 this.rpcControllerFactory = controllerFactory; this.conf = conf; // 初始化scanner initializeScannerInConstruction(); }ClientScanner的构造方法中,首先是对各种成员变量赋值,比如scan、tableName、connection等,然后是处理maxScannerResultSize、scannerTimeout、caching等scan需要用到的各种参数,这些都没有什么好说的。
接下来,是初始化两个重要的变量caller和rpcControllerFactory,caller为RpcRetryingCaller类型的,rpcControllerFactory为RpcControllerFactory类型的。
最后调用initializeScannerInConstruction()方法,ok,我们也跟着继续。
protected void initializeScannerInConstruction() throws IOException{ // initialize the scanner // 初始化scanner nextScanner(this.caching, false); }紧接着,调用nextScanner()方法,注意,传入两个参数,一个是ClientScanner对象生成时的caching,另外一个是false。
这个caching,如果在构造Scan对象时没有设置,则取参数hbase.client.scanner.caching配置的值,参数未配置则默认为100,它的含义是每次RPC请求的最大行数。
继续追踪nextScanner()方法,完整的代码如下:
/* * Gets a scanner for the next region. If this.currentRegion != null, then * we will move to the endrow of this.currentRegion. Else we will get * scanner at the scan.getStartRow(). We will go no further, just tidy * up outstanding scanners, if <code>currentRegion != null</code> and * <code>done</code> is true. * @param nbRows * @param done Server-side says we're done scanning. */ protected boolean nextScanner(int nbRows, final boolean done) throws IOException { // Close the previous scanner if it's open // 关闭之前打开的scanner // 第一次调用时,callable因为没有初始化,所以肯定是空的,为null,此处会跳过 // 什么时候callable被赋值,而什么时候callable又被清空呢? // 关闭上一个callable if (this.callable != null) { // 调用setClose()方法将callable中的currentScannerCallable的closed设置为true // 将ScannerCallableWithReplicas类型的callable中ScannerCallable类型的成员变量中的closed设置为true this.callable.setClose(); // 调用call()方法,发起一次请求,此时callable不为空,且其currentScannerCallable中closed为true // 最终调用其currentScannerCallable的call()方法,因为closed为true,只执行其close()方法 call(scan, callable, caller, scannerTimeout); // 将callable设置为null,下次就不会执行该模块了 this.callable = null; } // Where to start the next scanner // 从哪里开始下一个scanner byte [] localStartKey; // if we're at end of table, close and return false to stop iterating // 如果我们处于表的末尾,关闭并返回false,以停止此次迭代 if (this.currentRegion != null) { byte [] endKey = this.currentRegion.getEndKey(); // done,或者endKey为空,或者endKey>=stopRow if (endKey == null || Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) || checkScanStopRow(endKey) || done) { // 关闭 close(); if (LOG.isTraceEnabled()) { LOG.trace("Finished " + this.currentRegion); } // 返回false return false; } // localStartKey设置为当前Region的endKey localStartKey = endKey; if (LOG.isTraceEnabled()) { LOG.trace("Finished " + this.currentRegion); } } else { localStartKey = this.scan.getStartRow(); } if (LOG.isDebugEnabled() && this.currentRegion != null) { // Only worth logging if NOT first region in scan. LOG.debug("Advancing internal scanner to startKey at '" + Bytes.toStringBinary(localStartKey) + "'"); } try { // callable为ScannerCallableWithReplicas类型的 // 获取一个新的callable callable = getScannerCallable(localStartKey, nbRows); // Open a scanner on the region server starting at the // beginning of the region // 在localStartKey所在Region的RegionServer上打开一个scanner call(scan, callable, caller, scannerTimeout); // 设置currentRegion this.currentRegion = callable.getHRegionInfo(); if (this.scanMetrics != null) { this.scanMetrics.countOfRegions.incrementAndGet(); } } catch (IOException e) { close(); throw e; } return true; }我们一步步分析,由于ClientScanner在构造时,并没有初始化callable成员变量,所以它必定为null,第一部分代码略过,我们以后再讲。
接下来,由于currentRegion也没有被初始化,所以,程序走的是else分支,也就是,将localStartKey设置为scan的startRow。
localStartKey = this.scan.getStartRow();紧接着,调用getScannerCallable()方法为成员变量callable赋值,入参为行localStartKey和行数nbRows,这个nbRows即为ClientScanner构造时的caching值。方法定义如下:
@InterfaceAudience.Private protected ScannerCallableWithReplicas getScannerCallable(byte [] localStartKey, int nbRows) { // 设置scan的startRow scan.setStartRow(localStartKey); // 构造ScannerCallable类型的对象s,将其作为ScannerCallableWithReplicas类型的sr的成员变量currentScannerCallable ScannerCallable s = new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics, this.rpcControllerFactory); // 设置caching s.setCaching(nbRows); // 构造ScannerCallableWithReplicas类型的对象sr,其成员变量currentScannerCallable为上面的ScannerCallable类型的对象s ScannerCallableWithReplicas sr = new ScannerCallableWithReplicas(tableName, getConnection(), s, pool, primaryOperationTimeout, scan, retries, scannerTimeout, caching, conf, caller); return sr; }首先,把localStartKey设置为scan的startRow,后续每次迭代处理时,我们就能知道scan的起始行。
然后,生成一个ScannerCallable类型的对象s,这个s是要作为ScannerCallableWithReplicas类型的sr的成员变量currentScannerCallable来使用的,实际与Region所在RegionServer通信的正是这个对象,并且,这个ScannerCallable对象中有一个byte[]类型的row成员变量,它会被初始化为scan的startRow,被用来进行Region的定位和其行号的定位。
构造ScannerCallableWithReplicas类型的对象sr并返回,其成员变量currentScannerCallable为上面的ScannerCallable类型的对象s。
getScannerCallable()方法执行完后,紧接着会调用ClientScanner的call()方法,代码如下:
Result[] call(Scan scan, ScannerCallableWithReplicas callable, RpcRetryingCaller<Result[]> caller, int scannerTimeout) throws IOException, RuntimeException { if (Thread.interrupted()) { throw new InterruptedIOException(); } // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas, // we do a callWithRetries // caller为RpcRetryingCaller类型 // callable为ScannerCallableWithReplicas类型 return caller.callWithoutRetries(callable, scannerTimeout); }之前我们已知道,caller为RpcRetryingCaller类型,它的方法定义如下:
/** * Call the server once only. * 仅仅调用server一次 * {@link RetryingCallable} has a strange shape so we can do retrys. Use this invocation if you * want to do a single call only (A call to {@link RetryingCallable#call(int)} will not likely * succeed). * @return an object of type T * @throws IOException if a remote or network exception occurs * @throws RuntimeException other unspecified error */ public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout) throws IOException, RuntimeException { // The code of this method should be shared with withRetries. this.globalStartTime = EnvironmentEdgeManager.currentTime(); try { // 先调用prepare()方法,再调用call()方法,超时时间为callTimeout callable.prepare(false); return callable.call(callTimeout); } catch (Throwable t) { Throwable t2 = translateException(t); ExceptionUtil.rethrowIfInterrupt(t2); // It would be nice to clear the location cache here. if (t2 instanceof IOException) { throw (IOException)t2; } else { throw new RuntimeException(t2); } } }由代码可以看出,它实际上的处理流程为先调用callable的prepare()方法,再调用callable的call()方法。
接下来,我们转入callable的分析,这个callable为ScannerCallableWithReplicas类型,它的prepare()方法为空,我们重点分析call()方法,代码如下:
@Override public Result [] call(int timeout) throws IOException { // If the active replica callable was closed somewhere, invoke the RPC to // really close it. In the case of regular scanners, this applies. We make couple // of RPCs to a RegionServer, and when that region is exhausted, we set // the closed flag. Then an RPC is required to actually close the scanner. // 第一次调用时,该段代码会被忽略 if (currentScannerCallable != null && currentScannerCallable.closed) { // For closing we target that exact scanner (and not do replica fallback like in // the case of normal reads) if (LOG.isTraceEnabled()) { LOG.trace("Closing scanner id=" + currentScannerCallable.scannerId); } // 最终调用close()方法 Result[] r = currentScannerCallable.call(timeout); currentScannerCallable = null; return r; } // We need to do the following: //1. When a scan goes out to a certain replica (default or not), we need to // continue to hit that until there is a failure. So store the last successfully invoked // replica //2. We should close the "losing" scanners (scanners other than the ones we hear back // from first) // 我们需要做以下事情: //1、当一个scan扫描到一个特定的副本(无论默认与否)。我们需要继续执行,直到有一个错误。然后存储上一个成功的副本。 //2、我们应该关闭这个losing scanners // 根据scan的startRow获取Region位置,使用cache RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true, RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName, currentScannerCallable.getRow()); // allocate a boundedcompletion pool of some multiple of number of replicas. // We want to accomodate some RPCs for redundant replica scans (but are still in progress) // 分配一个有界操作的包含一些副本的池 // 构造一个BoundedCompletionService类型的数据结构cs // cs中包含需要执行的task、已经完成的task和线程池executor,并提供了submit、poll、take等方法 BoundedCompletionService<Pair<Result[], ScannerCallable>> cs = new BoundedCompletionService<Pair<Result[], ScannerCallable>>(pool, rl.size() * 5); List<ExecutionException> exceptions = null; int submitted = 0, completed = 0; AtomicBoolean done = new AtomicBoolean(false); replicaSwitched.set(false); // submit call for the primary replica. // 提交请求至the primary replica // 提交task,并添加到outstandingCallables中 // task为用this.currentScannerCallable封装成的RetryingRPC对象 submitted += addCallsForCurrentReplica(cs, rl); try { // wait for the timeout to see whether the primary responds back // 从cs的BlockingQueue<Future<V>>类型的completed中获取任务完成情况 Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds if (f != null) { Pair<Result[], ScannerCallable> r = f.get(); if (r != null && r.getSecond() != null) { updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); } return r == null ? null : r.getFirst(); //great we got a response } } catch (ExecutionException e) { // the primary call failed with RetriesExhaustedException or DoNotRetryIOException // but the secondaries might still succeed. Continue on the replica RPCs. exceptions = new ArrayList<ExecutionException>(rl.size()); exceptions.add(e); completed++; } catch (CancellationException e) { throw new InterruptedIOException(e.getMessage()); } catch (InterruptedException e) { throw new InterruptedIOException(e.getMessage()); } // submit call for the all of the secondaries at once // TODO: this may be an overkill for large region replication submitted += addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1); try { while (completed < submitted) { try { Future<Pair<Result[], ScannerCallable>> f = cs.take(); Pair<Result[], ScannerCallable> r = f.get(); if (r != null && r.getSecond() != null) { updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool); } return r == null ? null : r.getFirst(); // great we got an answer } catch (ExecutionException e) { // if not cancel or interrupt, wait until all RPC's are done // one of the tasks failed. Save the exception for later. if (exceptions == null) exceptions = new ArrayList<ExecutionException>(rl.size()); exceptions.add(e); completed++; } } } catch (CancellationException e) { throw new InterruptedIOException(e.getMessage()); } catch (InterruptedException e) { throw new InterruptedIOException(e.getMessage()); } finally { // We get there because we were interrupted or because one or more of the // calls succeeded or failed. In all case, we stop all our tasks. cs.cancelAll(true); } if (exceptions != null && !exceptions.isEmpty()) { RpcRetryingCallerWithReadReplicas.throwEnrichedException(exceptions.get(0), retries); // just rethrow the first exception for now. } return null; // unreachable }又是一个比较长的方法,我们还是一步步分析。
首先,currentScannerCallable虽然已被初始化,但是它的closed还是为false,那么方法的第一块代码会被跳过。
接下来,调用RpcRetryingCallerWithReadReplicas的getRegionLocations()方法,利用cConnection、tableName和currentScannerCallable.getRow()定位Region,得到Region的位置RegionLocations类型的rl。
然后,构造一个BoundedCompletionService类型的数据结构cs,cs中包含需要执行的task、已经完成的task和线程池executor,并提供了submit、poll、take等方法。依靠它,利用线程池完成任务的调度与执行,并同步获取执行结果。
addCallsForCurrentReplica方法就是实现上述逻辑的方法,代码如下:
private int addCallsForCurrentReplica( BoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl) { RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable); outstandingCallables.add(currentScannerCallable); cs.submit(retryingOnReplica); return 1; }它将currentScannerCallable封装出成为一个RetryingRPC对象,提交到cs中执行,并添加到数据结构outstandingCallables中。
我们知道,这个currentScannerCallable对象是ScannerCallable类型的,它被线程池调度执行时,依靠call()方法完成业务逻辑,call方法定义如下:
@Override public Result [] call(int callTimeout) throws IOException { if (Thread.interrupted()) { throw new InterruptedIOException(); } if (closed) { if (scannerId != -1) { close(); } } else { if (scannerId == -1L) { this.scannerId = openScanner(); } else { Result [] rrs = null; ScanRequest request = null; try { incRPCcallsMetrics(); request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq); ScanResponse response = null; PayloadCarryingRpcController controller = controllerFactory.newController(); controller.setPriority(getTableName()); controller.setCallTimeout(callTimeout); try { response = getStub().scan(controller, request); // Client and RS maintain a nextCallSeq number during the scan. Every next() call // from client to server will increment this number in both sides. Client passes this // number along with the request and at RS side both the incoming nextCallSeq and its // nextCallSeq will be matched. In case of a timeout this increment at the client side // should not happen. If at the server side fetching of next batch of data was over, // there will be mismatch in the nextCallSeq number. Server will throw // OutOfOrderScannerNextException and then client will reopen the scanner with startrow // as the last successfully retrieved row. // See HBASE-5974 nextCallSeq++; long timestamp = System.currentTimeMillis(); // Results are returned via controller CellScanner cellScanner = controller.cellScanner(); rrs = ResponseConverter.getResults(cellScanner, response); if (logScannerActivity) { long now = System.currentTimeMillis(); if (now - timestamp > logCutOffLatency) { int rows = rrs == null ? 0 : rrs.length; LOG.info("Took " + (now-timestamp) + "ms to fetch " + rows + " rows from scanner=" + scannerId); } } // moreResults is only used for the case where a filter exhausts all elements if (response.hasMoreResults() && !response.getMoreResults()) { scannerId = -1L; closed = true; // Implied that no results were returned back, either. return null; } // moreResultsInRegion explicitly defines when a RS may choose to terminate a batch due // to size or quantity of results in the response. if (response.hasMoreResultsInRegion()) { // Set what the RS said setHasMoreResultsContext(true); setServerHasMoreResults(response.getMoreResultsInRegion()); } else { // Server didn't respond whether it has more results or not. setHasMoreResultsContext(false); } } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } updateResultsMetrics(rrs); } catch (IOException e) { if (logScannerActivity) { LOG.info("Got exception making request " + TextFormat.shortDebugString(request) + " to " + getLocation(), e); } IOException ioe = e; if (e instanceof RemoteException) { ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e); } if (logScannerActivity && (ioe instanceof UnknownScannerException)) { try { HRegionLocation location = getConnection().relocateRegion(getTableName(), scan.getStartRow()); LOG.info("Scanner=" + scannerId + " expired, current region location is " + location.toString()); } catch (Throwable t) { LOG.info("Failed to relocate region", t); } } // The below convertion of exceptions into DoNotRetryExceptions is a little strange. // Why not just have these exceptions implment DNRIOE you ask? Well, usually we want // ServerCallable#withRetries to just retry when it gets these exceptions. In here in // a scan when doing a next in particular, we want to break out and get the scanner to // reset itself up again. Throwing a DNRIOE is how we signal this to happen (its ugly, // yeah and hard to follow and in need of a refactor). if (ioe instanceof NotServingRegionException) { // Throw a DNRE so that we break out of cycle of calling NSRE // when what we need is to open scanner against new location. // Attach NSRE to signal client that it needs to re-setup scanner. if (this.scanMetrics != null) { this.scanMetrics.countOfNSRE.incrementAndGet(); } throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe); } else if (ioe instanceof RegionServerStoppedException) { // Throw a DNRE so that we break out of cycle of the retries and instead go and // open scanner against new location. throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe); } else { // The outer layers will retry throw ioe; } } return rrs; } } return null; }进入该方法时,closed由于为false,所以执行else分支,并且,由于scannerId未被赋值,所以它最终执行的是openScanner()方法,打开一个Scanner,并将ID赋值给scannerId。
接下来,cs.poll()方法的调用,则是从cs的BlockingQueue<Future<V>>类型的completed中获取任务完成情况l。
然后,调用updateCurrentlyServingReplica()方法,
至此,整个Table.getScanner()方法分析完毕。总结起来,它主要完成了ResultScanner的初始化工作,并未真正请求数据。同时,它还做了以下几件事情:
1、生成一个实现了ResultScanner接口的对象,一般为ClientScanner(其他三个类型以后再分析);
2、ClientScanner对象中callable被赋值,
接下来,我们继续分析之后的工作。在完成了ResultScanner的初始化后,数据是如何获取的呢?
我们知道,ResultScanner继承自Iterable接口,那么在其实现ClientScanner的抽象父类AbstractClientScanner类中,定义了iterator()方法的实现,主要是通过hasNext()和next()方法来完成遍历的。那我们就来看下ClientScanner的next()方法,代码如下:
@Override public Result next() throws IOException { // If the scanner is closed and there's nothing left in the cache, next is a no-op. if (cache.size() == 0 && this.closed) { return null; } if (cache.size() == 0) { loadCache(); } if (cache.size() > 0) { return cache.poll(); } // if we exhausted this scanner before calling close, write out the scan metrics writeScanMetrics(); return null; }非常简单,如果scanner的closed为true,并且cache没有数据,则直接返回null,如果closed为false,并且cache没有数据,那么通过loadCache()方法加载数据,然后其他的情况是,只要cache有数据,则直接返回。
在这里,我们就能知道,scan实际上并不是把数据直接全部加载到客户端的,而是在用到的时候才去取。这么做有什么好处呢?
1、避免全部数据尤其是大数据量的情况下全部缓存在客户端,造成客户端的压力;
2、如果等到全部数据都获取后才可以用,客户端的IO、集群网络IO会在数据获取阶段居高不下,延迟较高,而且不如这种类似懒加载的机制,数据边用边效果好的多。
我们继续分析loadCache()方法。
/** * Contact the servers to load more {@link Result}s in the cache. */ protected void loadCache() throws IOException { Result[] values = null; long remainingResultSize = maxScannerResultSize; int countdown = this.caching; // We need to reset it if it's a new callable that was created // with a countdown in nextScanner callable.setCaching(this.caching); // This flag is set when we want to skip the result returned. We do // this when we reset scanner because it split under us. boolean skipFirst = false; boolean retryAfterOutOfOrderException = true; // We don't expect that the server will have more results for us if // it doesn't tell us otherwise. We rely on the size or count of results boolean serverHasMoreResults = false; do { try { if (skipFirst) { // Skip only the first row (which was the last row of the last // already-processed batch). callable.setCaching(1); values = call(scan, callable, caller, scannerTimeout); // When the replica switch happens, we need to do certain operations // again. The scannercallable will openScanner with the right startkey // but we need to pick up from there. Bypass the rest of the loop // and let the catch-up happen in the beginning of the loop as it // happens for the cases where we see exceptions. Since only openScanner // would have happened, values would be null if (values == null && callable.switchedToADifferentReplica()) { if (this.lastResult != null) { //only skip if there was something read earlier skipFirst = true; } this.currentRegion = callable.getHRegionInfo(); continue; } callable.setCaching(this.caching); skipFirst = false; } // Server returns a null values if scanning is to stop. Else, // returns an empty array if scanning is to go on and we've just // exhausted current region. values = call(scan, callable, caller, scannerTimeout); if (skipFirst && values != null && values.length == 1) { skipFirst = false; // Already skipped, unset it before scanning again values = call(scan, callable, caller, scannerTimeout); } // When the replica switch happens, we need to do certain operations // again. The callable will openScanner with the right startkey // but we need to pick up from there. Bypass the rest of the loop // and let the catch-up happen in the beginning of the loop as it // happens for the cases where we see exceptions. Since only openScanner // would have happened, values would be null if (values == null && callable.switchedToADifferentReplica()) { if (this.lastResult != null) { //only skip if there was something read earlier skipFirst = true; } this.currentRegion = callable.getHRegionInfo(); continue; } retryAfterOutOfOrderException = true; } catch (DoNotRetryIOException e) { // DNRIOEs are thrown to make us break out of retries. Some types of DNRIOEs want us // to reset the scanner and come back in again. if (e instanceof UnknownScannerException) { long timeout = lastNext + scannerTimeout; // If we are over the timeout, throw this exception to the client wrapped in // a ScannerTimeoutException. Else, it's because the region moved and we used the old // id against the new region server; reset the scanner. if (timeout < System.currentTimeMillis()) { LOG.info("For hints related to the following exception, please try taking a look at: " + "https://hbase.apache.org/book.html#trouble.client.scantimeout"); long elapsed = System.currentTimeMillis() - lastNext; ScannerTimeoutException ex = new ScannerTimeoutException(elapsed + "ms passed since the last invocation, " + "timeout is currently set to " + scannerTimeout); ex.initCause(e); throw ex; } } else { // If exception is any but the list below throw it back to the client; else setup // the scanner and retry. Throwable cause = e.getCause(); if ((cause != null && cause instanceof NotServingRegionException) || (cause != null && cause instanceof RegionServerStoppedException) || e instanceof OutOfOrderScannerNextException) { // Pass // It is easier writing the if loop test as list of what is allowed rather than // as a list of what is not allowed... so if in here, it means we do not throw. } else { throw e; } } // Else, its signal from depths of ScannerCallable that we need to reset the scanner. if (this.lastResult != null) { // The region has moved. We need to open a brand new scanner at // the new location. // Reset the startRow to the row we've seen last so that the new // scanner starts at the correct row. Otherwise we may see previously // returned rows again. // (ScannerCallable by now has "relocated" the correct region) this.scan.setStartRow(this.lastResult.getRow()); // Skip first row returned. We already let it out on previous // invocation. skipFirst = true; } if (e instanceof OutOfOrderScannerNextException) { if (retryAfterOutOfOrderException) { retryAfterOutOfOrderException = false; } else { // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE? throw new DoNotRetryIOException("Failed after retry of " + "OutOfOrderScannerNextException: was there a rpc timeout?", e); } } // Clear region. this.currentRegion = null; // Set this to zero so we don't try and do an rpc and close on remote server when // the exception we got was UnknownScanner or the Server is going down. callable = null; // This continue will take us to while at end of loop where we will set up new scanner. continue; } // 取当前时间 long currentTime = System.currentTimeMillis(); // 更新指标信息 if (this.scanMetrics != null) { this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - lastNext); } // 更新lastNext lastNext = currentTime; // 遍历values,将Result添加到cache中,并累减remainingResultSize、countdown,记录lastResult if (values != null && values.length > 0) { for (Result rs : values) { cache.add(rs); // We don't make Iterator here for (Cell cell : rs.rawCells()) { remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell); } countdown--; this.lastResult = rs; } } // We expect that the server won't have more results for us when we exhaust // the size (bytes or count) of the results returned. If the server *does* inform us that // there are more results, we want to avoid possiblyNextScanner(...). Only when we actually // get results is the moreResults context valid. if (null != values && values.length > 0 && callable.hasMoreResultsContext()) { // Only adhere to more server results when we don't have any partialResults // as it keeps the outer loop logic the same. serverHasMoreResults = callable.getServerHasMoreResults(); } // Values == null means server-side filter has determined we must STOP // !partialResults.isEmpty() means that we are still accumulating partial Results for a // row. We should not change scanners before we receive all the partial Results for that // row. } while (remainingResultSize > 0 && countdown > 0 && !serverHasMoreResults && possiblyNextScanner(countdown, values == null)); // 此处countdown为剩余行数, }首先,初始化两个变量,remainingResultSize和countdown,这两个变量的含义分别是一次RPC调用获取的数据总大小和总行数,当然,如果remainingResultSize减去已整数行数据大小,比一行数据小,且还有数据要获取,总行数又低于countdown的话,那么我们还是要获取完整的一行数据的,这两个变量随着数据的获取,不断的递减,并参与到是否应该获取数据的逻辑判断,相应代码如下:
// We don't make Iterator here for (Cell cell : rs.rawCells()) { remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell); } countdown--;
while (remainingResultSize > 0 && countdown > 0 && !serverHasMoreResults && possiblyNextScanner(countdown, values == null));接下来,进入一个do-while循环,因为skipFirst初始值为false,所以,第一次会直接调用call()方法,结果又到了执行ScannerCallableWithReplicas的call()方法。我们再次分析,同样是先定位Region,获取Region的位置信息,并执行currentScannerCallable即ScannerCallable的call()方法,只不过这次调用,因为有了scannerId,而实际调用RegionServer上的RPC服务,真正去获取数据。代码如下:
Result [] rrs = null; ScanRequest request = null; try { incRPCcallsMetrics(); request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq); ScanResponse response = null; PayloadCarryingRpcController controller = controllerFactory.newController(); controller.setPriority(getTableName()); controller.setCallTimeout(callTimeout); try { response = getStub().scan(controller, request); // Client and RS maintain a nextCallSeq number during the scan. Every next() call // from client to server will increment this number in both sides. Client passes this // number along with the request and at RS side both the incoming nextCallSeq and its // nextCallSeq will be matched. In case of a timeout this increment at the client side // should not happen. If at the server side fetching of next batch of data was over, // there will be mismatch in the nextCallSeq number. Server will throw // OutOfOrderScannerNextException and then client will reopen the scanner with startrow // as the last successfully retrieved row. // See HBASE-5974 nextCallSeq++; long timestamp = System.currentTimeMillis(); // Results are returned via controller CellScanner cellScanner = controller.cellScanner(); rrs = ResponseConverter.getResults(cellScanner, response); if (logScannerActivity) { long now = System.currentTimeMillis(); if (now - timestamp > logCutOffLatency) { int rows = rrs == null ? 0 : rrs.length; LOG.info("Took " + (now-timestamp) + "ms to fetch " + rows + " rows from scanner=" + scannerId); } } // moreResults is only used for the case where a filter exhausts all elements if (response.hasMoreResults() && !response.getMoreResults()) { scannerId = -1L; closed = true; // Implied that no results were returned back, either. return null; } // moreResultsInRegion explicitly defines when a RS may choose to terminate a batch due // to size or quantity of results in the response. if (response.hasMoreResultsInRegion()) { // Set what the RS said setHasMoreResultsContext(true); setServerHasMoreResults(response.getMoreResultsInRegion()); } else { // Server didn't respond whether it has more results or not. setHasMoreResultsContext(false); } } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } updateResultsMetrics(rrs); } catch (IOException e) { if (logScannerActivity) { LOG.info("Got exception making request " + TextFormat.shortDebugString(request) + " to " + getLocation(), e); } IOException ioe = e; if (e instanceof RemoteException) { ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e); } if (logScannerActivity && (ioe instanceof UnknownScannerException)) { try { HRegionLocation location = getConnection().relocateRegion(getTableName(), scan.getStartRow()); LOG.info("Scanner=" + scannerId + " expired, current region location is " + location.toString()); } catch (Throwable t) { LOG.info("Failed to relocate region", t); } } // The below convertion of exceptions into DoNotRetryExceptions is a little strange. // Why not just have these exceptions implment DNRIOE you ask? Well, usually we want // ServerCallable#withRetries to just retry when it gets these exceptions. In here in // a scan when doing a next in particular, we want to break out and get the scanner to // reset itself up again. Throwing a DNRIOE is how we signal this to happen (its ugly, // yeah and hard to follow and in need of a refactor). if (ioe instanceof NotServingRegionException) { // Throw a DNRE so that we break out of cycle of calling NSRE // when what we need is to open scanner against new location. // Attach NSRE to signal client that it needs to re-setup scanner. if (this.scanMetrics != null) { this.scanMetrics.countOfNSRE.incrementAndGet(); } throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe); } else if (ioe instanceof RegionServerStoppedException) { // Throw a DNRE so that we break out of cycle of the retries and instead go and // open scanner against new location. throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe); } else { // The outer layers will retry throw ioe; } } return rrs;这段代码的最后,如果不能获取到数据,则将scannerId恢复为-1,并将closed设置为true,以便下次调用时,将上次的callable关闭,并重新生成一个scanner。
后续的逻辑是,如果scanner还有需要获取的数据,重新定位scan的startRow,发送RPC请求至RegionSever,继续获取该Region上的数据,否则,关闭上一个callable,并重新生成一个scanner和callable,并重复以前的逻辑,继续获取数据,直到检索到endRow,或者比endRow大的数据。
还有很多细节和特殊情况没有分析,留待以后慢慢分析吧~~