HBase2.0 replication wal znode大量积压问题定位解决

现象

线上有2个集群A和B,配置了双向同步,单活,即业务层某一时刻只会访问其中一个集群;
近期A集群的regionserver日志中报了很多异常,但监控页面正常,功能也未受影响。

HBase版本为2.0.0;

2019-09-04 02:44:58,115 WARN org.apache.zookeeper.ClientCnxn: Session 0x36abf38cec5531d for server host-15/ip-15:2181, unexpected error, closing socket connection and attempting reconnect
java.io.IOException: Broken pipe
        at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
        at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
        at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
        at sun.nio.ch.IOUtil.write(IOUtil.java:65)
        at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
        at org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:117)
        at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:355)
        at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1073)

定位

1、查看前后日志,可以看出该异常与replication有关

2019-09-04 02:44:56,960 INFO org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl: Atomically moving host-17,16020,1561539485645/1's WALs to my queue
2019-09-04 02:44:58,115 WARN org.apache.zookeeper.ClientCnxn: Session 0x36abf38cec5531d for server host-15/ip-15:2181, unexpected error, closing socket connection and attempting reconnect
java.io.IOException: Broken pipe
        at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
        at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
        at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
        at sun.nio.ch.IOUtil.write(IOUtil.java:65)
        at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
        at org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:117)
        at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:355)
        at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1073)
...
中间多次Broken pipe的异常日志
...    
2019-09-04 02:45:45,544 ERROR org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper: ZooKeeper multi failed after 4 attempts
2019-09-04 02:45:45,544 WARN org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl: Got exception in copyQueuesFromRSUsingMulti: 
org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
        at org.apache.zookeeper.ZooKeeper.multiInternal(ZooKeeper.java:944)
        at org.apache.zookeeper.ZooKeeper.multi(ZooKeeper.java:924)
        at org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.multi(RecoverableZooKeeper.java:663)
        at org.apache.hadoop.hbase.zookeeper.ZKUtil.multiOrSequential(ZKUtil.java:1670)
        at org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl.moveQueueUsingMulti(ReplicationQueuesZKImpl.java:318)
        at org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl.claimQueue(ReplicationQueuesZKImpl.java:210)
        at org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager$NodeFailoverWorker.run(ReplicationSourceManager.java:686)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

2、为什么会有这个操作

HBase的replication处理机制是,由各个regionServer负责自己产生的wal文件,如果某个rs退出,则其它rs会认领该rs尚未完成的wal文件同步任务,上面的日志就是认领动作产生的日志。

3、为什么会有rs退出

在cdh中查看命令运行日志,最近的一次重启是8月16,应该是当时修改了集群参数;
查看日志中最早出现异常的时间,也是8月16。

4、为什么认领失败

查看zk日志,有大量如下警告信息

2019-09-04 02:45:44,270 WARN org.apache.zookeeper.server.NIOServerCnxn: Exception causing close of session 0x36abf38cec5531a due to java.io.IOException: Len error 4448969

这个错是zk的一个bug,会在单次请求涉及过多数据量的时候触发,修复版本是3.4.7, 3.5.2, 3.6.0,我们的线上版本是3.4.5,相关的issue:
https://issues.apache.org/jira/browse/ZOOKEEPER-706

但这只是直接原因,即使修复这个bug,也只是将问题暂时掩盖,根本原因还需要继续分析。

5、为什么认领时涉及大量数据

