ServiceManager类:
用于监控服务集的管理器,该类提供了诸如startAsync、stopAsync、servicesByState方法来运行、结束和检查服务集,而且,通过监听器机制还能监控状态的改变。
建议通过此类来管理服务的生命周期,状态转变则通过其它机制来执行。例如,如果服务集通过除startAsync之外的某些机制启动,那么监听器将在合适的时候被唤醒,同时awaitHealthy 也仍将按预期工作。
NEW
|
服务在这个状态下是不活跃的,它的作用和消耗的资源都非常小
|
STARTING
|
服务在这个状态过渡到RUNNING
|
RUNNING
|
服务在这个状态下运作
|
STOPPING
|
服务在这个状态过渡到RUNNING
|
TERMINATED
|
服务在这个状态下,说明已正常执行
|
FAILED
|
服务遇到问题,已无法启动或终止
|
ServiceManager主要方法
void addListener(Listener listener, Executor executor)
在executor上注册一个监听器
ServiceManager startAsync()
启动所有被管理的服务,只有当所有服务的状态为NEW时,调用此方法才不会抛异常
void awaitHealthy()
等待服务管理器变得healthy(所有服务的状态都为RUNNING )
void awaitHealthy(long timeout, TimeUnit unit)
在一定时间内,等待服务管理器变得healthy(所有服务的状态都为RUNNING )
ServiceManager stopAsync()
stop()所有管理的服务
void awaitStopped()等待所有服务都到达终止状态(所有服务的状态应该为TERMINATED或FAILED)
void awaitStopped(long timeout, TimeUnit unit)
在一定时间内,等待所有服务都到达终止状态
boolean isHealthy()
如果所有服务的状态为RUNNING,则返回true
ImmutableMultimap<State, Service> servicesByState()
获取所有服务的当前状态的快照
ImmutableMap<Service, Long> startupTimes()
返回完成启动的服务的加载时间
ServiceManager的构建——
1)创建一个ServiceManagerState;
2)创建一个Executor ;
3)根据每一个Service及ServiceManagerState创建相应的ServiceListener;
4)将每一个ServiceListener注册到Executor 上,此时会调用到每个Service的addListener方法;
5)为services 添加Service与ServiceListener的键值对。
几个主要函数:
启动所有服务:
public ServiceManager startAsync() {
//验证所有服务的初始状态是否为NEW。
for (Map.Entry<Service, ServiceListener> entry : services.entrySet()) {
Service service = entry.getKey();
State state = service.state();
checkState(state == State.NEW, "Service %s is %s, cannot start it.", service,
state);
}
//执行所有服务的start方法。
for (ServiceListener service : services.values()) {
service.start();
}
return this;
}
终止所有服务:
public ServiceManager stopAsync() {
for (Service service : services.keySet()) {
service.stop();
}
return this;
}
在重写start方法时,可以做一些初始化工作,返回结果为ListenableFuture<State>,调用其get方法时,将在该服务启动后,获得其状态变量(RUNNING,STOPPING或TERMINATED),如果服务启动失败,get方法将抛出ExecutionException异常,同时服务状态将为FAILED。
在重写stop方法时,返回结果也为ListenableFuture<State>,调用其get方法时,将在该服务结束后,获得其状态变量TERMINATED或抛出ExecutionException异常。
|
ServiceManagerState类——ServiceManager的所有可变状态的封装体。
void serviceFinishedStarting(Service service, boolean currentlyHealthy)
|
在服务完成启动时调用
|
void serviceTerminated(Service service)
|
在服务TERMINATED时调用
|
void serviceFailed(final Service service)
|
在服务FAILED时调用
|
void serviceStopped(Service service)
|
在服务TERMINATED或FAILED时调用
|
void executeListeners()
|
执行queuedListeners中所有的监听器
|
void addListener(Listener listener, Executor executor)
|
同上
|
void awaitHealthy()
|
同上
|
boolean awaitHealthy(long timeout, TimeUnit unit)
|
同上
|
void awaitStopped()
|
同上
|
boolean awaitStopped(long timeout, TimeUnit unit)
|
同上
|
ServiceManager中的方式实际调用的即为以上方法。
主要成员变量:
1)记录了服务总数(numberOfServices),未启动服务数(unstartedServices)及未终止服务数(unstoppedServices),初始值都为服务总数。
这些状态的作用体现在两个地方:
① 等待所有服务处于健康状态,涉及的条件awaitHealthGuard为:所有服务都已经启动 或 部分服务已终止。
该条件应用在,等待所有服务处于健康状态的主体函数中:
public void awaitHealthy() {
state.awaitHealthy();
checkState(isHealthy(), "Expected to be healthy after starting");
}
|
主要分两部分:
首先在state.awaitHealthy()中,通过下面函数,等待所有服务都启动
private void waitUninterruptibly(Guard guard, boolean signalBeforeWaiting) {
//所有服务都已启动 或 部分服务已终止,则直接返回,无需等待;
if (!guard.isSatisfied()) {
if (signalBeforeWaiting) {
signalConditionsOfSatisfiedGuards(null);
}
incrementWaiters(guard);
try {
final Condition condition = guard.condition;
do {
condition.awaitUninterruptibly();
} while (!guard.isSatisfied());//若存在未启动的服务,且没有全部终止,则继续等待;
} finally {
decrementWaiters(guard);
}
}
}
|
接着通过isHealthy()函数,判断所有服务处在RUNNING状态。
② 判断所有服务是否处于终止状态,需要满足的条件stoppedGuard即为:未终止服务数等于0。
2)Queue<Runnable> queuedListeners队列保存等待执行的监听器,调用executeListeners()时,会依次将队列中的所有方法执行并移除。
3)List<ListenerExecutorPair> listeners 保存了在状态转变时,需要通知的监听器,应用:
① serviceStopped方法被调用时,unstoppedServices减一,添加listeners中的所有监听器的stopped方法至queuedListeners队列。
② serviceTerminated方法被调用时,实际调用serviceStopped方法。
③ serviceFailed方法被调用时,添加listeners中的所有监听器的failure方法至queuedListeners队列,并调用serviceStopped方法。
④ serviceFinishedStarting方法被调用时,unstartedServices减1,如果参数currentlyHealthy为真,且所有服务都已启动且没终止,添加listeners中的所有监听器的 healthy方法至queuedListeners队列。
添加方法都如下:
for (final ListenerExecutorPair pair : listeners) {
queuedListeners.add(new Runnable() {
@Override public void run() {
pair.execute(new Runnable() {
@Override public void run() {
pair.listener.stopped();
}
});
}
});
}
|
ServiceListener类:
封装了另一种服务及其启动时间,同时会根据当前状态调用ServiceManagerState的serviceFinishedStarting ,serviceTerminated ,serviceFailed 方法。
DEMO:
下面从终止服务为例,说下Service及ServiceManager的监听器,也不知道自己这样用对不对....
我们可以在实现Service接口的类中,重写addListener方法,该方法将在构造服务管理器时,被调用:
调用处:
ServiceListener listener = new ServiceListener(service, state);
service.
addListener(listener, executor); |
重写的addListener方法:
@Override
public void addListener(Listener listener, Executor executor) {
this.listener =
listener;
System.out.println(serviceInfo+"注册监听器");
}
|
接着重写stop方法,当服务管理器执行stopAsync() 方法时,将依次执行每个服务的stop方法:
@Override
public ListenableFuture<State> stop() {
listener. terminated(state);
System.out.println(serviceInfo + "终止...");
return null;
}
|
此处我们调用listener的terminated方法:
@Override public void terminated(State from) {
logger.info("Service " + service + " has terminated. Previous state was " + from + " state.");
state.monitor.enter();
try {
if (from == State.NEW) {
startTimer();
finishedStarting(false);
}
state.serviceTerminated(service);
} finally {
state.monitor.leave();
state.executeListeners();
}
}
|
serviceTerminated方法实际调用的为serviceStopped方法,在该方法中将unstoppedServices(未终止服务数)减一,若所有服务都已终止,则将listeners中所有监听器的stopped方法加入queuedListeners队列中稍后执行。
private void serviceStopped(Service service) {
checkState(unstoppedServices > 0, "All services should have already stopped but %s just stopped.", service);
unstoppedServices--;
if (unstoppedServices == 0) {
checkState(unstartedServices == 0, "All services are stopped but %d services haven't finished starting", unstartedServices);
for (final
ListenerExecutorPair pair : listeners) {
queuedListeners.add(new Runnable() {
@Override public void run() {
pair.execute(new Runnable() {
@Override public void run() {
pair.listener.stopped();
}
});
}
});
}
listeners.clear();
}
}
|
listeners中的监听器,是通过服务管理器的addListener方法(Service也有addListener方法)添加的,在该方法中会将监听器添加到listeners队列中,当服务状态发生改变时,通知队列中的监听器。
这些监听器除了有stopped方法,还有healthy及failure方法,通过这几个方法,我们可以监视所有服务的运行状态,在下面的例子中,我们可以看到运行结果中出现"服务运行结束!"字样,说明已检测到所有服务都运行结束。
ServiceImp实现类:
public class ServiceImp implements Service {
private State state = NEW;
private String serviceInfo;
private Listener listener;
ServiceImp(int num){
this.serviceInfo = "第"+num+"个任务:";
}
@Override
public ListenableFuture<State> start() {
System.out.println(serviceInfo+"启动...");
listener.starting();
return null;
}
@Override
public State startAndWait() {
return state;
}
@Override
public boolean isRunning() {
if(state== RUNNING)
return true;
else
return false;
}
@Override
public State state() {
return state;
}
@Override
public ListenableFuture<State> stop() {
listener.
terminated(state);
System.out.println(serviceInfo + "终止...");
return null;
}
@Override
public State stopAndWait() {
return state;
}
@Override
public Throwable failureCause() {
return null;
}
@Override
public void addListener(Listener listener, Executor executor) {
this.listener =
listener;
System.out.println(serviceInfo+"注册监听器");
}
}
|
运行服务:
public class Server {
public static void main(String[] args) {
List<Service> services = Lists.newArrayList();
for (int num = 0; num < 5; num++) {
ServiceImp serviceImp = new ServiceImp(num);
services.add(serviceImp);
}
System.out.println("*******构造服务管理器*******");
final ServiceManager serviceManager = new ServiceManager(services);
serviceManager.
addListener(new ServiceManager.Listener() {
@Override
public void healthy() {
System.out.println("服务运行健康!");
}
@Override
public void stopped() {
System.out.println("服务运行结束!");
}
@Override
public void failure(Service service) {
System.out.println("服务运行失败!");
}
}, MoreExecutors.sameThreadExecutor());
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
try {
System.out.println("********终止所有任务********");
serviceManager.stopAsync().awaitStopped(10, TimeUnit.SECONDS);
} catch (Exception timeout) {
System.out.println("timeout");
}
}
});
System.out.println("********启动所有任务********");
serviceManager.startAsync();
}
}
|
运行结果:
*******构造服务管理器*******
第0个任务:注册监听器
第1个任务:注册监听器
第2个任务:注册监听器
第3个任务:注册监听器
第4个任务:注册监听器
********启动所有任务********
第0个任务:启动...
第1个任务:启动...
第2个任务:启动...
第3个任务:启动...
第4个任务:启动...
********终止所有任务********
第0个任务:终止...
第1个任务:终止...
第2个任务:终止...
第3个任务:终止...
服务运行终止!
第4个任务:终止...
|