Hive HMS Canary 时间较长异常分析

一、现象及异常

测试up1集群:
create database 操作 200s 多一点。

# sudo -u hive hive --hiveconf hive.metastore.uris=thrift://10.197.1.141:9084
hive> create database wgtestdb_region3_1;
OK
Time taken: 200.826 seconds

测试up2集群:

# sudo -u hive hive --hiveconf hive.metastore.uris=thrift://10.197.1.160:9084
hive> create database wgtestdb_region2_2;
OK
Time taken: 0.112 seconds

可以看到,当测试 up1 集群出现 HMS canary 异常时,up1 集群的 create/drop 操作时间基本都稳定在 200s 多一点。

同时 HMS 和 Sentry 服务端日志都出现一些异常。

HMS 异常日志:

2021-02-08 16:54:17,910 ERROR org.apache.sentry.core.common.transport.RetryClientInvocationHandler: [pool-5-thread-670]: failed to execute syncNotifications
java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.sentry.core.common.transport.RetryClientInvocationHandler.invokeImpl(RetryClientInvocationHandler.java:95)
        at org.apache.sentry.core.common.transport.SentryClientInvocationHandler.invoke(SentryClientInvocationHandler.java:41)
        at com.sun.proxy.$Proxy26.syncNotifications(Unknown Source)
        at org.apache.sentry.binding.metastore.SentrySyncHMSNotificationsPostEventListener.syncNotificationEvents(SentrySyncHMSNotificationsPostEventListener.java:153)
        at org.apache.sentry.binding.metastore.SentrySyncHMSNotificationsPostEventListener.onDropDatabase(SentrySyncHMSNotificationsPostEventListener.java:113)
        at org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier$13.notify(MetaStoreListenerNotifier.java:69)
        at org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier.notifyEvent(MetaStoreListenerNotifier.java:167)
        at org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier.notifyEvent(MetaStoreListenerNotifier.java:197)
        at org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier.notifyEvent(MetaStoreListenerNotifier.java:235)
        at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.drop_database_core(HiveMetaStore.java:1193)
        at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.drop_database(HiveMetaStore.java:1229)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invokeInternal(RetryingHMSHandler.java:140)
        at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:99)
        at com.sun.proxy.$Proxy11.drop_database(Unknown Source)
        at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Processor$drop_database.getResult(ThriftHiveMetastore.java:9624)
        at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Processor$drop_database.getResult(ThriftHiveMetastore.java:9608)
        at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
        at org.apache.hadoop.hive.metastore.TUGIBasedProcessor$1.run(TUGIBasedProcessor.java:110)
        at org.apache.hadoop.hive.metastore.TUGIBasedProcessor$1.run(TUGIBasedProcessor.java:106)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
        at org.apache.hadoop.hive.metastore.TUGIBasedProcessor.process(TUGIBasedProcessor.java:118)
        at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: timed out wait request for id 184474148. Server Stacktrace: java.util.concurrent.TimeoutException
        at org.apache.sentry.service.thrift.CounterWait$ValueEvent.waitFor(CounterWait.java:299)
        at org.apache.sentry.service.thrift.CounterWait.waitFor(CounterWait.java:212)
        at org.apache.sentry.provider.db.service.thrift.SentryPolicyStoreProcessor.sentry_sync_notifications(SentryPolicyStoreProcessor.java:934)
        at org.apache.sentry.provider.db.service.thrift.SentryPolicyService$Processor$sentry_sync_notifications.getResult(SentryPolicyService.java:1217)
        at org.apache.sentry.provider.db.service.thrift.SentryPolicyService$Processor$sentry_sync_notifications.getResult(SentryPolicyService.java:1202)
        at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
        at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
        at org.apache.sentry.provider.db.service.thrift.SentryProcessorWrapper.process(SentryProcessorWrapper.java:36)
        at org.apache.thrift.TMultiplexedProcessor.process(TMultiplexedProcessor.java:123)
        at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

        at org.apache.sentry.service.thrift.Status.throwIfNotOk(Status.java:109)
        at org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClientDefaultImpl.syncNotifications(SentryPolicyServiceClientDefaultImpl.java:824)
        ... 35 more

Sentry 异常日志:

