Curator源码阅读 - ConnectionState的管理与监听

看看Curator框架 为实现对 连接状态ConnectionState的监听,都是怎么构造框架的。后面我们也可以应用到业务的各种监听中。

Curator2.13实现

接口 Listener

Listener接口,给用户实现stateChange()传入新的状态,用户实现对这新的状态要做什么逻辑处理。

public interface ConnectionStateListener
{
    /**
     * Called when there is a state change in the connection
     * @param client the client
     * @param newState the new state
     */
    public void stateChanged(CuratorFramework client, ConnectionState newState);
}

接口 Listenable

/**
 * Abstracts a listenable object
 */
public interface Listenable<T>
{
    /**
     * Add the given listener. The listener will be executed in the containing instance's thread.
     *
     * @param listener listener to add
     */
    public void     addListener(T listener);

    /**
     * Add the given listener. The listener will be executed using the given
     * executor
     *
     * @param listener listener to add
     * @param executor executor to run listener in
     */
    public void     addListener(T listener, Executor executor);

    public void     removeListener(T listener);
}

抽象类 ListenerContainer<T> implements Listenable<T>

/**
 * Abstracts an object that has listeners 装Listener的容器
 * <T> Listener类型
 */
public class ListenerContainer<T> implements Listenable<T>
{
    private final Map<T, ListenerEntry<T>> listeners = Maps.newConcurrentMap();

    @Override
    public void addListener(T listener)
    {
        addListener(listener, MoreExecutors.sameThreadExecutor());
    }

    @Override
    public void addListener(T listener, Executor executor)
    {
        listeners.put(listener, new ListenerEntry<T>(listener, executor));
    }
    
    /**
     * 对 Listener 列表的遍历进行封装
     * Utility - apply the given function to each listener. 
     * @param function function to call for each listener
     */
    public void forEach(final Function<T, Void> function)
    {
        for ( final ListenerEntry<T> entry : listeners.values() )
        {
            entry.executor.execute
            (
                new Runnable()
                {
                    @Override
                    public void run()
                    {
                        try
                        {
                            function.apply(entry.listener);
                        }
                        catch ( Throwable e )
                        {
                            ThreadUtils.checkInterrupted(e);
                            log.error(String.format("Listener (%s) threw an exception", entry.listener), e);
                        }
                    }
                }
            );
        }
    }
    
    
    public void clear()
    {
        listeners.clear();
    }

    public int size()
    {
        return listeners.size();
    }
    
}

ConnectionStateManager

// to manage connection state
public class ConnectionStateManager {
    // 又是队列? 玩消息什么的都是用队列。现在是存放 ConnectionState
    BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);
    // 持有 ListenerContainer
    private final ListenerContainer<ConnectionStateListener> listeners = new ListenerContainer<ConnectionStateListener>();
    
    /**
     * Start the manager,起一个线程去执行 processEvents(),要是这线程挂了怎么办?异常怎么处理的?框架怎么处理的。。
     */
    public void start()
    {
        service.submit
            (
                new Callable<Object>()
                {
                    @Override
                    public Object call() throws Exception
                    {
                        processEvents();
                        return null;
                    }
                }
            );
    }
    
    @Override
    public void close()
    {
        if ( state.compareAndSet(State.STARTED, State.CLOSED) )
        {
            service.shutdownNow();
            listeners.clear();
        }
    }
    
    // 对不断产生的 ConnectionState 进行处理,生产者?
     private void processEvents(){
        // 当 ConnectionStateManager 启动完成
        while ( state.get() == State.STARTED )
        {
            // 不断从队列拿 Conection 状态
            final ConnectionState newState = eventQueue.take();
            // 对每个 状态监听接口  应用 Function, 状态监听接口作为 主语
            // forEach 是 listeners封装的 遍历所有 listener 的方法而已。。。
            listeners.forEach(
                new Function<ConnectionStateListener, Void>() {
                    // ConnectionStateListener是我们自己要实现的接口,stateChanged是要实现的方法
                    @Override 
                    public Void apply(ConnectionStateListener listener)
                    {
                        listener.stateChanged(client, newState);
                        return null;
                    }
                }
            );
            /**
            上面这段
            如果没有封装 Listener 到 ListenerContainer 的话, 所有 Listener 就是个 List列表,就直接调 Listener 的 stateChanged 方法了吧。
            for Listener {
                listener.stateChanged(client, newState);
            }
            
            因为 封装 Listener 到 ListenerContainer了, 上面的 forEach 方法内部就可以有些内部实现,比如 对每个 Listener 都是用对应的 executor 来执行。
            **/
        }
    }
    
    
    // 上面的方法是处理 ConnectionState 的,那 ConnectionState 是怎么传进来的呢? 生产者?
    /**
     * Post a state change. If the manager is already in that state the change
     * is ignored. Otherwise the change is queued for listeners.
     *
     * @param newConnectionState new state
     * @return true if the state actually changed, false if it was already at that state
     */
    public synchronized boolean addStateChange(ConnectionState newConnectionState)
    {
        // 先判断 ConnectionStateManager 是否已经启动好, state 是内部 Enum
        if ( state.get() != State.STARTED )
        {
            return false;
        }

        ConnectionState previousState = currentConnectionState;
        if ( previousState == newConnectionState )
        {
            return false;
        }
        ConnectionState localState = newConnectionState;
        // !!!
        notifyAll();

        while ( !eventQueue.offer(state) )
        {
            eventQueue.poll();
            log.warn("ConnectionStateManager queue full - dropping events to make room");
        }
        return true;
    }
    
}
    
   

