两种思路解决线程服务死循环

背景
系统突然error飚高,不停Full GC。
最后发现是因为调用的外部jar包中方法触发bug导致死循环,不断产生新对象,导致内存大量占用无法释放,最终JVM内存回收机制崩溃。



解决思路

服务一旦进入死循环,对应线程一直处于running状态,难以通过该线程自己计时中断。
对于无法完全放心的第三方jar包方法,可以选择开子线程处理,并对线程加以监控,当超时时及时中断子线程并返回。
两种实现思路:

思路一: 通过FutureTask
    Future在设定的时间超时后会抛出timeout异常,通常做法是捕获异常后执行future.cancel()方法。但cancel方法实际是调用线程的interrupt方法,给线程树立interrupt status,并不能中断死循环的子线程。
两种思路解决线程服务死循环
Future没有提供能够直接停止子线程的方法(也许是因为线程的stop方法可能产生不良后果)
所以这里可以参照FutureTask源码,新建一个MyFutureTask类,改写或新建一个类似cancel的方法,调用线程的stop方法。
demo中的MyFutureTask类参考cancel方法,新建了MyFutureTask.stop方法,调用子线程的stop方法来中止子线程。

思路二:通过CountDownLatch
    主线程建立可能出现死循环的子线程时设立CountDownLatch值为1,子线程逻辑中当处理完毕执行CountDownLatch减1。这样主线程可以看到子线程是执行完毕还是超时,如果超时或子线程已处理完毕,在主线程中执行子线程的stop方法中止子线程。



demo代码