2021-02-08 15:29:09,028 WARN org.apache.sentry.provider.db.service.thrift.SentryPolicyStoreProcessor: timed out wait request for id 184444094
java.util.concurrent.TimeoutException
        at org.apache.sentry.service.thrift.CounterWait$ValueEvent.waitFor(CounterWait.java:299)
        at org.apache.sentry.service.thrift.CounterWait.waitFor(CounterWait.java:212)
        at org.apache.sentry.provider.db.service.thrift.SentryPolicyStoreProcessor.sentry_sync_notifications(SentryPolicyStoreProcessor.java:934)
        at org.apache.sentry.provider.db.service.thrift.SentryPolicyService$Processor$sentry_sync_notifications.getResult(SentryPolicyService.java:1217)
        at org.apache.sentry.provider.db.service.thrift.SentryPolicyService$Processor$sentry_sync_notifications.getResult(SentryPolicyService.java:1202)
        at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
        at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
        at org.apache.sentry.provider.db.service.thrift.SentryProcessorWrapper.process(SentryProcessorWrapper.java:36)
        at org.apache.thrift.TMultiplexedProcessor.process(TMultiplexedProcessor.java:123)
        at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

日志异常大概原因是 HMS 向 Sentry 同步消息时出现延迟,Sentry 服务处理不过来出现超时。

二、原因分析

HMS 会实时向 Sentry 同步 Notifications 请求,当需要大批同步消息需要处理,后台线程处理不过来,消息压滞就会出现这个异常。这个异常不影响集群正常使用,只是会导致create, drop 等操作慢,需要等待 200s,等待的目的也是为了追上最新的 id。

下面是 sentry 和 hive 的消息同步的元数据信息,现象是 sentry 元数据的 SENTRY_HMS_NOTIFICATION_ID 表一直没有更新,而 hive 元数据 NOTIFICATION_SEQUENCE 表一直在更新,也就是说 sentry 消费 HMS 端的信息不及时出现了滞后情况。此时 Hive HMS 便会出现 canary 异常,导致上面 create database 操作的时间基本都是在 200.**s 左右,这个参数是由 sentry.notification.sync.timeout.ms(200s) 参数控制的,这也能解释为什么 create/drop 操作时间都在 200s 多一点。

# sentry 元数据信息
mysql -uroot -p123456
use sentry;
mysql> select * from  SENTRY_HMS_NOTIFICATION_ID order by NOTIFICATION_ID desc limit 5;(一段时间不更新,此时 HMS canary 异常)
+-----------------+
| NOTIFICATION_ID |
+-----------------+
|       184485024 |
|       184485023 |
|       184485023 |
|       184485022 |
|       184485021 |
+-----------------+
5 rows in set (0.00 sec)

# hive 元数据信息
use hive_warehouse;
MySQL [hive_warehouse]>  select * from NOTIFICATION_SEQUENCE limit 10;(持续更新)
+--------+---------------+
| NNI_ID | NEXT_EVENT_ID |
+--------+---------------+
|      1 |     184486656 |
+--------+---------------+
1 row in set (0.00 sec)

三、源码分析

Hive 中 create/drop/alter 等操作都会向 Sentry Server 发送 Notification 请求,这里以 create_table 操作为例。

从源码可以看出 create_table 操作会通过 MetaStoreListenerNotifier 类的静态方法 notifyEvent() 发送事件请求。

//位置:org/apache/hadoop/hive/metastore/HiveMetaStore.java
    private void create_table_core(final RawStore ms, final Table tbl,
        final EnvironmentContext envContext)
        throws AlreadyExistsException, MetaException,
        InvalidObjectException, NoSuchObjectException {

      ...
        if (!transactionalListeners.isEmpty()) {
          transactionalListenerResponses =
              MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
                                                    EventType.CREATE_TABLE,
                                                    new CreateTableEvent(tbl, true, this),
                                                    envContext);
        }
      ...
    }

    @Override
    public void create_table(final Table tbl) throws AlreadyExistsException,
        MetaException, InvalidObjectException {
      create_table_with_environment_context(tbl, null);
    }

具体发送事件请求方式如下,可以看到这里会发送所有继承自 MetaStoreEventListener 类的事件请求。

