有些工作只能在一台server上进行,比如master,这时HA(High Availability)首先要求部署多个server,其次要求多个server自动选举出一个active状态server,其他server处于standby状态,只有active状态的server允许进行特定的操作;当active状态的server由于各种原因无法服务之后(比如挂了或者断网),其他standby状态的server中会马上自动选举出一个active的server来提供服务,实现服务的无缝切换;
hadoop master ha是通过zookeeper实现的,其中又分为hdfs ha(namenode的ha)和yarn ha(resourcemanager的ha),两者既有共同点,又有差别;
1 现象
1.1 hdfs ha
zookeeper path:
/hadoop-ha/${dfs.nameservices}/ActiveBreadCrumb
/hadoop-ha/${dfs.nameservices}/ActiveStandbyElectorLock
配置:
<property>
<name>ha.zookeeper.parent-znode</name>
<value>/hadoop-ha</value>
<description>
The ZooKeeper znode under which the ZK failover controller stores
its information. Note that the nameservice ID is automatically
appended to this znode, so it is not normally necessary to
configure this, even in a federated environment.
</description>
</property>
<property>
<name>dfs.nameservices</name>
<value></value>
<description>
Comma-separated list of nameservices.
</description>
</property>
1.2 yarn ha
zookeeper path:
/yarn-leader-election/${yarn.resourcemanager.cluster-id}/ActiveBreadCrumb
/yarn-leader-election/${yarn.resourcemanager.cluster-id}/ActiveStandbyElectorLock
配置:
<property>
<description>The base znode path to use for storing leader information,
when using ZooKeeper based leader election.</description>
<name>yarn.resourcemanager.ha.automatic-failover.zk-base-path</name>
<value>/yarn-leader-election</value>
</property>
<property>
<description>Name of the cluster. In a HA setting,
this is used to ensure the RM participates in leader
election for this cluster and ensures it does not affect
other clusters</description>
<name>yarn.resourcemanager.cluster-id</name>
<!--value>yarn-cluster</value-->
</property>
为什么zookeeper上有两个节点ActiveBreadCrumb和ActiveStandbyElectorLock,ActiveStandbyElectorLock是用来实际加锁的,ActiveBreadCrumb是用来做fence的;
2 代码实现
hdfs和yarn的ha最终都用到了ActiveStandbyElector,逐一来看
2.1 hdfs ha
zkfc启动命令
$HADOOP_HOME/bin/hdfs
elif [ "$COMMAND" = "zkfc" ] ; then
CLASS='org.apache.hadoop.hdfs.tools.DFSZKFailoverController'
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_ZKFC_OPTS"
代码
org.apache.hadoop.hdfs.tools.DFSZKFailoverController
public static void main(String args[])
throws Exception {
if (DFSUtil.parseHelpArgument(args,
ZKFailoverController.USAGE, System.out, true)) {
System.exit(0);
} GenericOptionsParser parser = new GenericOptionsParser(
new HdfsConfiguration(), args);
DFSZKFailoverController zkfc = DFSZKFailoverController.create(
parser.getConfiguration());
int retCode = 0;
try {
retCode = zkfc.run(parser.getRemainingArgs());
} catch (Throwable t) {
LOG.fatal("Got a fatal error, exiting now", t);
}
System.exit(retCode);
}
DFSZKFailoverController.main会调用run,这里的run是父类ZKFailoverController的方法,其中会调用doRun,下面看父类:
org.apache.hadoop.ha.ZKFailoverController
private static final String ZK_PARENT_ZNODE_KEY = "ha.zookeeper.parent-znode"; static final String ZK_PARENT_ZNODE_DEFAULT = "/hadoop-ha"; private int doRun(String[] args)
throws HadoopIllegalArgumentException, IOException, InterruptedException {
try {
initZK();
} catch (KeeperException ke) {
LOG.fatal("Unable to start failover controller. Unable to connect "
+ "to ZooKeeper quorum at " + zkQuorum + ". Please check the "
+ "configured value for " + ZK_QUORUM_KEY + " and ensure that "
+ "ZooKeeper is running.");
return ERR_CODE_NO_ZK;
}
if (args.length > 0) {
if ("-formatZK".equals(args[0])) {
boolean force = false;
boolean interactive = true;
for (int i = 1; i < args.length; i++) {
if ("-force".equals(args[i])) {
force = true;
} else if ("-nonInteractive".equals(args[i])) {
interactive = false;
} else {
badArg(args[i]);
}
}
return formatZK(force, interactive);
} else {
badArg(args[0]);
}
} if (!elector.parentZNodeExists()) {
LOG.fatal("Unable to start failover controller. "
+ "Parent znode does not exist.\n"
+ "Run with -formatZK flag to initialize ZooKeeper.");
return ERR_CODE_NO_PARENT_ZNODE;
} try {
localTarget.checkFencingConfigured();
} catch (BadFencingConfigurationException e) {
LOG.fatal("Fencing is not configured for " + localTarget + ".\n" +
"You must configure a fencing method before using automatic " +
"failover.", e);
return ERR_CODE_NO_FENCER;
} initRPC();
initHM();
startRPC();
try {
mainLoop();
} finally {
rpcServer.stopAndJoin(); elector.quitElection(true);
healthMonitor.shutdown();
healthMonitor.join();
}
return 0;
} private void initZK() throws HadoopIllegalArgumentException, IOException,
KeeperException {
...
elector = new ActiveStandbyElector(zkQuorum,
zkTimeout, getParentZnode(), zkAcls, zkAuths,
new ElectorCallbacks(), maxRetryNum);
} private String getParentZnode() {
String znode = conf.get(ZK_PARENT_ZNODE_KEY,
ZK_PARENT_ZNODE_DEFAULT);
if (!znode.endsWith("/")) {
znode += "/";
}
return znode + getScopeInsideParentNode();
} class ElectorCallbacks implements ActiveStandbyElectorCallback {
@Override
public void becomeActive() throws ServiceFailedException {
ZKFailoverController.this.becomeActive();
} @Override
public void becomeStandby() {
ZKFailoverController.this.becomeStandby();
} @Override
public void enterNeutralMode() {
} @Override
public void notifyFatalError(String errorMessage) {
fatalError(errorMessage);
} @Override
public void fenceOldActive(byte[] data) {
ZKFailoverController.this.fenceOldActive(data);
} @Override
public String toString() {
synchronized (ZKFailoverController.this) {
return "Elector callbacks for " + localTarget;
}
}
}
doRun中会调用几个方法,最重要的两个是initZK和initHM:
在initZK中会通过getParentZnode创建zk路径,同时创建ActiveStandbyElector,这里最重要的是把内部类ElectorCallbacks的对象传到ActiveStandbyElector,后续各种zk状态都是通过ActiveStandbyElector->ElectorCallbacks->ZKFailoverController这个调用链传递的,最终实现状态变更,比如becomeActive,becomeStandby等;
initZK中只是确定了zk路径以及各种回调函数,还没有实际的创建操作,具体的操作在initHM中,下面看initHM:
org.apache.hadoop.ha.ZKFailoverController
private void initHM() {
healthMonitor = new HealthMonitor(conf, localTarget);
healthMonitor.addCallback(new HealthCallbacks());
healthMonitor.addServiceStateCallback(new ServiceStateCallBacks());
healthMonitor.start();
}
initHM中会创建HealthMonitor,传入HealthCallbacks,然后启动HealthMonitor,下面看HealthMonitor:
org.apache.hadoop.ha.HealthMonitor
void start() {
daemon.start();
} private void doHealthChecks() throws InterruptedException {
while (shouldRun) {
HAServiceStatus status = null;
boolean healthy = false;
try {
status = proxy.getServiceStatus();
proxy.monitorHealth();
healthy = true;
} catch (HealthCheckFailedException e) {
LOG.warn("Service health check failed for " + targetToMonitor
+ ": " + e.getMessage());
enterState(State.SERVICE_UNHEALTHY);
} catch (Throwable t) {
LOG.warn("Transport-level exception trying to monitor health of " +
targetToMonitor + ": " + t.getLocalizedMessage());
RPC.stopProxy(proxy);
proxy = null;
enterState(State.SERVICE_NOT_RESPONDING);
Thread.sleep(sleepAfterDisconnectMillis);
return;
} if (status != null) {
setLastServiceStatus(status);
}
if (healthy) {
enterState(State.SERVICE_HEALTHY);
} Thread.sleep(checkIntervalMillis);
}
} private synchronized void enterState(State newState) {
if (newState != state) {
LOG.info("Entering state " + newState);
state = newState;
synchronized (callbacks) {
for (Callback cb : callbacks) {
cb.enteredState(newState);
}
}
}
}
HealthMonitor.start会启动内部的MonitorDaemon线程,而MonitorDaemon线程中中会循环调用HealthMonitor.doHealthChecks,doHealthChecks会根据各种状态变化调用enterState,而enterState会迭代回调所有的callbacks,这是一个Observer模式,重点在callback上,即HealthCallbacks;
先看MonitorDaemon线程:
org.apache.hadoop.ha.HealthMonitor.MonitorDaemon
public void run() {
while (shouldRun) {
try {
loopUntilConnected();
doHealthChecks();
} catch (InterruptedException ie) {
Preconditions.checkState(!shouldRun,
"Interrupted but still supposed to run");
}
}
}
再看HealthCallbacks:
org.apache.hadoop.ha.ZKFailoverController.HealthCallbacks
class HealthCallbacks implements HealthMonitor.Callback {
@Override
public void enteredState(HealthMonitor.State newState) {
setLastHealthState(newState);
recheckElectability();
}
}
这里会调用到ZKFailoverController.recheckElectability
org.apache.hadoop.ha.ZKFailoverController
private void recheckElectability() {
// Maintain lock ordering of elector -> ZKFC
synchronized (elector) {
synchronized (this) {
boolean healthy = lastHealthState == State.SERVICE_HEALTHY; long remainingDelay = delayJoiningUntilNanotime - System.nanoTime();
if (remainingDelay > 0) {
if (healthy) {
LOG.info("Would have joined master election, but this node is " +
"prohibited from doing so for " +
TimeUnit.NANOSECONDS.toMillis(remainingDelay) + " more ms");
}
scheduleRecheck(remainingDelay);
return;
} switch (lastHealthState) {
case SERVICE_HEALTHY:
elector.joinElection(targetToData(localTarget));
if (quitElectionOnBadState) {
quitElectionOnBadState = false;
}
break; case INITIALIZING:
LOG.info("Ensuring that " + localTarget + " does not " +
"participate in active master election");
elector.quitElection(false);
serviceState = HAServiceState.INITIALIZING;
break; case SERVICE_UNHEALTHY:
case SERVICE_NOT_RESPONDING:
LOG.info("Quitting master election for " + localTarget +
" and marking that fencing is necessary");
elector.quitElection(true);
serviceState = HAServiceState.INITIALIZING;
break; case HEALTH_MONITOR_FAILED:
fatalError("Health monitor failed!");
break; default:
throw new IllegalArgumentException("Unhandled state:" + lastHealthState);
}
}
}
}
在health的情况下会调用ActiveStandbyElector.joinElection,下面看ActiveStandbyElector:
org.apache.hadoop.ha.ActiveStandbyElector
public class ActiveStandbyElector implements StatCallback, StringCallback { public ActiveStandbyElector(String zookeeperHostPorts,
int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl,
List<ZKAuthInfo> authInfo,
ActiveStandbyElectorCallback app, int maxRetryNum) throws IOException,
HadoopIllegalArgumentException, KeeperException {
...
znodeWorkingDir = parentZnodeName;
zkLockFilePath = znodeWorkingDir + "/" + LOCK_FILENAME;
zkBreadCrumbPath = znodeWorkingDir + "/" + BREADCRUMB_FILENAME;
... public synchronized void joinElection(byte[] data)
throws HadoopIllegalArgumentException { if (data == null) {
throw new HadoopIllegalArgumentException("data cannot be null");
} if (wantToBeInElection) {
LOG.info("Already in election. Not re-connecting.");
return;
} appData = new byte[data.length];
System.arraycopy(data, 0, appData, 0, data.length); LOG.debug("Attempting active election for " + this);
joinElectionInternal();
} private void joinElectionInternal() {
Preconditions.checkState(appData != null,
"trying to join election without any app data");
if (zkClient == null) {
if (!reEstablishSession()) {
fatalError("Failed to reEstablish connection with ZooKeeper");
return;
}
} createRetryCount = 0;
wantToBeInElection = true;
createLockNodeAsync();
} private void createLockNodeAsync() {
zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL,
this, zkClient);
}
ActiveStandbyElector实现了两个zookeeper的callback接口StatCallback和StringCallback,调用过程为joinElection->joinElectionInternal->createLockNodeAsync,最终会调用ZooKeeper.create异步方法,同时把自己作为callback传进去,这样zookeeper后续的变更都会回调ActiveStandbyElector.processResult,而processResult中会回调ElectorCallbacks,至此整个流程打通。
zookeeper的StringCallback接口如下:
org.apache.zookeeper.AsyncCallback.StringCallback
interface StringCallback extends AsyncCallback {
public void processResult(int rc, String path, Object ctx, String name);
}
2.2 yarn ha
org.apache.hadoop.yarn.conf.YarnConfiguration
public static final String AUTO_FAILOVER_ZK_BASE_PATH =
AUTO_FAILOVER_PREFIX + "zk-base-path";
public static final String DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH =
"/yarn-leader-election";
这里可以看到配置
org.apache.hadoop.yarn.server.resourcemanager.EmbeddedElectorService
protected void serviceInit(Configuration conf)
throws Exception {
...
String zkBasePath = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
String electionZNode = zkBasePath + "/" + clusterId;
...
elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout,
electionZNode, zkAcls, zkAuths, this, maxRetryNum);
... @Override
protected void serviceStart() throws Exception {
elector.joinElection(localActiveNodeInfo);
super.serviceStart();
}
过程和上述ZKFailoverController差不多,EmbeddedElectorService.serviceInit中会创建zk路径同时创建ActiveStandbyElector,然后在serviceStart中会调用ActiveStandbyElector.joinElection