一、现象及异常
测试up1集群:
create database 操作 200s 多一点。
# sudo -u hive hive --hiveconf hive.metastore.uris=thrift://10.197.1.141:9084
hive> create database wgtestdb_region3_1;
OK
Time taken: 200.826 seconds
测试up2集群:
# sudo -u hive hive --hiveconf hive.metastore.uris=thrift://10.197.1.160:9084
hive> create database wgtestdb_region2_2;
OK
Time taken: 0.112 seconds
可以看到,当测试 up1 集群出现 HMS canary 异常时,up1 集群的 create/drop 操作时间基本都稳定在 200s 多一点。
同时 HMS 和 Sentry 服务端日志都出现一些异常。
HMS 异常日志:
2021-02-08 16:54:17,910 ERROR org.apache.sentry.core.common.transport.RetryClientInvocationHandler: [pool-5-thread-670]: failed to execute syncNotifications
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.sentry.core.common.transport.RetryClientInvocationHandler.invokeImpl(RetryClientInvocationHandler.java:95)
at org.apache.sentry.core.common.transport.SentryClientInvocationHandler.invoke(SentryClientInvocationHandler.java:41)
at com.sun.proxy.$Proxy26.syncNotifications(Unknown Source)
at org.apache.sentry.binding.metastore.SentrySyncHMSNotificationsPostEventListener.syncNotificationEvents(SentrySyncHMSNotificationsPostEventListener.java:153)
at org.apache.sentry.binding.metastore.SentrySyncHMSNotificationsPostEventListener.onDropDatabase(SentrySyncHMSNotificationsPostEventListener.java:113)
at org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier$13.notify(MetaStoreListenerNotifier.java:69)
at org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier.notifyEvent(MetaStoreListenerNotifier.java:167)
at org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier.notifyEvent(MetaStoreListenerNotifier.java:197)
at org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier.notifyEvent(MetaStoreListenerNotifier.java:235)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.drop_database_core(HiveMetaStore.java:1193)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.drop_database(HiveMetaStore.java:1229)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invokeInternal(RetryingHMSHandler.java:140)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:99)
at com.sun.proxy.$Proxy11.drop_database(Unknown Source)
at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Processor$drop_database.getResult(ThriftHiveMetastore.java:9624)
at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Processor$drop_database.getResult(ThriftHiveMetastore.java:9608)
at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
at org.apache.hadoop.hive.metastore.TUGIBasedProcessor$1.run(TUGIBasedProcessor.java:110)
at org.apache.hadoop.hive.metastore.TUGIBasedProcessor$1.run(TUGIBasedProcessor.java:106)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
at org.apache.hadoop.hive.metastore.TUGIBasedProcessor.process(TUGIBasedProcessor.java:118)
at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: timed out wait request for id 184474148. Server Stacktrace: java.util.concurrent.TimeoutException
at org.apache.sentry.service.thrift.CounterWait$ValueEvent.waitFor(CounterWait.java:299)
at org.apache.sentry.service.thrift.CounterWait.waitFor(CounterWait.java:212)
at org.apache.sentry.provider.db.service.thrift.SentryPolicyStoreProcessor.sentry_sync_notifications(SentryPolicyStoreProcessor.java:934)
at org.apache.sentry.provider.db.service.thrift.SentryPolicyService$Processor$sentry_sync_notifications.getResult(SentryPolicyService.java:1217)
at org.apache.sentry.provider.db.service.thrift.SentryPolicyService$Processor$sentry_sync_notifications.getResult(SentryPolicyService.java:1202)
at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
at org.apache.sentry.provider.db.service.thrift.SentryProcessorWrapper.process(SentryProcessorWrapper.java:36)
at org.apache.thrift.TMultiplexedProcessor.process(TMultiplexedProcessor.java:123)
at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
at org.apache.sentry.service.thrift.Status.throwIfNotOk(Status.java:109)
at org.apache.sentry.provider.db.service.thrift.SentryPolicyServiceClientDefaultImpl.syncNotifications(SentryPolicyServiceClientDefaultImpl.java:824)
... 35 more
Sentry 异常日志:
2021-02-08 15:29:09,028 WARN org.apache.sentry.provider.db.service.thrift.SentryPolicyStoreProcessor: timed out wait request for id 184444094
java.util.concurrent.TimeoutException
at org.apache.sentry.service.thrift.CounterWait$ValueEvent.waitFor(CounterWait.java:299)
at org.apache.sentry.service.thrift.CounterWait.waitFor(CounterWait.java:212)
at org.apache.sentry.provider.db.service.thrift.SentryPolicyStoreProcessor.sentry_sync_notifications(SentryPolicyStoreProcessor.java:934)
at org.apache.sentry.provider.db.service.thrift.SentryPolicyService$Processor$sentry_sync_notifications.getResult(SentryPolicyService.java:1217)
at org.apache.sentry.provider.db.service.thrift.SentryPolicyService$Processor$sentry_sync_notifications.getResult(SentryPolicyService.java:1202)
at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
at org.apache.sentry.provider.db.service.thrift.SentryProcessorWrapper.process(SentryProcessorWrapper.java:36)
at org.apache.thrift.TMultiplexedProcessor.process(TMultiplexedProcessor.java:123)
at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
日志异常大概原因是 HMS 向 Sentry 同步消息时出现延迟,Sentry 服务处理不过来出现超时。
二、原因分析
HMS 会实时向 Sentry 同步 Notifications 请求,当需要大批同步消息需要处理,后台线程处理不过来,消息压滞就会出现这个异常。这个异常不影响集群正常使用,只是会导致create, drop 等操作慢,需要等待 200s,等待的目的也是为了追上最新的 id。
下面是 sentry 和 hive 的消息同步的元数据信息,现象是 sentry 元数据的 SENTRY_HMS_NOTIFICATION_ID 表一直没有更新,而 hive 元数据 NOTIFICATION_SEQUENCE 表一直在更新,也就是说 sentry 消费 HMS 端的信息不及时出现了滞后情况。此时 Hive HMS 便会出现 canary 异常,导致上面 create database 操作的时间基本都是在 200.**s 左右,这个参数是由 sentry.notification.sync.timeout.ms(200s) 参数控制的,这也能解释为什么 create/drop 操作时间都在 200s 多一点。
# sentry 元数据信息
mysql -uroot -p123456
use sentry;
mysql> select * from SENTRY_HMS_NOTIFICATION_ID order by NOTIFICATION_ID desc limit 5;(一段时间不更新,此时 HMS canary 异常)
+-----------------+
| NOTIFICATION_ID |
+-----------------+
| 184485024 |
| 184485023 |
| 184485023 |
| 184485022 |
| 184485021 |
+-----------------+
5 rows in set (0.00 sec)
# hive 元数据信息
use hive_warehouse;
MySQL [hive_warehouse]> select * from NOTIFICATION_SEQUENCE limit 10;(持续更新)
+--------+---------------+
| NNI_ID | NEXT_EVENT_ID |
+--------+---------------+
| 1 | 184486656 |
+--------+---------------+
1 row in set (0.00 sec)
三、源码分析
Hive 中 create/drop/alter 等操作都会向 Sentry Server 发送 Notification 请求,这里以 create_table 操作为例。
从源码可以看出 create_table 操作会通过 MetaStoreListenerNotifier 类的静态方法 notifyEvent() 发送事件请求。
//位置:org/apache/hadoop/hive/metastore/HiveMetaStore.java
private void create_table_core(final RawStore ms, final Table tbl,
final EnvironmentContext envContext)
throws AlreadyExistsException, MetaException,
InvalidObjectException, NoSuchObjectException {
...
if (!transactionalListeners.isEmpty()) {
transactionalListenerResponses =
MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
EventType.CREATE_TABLE,
new CreateTableEvent(tbl, true, this),
envContext);
}
...
}
@Override
public void create_table(final Table tbl) throws AlreadyExistsException,
MetaException, InvalidObjectException {
create_table_with_environment_context(tbl, null);
}
具体发送事件请求方式如下,可以看到这里会发送所有继承自 MetaStoreEventListener 类的事件请求。
//位置:org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java
/**
* Notify a list of listeners about a specific metastore event. Each listener notified might update
* the (ListenerEvent) event by setting a parameter key/value pair. These updated parameters will
* be returned to the caller.
*
* @param listeners List of MetaStoreEventListener listeners.
* @param eventType Type of the notification event.
* @param event The ListenerEvent with information about the event.
* @return A list of key/value pair parameters that the listeners set. The returned object will return an empty
* map if no parameters were updated or if no listeners were notified.
* @throws MetaException If an error occurred while calling the listeners.
*/
public static Map<String, String> notifyEvent(List<MetaStoreEventListener> listeners,
EventType eventType,
ListenerEvent event) throws MetaException {
Preconditions.checkNotNull(listeners, "Listeners must not be null.");
Preconditions.checkNotNull(event, "The event must not be null.");
for (MetaStoreEventListener listener : listeners) {
notificationEvents.get(eventType).notify(listener, event);
}
// Each listener called above might set a different parameter on the event.
// This write permission is allowed on the listener side to avoid breaking compatibility if we change the API
// method calls.
return event.getParameters();
}
HMS 发送事件请求后 Sentry Server 是如何接收的呢?Sentry 会通过 SentrySyncHMSNotificationsPostEventListener 类的 syncNotificationEvents() 方法对所有的 DDL 事情操作进行同步,交给 Sentry Server 处理。
//位置:org/apache/sentry/binding/metastore/SentrySyncHMSNotificationsPostEventListener.java
/**
* This HMS post-event listener is used only to synchronize with HMS notifications on the Sentry server
* whenever a DDL event happens on the Hive metastore.
*/
public class SentrySyncHMSNotificationsPostEventListener extends MetaStoreEventListener {
/**
* It requests the Sentry server the synchronization of recent notification events.
*
* After the sync call, the latest processed ID will be stored for future reference to avoid
* syncing an ID that was already processed.
*
* @param event An event that contains a DB_NOTIFICATION_EVENT_ID_KEY_NAME value to request.
*/
private void syncNotificationEvents(ListenerEvent event, String eventName) {
// Do not sync notifications if the event has failed.
if (failedEvent(event, eventName)) {
return;
}
Map<String, String> eventParameters = event.getParameters();
if (!eventParameters.containsKey(MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME)) {
return;
}
/* If the HMS is running in an active transaction, then we do not want to sync with Sentry
* because the desired eventId is not available for Sentry yet, and Sentry may block the HMS
* forever or until a read time-out happens. */
if (isMetastoreTransactionActive(eventParameters)) {
return;
}
long eventId =
Long.parseLong(eventParameters.get(MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME));
// This check is only for performance reasons to avoid calling the sync thrift call if the Sentry server
// already processed the requested eventId.
if (eventId <= latestProcessedId.get()) {
return;
}
try(SentryPolicyServiceClient sentryClient = this.getSentryServiceClient()) {
LOGGER.debug("Starting Sentry/HMS notifications sync for {} (id: {})", eventName, eventId);
long sentryLatestProcessedId = sentryClient.syncNotifications(eventId);
LOGGER.debug("Finishedd Sentry/HMS notifications sync for {} (id: {})", eventName, eventId);
LOGGER.debug("Latest processed event ID returned by the Sentry server: {}", sentryLatestProcessedId);
updateProcessedId(sentryLatestProcessedId);
} catch (Exception e) {
// This error is only logged. There is no need to throw an error to Hive because HMS sync is called
// after the notification is already generated by Hive (as post-event).
LOGGER.error("Failed to sync requested HMS notifications up to the event ID: " + eventId, e);
}
}
}
Sentry Server 会对通过过来的 HMS 事件请求进行处理,即交由 SentryPolicyStoreProcessor 类处理,这里的 waitFor() 方法是关键,会等待 Sentry Server 处理对应的 HMS 请求,如果事件处理不及时会出现超时异常,也就是第一小节中 HMS 日志中的异常。
//位置:org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
@Override
public TSentrySyncIDResponse sentry_sync_notifications(TSentrySyncIDRequest request)
throws TException {
TSentrySyncIDResponse response = new TSentrySyncIDResponse();
try (Timer.Context timerContext = hmsWaitTimer.time()) {
// Wait until Sentry Server processes specified HMS Notification ID.
response.setId(sentryStore.getCounterWait().waitFor(request.getId()));
response.setStatus(Status.OK());
} catch (InterruptedException e) {
String msg = String.format("wait request for id %d is interrupted",
request.getId());
LOGGER.error(msg, e);
response.setId(0);
response.setStatus(Status.RuntimeError(msg, e));
Thread.currentThread().interrupt();
} catch (TimeoutException e) {
String msg = String.format("timed out wait request for id %d", request.getId());
LOGGER.warn(msg, e);
response.setId(0);
response.setStatus(Status.RuntimeError(msg, e));
}
return response;
}
waitFor() 方法的具体逻辑,无非就是对接收的 HMS 事件 id 与 Sentry Server 中的最新 id (即 Sentry 元数据 SENTRY_HMS_NOTIFICATION_ID 表的最新值)进行比较,并进行更新。
//位置:org/apache/sentry/service/thrift/CounterWait.java
/**
* Wait for specified counter value.
* Returns immediately if the value is reached or blocks until the value
* is reached.
* Multiple threads can call the method concurrently.
*
* @param value requested counter value
* @return current counter value that should be no smaller then the requested
* value
* @throws InterruptedException if the wait was interrupted, TimeoutException if
* wait was not successfull within the timeout value specified at the construction time.
*/
public long waitFor(long value) throws InterruptedException, TimeoutException {
// Fast path - counter value already reached, no need to block
if (value <= currentId.get()) {
return currentId.get();
}
// Enqueue the waiter for this value
ValueEvent eid = new ValueEvent(value);
waiters.put(eid);
// It is possible that between the fast path check and the time the
// value event is enqueued, the counter value already reached the requested
// value. In this case we return immediately.
if (value <= currentId.get()) {
return currentId.get();
}
// At this point we may be sure that by the time the event was enqueued,
// the counter was below the requested value. This means that update()
// is guaranteed to wake us up when the counter reaches the requested value.
// The wake up may actually happen before we start waiting, in this case
// the event's blocking queue will be non-empty and the waitFor() below
// will not block, so it is safe to wake up before the wait.
// So sit tight and wait patiently.
eid.waitFor();
LOGGER.debug("CounterWait added new value to waitFor: value = {}, currentId = {}", value, currentId.get());
return currentId.get();
}
至此,Sentry Server 已对 HMS 的一次事件请求进行了处理。
四、解决措施/建议
1、适当调小 sentry.notification.sync.timeout.ms 参数
该参数默认是 200s,调小该参数,可适当减小 create/drop/alter 等操作的等待时间,消息积压不多的情况可以选择这种方式让 sentry 自行消费处理掉。
Cloudera 修改 Sentry 服务的参数配置:
修改参数后重启 Sentry 服务,发现 HMS 出现 canary 异常后超时时间在 50s 多一点,说明参数生效。
2、监控 sentry 元数据 SENTRY_HMS_NOTIFICATION_ID 信息
直接获取 sentry 元数据 SENTRY_HMS_NOTIFICATION_ID 表的最新记录,如果没有更新则表示消息出现了滞后,此时 HMS 必会出现 canary 异常。
mysql> select * from SENTRY_HMS_NOTIFICATION_ID order by NOTIFICATION_ID desc limit 1;
+-----------------+
| NOTIFICATION_ID |
+-----------------+
| 184490926 |
+-----------------+
1 row in set (0.00 sec)
3、更新 sentry 消息同步记录
如果消息积压的太多,sentry 慢慢消费的时间太长的话,可能一直追不上 HMS 的最新 id,此时可以选择丢掉这些信息,具体操作在 sentry 元数据的 SENTRY_HMS_NOTIFICATION_ID 表中插入一条最大值(该最大值等于当前消息的 id 值,从 hive 元数据的 NOTIFICATION_SEQUENCE 表中获取 ),重启 sentry 服务。
use sentry;
insert into SENTRY_HMS_NOTIFICATION_ID values(184472866);
更新后 create 操作时间正常