//位置:org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java
  /**
   * Notify a list of listeners about a specific metastore event. Each listener notified might update
   * the (ListenerEvent) event by setting a parameter key/value pair. These updated parameters will
   * be returned to the caller.
   *
   * @param listeners List of MetaStoreEventListener listeners.
   * @param eventType Type of the notification event.
   * @param event The ListenerEvent with information about the event.
   * @return A list of key/value pair parameters that the listeners set. The returned object will return an empty
   *         map if no parameters were updated or if no listeners were notified.
   * @throws MetaException If an error occurred while calling the listeners.
   */
  public static Map<String, String> notifyEvent(List<MetaStoreEventListener> listeners,
                                                EventType eventType,
                                                ListenerEvent event) throws MetaException {

    Preconditions.checkNotNull(listeners, "Listeners must not be null.");
    Preconditions.checkNotNull(event, "The event must not be null.");

    for (MetaStoreEventListener listener : listeners) {
      notificationEvents.get(eventType).notify(listener, event);
    }

    // Each listener called above might set a different parameter on the event.
    // This write permission is allowed on the listener side to avoid breaking compatibility if we change the API
    // method calls.
    return event.getParameters();
  }

HMS 发送事件请求后 Sentry Server 是如何接收的呢?Sentry 会通过 SentrySyncHMSNotificationsPostEventListener 类的 syncNotificationEvents() 方法对所有的 DDL 事情操作进行同步,交给 Sentry Server 处理。

//位置:org/apache/sentry/binding/metastore/SentrySyncHMSNotificationsPostEventListener.java
/**
 * This HMS post-event listener is used only to synchronize with HMS notifications on the Sentry server
 * whenever a DDL event happens on the Hive metastore.
 */
public class SentrySyncHMSNotificationsPostEventListener extends MetaStoreEventListener {
    /**
   * It requests the Sentry server the synchronization of recent notification events.
   *
   * After the sync call, the latest processed ID will be stored for future reference to avoid
   * syncing an ID that was already processed.
   *
   * @param event An event that contains a DB_NOTIFICATION_EVENT_ID_KEY_NAME value to request.
   */
  private void syncNotificationEvents(ListenerEvent event, String eventName) {
    // Do not sync notifications if the event has failed.
    if (failedEvent(event, eventName)) {
      return;
    }

    Map<String, String> eventParameters = event.getParameters();
    if (!eventParameters.containsKey(MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME)) {
      return;
    }

    /* If the HMS is running in an active transaction, then we do not want to sync with Sentry
     * because the desired eventId is not available for Sentry yet, and Sentry may block the HMS
     * forever or until a read time-out happens. */
    if (isMetastoreTransactionActive(eventParameters)) {
      return;
    }

    long eventId =
        Long.parseLong(eventParameters.get(MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME));

    // This check is only for performance reasons to avoid calling the sync thrift call if the Sentry server
    // already processed the requested eventId.
    if (eventId <= latestProcessedId.get()) {
      return;
    }

    try(SentryPolicyServiceClient sentryClient = this.getSentryServiceClient()) {
      LOGGER.debug("Starting Sentry/HMS notifications sync for {} (id: {})", eventName, eventId);
      long sentryLatestProcessedId = sentryClient.syncNotifications(eventId);
      LOGGER.debug("Finishedd Sentry/HMS notifications sync for {} (id: {})", eventName, eventId);
      LOGGER.debug("Latest processed event ID returned by the Sentry server: {}", sentryLatestProcessedId);

      updateProcessedId(sentryLatestProcessedId);
    } catch (Exception e) {
      // This error is only logged. There is no need to throw an error to Hive because HMS sync is called
      // after the notification is already generated by Hive (as post-event).
      LOGGER.error("Failed to sync requested HMS notifications up to the event ID: " + eventId, e);
    }
  }
}

Sentry Server 会对通过过来的 HMS 事件请求进行处理,即交由 SentryPolicyStoreProcessor 类处理,这里的 waitFor() 方法是关键,会等待 Sentry Server 处理对应的 HMS 请求,如果事件处理不及时会出现超时异常,也就是第一小节中 HMS 日志中的异常。

