背景
系统突然error飚高,不停Full GC。
最后发现是因为调用的外部jar包中方法触发bug导致死循环,不断产生新对象,导致内存大量占用无法释放,最终JVM内存回收机制崩溃。
解决思路
服务一旦进入死循环,对应线程一直处于running状态,难以通过该线程自己计时中断。
对于无法完全放心的第三方jar包方法,可以选择开子线程处理,并对线程加以监控,当超时时及时中断子线程并返回。
两种实现思路:
思路一: 通过FutureTask
Future在设定的时间超时后会抛出timeout异常,通常做法是捕获异常后执行future.cancel()方法。但cancel方法实际是调用线程的interrupt方法,给线程树立interrupt status,并不能中断死循环的子线程。
Future没有提供能够直接停止子线程的方法(也许是因为线程的stop方法可能产生不良后果)
所以这里可以参照FutureTask源码,新建一个MyFutureTask类,改写或新建一个类似cancel的方法,调用线程的stop方法。
demo中的MyFutureTask类参考cancel方法,新建了MyFutureTask.stop方法,调用子线程的stop方法来中止子线程。
思路二:通过CountDownLatch
主线程建立可能出现死循环的子线程时设立CountDownLatch值为1,子线程逻辑中当处理完毕执行CountDownLatch减1。这样主线程可以看到子线程是执行完毕还是超时,如果超时或子线程已处理完毕,在主线程中执行子线程的stop方法中止子线程。
demo代码
思路一:
系统突然error飚高,不停Full GC。
最后发现是因为调用的外部jar包中方法触发bug导致死循环,不断产生新对象,导致内存大量占用无法释放,最终JVM内存回收机制崩溃。
解决思路
服务一旦进入死循环,对应线程一直处于running状态,难以通过该线程自己计时中断。
对于无法完全放心的第三方jar包方法,可以选择开子线程处理,并对线程加以监控,当超时时及时中断子线程并返回。
两种实现思路:
思路一: 通过FutureTask
Future在设定的时间超时后会抛出timeout异常,通常做法是捕获异常后执行future.cancel()方法。但cancel方法实际是调用线程的interrupt方法,给线程树立interrupt status,并不能中断死循环的子线程。
Future没有提供能够直接停止子线程的方法(也许是因为线程的stop方法可能产生不良后果)
所以这里可以参照FutureTask源码,新建一个MyFutureTask类,改写或新建一个类似cancel的方法,调用线程的stop方法。
demo中的MyFutureTask类参考cancel方法,新建了MyFutureTask.stop方法,调用子线程的stop方法来中止子线程。
思路二:通过CountDownLatch
主线程建立可能出现死循环的子线程时设立CountDownLatch值为1,子线程逻辑中当处理完毕执行CountDownLatch减1。这样主线程可以看到子线程是执行完毕还是超时,如果超时或子线程已处理完毕,在主线程中执行子线程的stop方法中止子线程。
demo代码
思路一:
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class Test {
public static ExecutorService threadPoolExecutor = Executors
.newCachedThreadPool();
@SuppressWarnings({ "unchecked", "rawtypes" })
public static void main(String[] args) {
int count = 1;
//参考FutureTask源代码,写有自己需要功能的FutureTask
MyFuntureTask[] fts = new MyFuntureTask[count];
for (int i = 0; i < count; i++) {
fts[i] = new MyFuntureTask(new Callable() {
@Override
public Object call() throws Exception {
for (;;) {
if (System.currentTimeMillis() % 10000 == 0) {
System.out.println(System.currentTimeMillis() + ":"
+ Thread.currentThread().getId() + ":"
+ Thread.currentThread().getState());
}
}
}
});
threadPoolExecutor.submit(fts[i]);
}
try {
Thread.sleep(10000);
} catch (Exception e) {
e.printStackTrace();
}
for (int i = 0; i < count; i++) {
try {
fts[i].get(1, TimeUnit.MILLISECONDS);
} catch (Exception e) {
/**
* 这里本来FutureTask只有.cancel()的功能,cancel之后树立Future线程的isInterupted标识位
* 由于增加了stop方法,可以直接通过FutureTask的stop方法中止线程
*/
System.out.println("线程已强制中断" + fts[i].stop(true));
}
}
try {
Thread.sleep(100000);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 参考FutureTask源代码,写有自己需要功能的FutureTask
*/
public static class MyFuntureTask<V> implements RunnableFuture<V> {
/** Synchronization control for FutureTask */
private final Sync sync;
/**
* Creates a <tt>FutureTask</tt> that will, upon running, execute the
* given <tt>Callable</tt>.
*
* @param callable
* the callable task
* @throws NullPointerException
* if callable is null
*/
public MyFuntureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
sync = new Sync(callable);
}
/**
* Creates a <tt>FutureTask</tt> that will, upon running, execute the
* given <tt>Runnable</tt>, and arrange that <tt>get</tt> will return
* the given result on successful completion.
*
* @param runnable
* the runnable task
* @param result
* the result to return on successful completion. If you
* don't need a particular result, consider using
* constructions of the form:
* <tt>Future<?> f = new FutureTask<Object>(runnable, null)</tt>
* @throws NullPointerException
* if runnable is null
*/
public MyFuntureTask(Runnable runnable, V result) {
sync = new Sync(Executors.callable(runnable, result));
}
public boolean isCancelled() {
return sync.innerIsCancelled();
}
public boolean isDone() {
return sync.innerIsDone();
}
public boolean cancel(boolean mayInterruptIfRunning) {
return sync.innerCancel(mayInterruptIfRunning);
}
/**
* 自己加的!!!!
*
* @param mayStopIfRunning
* @return
*/
public boolean stop(boolean mayStopIfRunning) {
return sync.innerStop(mayStopIfRunning);
}
/**
* @throws CancellationException
* {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
return sync.innerGet();
}
/**
* @throws CancellationException
* {@inheritDoc}
*/
public V get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException {
return sync.innerGet(unit.toNanos(timeout));
}
/**
* Protected method invoked when this task transitions to state
* <tt>isDone</tt> (whether normally or via cancellation). The default
* implementation does nothing. Subclasses may override this method to
* invoke completion callbacks or perform bookkeeping. Note that you can
* query status inside the implementation of this method to determine
* whether this task has been cancelled.
*/
protected void done() {
}
/**
* Sets the result of this Future to the given value unless this future
* has already been set or has been cancelled. This method is invoked
* internally by the <tt>run</tt> method upon successful completion of
* the computation.
*
* @param v
* the value
*/
protected void set(V v) {
sync.innerSet(v);
}
/**
* Causes this future to report an <tt>ExecutionException</tt> with the
* given throwable as its cause, unless this Future has already been set
* or has been cancelled. This method is invoked internally by the
* <tt>run</tt> method upon failure of the computation.
*
* @param t
* the cause of failure
*/
protected void setException(Throwable t) {
sync.innerSetException(t);
}
// The following (duplicated) doc comment can be removed once
//
// 6270645: Javadoc comments should be inherited from most derived
// superinterface or superclass
// is fixed.
/**
* Sets this Future to the result of its computation unless it has been
* cancelled.
*/
public void run() {
sync.innerRun();
}
/**
* Executes the computation without setting its result, and then resets
* this Future to initial state, failing to do so if the computation
* encounters an exception or is cancelled. This is designed for use
* with tasks that intrinsically execute more than once.
*
* @return true if successfully run and reset
*/
protected boolean runAndReset() {
return sync.innerRunAndReset();
}
/**
* Synchronization control for FutureTask. Note that this must be a
* non-static inner class in order to invoke the protected <tt>done</tt>
* method. For clarity, all inner class support methods are same as
* outer, prefixed with "inner".
*
* Uses AQS sync state to represent run status
*/
private final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -7828117401763700385L;
/** State value representing that task is ready to run */
private static final int READY = 0;
/** State value representing that task is running */
private static final int RUNNING = 1;
/** State value representing that task ran */
private static final int RAN = 2;
/** State value representing that task was cancelled */
private static final int CANCELLED = 4;
/** The underlying callable */
private final Callable<V> callable;
/** The result to return from get() */
private V result;
/** The exception to throw from get() */
private Throwable exception;
/**
* The thread running task. When nulled after set/cancel, this
* indicates that the results are accessible. Must be volatile, to
* ensure visibility upon completion.
*/
private volatile Thread runner;
Sync(Callable<V> callable) {
this.callable = callable;
}
private boolean ranOrCancelled(int state) {
return (state & (RAN | CANCELLED)) != 0;
}
/**
* Implements AQS base acquire to succeed if ran or cancelled
*/
protected int tryAcquireShared(int ignore) {
return innerIsDone() ? 1 : -1;
}
/**
* Implements AQS base release to always signal after setting final
* done status by nulling runner thread.
*/
protected boolean tryReleaseShared(int ignore) {
runner = null;
return true;
}
boolean innerIsCancelled() {
return getState() == CANCELLED;
}
boolean innerIsDone() {
return ranOrCancelled(getState()) && runner == null;
}
V innerGet() throws InterruptedException, ExecutionException {
acquireSharedInterruptibly(0);
if (getState() == CANCELLED)
throw new CancellationException();
if (exception != null)
throw new ExecutionException(exception);
return result;
}
V innerGet(long nanosTimeout) throws InterruptedException,
ExecutionException, TimeoutException {
if (!tryAcquireSharedNanos(0, nanosTimeout))
throw new TimeoutException();
if (getState() == CANCELLED)
throw new CancellationException();
if (exception != null)
throw new ExecutionException(exception);
return result;
}
void innerSet(V v) {
for (;;) {
int s = getState();
if (s == RAN)
return;
if (s == CANCELLED) {
// aggressively release to set runner to null,
// in case we are racing with a cancel request
// that will try to interrupt runner
releaseShared(0);
return;
}
if (compareAndSetState(s, RAN)) {
result = v;
releaseShared(0);
done();
return;
}
}
}
void innerSetException(Throwable t) {
for (;;) {
int s = getState();
if (s == RAN)
return;
if (s == CANCELLED) {
// aggressively release to set runner to null,
// in case we are racing with a cancel request
// that will try to interrupt runner
releaseShared(0);
return;
}
if (compareAndSetState(s, RAN)) {
exception = t;
releaseShared(0);
done();
return;
}
}
}
/**
* 仿照innerCancel自己加的!!
*
* @param mayStopIfRunning
* @return
*/
boolean innerStop(boolean mayStopIfRunning) {
for (;;) {
int s = getState();
if (ranOrCancelled(s))
return false;
if (compareAndSetState(s, CANCELLED))
break;
}
if (mayStopIfRunning) {
Thread r = runner;
if (r != null) {
r.stop();//这里调用线程stop方法
}
}
releaseShared(0);
done();
return true;
}
boolean innerCancel(boolean mayInterruptIfRunning) {
for (;;) {
int s = getState();
if (ranOrCancelled(s))
return false;
if (compareAndSetState(s, CANCELLED))
break;
}
if (mayInterruptIfRunning) {
Thread r = runner;
if (r != null) {
r.interrupt();
}
}
releaseShared(0);
done();
return true;
}
void innerRun() {
if (!compareAndSetState(READY, RUNNING))
return;
runner = Thread.currentThread();
if (getState() == RUNNING) { // recheck after setting thread
V result;
try {
result = callable.call();
} catch (Throwable ex) {
setException(ex);
return;
}
set(result);
} else {
releaseShared(0); // cancel
}
}
boolean innerRunAndReset() {
if (!compareAndSetState(READY, RUNNING))
return false;
try {
runner = Thread.currentThread();
if (getState() == RUNNING)
callable.call(); // don't set result
runner = null;
return compareAndSetState(RUNNING, READY);
} catch (Throwable ex) {
setException(ex);
return false;
}
}
}
}
}
思路二:
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class Test2 {
public static void main(String[] args) {
final CountDownLatch cdl = new CountDownLatch(1);
//处理可能发生死循环的子线程
Thread workThread = new Thread(new Runnable() {
@Override
public void run() {
try {
mayCauseDead(true);
} catch (InterruptedException e) {
e.printStackTrace();
}
//子线程处理完后通过countDownLatch通知主线程
cdl.countDown();
}
});
workThread.start();//开始子线程
try {
cdl.await(1, TimeUnit.MILLISECONDS);//当超时或子线程处理完毕
workThread.stop();//中指子线程
System.out.println("end");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 可能产生死循环的方法
*/
private static void mayCauseDead(boolean dead) throws InterruptedException{
while(dead){
System.out.println(System.currentTimeMillis() + ":"
+ Thread.currentThread().getId() + ":"
+ Thread.currentThread().getState());
}
for (int i = 0; i < 1000000; i++) {
System.out.println("work");
}
Thread.sleep(10000);
}
}