package net.rubyeye.concurrency.chapter7;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class PrimeGenerator implements Runnable {
private final List<BigInteger> primes = new ArrayList<BigInteger>();
private volatile boolean cancelled;
public void run() {
BigInteger p = BigInteger.ONE;
while (!cancelled) {
p = p.nextProbablePrime();
synchronized (this) {
primes.add(p);
}
}
}
public void cancel() {
cancelled = true;
}
public synchronized List<BigInteger> get() {
return new ArrayList<BigInteger>(primes);
}
public static void main(String args[]) throws InterruptedException {
PrimeGenerator generator = new PrimeGenerator();
new Thread(generator).start();
try {
TimeUnit.SECONDS.sleep(1);
} finally {
generator.cancel();
}
}
}
main中启动一个素数生成的任务,线程运行一秒就取消掉。通过线程中的cancelled变量来表征任务是否继续执行。既然是最简单的策略,那么什么是例外情况?显然,阻塞操作下(比如调用join,wait,sleep方法),这样的策略会出问题。任务因为调用这些阻塞方法而被阻塞,它将不会去检查volatile变量,导致取消操作失效。那么解决办法是什么?中断!考虑我们用BlockingQueue去保存生成的素数,BlockingQueue的put方法是阻塞的(当BlockingQueue满的时候,put操作会阻塞直到有元素被take),让我们看看不采用中断,仍然采用简单策略会出现什么情况:import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class PrimeGenerator implements Runnable {
private final List<BigInteger> primes = new ArrayList<BigInteger>();
private volatile boolean cancelled;
public void run() {
BigInteger p = BigInteger.ONE;
while (!cancelled) {
p = p.nextProbablePrime();
synchronized (this) {
primes.add(p);
}
}
}
public void cancel() {
cancelled = true;
}
public synchronized List<BigInteger> get() {
return new ArrayList<BigInteger>(primes);
}
public static void main(String args[]) throws InterruptedException {
PrimeGenerator generator = new PrimeGenerator();
new Thread(generator).start();
try {
TimeUnit.SECONDS.sleep(1);
} finally {
generator.cancel();
}
}
}
package net.rubyeye.concurrency.chapter7;
import java.math.BigInteger;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class BrokenPrimeProducer extends Thread {
static int i = 1000;
private final BlockingQueue<BigInteger> queue;
private volatile boolean cancelled = false;
BrokenPrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}
public void run() {
BigInteger p = BigInteger.ONE;
try {
while (!cancelled) {
p = p.nextProbablePrime();
queue.put(p);
}
} catch (InterruptedException cusumed) {
}
}
public void cancel() {
this.cancelled = false;
}
public static void main(String args[]) throws InterruptedException {
BlockingQueue<BigInteger> queue = new LinkedBlockingQueue<BigInteger>(
10);
BrokenPrimeProducer producer = new BrokenPrimeProducer(queue);
producer.start();
try {
while (needMorePrimes())
queue.take();
} finally {
producer.cancel();
}
}
public static boolean needMorePrimes() throws InterruptedException {
boolean result = true;
i--;
if (i == 0)
result = false;
return result;
}
}
我们在main中通过queue.take来消费产生的素数(虽然仅仅是取出扔掉),我们只消费了1000个素数,然后尝试取消产生素数的任务,很遗憾,取消不了,因为产生素数的线程产生素数的速度大于我们消费的速度,我们在消费1000后就停止消费了,那么任务将被queue的put方法阻塞,永远也不会去判断cancelled状态变量,任务取消不了。正确的做法应当是使用中断(interrupt):import java.math.BigInteger;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class BrokenPrimeProducer extends Thread {
static int i = 1000;
private final BlockingQueue<BigInteger> queue;
private volatile boolean cancelled = false;
BrokenPrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}
public void run() {
BigInteger p = BigInteger.ONE;
try {
while (!cancelled) {
p = p.nextProbablePrime();
queue.put(p);
}
} catch (InterruptedException cusumed) {
}
}
public void cancel() {
this.cancelled = false;
}
public static void main(String args[]) throws InterruptedException {
BlockingQueue<BigInteger> queue = new LinkedBlockingQueue<BigInteger>(
10);
BrokenPrimeProducer producer = new BrokenPrimeProducer(queue);
producer.start();
try {
while (needMorePrimes())
queue.take();
} finally {
producer.cancel();
}
}
public static boolean needMorePrimes() throws InterruptedException {
boolean result = true;
i--;
if (i == 0)
result = false;
return result;
}
}
package net.rubyeye.concurrency.chapter7;
import java.math.BigInteger;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class PrimeProducer extends Thread {
static int i = 1000;
private final BlockingQueue<BigInteger> queue;
private volatile boolean cancelled = false;
PrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}
public void run() {
BigInteger p = BigInteger.ONE;
try {
while (!Thread.currentThread().isInterrupted()) {
p = p.nextProbablePrime();
queue.put(p);
}
} catch (InterruptedException cusumed) {
}
}
public void cancel() {
interrupt();
}
public static void main(String args[]) throws InterruptedException {
BlockingQueue<BigInteger> queue = new LinkedBlockingQueue<BigInteger>(
10);
PrimeProducer producer = new PrimeProducer(queue);
producer.start();
try {
while (needMorePrimes())
queue.take();
} finally {
producer.cancel();
}
}
public static boolean needMorePrimes() throws InterruptedException {
boolean result = true;
i--;
if (i == 0)
result = false;
return result;
}
}
在run方法中,通过Thread的isInterrupted来判断interrupt status是否已经被修改,从而正确实现了任务的取消。关于interrupt,有一点需要特别说明,调用interrupt并不意味着必然停止目标线程的正在进行的工作,它仅仅是传递一个请求中断的信号给目标线程,目标线程会在下一个方便的时刻中断。而对于阻塞方法产生的InterruptedException的处理,两种选择:要么重新抛出让上层代码来处理,要么在catch块中调用Thread的interrupt来保存中断状态。除非你确定要让工作线程终止(如上所示代码),否则不要仅仅是catch而不做任务处理工作(生吞了InterruptedException),更详细可以参考这里。如果不清楚外部线程的中断策略,生搬硬套地调用interrupt可能产生不可预料的后果,可参见书中7.1.4例子。import java.math.BigInteger;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class PrimeProducer extends Thread {
static int i = 1000;
private final BlockingQueue<BigInteger> queue;
private volatile boolean cancelled = false;
PrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}
public void run() {
BigInteger p = BigInteger.ONE;
try {
while (!Thread.currentThread().isInterrupted()) {
p = p.nextProbablePrime();
queue.put(p);
}
} catch (InterruptedException cusumed) {
}
}
public void cancel() {
interrupt();
}
public static void main(String args[]) throws InterruptedException {
BlockingQueue<BigInteger> queue = new LinkedBlockingQueue<BigInteger>(
10);
PrimeProducer producer = new PrimeProducer(queue);
producer.start();
try {
while (needMorePrimes())
queue.take();
} finally {
producer.cancel();
}
}
public static boolean needMorePrimes() throws InterruptedException {
boolean result = true;
i--;
if (i == 0)
result = false;
return result;
}
}
另外一个取消任务的方法就是采用Future来管理任务,这是JDK5引入的,用于管理任务的生命周期,处理异常等。比如调用ExecutorService的sumit方法会返回一个Future来描述任务,而Future有一个cancel方法用于取消任务。
那么,如果任务调用了不可中断的阻塞方法,比如Socket的read、write方法,java.nio中的同步I/O,那么该怎么处理呢?简单地,关闭它们!参考下面的例子:
package net.rubyeye.concurrency.chapter7;
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
/**
* 展示对于不可中断阻塞的取消任务 通过关闭socket引发异常来中断
*
* @author Admin
*
*/
public abstract class ReaderThread extends Thread {
private final Socket socket;
private final InputStream in;
public ReaderThread(Socket socket) throws IOException {
this.socket = socket;
this.in = socket.getInputStream();
}
// 重写interrupt方法
public void interrupt() {
try {
socket.close();
} catch (IOException e) {
} finally {
super.interrupt();
}
}
public void run() {
try {
byte[] buf = new byte[1024];
while (true) {
int count = in.read(buf);
if (count < 0)
break;
else if (count > 0)
processBuff(buf, count);
}
} catch (IOException e) {
}
}
public abstract void processBuff(byte[] buf, int count);
}
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
/**
* 展示对于不可中断阻塞的取消任务 通过关闭socket引发异常来中断
*
* @author Admin
*
*/
public abstract class ReaderThread extends Thread {
private final Socket socket;
private final InputStream in;
public ReaderThread(Socket socket) throws IOException {
this.socket = socket;
this.in = socket.getInputStream();
}
// 重写interrupt方法
public void interrupt() {
try {
socket.close();
} catch (IOException e) {
} finally {
super.interrupt();
}
}
public void run() {
try {
byte[] buf = new byte[1024];
while (true) {
int count = in.read(buf);
if (count < 0)
break;
else if (count > 0)
processBuff(buf, count);
}
} catch (IOException e) {
}
}
public abstract void processBuff(byte[] buf, int count);
}
Reader线程重写了interrupt方法,其中调用了socket的close方法用于中断read方法,最后,又调用了super.interrupt(),防止当调用可中断的阻塞方法时不能正常中断。文章转自庄周梦蝶 ,原文发布时间 2007-09-03
文章转自庄周梦蝶 ,原文发布时间