//位置:org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
  @Override
  public TSentrySyncIDResponse sentry_sync_notifications(TSentrySyncIDRequest request)
          throws TException {
    TSentrySyncIDResponse response = new TSentrySyncIDResponse();
    try (Timer.Context timerContext = hmsWaitTimer.time()) {
      // Wait until Sentry Server processes specified HMS Notification ID.
      response.setId(sentryStore.getCounterWait().waitFor(request.getId()));
      response.setStatus(Status.OK());
    } catch (InterruptedException e) {
      String msg = String.format("wait request for id %d is interrupted",
              request.getId());
      LOGGER.error(msg, e);
      response.setId(0);
      response.setStatus(Status.RuntimeError(msg, e));
      Thread.currentThread().interrupt();
    } catch (TimeoutException e) {
      String msg = String.format("timed out wait request for id %d", request.getId());
      LOGGER.warn(msg, e);
      response.setId(0);
      response.setStatus(Status.RuntimeError(msg, e));
    }
    return response;
  }

waitFor() 方法的具体逻辑,无非就是对接收的 HMS 事件 id 与 Sentry Server 中的最新 id (即 Sentry 元数据 SENTRY_HMS_NOTIFICATION_ID 表的最新值)进行比较,并进行更新。

//位置:org/apache/sentry/service/thrift/CounterWait.java
  /**
   * Wait for specified counter value.
   * Returns immediately if the value is reached or blocks until the value
   * is reached.
   * Multiple threads can call the method concurrently.
   *
   * @param value requested counter value
   * @return current counter value that should be no smaller then the requested
   * value
   * @throws InterruptedException if the wait was interrupted, TimeoutException if
   * wait was not successfull within the timeout value specified at the construction time.
   */
  public long waitFor(long value) throws InterruptedException, TimeoutException {
    // Fast path - counter value already reached, no need to block
    if (value <= currentId.get()) {
      return currentId.get();
    }

    // Enqueue the waiter for this value
    ValueEvent eid = new ValueEvent(value);
    waiters.put(eid);

    // It is possible that between the fast path check and the time the
    // value event is enqueued, the counter value already reached the requested
    // value. In this case we return immediately.
    if (value <= currentId.get()) {
      return currentId.get();
    }

    // At this point we may be sure that by the time the event was enqueued,
    // the counter was below the requested value. This means that update()
    // is guaranteed to wake us up when the counter reaches the requested value.
    // The wake up may actually happen before we start waiting, in this case
    // the event's blocking queue will be non-empty and the waitFor() below
    // will not block, so it is safe to wake up before the wait.
    // So sit tight and wait patiently.
    eid.waitFor();
    LOGGER.debug("CounterWait added new value to waitFor: value = {}, currentId = {}", value, currentId.get());
    return currentId.get();
  }

至此,Sentry Server 已对 HMS 的一次事件请求进行了处理。

四、解决措施/建议

1、适当调小 sentry.notification.sync.timeout.ms 参数
该参数默认是 200s,调小该参数,可适当减小 create/drop/alter 等操作的等待时间,消息积压不多的情况可以选择这种方式让 sentry 自行消费处理掉。
Cloudera 修改 Sentry 服务的参数配置:
Hive HMS Canary 时间较长异常分析
修改参数后重启 Sentry 服务,发现 HMS 出现 canary 异常后超时时间在 50s 多一点,说明参数生效。
Hive HMS Canary 时间较长异常分析

2、监控 sentry 元数据 SENTRY_HMS_NOTIFICATION_ID 信息
直接获取 sentry 元数据 SENTRY_HMS_NOTIFICATION_ID 表的最新记录,如果没有更新则表示消息出现了滞后,此时 HMS 必会出现 canary 异常。

mysql>  select * from  SENTRY_HMS_NOTIFICATION_ID order by NOTIFICATION_ID desc limit 1;
+-----------------+
| NOTIFICATION_ID |
+-----------------+
|       184490926 |
+-----------------+
1 row in set (0.00 sec)

3、更新 sentry 消息同步记录
如果消息积压的太多,sentry 慢慢消费的时间太长的话,可能一直追不上 HMS 的最新 id,此时可以选择丢掉这些信息,具体操作在 sentry 元数据的 SENTRY_HMS_NOTIFICATION_ID 表中插入一条最大值(该最大值等于当前消息的 id 值,从 hive 元数据的 NOTIFICATION_SEQUENCE 表中获取 ),重启 sentry 服务。

use sentry;
insert into SENTRY_HMS_NOTIFICATION_ID values(184472866);
更新后 create 操作时间正常

参考资料

  1. https://blog.csdn.net/wflh323/article/details/103608560
上一篇:Mac安装thrift


下一篇:Feign设置Header头部参数