思路一:
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class Test {
	public static ExecutorService threadPoolExecutor = Executors
			.newCachedThreadPool();

	@SuppressWarnings({ "unchecked", "rawtypes" })
	public static void main(String[] args) {
		int count = 1;
		//参考FutureTask源代码,写有自己需要功能的FutureTask
		MyFuntureTask[] fts = new MyFuntureTask[count];
		for (int i = 0; i < count; i++) {
			fts[i] = new MyFuntureTask(new Callable() {

				@Override
				public Object call() throws Exception {
					for (;;) {
						if (System.currentTimeMillis() % 10000 == 0) {
							System.out.println(System.currentTimeMillis() + ":"
									+ Thread.currentThread().getId() + ":"
									+ Thread.currentThread().getState());
						}
					}
				}
			});
			threadPoolExecutor.submit(fts[i]);
		}
		try {
			Thread.sleep(10000);
		} catch (Exception e) {
			e.printStackTrace();
		}
		for (int i = 0; i < count; i++) {
			try {
				fts[i].get(1, TimeUnit.MILLISECONDS);
			} catch (Exception e) {
				/**
				 * 这里本来FutureTask只有.cancel()的功能,cancel之后树立Future线程的isInterupted标识位
				 * 由于增加了stop方法,可以直接通过FutureTask的stop方法中止线程
				 */
				System.out.println("线程已强制中断" + fts[i].stop(true));
			}
		}

		try {
			Thread.sleep(100000);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	/**
	 * 参考FutureTask源代码,写有自己需要功能的FutureTask
	 */
	public static class MyFuntureTask<V> implements RunnableFuture<V> {
		/** Synchronization control for FutureTask */
		private final Sync sync;

		/**
		 * Creates a <tt>FutureTask</tt> that will, upon running, execute the
		 * given <tt>Callable</tt>.
		 *
		 * @param callable
		 *            the callable task
		 * @throws NullPointerException
		 *             if callable is null
		 */
		public MyFuntureTask(Callable<V> callable) {
			if (callable == null)
				throw new NullPointerException();

			sync = new Sync(callable);
		}

		/**
		 * Creates a <tt>FutureTask</tt> that will, upon running, execute the
		 * given <tt>Runnable</tt>, and arrange that <tt>get</tt> will return
		 * the given result on successful completion.
		 *
		 * @param runnable
		 *            the runnable task
		 * @param result
		 *            the result to return on successful completion. If you
		 *            don't need a particular result, consider using
		 *            constructions of the form:
		 *            <tt>Future&lt;?&gt; f = new FutureTask&lt;Object&gt;(runnable, null)</tt>
		 * @throws NullPointerException
		 *             if runnable is null
		 */
		public MyFuntureTask(Runnable runnable, V result) {
			sync = new Sync(Executors.callable(runnable, result));
		}

		public boolean isCancelled() {
			return sync.innerIsCancelled();
		}

		public boolean isDone() {
			return sync.innerIsDone();
		}

		public boolean cancel(boolean mayInterruptIfRunning) {
			return sync.innerCancel(mayInterruptIfRunning);
		}

		/**
		 * 自己加的!!!!
		 * 
		 * @param mayStopIfRunning
		 * @return
		 */
		public boolean stop(boolean mayStopIfRunning) {
			return sync.innerStop(mayStopIfRunning);
		}

		/**
		 * @throws CancellationException
		 *             {@inheritDoc}
		 */
		public V get() throws InterruptedException, ExecutionException {
			return sync.innerGet();
		}

		/**
		 * @throws CancellationException
		 *             {@inheritDoc}
		 */
		public V get(long timeout, TimeUnit unit) throws InterruptedException,
				ExecutionException, TimeoutException {
			return sync.innerGet(unit.toNanos(timeout));
		}

		/**
		 * Protected method invoked when this task transitions to state
		 * <tt>isDone</tt> (whether normally or via cancellation). The default
		 * implementation does nothing. Subclasses may override this method to
		 * invoke completion callbacks or perform bookkeeping. Note that you can
		 * query status inside the implementation of this method to determine
		 * whether this task has been cancelled.
		 */
		protected void done() {
		}

		/**
		 * Sets the result of this Future to the given value unless this future
		 * has already been set or has been cancelled. This method is invoked
		 * internally by the <tt>run</tt> method upon successful completion of
		 * the computation.
		 * 
		 * @param v
		 *            the value
		 */
		protected void set(V v) {
			sync.innerSet(v);
		}

		/**
		 * Causes this future to report an <tt>ExecutionException</tt> with the
		 * given throwable as its cause, unless this Future has already been set
		 * or has been cancelled. This method is invoked internally by the
		 * <tt>run</tt> method upon failure of the computation.
		 * 
		 * @param t
		 *            the cause of failure
		 */
		protected void setException(Throwable t) {
			sync.innerSetException(t);
		}

		// The following (duplicated) doc comment can be removed once
		//
		// 6270645: Javadoc comments should be inherited from most derived
		// superinterface or superclass
		// is fixed.
		/**
		 * Sets this Future to the result of its computation unless it has been
		 * cancelled.
		 */
		public void run() {
			sync.innerRun();
		}

		/**
		 * Executes the computation without setting its result, and then resets
		 * this Future to initial state, failing to do so if the computation
		 * encounters an exception or is cancelled. This is designed for use
		 * with tasks that intrinsically execute more than once.
		 * 
		 * @return true if successfully run and reset
		 */
		protected boolean runAndReset() {
			return sync.innerRunAndReset();
		}

		/**
		 * Synchronization control for FutureTask. Note that this must be a
		 * non-static inner class in order to invoke the protected <tt>done</tt>
		 * method. For clarity, all inner class support methods are same as
		 * outer, prefixed with "inner".
		 *
		 * Uses AQS sync state to represent run status
		 */
		private final class Sync extends AbstractQueuedSynchronizer {
			private static final long serialVersionUID = -7828117401763700385L;

			/** State value representing that task is ready to run */
			private static final int READY = 0;
			/** State value representing that task is running */
			private static final int RUNNING = 1;
			/** State value representing that task ran */
			private static final int RAN = 2;
			/** State value representing that task was cancelled */
			private static final int CANCELLED = 4;

			/** The underlying callable */
			private final Callable<V> callable;
			/** The result to return from get() */
			private V result;
			/** The exception to throw from get() */
			private Throwable exception;

			/**
			 * The thread running task. When nulled after set/cancel, this
			 * indicates that the results are accessible. Must be volatile, to
			 * ensure visibility upon completion.
			 */
			private volatile Thread runner;

			Sync(Callable<V> callable) {
				this.callable = callable;
			}

			private boolean ranOrCancelled(int state) {
				return (state & (RAN | CANCELLED)) != 0;
			}

			/**
			 * Implements AQS base acquire to succeed if ran or cancelled
			 */
			protected int tryAcquireShared(int ignore) {
				return innerIsDone() ? 1 : -1;
			}

			/**
			 * Implements AQS base release to always signal after setting final
			 * done status by nulling runner thread.
			 */
			protected boolean tryReleaseShared(int ignore) {
				runner = null;
				return true;
			}

			boolean innerIsCancelled() {
				return getState() == CANCELLED;
			}

			boolean innerIsDone() {
				return ranOrCancelled(getState()) && runner == null;
			}

			V innerGet() throws InterruptedException, ExecutionException {
				acquireSharedInterruptibly(0);
				if (getState() == CANCELLED)
					throw new CancellationException();
				if (exception != null)
					throw new ExecutionException(exception);
				return result;
			}

			V innerGet(long nanosTimeout) throws InterruptedException,
					ExecutionException, TimeoutException {
				if (!tryAcquireSharedNanos(0, nanosTimeout))
					throw new TimeoutException();
				if (getState() == CANCELLED)
					throw new CancellationException();
				if (exception != null)
					throw new ExecutionException(exception);
				return result;
			}

			void innerSet(V v) {
				for (;;) {
					int s = getState();
					if (s == RAN)
						return;
					if (s == CANCELLED) {
						// aggressively release to set runner to null,
						// in case we are racing with a cancel request
						// that will try to interrupt runner
						releaseShared(0);
						return;
					}
					if (compareAndSetState(s, RAN)) {
						result = v;
						releaseShared(0);
						done();
						return;
					}
				}
			}

			void innerSetException(Throwable t) {
				for (;;) {
					int s = getState();
					if (s == RAN)
						return;
					if (s == CANCELLED) {
						// aggressively release to set runner to null,
						// in case we are racing with a cancel request
						// that will try to interrupt runner
						releaseShared(0);
						return;
					}
					if (compareAndSetState(s, RAN)) {
						exception = t;
						releaseShared(0);
						done();
						return;
					}
				}
			}

			/**
			 * 仿照innerCancel自己加的!!
			 * 
			 * @param mayStopIfRunning
			 * @return
			 */
			boolean innerStop(boolean mayStopIfRunning) {
				for (;;) {
					int s = getState();
					if (ranOrCancelled(s))
						return false;
					if (compareAndSetState(s, CANCELLED))
						break;
				}
				if (mayStopIfRunning) {
					Thread r = runner;
					if (r != null) {
						r.stop();//这里调用线程stop方法
					}
				}
				releaseShared(0);
				done();
				return true;
			}

			boolean innerCancel(boolean mayInterruptIfRunning) {
				for (;;) {
					int s = getState();
					if (ranOrCancelled(s))
						return false;
					if (compareAndSetState(s, CANCELLED))
						break;
				}
				if (mayInterruptIfRunning) {
					Thread r = runner;
					if (r != null) {
						r.interrupt();
					}
				}
				releaseShared(0);
				done();
				return true;
			}

			void innerRun() {
				if (!compareAndSetState(READY, RUNNING))
					return;

				runner = Thread.currentThread();
				if (getState() == RUNNING) { // recheck after setting thread
					V result;
					try {
						result = callable.call();
					} catch (Throwable ex) {
						setException(ex);
						return;
					}
					set(result);
				} else {
					releaseShared(0); // cancel
				}
			}

			boolean innerRunAndReset() {
				if (!compareAndSetState(READY, RUNNING))
					return false;
				try {
					runner = Thread.currentThread();
					if (getState() == RUNNING)
						callable.call(); // don't set result
					runner = null;
					return compareAndSetState(RUNNING, READY);
				} catch (Throwable ex) {
					setException(ex);
					return false;
				}
			}
		}
	}

}

 

思路二:

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class Test2 {
	public static void main(String[] args) {
		final CountDownLatch cdl = new CountDownLatch(1);

		//处理可能发生死循环的子线程
		Thread workThread = new Thread(new Runnable() {
			
			@Override
			public void run() {
				try {
					mayCauseDead(true);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				//子线程处理完后通过countDownLatch通知主线程
				cdl.countDown();
			}
		});
		workThread.start();//开始子线程
		
		try {
			cdl.await(1, TimeUnit.MILLISECONDS);//当超时或子线程处理完毕
			workThread.stop();//中指子线程
			System.out.println("end");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
	}
	
	/**
	 * 可能产生死循环的方法
	 */
	private static void mayCauseDead(boolean dead) throws InterruptedException{
		while(dead){
			System.out.println(System.currentTimeMillis() + ":"
					+ Thread.currentThread().getId() + ":"
					+ Thread.currentThread().getState());
		}
		for (int i = 0; i < 1000000; i++) {
			System.out.println("work");
		}
		Thread.sleep(10000);
	}
}

 

上一篇:阿里云服务器双十一配置仅需85元/年


下一篇:创建3层的服务模板(4)--- 创建一个App Server的VM Template