这个虚类是kafka.network.Acceptor和kafka.network.Processor的父类,提供了一个抽象的Sever线程。
它的有趣之处在于为子类的启动和停止提供了线程间的协作机制。
当子类的shutdown方法被调用时,子类可以得知自己被停止,在子类做了适当的处理和清理后,调用自己的shutdownComplete方法,使得对子类shutdown方法的调用从阻塞状态返回,从而使调用线程得知子类的对象已经恰当的停止。
即,在另一个线程中要关闭一个AbstractServerThread,可以执行它shutdown方法,当此方法从阻塞中返回,代表它已经恰当的关闭。
同样,对子类的awaitStartup方法调用也会阻塞,直到子类确认自己完全启动,这个方法调用才会返回。
这些功能是通过对CountdownLatch和AtomicBoolean的使用来实现的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
|
private [kafka] abstract
class AbstractServerThread extends
Runnable with Logging {
protected
val selector = Selector.open();
private
val startupLatch = new
CountDownLatch( 1 )
private
val shutdownLatch = new
CountDownLatch( 1 )
private
val alive = new
AtomicBoolean( false )
/**
* Initiates a graceful shutdown by signaling to stop and waiting for the shutdown to complete
*/
def shutdown(): Unit = {
alive.set( false )
selector.wakeup()
shutdownLatch.await
}
/**
* Wait for the thread to completely start up
*/
def awaitStartup(): Unit = startupLatch.await
/**
* Record that the thread startup is complete
*/
protected
def startupComplete() = {
alive.set( true )
startupLatch.countDown
}
/**
* Record that the thread shutdown is complete
*/
protected
def shutdownComplete() = shutdownLatch.countDown
/**
* Is the server still running?
*/
protected
def isRunning = alive.get
/**
* Wakeup the thread for selection.
*/
def wakeup() = selector.wakeup()
} |
由于它代表了一个Server线程,在其内部使用了java.nio的Selector。所以在shutdown时,需要调用Selector的wakeup方法,使得对Selector的select方法的调用从阻塞中返回。
继承它的子类必须对isRunning进行判断,来确定自己是否已经被要求关闭。以及在处理关闭请求后,调用shutdownComplete()来确认已完闭完成。
由于Acceptor和Processor的实现太长,这里写了一个例子模拟它们
1
2
3
4
5
6
7
8
9
10
11
|
private
class Processor extends
AbstractServerThread {
override def run() {
while (isRunning) {
println( "processor is running" )
//执行一些操作
Thread.sleep( 1000 )
}
shutdownComplete()
}
} |
在工作循环中判断isRunning作为退出循环的条件。然后执行shutdownComplete, 这时对Processor 的shutdown方法的调用才会返回。