vertx模块HAManager高可用

HAManager

public HAManager(VertxInternal vertx, DeploymentManager deploymentManager,
ClusterManager clusterManager, int quorumSize, String group, boolean enabled) {
this.vertx = vertx;
this.deploymentManager = deploymentManager;
this.clusterManager = clusterManager;
//仲裁结点数量,默认1
this.quorumSize = enabled ? quorumSize : ;
//定义逻辑组名,默认__DEFAULT__
this.group = enabled ? group : "__DISABLED__";
this.enabled = enabled;
this.haInfo = new JsonObject();
haInfo.put("verticles", new JsonArray());
haInfo.put("group", this.group);
//获取集群管理 __vertx.haInfo,添加结点信息
this.clusterMap = clusterManager.getSyncMap(CLUSTER_MAP_NAME);
this.nodeID = clusterManager.getNodeID();
synchronized (haInfo) {
clusterMap.put(nodeID, haInfo.encode());
}
/**添加一个节点侦听器,侦听节点join or leaves */
clusterManager.nodeListener(new NodeListener() {
@Override
public void nodeAdded(String nodeID) {
HAManager.this.nodeAdded(nodeID);
} @Override
public void nodeLeft(String leftNodeID) {
HAManager.this.nodeLeft(leftNodeID);
}
});
//定义周期器,每隔 1s 检查 HADeployments
quorumTimerID = vertx.setPeriodic(QUORUM_CHECK_PERIOD, tid -> checkHADeployments());
// 调用检查仲裁来计算是否有初始仲裁
synchronized (this) {
checkQuorum();
}
} private void checkQuorum() {
if (quorumSize == ) {//判断仲裁数量
this.attainedQuorum = true;
} else {
 /**获取group node数量*/
List<String> nodes = clusterManager.getNodes();
int count = ;
for (String node : nodes) {
String json = clusterMap.get(node);
if (json != null) {
JsonObject clusterInfo = new JsonObject(json);
String group = clusterInfo.getString("group");
if (group.equals(this.group)) {
count++;
}
}
}
/**计算是否到达仲裁数量*/
boolean attained = count >= quorumSize;
if (!attainedQuorum && attained) {
log.info("A quorum has been obtained. Any deploymentIDs waiting on a quorum will now be deployed");
this.attainedQuorum = true;
} else if (attainedQuorum && !attained) {
log.info("There is no longer a quorum. Any HA deploymentIDs will be undeployed until a quorum is re-attained");
this.attainedQuorum = false;
}
}
}

deploy an HA verticle

public void deployVerticle(final String verticleName, DeploymentOptions deploymentOptions,
final Handler<AsyncResult<String>> doneHandler) {
if (attainedQuorum) {//根据是否达到仲裁数量,否则添加到delay Queue
doDeployVerticle(verticleName, deploymentOptions, doneHandler);
} else {
log.info("Quorum not attained. Deployment of verticle will be delayed until there's a quorum.");
addToHADeployList(verticleName, deploymentOptions, doneHandler);
}
} /**
* 部署verticle
*/
private void doDeployVerticle(final String verticleName, DeploymentOptions deploymentOptions,
final Handler<AsyncResult<String>> doneHandler) {
/**添加deploy verticle 后的回调 handler*/
final Handler<AsyncResult<String>> wrappedHandler = asyncResult -> {
if (asyncResult.succeeded()) {
// 添加当前 node 的 HA 相关信息,以便 other node了解
addToHA(asyncResult.result(), verticleName, deploymentOptions);
}
/**触发已添加添加回调 hander*/
if (doneHandler != null) {
doneHandler.handle(asyncResult);
} else if (asyncResult.failed()) {
log.error("Failed to deploy verticle", asyncResult.cause());
}
};
//部署verticle
deploymentManager.deployVerticle(verticleName, deploymentOptions, wrappedHandler);
} /**
* 添加deploy 任务到delay Queue
*/
private void addToHADeployList(final String verticleName, final DeploymentOptions deploymentOptions,
final Handler<AsyncResult<String>> doneHandler) {
toDeployOnQuorum.add(() -> {
ContextImpl ctx = vertx.getContext();
try {
ContextImpl.setContext(null);
//部署verticle
deployVerticle(verticleName, deploymentOptions, doneHandler);
} finally {
ContextImpl.setContext(ctx);
}
});
}

周期每秒检测

private void checkHADeployments() {
try {
if (attainedQuorum) {//判断仲裁数量是否达到
deployHADeployments();
} else {
undeployHADeployments();
}
} catch (Throwable t) {
log.error("Failed when checking HA deploymentIDs", t);
}
} private void deployHADeployments() {
  //获取delay Queue 任务数
int size = toDeployOnQuorum.size();
if (size != ) {
log.info("There are " + size + " HA deploymentIDs waiting on a quorum. These will now be deployed");
Runnable task;
/**处理所有 delay 部署任务*/
while ((task = toDeployOnQuorum.poll()) != null) {
try {
task.run();
} catch (Throwable t) {
log.error("Failed to run redeployment task", t);
}
}
}
} private void undeployHADeployments() {
  /** 遍历所有deploy verticle */
for (String deploymentID: deploymentManager.deployments()) {
Deployment dep = deploymentManager.getDeployment(deploymentID);
if (dep != null) {
if (dep.deploymentOptions().isHa()) {
ContextImpl ctx = vertx.getContext();
try {
ContextImpl.setContext(null);
//卸载
deploymentManager.undeployVerticle(deploymentID, result -> {
if (result.succeeded()) {
log.info("Successfully undeployed HA deployment " + deploymentID + "-" + dep.verticleIdentifier() + " as there is no quorum");
/**添加HA verticle 到 delay Queue 重新部署*/
addToHADeployList(dep.verticleIdentifier(), dep.deploymentOptions(), result1 -> {
if (result1.succeeded()) {
log.info("Successfully redeployed verticle " + dep.verticleIdentifier() + " after quorum was re-attained");
} else {
log.error("Failed to redeploy verticle " + dep.verticleIdentifier() + " after quorum was re-attained", result1.cause());
}
});
} else {
log.error("Failed to undeploy deployment on lost quorum", result.cause());
}
});
} finally {
ContextImpl.setContext(ctx);
}
}
}
}
}

note:不建议使用HA模块,还是利用Health Check作为verticle服务检查,出错时自动重启服务,
"启用高可用性(HA)的情况下部署Verticle。在该上下文中,当Verticle部署在突然死亡的vert.x实例上时,
Verticle将从集群中的另一个vert.x实例上重新部署".
分析:没有verticle持久化,上传集群中心或序列化分发other node,其它集群结点local根本没有
verticle(compile class)如何重新部署,毕竟不是共享内存

上一篇:对象转型、迭代器Iterator、Set集合、装箱与拆箱、基本数据类型与字符串的转换、TreeSet集合与对象


下一篇:PAT 1016. Phone Bills