参考资料:
ReferenceQueue源代码里面很好的展示了java队列的实现思路, 以及多线程观察者的实现思路
多线程观察者实现思路:
入队的时候, 调用notify()方法
remove方法调用的时候,如果对列不为空, 出队列, 方法结束, 否则调用wait()方法(这里notify和wait()使用的监视器都是ReferenceQueue.class)
remove(long), remove(), remove()方法调用wait(0)(wait参数为0, 表示一直等待), remove(long timeout), 这里参数代表超时时间, 超过时间没有获取到,就结束啦(return null) , 这里超时的实现也很特殊,详见下面贴出来的代码吧
public synchronized Reference<? extends T> remove(long timeoutMillis)
throws InterruptedException {
if (timeoutMillis < 0) {
throw new IllegalArgumentException("timeout < 0: " + timeoutMillis);
} if (head != null) {
return poll();
} // avoid overflow: if total > 292 years, just wait forever
if (timeoutMillis == 0 || (timeoutMillis > Long.MAX_VALUE / NANOS_PER_MILLI)) {
do {
wait(0);
} while (head == null);
return poll();
} // guaranteed to not overflow
long nanosToWait = timeoutMillis * NANOS_PER_MILLI;
int timeoutNanos = 0; // wait until notified or the timeout has elapsed
long startTime = System.nanoTime();
while (true) {
wait(timeoutMillis, timeoutNanos);
if (head != null) {
break;
}
long nanosElapsed = System.nanoTime() - startTime;
long nanosRemaining = nanosToWait - nanosElapsed;
if (nanosRemaining <= 0) {
break;
}
timeoutMillis = nanosRemaining / NANOS_PER_MILLI;
timeoutNanos = (int) (nanosRemaining - timeoutMillis * NANOS_PER_MILLI);
}
return poll();
}
同时从这个类里面也可以看到java对列的基本实现
入队:
synchronized void enqueue(Reference<? extends T> reference) { //对列为空,那么直接将head指向新的入队元素
if (tail == null) {
head = reference;
} else { //对列不为空, 新元素放入队尾
tail.queueNext = reference;
} // The newly enqueued reference becomes the new tail, and always
// points to itself.
tail = reference; //队尾指向后移
tail.queueNext = reference;
notify();
}
出队:
public synchronized Reference<? extends T> poll() {
if (head == null) { //空对列返回null
return null;
} Reference<? extends T> ret = head; if (head == tail) {//head == tail也是空对列
tail = null;
head = null;
} else { //不是空对列, head指向后移
head = head.queueNext;
} ret.queueNext = null;
return ret;
}