rs在认领任务时,对于每个wal文件会生成一个OP对象,然后封装到一个List中,最后调用multi执行

  private Pair<String, SortedSet<String>> moveQueueUsingMulti(String znode, String peerId) {
    try {
      // hbase/replication/rs/deadrs
      String deadRSZnodePath = ZNodePaths.joinZNode(this.queuesZNode, znode);
      List<ZKUtilOp> listOfOps = new ArrayList<>();
      ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);

      String newPeerId = peerId + "-" + znode;
      String newPeerZnode = ZNodePaths.joinZNode(this.myQueuesZnode, newPeerId);
      // check the logs queue for the old peer cluster
      String oldClusterZnode = ZNodePaths.joinZNode(deadRSZnodePath, peerId);
      List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);

      if (!peerExists(replicationQueueInfo.getPeerId())) {
        LOG.warn("Peer " + replicationQueueInfo.getPeerId() +
                " didn't exist, will move its queue to avoid the failure of multi op");
        for (String wal : wals) {
          String oldWalZnode = ZNodePaths.joinZNode(oldClusterZnode, wal);
          listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode));
        }
        listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
        ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
        return null;
      }

      SortedSet<String> logQueue = new TreeSet<>();
      if (wals == null || wals.isEmpty()) {
        listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
      } else {
        // create the new cluster znode
        ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY);
        listOfOps.add(op);
        // get the offset of the logs and set it to new znodes
        for (String wal : wals) {
          String oldWalZnode = ZNodePaths.joinZNode(oldClusterZnode, wal);
          byte[] logOffset = ZKUtil.getData(this.zookeeper, oldWalZnode);
          LOG.debug("Creating " + wal + " with data " + Bytes.toString(logOffset));
          String newLogZnode = ZNodePaths.joinZNode(newPeerZnode, wal);
          listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset));
          listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode));
          logQueue.add(wal);
        }
        // add delete op for peer
        listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));

        if (LOG.isTraceEnabled())
          LOG.trace(" The multi list size is: " + listOfOps.size());
      }
      ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);

      LOG.info("Atomically moved " + znode + "/" + peerId + "'s WALs to my queue");
      return new Pair<>(newPeerId, logQueue);
    } catch (KeeperException e) {
      // Multi call failed; it looks like some other regionserver took away the logs.
      LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
    } catch (InterruptedException e) {
      LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
      Thread.currentThread().interrupt();
    }
    return null;
  }

这意味着,退出的rs遗留了大量的wal同步任务,查看zk上该rs的任务列表,路径是/hbase/replication/rs/#rs实例名#/#peerId#;
结果打印出茫茫多的子节点,10秒钟都没打印结束;
具体数量不好统计,但从cdh中可以看到集群的zk有30多万个znode,作为参考,马驹桥集群的zk只有1.2万个znode。

6、为什么会出现这么多znode

查看replication模块的源码,梳理核心流程如下:
HBase2.0 replication wal znode大量积压问题定位解决

walReader和walShipper的实现类分别为ReplicationSourceWALReader和ReplicationSourceShipper,各自起了一个线程,以生产者消费者方式通过一个队列通信;
删除wal znode是由shipper负责的,实际逻辑是在shipEdits方法中,在数据发送结束后执行,可确保数据不丢失,删除时比该wal更旧的wal对应的znode会一起删除;
由于积压了很多znode,猜测该shipEdits方法应该是长时间没有被调用,查看这2个线程的栈信息:

"main-EventThread.replicationSource,2.replicationSource.host-17%2C16020%2C1567586932902.host-17%2C16020%2C1567586932902.regiongroup-0,2.replicationSource.wal-reader.host-17%2C16020%2C1567586932902.host-17%2C16020%2C1567586932902.regiongroup-0,2" #157238 daemon prio=5 os_prio=0 tid=0x00007f7634be8800 nid=0x377ef waiting on condition [0x00007f6114c0e000]"main-EventThread.replicationSource,2.replicationSource.host-17%2C16020%2C1567586932902.host-17%2C16020%2C1567586932902.regiongroup-0,2.replicationSource.wal-reader.host-17%2C16020%2C1567586932902.host-17%2C16020%2C1567586932902.regiongroup-0,2" #157238 daemon prio=5 os_prio=0 tid=0x00007f7634be8800 nid=0x377ef waiting on condition [0x00007f6114c0e000]   java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.handleEmptyWALEntryBatch(ReplicationSourceWALReader.java:192) at org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.run(ReplicationSourceWALReader.java:142)

"main-EventThread.replicationSource,2.replicationSource.host-17%2C16020%2C1567586932902.host-17%2C16020%2C1567586932902.regiongroup-0,2" #157237 daemon prio=5 os_prio=0 tid=0x00007f76350b0000 nid=0x377ee waiting on condition [0x00007f6108173000]   java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for  <0x00007f6f99bb6718> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442) at org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.take(ReplicationSourceWALReader.java:248) at org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceShipper.run(ReplicationSourceShipper.java:108)

从栈信息可以看出,shipper确实处于阻塞状态;

查看walReader的相关代码,写入队列的部分如下:

WALEntryBatch batch = readWALEntries(entryStream);
if (batch != null && batch.getNbEntries() > 0) {
  if (LOG.isTraceEnabled()) {
    LOG.trace(String.format("Read %s WAL entries eligible for replication",
      batch.getNbEntries()));
  }
  entryBatchQueue.put(batch);
  sleepMultiplier = 1;
} else { // got no entries and didn't advance position in WAL
  handleEmptyWALEntryBatch(batch, entryStream.getCurrentPath());
}

推测是一直进入了else逻辑,handleEmptyWALEntryBatch方法代码如下:

protected void handleEmptyWALEntryBatch(WALEntryBatch batch, Path currentPath)
    throws InterruptedException {
  LOG.trace("Didn't read any new entries from WAL");
  Thread.sleep(sleepForRetries);
}

打开该类的trace级别log,可以看到不停的打印如下日志:

2019-09-10 17:09:34,093 TRACE org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader: Didn't read any new entries from WAL
2019-09-10 17:09:35,096 TRACE org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader: Didn't read any new entries from WAL
2019-09-10 17:09:36,099 TRACE org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader: Didn't read any new entries from WAL
2019-09-10 17:09:37,102 TRACE org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader: Didn't read any new entries from WAL
2019-09-10 17:09:38,105 TRACE org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader: Didn't read any new entries from WAL
2019-09-10 17:09:39,108 TRACE org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader: Didn't read any new entries from WAL

同时,并没有类似"WAL entries eligible for replication"的日志出现;
至此,可以确认是walReader读取时一直返回了empty的batch对象导致;
wal里面有数据,但是返回为空的原因是,被过滤掉了,readWALEntries方法的代码如下:

  private WALEntryBatch readWALEntries(WALEntryStream entryStream) throws IOException {
    WALEntryBatch batch = null;
    while (entryStream.hasNext()) {
      if (batch == null) {
        batch = new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
      }
      Entry entry = entryStream.next();
      entry = filterEntry(entry);
      if (entry != null) {
        WALEdit edit = entry.getEdit();
        if (edit != null && !edit.isEmpty()) {
          long entrySize = getEntrySize(entry);
          batch.addEntry(entry);
          updateBatchStats(batch, entry, entryStream.getPosition(), entrySize);
          boolean totalBufferTooLarge = acquireBufferQuota(entrySize);
          // Stop if too many entries or too big
          if (totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
              || batch.getNbEntries() >= replicationBatchCountCapacity) {
            break;
          }
        }
      }
    }
    return batch;
  }

这里起作用的是ClusterMarkingEntryFilter,HBase为了避免master-master模式下出现replication环路,会在同步的walEntry中写入源集群的clusterId,如果当前处理的entry中的clusterId与目标集群相同,说明它是从目标集群同步过来的,就不用再同步回去了,代码如下;

  public Entry filter(Entry entry) {
    // don't replicate if the log entries have already been consumed by the cluster
    if (replicationEndpoint.canReplicateToSameCluster()
        || !entry.getKey().getClusterIds().contains(peerClusterId)) {
      WALEdit edit = entry.getEdit();
      WALKeyImpl logKey = (WALKeyImpl)entry.getKey();

      if (edit != null && !edit.isEmpty()) {
        // Mark that the current cluster has the change
        logKey.addClusterId(clusterId);
        // We need to set the CC to null else it will be compressed when sent to the sink
        entry.setCompressionContext(null);
        return entry;
      }
    }
    return null;
  }

至此,原因基本清楚,线上的2个HBase集群配置为master-master模式的双向同步,但是只有1个作为active集群会写入数据,而作为backup的集群接受同步数据时会产生wal,但一直没有写入操作就一直触发不了shipper的删除动作。

解决

在读取的batch为empty时依然更新其lastWalPosition,并写入带有到队列中,以触发shipper的清理动作;

这个bug存在于2.0分支,2.1之后这部分代码在实现serial replication时做了改动,问题已经不存在;

对于2.0分支,已提了一个issue(https://issues.apache.org/jira/browse/HBASE-23008)到社区,但由于提交pr时该分支已停止维护,因此并未合并进去;

上一篇:HBase2.0 procedureV2原理简析


下一篇:HBase scan过程简析