调用

启动

// 启动 connectionStateManager,不断检测 connectionState 变化
connectionStateManager.start(); // must be called before client.start()
// 来个匿名默认的 ConnectionStateListener
final ConnectionStateListener listener = new ConnectionStateListener()
{
    @Override
    public void stateChanged(CuratorFramework client, ConnectionState newState)
    {
        if ( ConnectionState.CONNECTED == newState || ConnectionState.RECONNECTED == newState )
        {
            logAsErrorConnectionErrors.set(true);
        }
    }
};
this.getConnectionStateListenable().addListener(listener);

生产 ConnectionState,把zk那里拿到的state转一下,然后addStateChange

void validateConnection(Watcher.Event.KeeperState state)
{
    if ( state == Watcher.Event.KeeperState.Disconnected )
    {
        suspendConnection();
    }
    else if ( state == Watcher.Event.KeeperState.Expired )
    {
        connectionStateManager.addStateChange(ConnectionState.LOST);
    }
    else if ( state == Watcher.Event.KeeperState.SyncConnected )
    {
        connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
    }
    else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )
    {
        connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
    }
}

复用?

还有其他各种Listener,都可以放到 ListenerContainer

private final ListenerContainer<CuratorListener> listeners;
private final ListenerContainer<UnhandledErrorListener> unhandledErrorListeners;


/**
 * Receives notifications about errors and background events
 */
public interface CuratorListener {
    /**
     * Called when a background task has completed or a watch has triggered
     * @param event the event
     * @throws Exception any errors
     */
    public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception;
}

public interface UnhandledErrorListener
{
    /**
     * Called when an exception is caught in a background thread, handler, etc. Before this
     * listener is called, the error will have been logged and a {@link ConnectionState#LOST}
     * event will have been queued for any {@link ConnectionStateListener}s.
     * @param message Source message
     * @param e exception
     */
    public void     unhandledError(String message, Throwable e);
}

总结一下源码技巧

  1. ConnectionStateManager 就是个 生产者消费者模式的代码,特点就是: public addStateChange() 暴露给外部用户生产 ConnectionState,通过队列eventQueue传递,private processEvents()在内部对ConnectionState进行消费。
  2. 直接new匿名类,对接口进行默认实现。
  3. Listener列表对象进行Container封装,然后 封装foreach方法,传入Function接口 就是foreach每个元素要执行的业务逻辑,方法体就可以加一些其他福利。
上一篇:ZooKeeper系列(四)—— Java 客户端 Apache Curator


下一篇:「C语言」文件的概念与简单数据流的读写函数