HBase的scan源码分析客户端部分之整体流程(一)

        scan的调用代码示例如下:

    // 创建HBase配置config
		Configuration config = HBaseConfiguration.create();
		config.set("hbase.zookeeper.quorum", "192.168.1.226");// zookeeper部署的服务器IP
		config.set("hbase.zookeeper.property.clientPort", "2181");// zookeeper允许连接的客户端端口号

		// 定义HBase连接
		Connection connection = null;

		try {
			// 获取HBase数据库连接
			connection = ConnectionFactory.createConnection(config);

			// 输出连接建立结果
			while (!connection.isClosed()) {
				break;
			}

			// 获取HBase数据库表
			Table table = connection.getTable(TableName.valueOf("test_table"));

			// 构造Scan实例
			Scan scan = new Scan();
			scan.addColumn(Bytes.toBytes("family1"), Bytes.toBytes("cloumn1"));

			// 获取查询结果
			ResultScanner result = table.getScanner(scan);

			// 解析查询结果
			for (Result r : result) {

				// 此处为处理Result的代码
				byte[] row = r.getRow();
				if(row.length == 0){
					//...
				}
			}

			result.close();
			table.close();

			table = null;
			scan = null;
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			if (connection != null) {
				try {
					connection.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
        下面,我们对scan的整个流程进行分析。

        首先从Table的getScanner(Scan scan)方法入手,它的定义如下:

/**
   * Returns a scanner on the current table as specified by the {@link Scan}
   * object.
   * Note that the passed {@link Scan}'s start row and caching properties
   * maybe changed.
   *
   * @param scan A configured {@link Scan} object.
   * @return A scanner.
   * @throws IOException if a remote or network exception occurs.
   * @since 0.20.0
   */
  ResultScanner getScanner(Scan scan) throws IOException;
        它的实现是由HTable来完成的,源码如下:

/**
   * The underlying {@link HTable} must not be closed.
   * {@link HTableInterface#getScanner(Scan)} has other usage details.
   * 
   * HBase中scan的入口方法
   */
  @Override
  public ResultScanner getScanner(final Scan scan) throws IOException {
    // small scan不可以设置batch
	if (scan.getBatch() > 0 && scan.isSmall()) {
      throw new IllegalArgumentException("Small scan should not be used with batching");
    }

	// 设置caching
	// 取参数“hbase.client.scanner.caching”,如果参数未配置,则默认为100
    if (scan.getCaching() <= 0) {
      scan.setCaching(getScannerCaching());
    }
    
    // 取参数“HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY”,如果参数未配置,则默认为2 * 1024 * 1024
    if (scan.getMaxResultSize() <= 0) {
      scan.setMaxResultSize(scannerMaxResultSize);
    }

    /**
     * scan总共分为四种类型:
     * 1、reversed、small--ClientSmallReversedScanner
     * 2、reversed、big--ReversedClientScanner
     * 3、notReversed、small--ClientSmallScanner
     * 4、notReversed、big--ClientScanner
     */
    if (scan.isReversed()) {// 反向扫描 
      if (scan.isSmall()) {
        return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
            this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
            pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
      } else {
        return new ReversedClientScanner(getConfiguration(), scan, getName(),
            this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
            pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
      }
    }

    if (scan.isSmall()) {
      return new ClientSmallScanner(getConfiguration(), scan, getName(),
          this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
          pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
    } else {
      return new ClientScanner(getConfiguration(), scan, getName(), this.connection,
          this.rpcCallerFactory, this.rpcControllerFactory,
          pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
    }
  }
      这里,我们先只研究ClientScanner,其他三种以后再说。

/**
   * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start
   * row maybe changed changed.
   * @param conf The {@link Configuration} to use.
   * @param scan {@link Scan} to use in this scanner
   * @param tableName The table that we wish to scan
   * @param connection Connection identifying the cluster
   * @throws IOException
   */
  public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName,
      ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
      RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) throws IOException {
      
	  if (LOG.isTraceEnabled()) {
        LOG.trace("Scan table=" + tableName
            + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
      }
      
      // 设置scan、tableName等成员变量
      this.scan = scan;
      this.tableName = tableName;
      this.lastNext = System.currentTimeMillis();
      this.connection = connection;
      this.pool = pool;
      this.primaryOperationTimeout = primaryOperationTimeout;
      // 重试次数,取参数“hbase.client.retries.number”,如果参数未配置,则默认为31
      this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
          HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
      if (scan.getMaxResultSize() > 0) {
        this.maxScannerResultSize = scan.getMaxResultSize();
      } else {
        this.maxScannerResultSize = conf.getLong(
          HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
          HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
      }
      
      // scanner超时时间
      this.scannerTimeout = HBaseConfiguration.getInt(conf,
        HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
        HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
        HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);

      // check if application wants to collect scan metrics
      initScanMetrics(scan);

      // Use the caching from the Scan.  If not set, use the default cache setting for this table.
      // 处理caching
      if (this.scan.getCaching() > 0) {
        this.caching = this.scan.getCaching();
      } else {
        this.caching = conf.getInt(
            HConstants.HBASE_CLIENT_SCANNER_CACHING,
            HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
      }

      // 初始化caller
      // caller为RpcRetryingCaller类型
      this.caller = rpcFactory.<Result[]> newCaller();
      // rpcControllerFactory为RpcControllerFactory类型
      this.rpcControllerFactory = controllerFactory;

      this.conf = conf;
      // 初始化scanner
      initializeScannerInConstruction();
    }
        ClientScanner的构造方法中,首先是对各种成员变量赋值,比如scan、tableName、connection等,然后是处理maxScannerResultSize、scannerTimeout、caching等scan需要用到的各种参数,这些都没有什么好说的。

        接下来,是初始化两个重要的变量caller和rpcControllerFactory,caller为RpcRetryingCaller类型的,rpcControllerFactory为RpcControllerFactory类型的。

        最后调用initializeScannerInConstruction()方法,ok,我们也跟着继续。

protected void initializeScannerInConstruction() throws IOException{
      // initialize the scanner
      // 初始化scanner
      nextScanner(this.caching, false);
    }
       紧接着,调用nextScanner()方法,注意,传入两个参数,一个是ClientScanner对象生成时的caching,另外一个是false。

        这个caching,如果在构造Scan对象时没有设置,则取参数hbase.client.scanner.caching配置的值,参数未配置则默认为100,它的含义是每次RPC请求的最大行数。

        继续追踪nextScanner()方法,完整的代码如下:

/*
     * Gets a scanner for the next region.  If this.currentRegion != null, then
     * we will move to the endrow of this.currentRegion.  Else we will get
     * scanner at the scan.getStartRow().  We will go no further, just tidy
     * up outstanding scanners, if <code>currentRegion != null</code> and
     * <code>done</code> is true.
     * @param nbRows
     * @param done Server-side says we're done scanning.
     */
  protected boolean nextScanner(int nbRows, final boolean done)
    throws IOException {
      // Close the previous scanner if it's open
	  // 关闭之前打开的scanner
	  // 第一次调用时,callable因为没有初始化,所以肯定是空的,为null,此处会跳过
	  // 什么时候callable被赋值,而什么时候callable又被清空呢?
	  
	  // 关闭上一个callable
      if (this.callable != null) {
    	// 调用setClose()方法将callable中的currentScannerCallable的closed设置为true
    	  
    	// 将ScannerCallableWithReplicas类型的callable中ScannerCallable类型的成员变量中的closed设置为true
        this.callable.setClose();
        
        // 调用call()方法,发起一次请求,此时callable不为空,且其currentScannerCallable中closed为true
        // 最终调用其currentScannerCallable的call()方法,因为closed为true,只执行其close()方法
        call(scan, callable, caller, scannerTimeout);
        // 将callable设置为null,下次就不会执行该模块了
        this.callable = null;
      }

      // Where to start the next scanner
      // 从哪里开始下一个scanner
      byte [] localStartKey;

      // if we're at end of table, close and return false to stop iterating
      // 如果我们处于表的末尾,关闭并返回false,以停止此次迭代
      if (this.currentRegion != null) {
        byte [] endKey = this.currentRegion.getEndKey();
        // done,或者endKey为空,或者endKey>=stopRow
        if (endKey == null ||
            Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
            checkScanStopRow(endKey) ||
            done) {
          // 关闭
          close();
          if (LOG.isTraceEnabled()) {
            LOG.trace("Finished " + this.currentRegion);
          }
          // 返回false
          return false;
        }
        
        // localStartKey设置为当前Region的endKey
        localStartKey = endKey;
        if (LOG.isTraceEnabled()) {
          LOG.trace("Finished " + this.currentRegion);
        }
      } else {
        localStartKey = this.scan.getStartRow();
      }

      if (LOG.isDebugEnabled() && this.currentRegion != null) {
        // Only worth logging if NOT first region in scan.
        LOG.debug("Advancing internal scanner to startKey at '" +
          Bytes.toStringBinary(localStartKey) + "'");
      }
      try {
    	// callable为ScannerCallableWithReplicas类型的
    	// 获取一个新的callable
        callable = getScannerCallable(localStartKey, nbRows);
        // Open a scanner on the region server starting at the
        // beginning of the region
        // 在localStartKey所在Region的RegionServer上打开一个scanner
        call(scan, callable, caller, scannerTimeout);
        // 设置currentRegion
        this.currentRegion = callable.getHRegionInfo();
        if (this.scanMetrics != null) {
          this.scanMetrics.countOfRegions.incrementAndGet();
        }
      } catch (IOException e) {
        close();
        throw e;
      }
      return true;
    }
        我们一步步分析,由于ClientScanner在构造时,并没有初始化callable成员变量,所以它必定为null,第一部分代码略过,我们以后再讲。

        接下来,由于currentRegion也没有被初始化,所以,程序走的是else分支,也就是,将localStartKey设置为scan的startRow。

localStartKey = this.scan.getStartRow();
        紧接着,调用getScannerCallable()方法为成员变量callable赋值,入参为行localStartKey和行数nbRows,这个nbRows即为ClientScanner构造时的caching值。方法定义如下:

@InterfaceAudience.Private
    protected ScannerCallableWithReplicas getScannerCallable(byte [] localStartKey,
        int nbRows) {
      // 设置scan的startRow
      scan.setStartRow(localStartKey);
      
      // 构造ScannerCallable类型的对象s,将其作为ScannerCallableWithReplicas类型的sr的成员变量currentScannerCallable
      ScannerCallable s =
          new ScannerCallable(getConnection(), getTable(), scan, this.scanMetrics,
              this.rpcControllerFactory);
      // 设置caching
      s.setCaching(nbRows);
      
      // 构造ScannerCallableWithReplicas类型的对象sr,其成员变量currentScannerCallable为上面的ScannerCallable类型的对象s
      ScannerCallableWithReplicas sr = new ScannerCallableWithReplicas(tableName, getConnection(),
       s, pool, primaryOperationTimeout, scan,
       retries, scannerTimeout, caching, conf, caller);
      return sr;
    }
        首先,把localStartKey设置为scan的startRow,后续每次迭代处理时,我们就能知道scan的起始行。

        然后,生成一个ScannerCallable类型的对象s,这个s是要作为ScannerCallableWithReplicas类型的sr的成员变量currentScannerCallable来使用的,实际与Region所在RegionServer通信的正是这个对象,并且,这个ScannerCallable对象中有一个byte[]类型的row成员变量,它会被初始化为scan的startRow,被用来进行Region的定位和其行号的定位。

        构造ScannerCallableWithReplicas类型的对象sr并返回,其成员变量currentScannerCallable为上面的ScannerCallable类型的对象s。

        getScannerCallable()方法执行完后,紧接着会调用ClientScanner的call()方法,代码如下:

Result[] call(Scan scan, ScannerCallableWithReplicas callable,
      RpcRetryingCaller<Result[]> caller, int scannerTimeout)
      throws IOException, RuntimeException {
    if (Thread.interrupted()) {
      throw new InterruptedIOException();
    }
    // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas,
    // we do a callWithRetries
    // caller为RpcRetryingCaller类型
    // callable为ScannerCallableWithReplicas类型
    return caller.callWithoutRetries(callable, scannerTimeout);
  }
        之前我们已知道,caller为RpcRetryingCaller类型,它的方法定义如下:

/**
   * Call the server once only.
   * 仅仅调用server一次
   * {@link RetryingCallable} has a strange shape so we can do retrys.  Use this invocation if you
   * want to do a single call only (A call to {@link RetryingCallable#call(int)} will not likely
   * succeed).
   * @return an object of type T
   * @throws IOException if a remote or network exception occurs
   * @throws RuntimeException other unspecified error
   */
  public T callWithoutRetries(RetryingCallable<T> callable, int callTimeout)
  throws IOException, RuntimeException {
    // The code of this method should be shared with withRetries.
    this.globalStartTime = EnvironmentEdgeManager.currentTime();
    try {
      // 先调用prepare()方法,再调用call()方法,超时时间为callTimeout
      callable.prepare(false);
      return callable.call(callTimeout);
    } catch (Throwable t) {
      Throwable t2 = translateException(t);
      ExceptionUtil.rethrowIfInterrupt(t2);
      // It would be nice to clear the location cache here.
      if (t2 instanceof IOException) {
        throw (IOException)t2;
      } else {
        throw new RuntimeException(t2);
      }
    }
  }
        由代码可以看出,它实际上的处理流程为先调用callable的prepare()方法,再调用callable的call()方法。

        接下来,我们转入callable的分析,这个callable为ScannerCallableWithReplicas类型,它的prepare()方法为空,我们重点分析call()方法,代码如下:

@Override
  public Result [] call(int timeout) throws IOException {
    // If the active replica callable was closed somewhere, invoke the RPC to
    // really close it. In the case of regular scanners, this applies. We make couple
    // of RPCs to a RegionServer, and when that region is exhausted, we set
    // the closed flag. Then an RPC is required to actually close the scanner.
	 
	// 第一次调用时,该段代码会被忽略
    if (currentScannerCallable != null && currentScannerCallable.closed) {
      // For closing we target that exact scanner (and not do replica fallback like in
      // the case of normal reads)
      if (LOG.isTraceEnabled()) {
        LOG.trace("Closing scanner id=" + currentScannerCallable.scannerId);
      }
      
      // 最终调用close()方法
      Result[] r = currentScannerCallable.call(timeout);
      currentScannerCallable = null;
      return r;
    }
    // We need to do the following:
    //1. When a scan goes out to a certain replica (default or not), we need to
    //   continue to hit that until there is a failure. So store the last successfully invoked
    //   replica
    //2. We should close the "losing" scanners (scanners other than the ones we hear back
    //   from first)
    // 我们需要做以下事情:
    //1、当一个scan扫描到一个特定的副本(无论默认与否)。我们需要继续执行,直到有一个错误。然后存储上一个成功的副本。
    //2、我们应该关闭这个losing scanners
    
    // 根据scan的startRow获取Region位置,使用cache
    RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true,
        RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,
        currentScannerCallable.getRow());

    // allocate a boundedcompletion pool of some multiple of number of replicas.
    // We want to accomodate some RPCs for redundant replica scans (but are still in progress)
    // 分配一个有界操作的包含一些副本的池
    // 构造一个BoundedCompletionService类型的数据结构cs
    // cs中包含需要执行的task、已经完成的task和线程池executor,并提供了submit、poll、take等方法
    BoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
        new BoundedCompletionService<Pair<Result[], ScannerCallable>>(pool, rl.size() * 5);

    List<ExecutionException> exceptions = null;
    int submitted = 0, completed = 0;
    AtomicBoolean done = new AtomicBoolean(false);
    replicaSwitched.set(false);
    // submit call for the primary replica.
    // 提交请求至the primary replica
    
    // 提交task,并添加到outstandingCallables中
    // task为用this.currentScannerCallable封装成的RetryingRPC对象
    submitted += addCallsForCurrentReplica(cs, rl);
    try {
      // wait for the timeout to see whether the primary responds back
      // 从cs的BlockingQueue<Future<V>>类型的completed中获取任务完成情况
      Future<Pair<Result[], ScannerCallable>> f = cs.poll(timeBeforeReplicas,
          TimeUnit.MICROSECONDS); // Yes, microseconds
      if (f != null) {
        Pair<Result[], ScannerCallable> r = f.get();
        if (r != null && r.getSecond() != null) {
          updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
        }
        return r == null ? null : r.getFirst(); //great we got a response
      }
    } catch (ExecutionException e) {
      // the primary call failed with RetriesExhaustedException or DoNotRetryIOException
      // but the secondaries might still succeed. Continue on the replica RPCs.
      exceptions = new ArrayList<ExecutionException>(rl.size());
      exceptions.add(e);
      completed++;
    } catch (CancellationException e) {
      throw new InterruptedIOException(e.getMessage());
    } catch (InterruptedException e) {
      throw new InterruptedIOException(e.getMessage());
    }
    // submit call for the all of the secondaries at once
    // TODO: this may be an overkill for large region replication
    submitted += addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1);
    try {
      while (completed < submitted) {
        try {
          Future<Pair<Result[], ScannerCallable>> f = cs.take();
          Pair<Result[], ScannerCallable> r = f.get();
          if (r != null && r.getSecond() != null) {
            updateCurrentlyServingReplica(r.getSecond(), r.getFirst(), done, pool);
          }
          return r == null ? null : r.getFirst(); // great we got an answer
        } catch (ExecutionException e) {
          // if not cancel or interrupt, wait until all RPC's are done
          // one of the tasks failed. Save the exception for later.
          if (exceptions == null) exceptions = new ArrayList<ExecutionException>(rl.size());
          exceptions.add(e);
          completed++;
        }
      }
    } catch (CancellationException e) {
      throw new InterruptedIOException(e.getMessage());
    } catch (InterruptedException e) {
      throw new InterruptedIOException(e.getMessage());
    } finally {
      // We get there because we were interrupted or because one or more of the
      // calls succeeded or failed. In all case, we stop all our tasks.
      cs.cancelAll(true);
    }

    if (exceptions != null && !exceptions.isEmpty()) {
      RpcRetryingCallerWithReadReplicas.throwEnrichedException(exceptions.get(0),
          retries); // just rethrow the first exception for now.
    }
    return null; // unreachable
  }
        又是一个比较长的方法,我们还是一步步分析。

        首先,currentScannerCallable虽然已被初始化,但是它的closed还是为false,那么方法的第一块代码会被跳过。

        接下来,调用RpcRetryingCallerWithReadReplicas的getRegionLocations()方法,利用cConnection、tableName和currentScannerCallable.getRow()定位Region,得到Region的位置RegionLocations类型的rl。

        然后,构造一个BoundedCompletionService类型的数据结构cs,cs中包含需要执行的task、已经完成的task和线程池executor,并提供了submit、poll、take等方法。依靠它,利用线程池完成任务的调度与执行,并同步获取执行结果。

        addCallsForCurrentReplica方法就是实现上述逻辑的方法,代码如下:

private int addCallsForCurrentReplica(
      BoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl) {
    RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);
    outstandingCallables.add(currentScannerCallable);
    cs.submit(retryingOnReplica);
    return 1;
  }
        它将currentScannerCallable封装出成为一个RetryingRPC对象,提交到cs中执行,并添加到数据结构outstandingCallables中。

        我们知道,这个currentScannerCallable对象是ScannerCallable类型的,它被线程池调度执行时,依靠call()方法完成业务逻辑,call方法定义如下:

@Override
  public Result [] call(int callTimeout) throws IOException {
    if (Thread.interrupted()) {
      throw new InterruptedIOException();
    }
    if (closed) {
      if (scannerId != -1) {
        close();
      }
    } else {
      if (scannerId == -1L) {
        this.scannerId = openScanner();
      } else {
        Result [] rrs = null;
        ScanRequest request = null;
        try {
          incRPCcallsMetrics();
          request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
          ScanResponse response = null;
          PayloadCarryingRpcController controller = controllerFactory.newController();
          controller.setPriority(getTableName());
          controller.setCallTimeout(callTimeout);
          try {
            response = getStub().scan(controller, request);
            // Client and RS maintain a nextCallSeq number during the scan. Every next() call
            // from client to server will increment this number in both sides. Client passes this
            // number along with the request and at RS side both the incoming nextCallSeq and its
            // nextCallSeq will be matched. In case of a timeout this increment at the client side
            // should not happen. If at the server side fetching of next batch of data was over,
            // there will be mismatch in the nextCallSeq number. Server will throw
            // OutOfOrderScannerNextException and then client will reopen the scanner with startrow
            // as the last successfully retrieved row.
            // See HBASE-5974
            nextCallSeq++;
            long timestamp = System.currentTimeMillis();
            // Results are returned via controller
            CellScanner cellScanner = controller.cellScanner();
            rrs = ResponseConverter.getResults(cellScanner, response);
            if (logScannerActivity) {
              long now = System.currentTimeMillis();
              if (now - timestamp > logCutOffLatency) {
                int rows = rrs == null ? 0 : rrs.length;
                LOG.info("Took " + (now-timestamp) + "ms to fetch "
                  + rows + " rows from scanner=" + scannerId);
              }
            }
            // moreResults is only used for the case where a filter exhausts all elements
            if (response.hasMoreResults() && !response.getMoreResults()) {
              scannerId = -1L;
              closed = true;
              // Implied that no results were returned back, either.
              return null;
            }
            // moreResultsInRegion explicitly defines when a RS may choose to terminate a batch due
            // to size or quantity of results in the response.
            if (response.hasMoreResultsInRegion()) {
              // Set what the RS said
              setHasMoreResultsContext(true);
              setServerHasMoreResults(response.getMoreResultsInRegion());
            } else {
              // Server didn't respond whether it has more results or not.
              setHasMoreResultsContext(false);
            }
          } catch (ServiceException se) {
            throw ProtobufUtil.getRemoteException(se);
          }
          updateResultsMetrics(rrs);
        } catch (IOException e) {
          if (logScannerActivity) {
            LOG.info("Got exception making request " + TextFormat.shortDebugString(request)
              + " to " + getLocation(), e);
          }
          IOException ioe = e;
          if (e instanceof RemoteException) {
            ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
          }
          if (logScannerActivity && (ioe instanceof UnknownScannerException)) {
            try {
              HRegionLocation location =
                getConnection().relocateRegion(getTableName(), scan.getStartRow());
              LOG.info("Scanner=" + scannerId
                + " expired, current region location is " + location.toString());
            } catch (Throwable t) {
              LOG.info("Failed to relocate region", t);
            }
          }
          // The below convertion of exceptions into DoNotRetryExceptions is a little strange.
          // Why not just have these exceptions implment DNRIOE you ask?  Well, usually we want
          // ServerCallable#withRetries to just retry when it gets these exceptions.  In here in
          // a scan when doing a next in particular, we want to break out and get the scanner to
          // reset itself up again.  Throwing a DNRIOE is how we signal this to happen (its ugly,
          // yeah and hard to follow and in need of a refactor).
          if (ioe instanceof NotServingRegionException) {
            // Throw a DNRE so that we break out of cycle of calling NSRE
            // when what we need is to open scanner against new location.
            // Attach NSRE to signal client that it needs to re-setup scanner.
            if (this.scanMetrics != null) {
              this.scanMetrics.countOfNSRE.incrementAndGet();
            }
            throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
          } else if (ioe instanceof RegionServerStoppedException) {
            // Throw a DNRE so that we break out of cycle of the retries and instead go and
            // open scanner against new location.
            throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
          } else {
            // The outer layers will retry
            throw ioe;
          }
        }
        return rrs;
      }
    }
    return null;
  }
        进入该方法时,closed由于为false,所以执行else分支,并且,由于scannerId未被赋值,所以它最终执行的是openScanner()方法,打开一个Scanner,并将ID赋值给scannerId。

        接下来,cs.poll()方法的调用,则是从cs的BlockingQueue<Future<V>>类型的completed中获取任务完成情况l。

        然后,调用updateCurrentlyServingReplica()方法,

        至此,整个Table.getScanner()方法分析完毕。总结起来,它主要完成了ResultScanner的初始化工作,并未真正请求数据。同时,它还做了以下几件事情:

        1、生成一个实现了ResultScanner接口的对象,一般为ClientScanner(其他三个类型以后再分析);

        2、ClientScanner对象中callable被赋值,

        接下来,我们继续分析之后的工作。在完成了ResultScanner的初始化后,数据是如何获取的呢?

        我们知道,ResultScanner继承自Iterable接口,那么在其实现ClientScanner的抽象父类AbstractClientScanner类中,定义了iterator()方法的实现,主要是通过hasNext()和next()方法来完成遍历的。那我们就来看下ClientScanner的next()方法,代码如下:

@Override
    public Result next() throws IOException {
      // If the scanner is closed and there's nothing left in the cache, next is a no-op.
      if (cache.size() == 0 && this.closed) {
        return null;
      }
      if (cache.size() == 0) {
        loadCache();
      }

      if (cache.size() > 0) {
        return cache.poll();
      }

      // if we exhausted this scanner before calling close, write out the scan metrics
      writeScanMetrics();
      return null;
    }
        非常简单,如果scanner的closed为true,并且cache没有数据,则直接返回null,如果closed为false,并且cache没有数据,那么通过loadCache()方法加载数据,然后其他的情况是,只要cache有数据,则直接返回。

        在这里,我们就能知道,scan实际上并不是把数据直接全部加载到客户端的,而是在用到的时候才去取。这么做有什么好处呢?

        1、避免全部数据尤其是大数据量的情况下全部缓存在客户端,造成客户端的压力;

        2、如果等到全部数据都获取后才可以用,客户端的IO、集群网络IO会在数据获取阶段居高不下,延迟较高,而且不如这种类似懒加载的机制,数据边用边效果好的多。

        我们继续分析loadCache()方法。

/**
   * Contact the servers to load more {@link Result}s in the cache.
   */
  protected void loadCache() throws IOException {
    Result[] values = null;
    long remainingResultSize = maxScannerResultSize;
    int countdown = this.caching;

    // We need to reset it if it's a new callable that was created
    // with a countdown in nextScanner
    callable.setCaching(this.caching);
    // This flag is set when we want to skip the result returned.  We do
    // this when we reset scanner because it split under us.
    boolean skipFirst = false;
    boolean retryAfterOutOfOrderException = true;
    // We don't expect that the server will have more results for us if
    // it doesn't tell us otherwise. We rely on the size or count of results
    boolean serverHasMoreResults = false;
    do {
      try {
        if (skipFirst) {
          // Skip only the first row (which was the last row of the last
          // already-processed batch).
          callable.setCaching(1);
          values = call(scan, callable, caller, scannerTimeout);
          // When the replica switch happens, we need to do certain operations
          // again. The scannercallable will openScanner with the right startkey
          // but we need to pick up from there. Bypass the rest of the loop
          // and let the catch-up happen in the beginning of the loop as it
          // happens for the cases where we see exceptions. Since only openScanner
          // would have happened, values would be null
          if (values == null && callable.switchedToADifferentReplica()) {
            if (this.lastResult != null) { //only skip if there was something read earlier
              skipFirst = true;
            }
            this.currentRegion = callable.getHRegionInfo();
            continue;
          }
          callable.setCaching(this.caching);
          skipFirst = false;
        }
        // Server returns a null values if scanning is to stop. Else,
        // returns an empty array if scanning is to go on and we've just
        // exhausted current region.
        values = call(scan,   callable, caller, scannerTimeout);
        if (skipFirst && values != null && values.length == 1) {
          skipFirst = false; // Already skipped, unset it before scanning again
          values = call(scan, callable, caller, scannerTimeout);
        }
        // When the replica switch happens, we need to do certain operations
        // again. The callable will openScanner with the right startkey
        // but we need to pick up from there. Bypass the rest of the loop
        // and let the catch-up happen in the beginning of the loop as it
        // happens for the cases where we see exceptions. Since only openScanner
        // would have happened, values would be null
        if (values == null && callable.switchedToADifferentReplica()) {
          if (this.lastResult != null) { //only skip if there was something read earlier
            skipFirst = true;
          }
          this.currentRegion = callable.getHRegionInfo();
          continue;
        }
        retryAfterOutOfOrderException = true;
      } catch (DoNotRetryIOException e) {
        // DNRIOEs are thrown to make us break out of retries. Some types of DNRIOEs want us
        // to reset the scanner and come back in again.
        if (e instanceof UnknownScannerException) {
          long timeout = lastNext + scannerTimeout;
          // If we are over the timeout, throw this exception to the client wrapped in
          // a ScannerTimeoutException. Else, it's because the region moved and we used the old
          // id against the new region server; reset the scanner.
          if (timeout < System.currentTimeMillis()) {
            LOG.info("For hints related to the following exception, please try taking a look at: " +
                "https://hbase.apache.org/book.html#trouble.client.scantimeout");
            long elapsed = System.currentTimeMillis() - lastNext;
            ScannerTimeoutException ex =
                new ScannerTimeoutException(elapsed + "ms passed since the last invocation, "
                    + "timeout is currently set to " + scannerTimeout);
            ex.initCause(e);
            throw ex;
          }
        } else {
          // If exception is any but the list below throw it back to the client; else setup
          // the scanner and retry.
          Throwable cause = e.getCause();
          if ((cause != null && cause instanceof NotServingRegionException) ||
              (cause != null && cause instanceof RegionServerStoppedException) ||
              e instanceof OutOfOrderScannerNextException) {
            // Pass
            // It is easier writing the if loop test as list of what is allowed rather than
            // as a list of what is not allowed... so if in here, it means we do not throw.
          } else {
            throw e;
          }
        }
        // Else, its signal from depths of ScannerCallable that we need to reset the scanner.
        if (this.lastResult != null) {
          // The region has moved. We need to open a brand new scanner at
          // the new location.
          // Reset the startRow to the row we've seen last so that the new
          // scanner starts at the correct row. Otherwise we may see previously
          // returned rows again.
          // (ScannerCallable by now has "relocated" the correct region)
          this.scan.setStartRow(this.lastResult.getRow());

          // Skip first row returned.  We already let it out on previous
          // invocation.
          skipFirst = true;
        }
        if (e instanceof OutOfOrderScannerNextException) {
          if (retryAfterOutOfOrderException) {
            retryAfterOutOfOrderException = false;
          } else {
            // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE?
            throw new DoNotRetryIOException("Failed after retry of " +
                "OutOfOrderScannerNextException: was there a rpc timeout?", e);
          }
        }
        // Clear region.
        this.currentRegion = null;
        // Set this to zero so we don't try and do an rpc and close on remote server when
        // the exception we got was UnknownScanner or the Server is going down.
        callable = null;
        // This continue will take us to while at end of loop where we will set up new scanner.
        continue;
      }
      
      // 取当前时间
      long currentTime = System.currentTimeMillis();
      
      // 更新指标信息
      if (this.scanMetrics != null) {
        this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - lastNext);
      }
      
      // 更新lastNext
      lastNext = currentTime;
      
      // 遍历values,将Result添加到cache中,并累减remainingResultSize、countdown,记录lastResult
      if (values != null && values.length > 0) {
        for (Result rs : values) {
          cache.add(rs);
          // We don't make Iterator here
          for (Cell cell : rs.rawCells()) {
            remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
          }
          countdown--;
          this.lastResult = rs;
        }
      }
      // We expect that the server won't have more results for us when we exhaust
      // the size (bytes or count) of the results returned. If the server *does* inform us that
      // there are more results, we want to avoid possiblyNextScanner(...). Only when we actually
      // get results is the moreResults context valid.
      if (null != values && values.length > 0 && callable.hasMoreResultsContext()) {
        // Only adhere to more server results when we don't have any partialResults
        // as it keeps the outer loop logic the same.
        serverHasMoreResults = callable.getServerHasMoreResults();
      }
      // Values == null means server-side filter has determined we must STOP
      // !partialResults.isEmpty() means that we are still accumulating partial Results for a
      // row. We should not change scanners before we receive all the partial Results for that
      // row.
    } while (remainingResultSize > 0 && countdown > 0 && !serverHasMoreResults
        && possiblyNextScanner(countdown, values == null));
    // 此处countdown为剩余行数,
  }
        首先,初始化两个变量,remainingResultSize和countdown,这两个变量的含义分别是一次RPC调用获取的数据总大小和总行数,当然,如果remainingResultSize减去已整数行数据大小,比一行数据小,且还有数据要获取,总行数又低于countdown的话,那么我们还是要获取完整的一行数据的,这两个变量随着数据的获取,不断的递减,并参与到是否应该获取数据的逻辑判断,相应代码如下:

// We don't make Iterator here
          for (Cell cell : rs.rawCells()) {
            remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell);
          }
          countdown--;
while (remainingResultSize > 0 && countdown > 0 && !serverHasMoreResults
        && possiblyNextScanner(countdown, values == null));
        接下来,进入一个do-while循环,因为skipFirst初始值为false,所以,第一次会直接调用call()方法,结果又到了执行ScannerCallableWithReplicas的call()方法。我们再次分析,同样是先定位Region,获取Region的位置信息,并执行currentScannerCallable即ScannerCallable的call()方法,只不过这次调用,因为有了scannerId,而实际调用RegionServer上的RPC服务,真正去获取数据。代码如下:

Result [] rrs = null;
        ScanRequest request = null;
        try {
          incRPCcallsMetrics();
          request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
          ScanResponse response = null;
          PayloadCarryingRpcController controller = controllerFactory.newController();
          controller.setPriority(getTableName());
          controller.setCallTimeout(callTimeout);
          try {
            response = getStub().scan(controller, request);
            // Client and RS maintain a nextCallSeq number during the scan. Every next() call
            // from client to server will increment this number in both sides. Client passes this
            // number along with the request and at RS side both the incoming nextCallSeq and its
            // nextCallSeq will be matched. In case of a timeout this increment at the client side
            // should not happen. If at the server side fetching of next batch of data was over,
            // there will be mismatch in the nextCallSeq number. Server will throw
            // OutOfOrderScannerNextException and then client will reopen the scanner with startrow
            // as the last successfully retrieved row.
            // See HBASE-5974
            nextCallSeq++;
            long timestamp = System.currentTimeMillis();
            // Results are returned via controller
            CellScanner cellScanner = controller.cellScanner();
            rrs = ResponseConverter.getResults(cellScanner, response);
            if (logScannerActivity) {
              long now = System.currentTimeMillis();
              if (now - timestamp > logCutOffLatency) {
                int rows = rrs == null ? 0 : rrs.length;
                LOG.info("Took " + (now-timestamp) + "ms to fetch "
                  + rows + " rows from scanner=" + scannerId);
              }
            }
            // moreResults is only used for the case where a filter exhausts all elements
            if (response.hasMoreResults() && !response.getMoreResults()) {
              scannerId = -1L;
              closed = true;
              // Implied that no results were returned back, either.
              return null;
            }
            // moreResultsInRegion explicitly defines when a RS may choose to terminate a batch due
            // to size or quantity of results in the response.
            if (response.hasMoreResultsInRegion()) {
              // Set what the RS said
              setHasMoreResultsContext(true);
              setServerHasMoreResults(response.getMoreResultsInRegion());
            } else {
              // Server didn't respond whether it has more results or not.
              setHasMoreResultsContext(false);
            }
          } catch (ServiceException se) {
            throw ProtobufUtil.getRemoteException(se);
          }
          updateResultsMetrics(rrs);
        } catch (IOException e) {
          if (logScannerActivity) {
            LOG.info("Got exception making request " + TextFormat.shortDebugString(request)
              + " to " + getLocation(), e);
          }
          IOException ioe = e;
          if (e instanceof RemoteException) {
            ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
          }
          if (logScannerActivity && (ioe instanceof UnknownScannerException)) {
            try {
              HRegionLocation location =
                getConnection().relocateRegion(getTableName(), scan.getStartRow());
              LOG.info("Scanner=" + scannerId
                + " expired, current region location is " + location.toString());
            } catch (Throwable t) {
              LOG.info("Failed to relocate region", t);
            }
          }
          // The below convertion of exceptions into DoNotRetryExceptions is a little strange.
          // Why not just have these exceptions implment DNRIOE you ask?  Well, usually we want
          // ServerCallable#withRetries to just retry when it gets these exceptions.  In here in
          // a scan when doing a next in particular, we want to break out and get the scanner to
          // reset itself up again.  Throwing a DNRIOE is how we signal this to happen (its ugly,
          // yeah and hard to follow and in need of a refactor).
          if (ioe instanceof NotServingRegionException) {
            // Throw a DNRE so that we break out of cycle of calling NSRE
            // when what we need is to open scanner against new location.
            // Attach NSRE to signal client that it needs to re-setup scanner.
            if (this.scanMetrics != null) {
              this.scanMetrics.countOfNSRE.incrementAndGet();
            }
            throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
          } else if (ioe instanceof RegionServerStoppedException) {
            // Throw a DNRE so that we break out of cycle of the retries and instead go and
            // open scanner against new location.
            throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe);
          } else {
            // The outer layers will retry
            throw ioe;
          }
        }
        return rrs;
        这段代码的最后,如果不能获取到数据,则将scannerId恢复为-1,并将closed设置为true,以便下次调用时,将上次的callable关闭,并重新生成一个scanner。
        后续的逻辑是,如果scanner还有需要获取的数据,重新定位scan的startRow,发送RPC请求至RegionSever,继续获取该Region上的数据,否则,关闭上一个callable,并重新生成一个scanner和callable,并重复以前的逻辑,继续获取数据,直到检索到endRow,或者比endRow大的数据。

        还有很多细节和特殊情况没有分析,留待以后慢慢分析吧~~








上一篇:Spark源码分析之一:Job提交运行总流程概述


下一篇:Spark源码分析之二:Job的调度模型与运行反馈