看看Curator
框架 为实现对 连接状态ConnectionState
的监听,都是怎么构造框架的。后面我们也可以应用到业务的各种监听中。
Curator2.13实现
接口 Listener
Listener
接口,给用户实现stateChange()
传入新的状态,用户实现对这新的状态要做什么逻辑处理。
public interface ConnectionStateListener
{
/**
* Called when there is a state change in the connection
* @param client the client
* @param newState the new state
*/
public void stateChanged(CuratorFramework client, ConnectionState newState);
}
接口 Listenable
/**
* Abstracts a listenable object
*/
public interface Listenable<T>
{
/**
* Add the given listener. The listener will be executed in the containing instance's thread.
*
* @param listener listener to add
*/
public void addListener(T listener);
/**
* Add the given listener. The listener will be executed using the given
* executor
*
* @param listener listener to add
* @param executor executor to run listener in
*/
public void addListener(T listener, Executor executor);
public void removeListener(T listener);
}
抽象类 ListenerContainer<T> implements Listenable<T>
/**
* Abstracts an object that has listeners 装Listener的容器
* <T> Listener类型
*/
public class ListenerContainer<T> implements Listenable<T>
{
private final Map<T, ListenerEntry<T>> listeners = Maps.newConcurrentMap();
@Override
public void addListener(T listener)
{
addListener(listener, MoreExecutors.sameThreadExecutor());
}
@Override
public void addListener(T listener, Executor executor)
{
listeners.put(listener, new ListenerEntry<T>(listener, executor));
}
/**
* 对 Listener 列表的遍历进行封装
* Utility - apply the given function to each listener.
* @param function function to call for each listener
*/
public void forEach(final Function<T, Void> function)
{
for ( final ListenerEntry<T> entry : listeners.values() )
{
entry.executor.execute
(
new Runnable()
{
@Override
public void run()
{
try
{
function.apply(entry.listener);
}
catch ( Throwable e )
{
ThreadUtils.checkInterrupted(e);
log.error(String.format("Listener (%s) threw an exception", entry.listener), e);
}
}
}
);
}
}
public void clear()
{
listeners.clear();
}
public int size()
{
return listeners.size();
}
}
ConnectionStateManager
// to manage connection state
public class ConnectionStateManager {
// 又是队列? 玩消息什么的都是用队列。现在是存放 ConnectionState
BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);
// 持有 ListenerContainer
private final ListenerContainer<ConnectionStateListener> listeners = new ListenerContainer<ConnectionStateListener>();
/**
* Start the manager,起一个线程去执行 processEvents(),要是这线程挂了怎么办?异常怎么处理的?框架怎么处理的。。
*/
public void start()
{
service.submit
(
new Callable<Object>()
{
@Override
public Object call() throws Exception
{
processEvents();
return null;
}
}
);
}
@Override
public void close()
{
if ( state.compareAndSet(State.STARTED, State.CLOSED) )
{
service.shutdownNow();
listeners.clear();
}
}
// 对不断产生的 ConnectionState 进行处理,生产者?
private void processEvents(){
// 当 ConnectionStateManager 启动完成
while ( state.get() == State.STARTED )
{
// 不断从队列拿 Conection 状态
final ConnectionState newState = eventQueue.take();
// 对每个 状态监听接口 应用 Function, 状态监听接口作为 主语
// forEach 是 listeners封装的 遍历所有 listener 的方法而已。。。
listeners.forEach(
new Function<ConnectionStateListener, Void>() {
// ConnectionStateListener是我们自己要实现的接口,stateChanged是要实现的方法
@Override
public Void apply(ConnectionStateListener listener)
{
listener.stateChanged(client, newState);
return null;
}
}
);
/**
上面这段
如果没有封装 Listener 到 ListenerContainer 的话, 所有 Listener 就是个 List列表,就直接调 Listener 的 stateChanged 方法了吧。
for Listener {
listener.stateChanged(client, newState);
}
因为 封装 Listener 到 ListenerContainer了, 上面的 forEach 方法内部就可以有些内部实现,比如 对每个 Listener 都是用对应的 executor 来执行。
**/
}
}
// 上面的方法是处理 ConnectionState 的,那 ConnectionState 是怎么传进来的呢? 生产者?
/**
* Post a state change. If the manager is already in that state the change
* is ignored. Otherwise the change is queued for listeners.
*
* @param newConnectionState new state
* @return true if the state actually changed, false if it was already at that state
*/
public synchronized boolean addStateChange(ConnectionState newConnectionState)
{
// 先判断 ConnectionStateManager 是否已经启动好, state 是内部 Enum
if ( state.get() != State.STARTED )
{
return false;
}
ConnectionState previousState = currentConnectionState;
if ( previousState == newConnectionState )
{
return false;
}
ConnectionState localState = newConnectionState;
// !!!
notifyAll();
while ( !eventQueue.offer(state) )
{
eventQueue.poll();
log.warn("ConnectionStateManager queue full - dropping events to make room");
}
return true;
}
}
调用
启动
// 启动 connectionStateManager,不断检测 connectionState 变化
connectionStateManager.start(); // must be called before client.start()
// 来个匿名默认的 ConnectionStateListener
final ConnectionStateListener listener = new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
if ( ConnectionState.CONNECTED == newState || ConnectionState.RECONNECTED == newState )
{
logAsErrorConnectionErrors.set(true);
}
}
};
this.getConnectionStateListenable().addListener(listener);
生产 ConnectionState
,把zk
那里拿到的state
转一下,然后addStateChange
void validateConnection(Watcher.Event.KeeperState state)
{
if ( state == Watcher.Event.KeeperState.Disconnected )
{
suspendConnection();
}
else if ( state == Watcher.Event.KeeperState.Expired )
{
connectionStateManager.addStateChange(ConnectionState.LOST);
}
else if ( state == Watcher.Event.KeeperState.SyncConnected )
{
connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
}
else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )
{
connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
}
}
复用?
还有其他各种Listener
,都可以放到 ListenerContainer
private final ListenerContainer<CuratorListener> listeners;
private final ListenerContainer<UnhandledErrorListener> unhandledErrorListeners;
/**
* Receives notifications about errors and background events
*/
public interface CuratorListener {
/**
* Called when a background task has completed or a watch has triggered
* @param event the event
* @throws Exception any errors
*/
public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception;
}
public interface UnhandledErrorListener
{
/**
* Called when an exception is caught in a background thread, handler, etc. Before this
* listener is called, the error will have been logged and a {@link ConnectionState#LOST}
* event will have been queued for any {@link ConnectionStateListener}s.
* @param message Source message
* @param e exception
*/
public void unhandledError(String message, Throwable e);
}
总结一下源码技巧
-
ConnectionStateManager
就是个 生产者消费者模式的代码,特点就是:public addStateChange()
暴露给外部用户生产ConnectionState
,通过队列eventQueue
传递,private processEvents()
在内部对ConnectionState
进行消费。 - 直接
new
匿名类,对接口进行默认实现。 - 对
Listener
列表对象进行Container
封装,然后 封装foreach
方法,传入Function
接口 就是foreach
每个元素要执行的业务逻辑,方法体就可以加一些其他福利。