两种思路解决线程服务死循环

简介: 背景系统突然error飚高,不停Full GC。最后发现是因为调用的外部jar包中方法触发bug导致死循环,不断产生新对象,导致内存大量占用无法释放,最终JVM内存回收机制崩溃。解决思路服务一旦进入死循环,对应线程一直处于running状态,难以通过该线程自己计时中断。对于无法完全放心的第三方jar包方法,可以选择开子线程处理,并对线程加以监控,当超时时及时中断子线程并返回。两种实现思路:思路一:
背景
系统突然error飚高,不停Full GC。
最后发现是因为调用的外部jar包中方法触发bug导致死循环,不断产生新对象,导致内存大量占用无法释放,最终JVM内存回收机制崩溃。



解决思路

服务一旦进入死循环,对应线程一直处于running状态,难以通过该线程自己计时中断。
对于无法完全放心的第三方jar包方法,可以选择开子线程处理,并对线程加以监控,当超时时及时中断子线程并返回。
两种实现思路:

思路一: 通过FutureTask
    Future在设定的时间超时后会抛出timeout异常,通常做法是捕获异常后执行future.cancel()方法。但cancel方法实际是调用线程的interrupt方法,给线程树立interrupt status,并不能中断死循环的子线程。
1
Future没有提供能够直接停止子线程的方法(也许是因为线程的stop方法可能产生不良后果)
所以这里可以参照FutureTask源码,新建一个MyFutureTask类,改写或新建一个类似cancel的方法,调用线程的stop方法。
demo中的MyFutureTask类参考cancel方法,新建了MyFutureTask.stop方法,调用子线程的stop方法来中止子线程。

思路二:通过CountDownLatch
    主线程建立可能出现死循环的子线程时设立CountDownLatch值为1,子线程逻辑中当处理完毕执行CountDownLatch减1。这样主线程可以看到子线程是执行完毕还是超时,如果超时或子线程已处理完毕,在主线程中执行子线程的stop方法中止子线程。



demo代码


思路一:
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

public class Test {
	public static ExecutorService threadPoolExecutor = Executors
			.newCachedThreadPool();

	@SuppressWarnings({ "unchecked", "rawtypes" })
	public static void main(String[] args) {
		int count = 1;
		//参考FutureTask源代码,写有自己需要功能的FutureTask
		MyFuntureTask[] fts = new MyFuntureTask[count];
		for (int i = 0; i < count; i++) {
			fts[i] = new MyFuntureTask(new Callable() {

				@Override
				public Object call() throws Exception {
					for (;;) {
						if (System.currentTimeMillis() % 10000 == 0) {
							System.out.println(System.currentTimeMillis() + ":"
									+ Thread.currentThread().getId() + ":"
									+ Thread.currentThread().getState());
						}
					}
				}
			});
			threadPoolExecutor.submit(fts[i]);
		}
		try {
			Thread.sleep(10000);
		} catch (Exception e) {
			e.printStackTrace();
		}
		for (int i = 0; i < count; i++) {
			try {
				fts[i].get(1, TimeUnit.MILLISECONDS);
			} catch (Exception e) {
				/**
				 * 这里本来FutureTask只有.cancel()的功能,cancel之后树立Future线程的isInterupted标识位
				 * 由于增加了stop方法,可以直接通过FutureTask的stop方法中止线程
				 */
				System.out.println("线程已强制中断" + fts[i].stop(true));
			}
		}

		try {
			Thread.sleep(100000);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	/**
	 * 参考FutureTask源代码,写有自己需要功能的FutureTask
	 */
	public static class MyFuntureTask<V> implements RunnableFuture<V> {
		/** Synchronization control for FutureTask */
		private final Sync sync;

		/**
		 * Creates a <tt>FutureTask</tt> that will, upon running, execute the
		 * given <tt>Callable</tt>.
		 *
		 * @param callable
		 *            the callable task
		 * @throws NullPointerException
		 *             if callable is null
		 */
		public MyFuntureTask(Callable<V> callable) {
			if (callable == null)
				throw new NullPointerException();

			sync = new Sync(callable);
		}

		/**
		 * Creates a <tt>FutureTask</tt> that will, upon running, execute the
		 * given <tt>Runnable</tt>, and arrange that <tt>get</tt> will return
		 * the given result on successful completion.
		 *
		 * @param runnable
		 *            the runnable task
		 * @param result
		 *            the result to return on successful completion. If you
		 *            don't need a particular result, consider using
		 *            constructions of the form:
		 *            <tt>Future&lt;?&gt; f = new FutureTask&lt;Object&gt;(runnable, null)</tt>
		 * @throws NullPointerException
		 *             if runnable is null
		 */
		public MyFuntureTask(Runnable runnable, V result) {
			sync = new Sync(Executors.callable(runnable, result));
		}

		public boolean isCancelled() {
			return sync.innerIsCancelled();
		}

		public boolean isDone() {
			return sync.innerIsDone();
		}

		public boolean cancel(boolean mayInterruptIfRunning) {
			return sync.innerCancel(mayInterruptIfRunning);
		}

		/**
		 * 自己加的!!!!
		 * 
		 * @param mayStopIfRunning
		 * @return
		 */
		public boolean stop(boolean mayStopIfRunning) {
			return sync.innerStop(mayStopIfRunning);
		}

		/**
		 * @throws CancellationException
		 *             {@inheritDoc}
		 */
		public V get() throws InterruptedException, ExecutionException {
			return sync.innerGet();
		}

		/**
		 * @throws CancellationException
		 *             {@inheritDoc}
		 */
		public V get(long timeout, TimeUnit unit) throws InterruptedException,
				ExecutionException, TimeoutException {
			return sync.innerGet(unit.toNanos(timeout));
		}

		/**
		 * Protected method invoked when this task transitions to state
		 * <tt>isDone</tt> (whether normally or via cancellation). The default
		 * implementation does nothing. Subclasses may override this method to
		 * invoke completion callbacks or perform bookkeeping. Note that you can
		 * query status inside the implementation of this method to determine
		 * whether this task has been cancelled.
		 */
		protected void done() {
		}

		/**
		 * Sets the result of this Future to the given value unless this future
		 * has already been set or has been cancelled. This method is invoked
		 * internally by the <tt>run</tt> method upon successful completion of
		 * the computation.
		 * 
		 * @param v
		 *            the value
		 */
		protected void set(V v) {
			sync.innerSet(v);
		}

		/**
		 * Causes this future to report an <tt>ExecutionException</tt> with the
		 * given throwable as its cause, unless this Future has already been set
		 * or has been cancelled. This method is invoked internally by the
		 * <tt>run</tt> method upon failure of the computation.
		 * 
		 * @param t
		 *            the cause of failure
		 */
		protected void setException(Throwable t) {
			sync.innerSetException(t);
		}

		// The following (duplicated) doc comment can be removed once
		//
		// 6270645: Javadoc comments should be inherited from most derived
		// superinterface or superclass
		// is fixed.
		/**
		 * Sets this Future to the result of its computation unless it has been
		 * cancelled.
		 */
		public void run() {
			sync.innerRun();
		}

		/**
		 * Executes the computation without setting its result, and then resets
		 * this Future to initial state, failing to do so if the computation
		 * encounters an exception or is cancelled. This is designed for use
		 * with tasks that intrinsically execute more than once.
		 * 
		 * @return true if successfully run and reset
		 */
		protected boolean runAndReset() {
			return sync.innerRunAndReset();
		}

		/**
		 * Synchronization control for FutureTask. Note that this must be a
		 * non-static inner class in order to invoke the protected <tt>done</tt>
		 * method. For clarity, all inner class support methods are same as
		 * outer, prefixed with "inner".
		 *
		 * Uses AQS sync state to represent run status
		 */
		private final class Sync extends AbstractQueuedSynchronizer {
			private static final long serialVersionUID = -7828117401763700385L;

			/** State value representing that task is ready to run */
			private static final int READY = 0;
			/** State value representing that task is running */
			private static final int RUNNING = 1;
			/** State value representing that task ran */
			private static final int RAN = 2;
			/** State value representing that task was cancelled */
			private static final int CANCELLED = 4;

			/** The underlying callable */
			private final Callable<V> callable;
			/** The result to return from get() */
			private V result;
			/** The exception to throw from get() */
			private Throwable exception;

			/**
			 * The thread running task. When nulled after set/cancel, this
			 * indicates that the results are accessible. Must be volatile, to
			 * ensure visibility upon completion.
			 */
			private volatile Thread runner;

			Sync(Callable<V> callable) {
				this.callable = callable;
			}

			private boolean ranOrCancelled(int state) {
				return (state & (RAN | CANCELLED)) != 0;
			}

			/**
			 * Implements AQS base acquire to succeed if ran or cancelled
			 */
			protected int tryAcquireShared(int ignore) {
				return innerIsDone() ? 1 : -1;
			}

			/**
			 * Implements AQS base release to always signal after setting final
			 * done status by nulling runner thread.
			 */
			protected boolean tryReleaseShared(int ignore) {
				runner = null;
				return true;
			}

			boolean innerIsCancelled() {
				return getState() == CANCELLED;
			}

			boolean innerIsDone() {
				return ranOrCancelled(getState()) && runner == null;
			}

			V innerGet() throws InterruptedException, ExecutionException {
				acquireSharedInterruptibly(0);
				if (getState() == CANCELLED)
					throw new CancellationException();
				if (exception != null)
					throw new ExecutionException(exception);
				return result;
			}

			V innerGet(long nanosTimeout) throws InterruptedException,
					ExecutionException, TimeoutException {
				if (!tryAcquireSharedNanos(0, nanosTimeout))
					throw new TimeoutException();
				if (getState() == CANCELLED)
					throw new CancellationException();
				if (exception != null)
					throw new ExecutionException(exception);
				return result;
			}

			void innerSet(V v) {
				for (;;) {
					int s = getState();
					if (s == RAN)
						return;
					if (s == CANCELLED) {
						// aggressively release to set runner to null,
						// in case we are racing with a cancel request
						// that will try to interrupt runner
						releaseShared(0);
						return;
					}
					if (compareAndSetState(s, RAN)) {
						result = v;
						releaseShared(0);
						done();
						return;
					}
				}
			}

			void innerSetException(Throwable t) {
				for (;;) {
					int s = getState();
					if (s == RAN)
						return;
					if (s == CANCELLED) {
						// aggressively release to set runner to null,
						// in case we are racing with a cancel request
						// that will try to interrupt runner
						releaseShared(0);
						return;
					}
					if (compareAndSetState(s, RAN)) {
						exception = t;
						releaseShared(0);
						done();
						return;
					}
				}
			}

			/**
			 * 仿照innerCancel自己加的!!
			 * 
			 * @param mayStopIfRunning
			 * @return
			 */
			boolean innerStop(boolean mayStopIfRunning) {
				for (;;) {
					int s = getState();
					if (ranOrCancelled(s))
						return false;
					if (compareAndSetState(s, CANCELLED))
						break;
				}
				if (mayStopIfRunning) {
					Thread r = runner;
					if (r != null) {
						r.stop();//这里调用线程stop方法
					}
				}
				releaseShared(0);
				done();
				return true;
			}

			boolean innerCancel(boolean mayInterruptIfRunning) {
				for (;;) {
					int s = getState();
					if (ranOrCancelled(s))
						return false;
					if (compareAndSetState(s, CANCELLED))
						break;
				}
				if (mayInterruptIfRunning) {
					Thread r = runner;
					if (r != null) {
						r.interrupt();
					}
				}
				releaseShared(0);
				done();
				return true;
			}

			void innerRun() {
				if (!compareAndSetState(READY, RUNNING))
					return;

				runner = Thread.currentThread();
				if (getState() == RUNNING) { // recheck after setting thread
					V result;
					try {
						result = callable.call();
					} catch (Throwable ex) {
						setException(ex);
						return;
					}
					set(result);
				} else {
					releaseShared(0); // cancel
				}
			}

			boolean innerRunAndReset() {
				if (!compareAndSetState(READY, RUNNING))
					return false;
				try {
					runner = Thread.currentThread();
					if (getState() == RUNNING)
						callable.call(); // don't set result
					runner = null;
					return compareAndSetState(RUNNING, READY);
				} catch (Throwable ex) {
					setException(ex);
					return false;
				}
			}
		}
	}

}

 

思路二:

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class Test2 {
	public static void main(String[] args) {
		final CountDownLatch cdl = new CountDownLatch(1);

		//处理可能发生死循环的子线程
		Thread workThread = new Thread(new Runnable() {
			
			@Override
			public void run() {
				try {
					mayCauseDead(true);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				//子线程处理完后通过countDownLatch通知主线程
				cdl.countDown();
			}
		});
		workThread.start();//开始子线程
		
		try {
			cdl.await(1, TimeUnit.MILLISECONDS);//当超时或子线程处理完毕
			workThread.stop();//中指子线程
			System.out.println("end");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
	}
	
	/**
	 * 可能产生死循环的方法
	 */
	private static void mayCauseDead(boolean dead) throws InterruptedException{
		while(dead){
			System.out.println(System.currentTimeMillis() + ":"
					+ Thread.currentThread().getId() + ":"
					+ Thread.currentThread().getState());
		}
		for (int i = 0; i < 1000000; i++) {
			System.out.println("work");
		}
		Thread.sleep(10000);
	}
}

 

目录
相关文章
|
6月前
|
监控 测试技术 程序员
解决线程死循环问题的有效方法
作为开发者想必都清楚,多线程应用程序的开发为我们日常开发工作中提供了并发执行任务的能力,但线程死循环问题却是一个常见而令人头疼的挑战,因为线程死循环可能导致系统的不稳定性、资源浪费以及应用程序的异常运行,所以准确地定位和妥善处理线程死循环现象,并在编码阶段就避免潜在风险,成为开发人员必须面对的重要问题,线程死循环问题的解决不仅有助于提高系统的稳定性和可用性,还能优化资源利用和提升应用程序的性能,通过采取适当的预防和处理措施,开发人员能够避免线程陷入无尽的循环,并及时发现和解决潜在问题。那么本文就来分享一下关于如何处理线程死循环问题,以及如何在编码阶段规避潜在风险。
195 2
解决线程死循环问题的有效方法
|
2月前
|
消息中间件 监控 安全
服务Down机了,线程池中的数据如何保证不丢失?
在分布式系统与高并发应用开发中,服务的稳定性和数据的持久性是两个至关重要的考量点。当服务遭遇Down机时,如何确保线程池中处理的数据不丢失,是每一位开发者都需要深入思考的问题。以下,我将从几个关键方面分享如何在这种情况下保障数据的安全与完整性。
63 2
|
28天前
|
监控 安全 算法
线程死循环确实是多线程编程中的一个常见问题,在编码阶段规避潜在风险
【10月更文挑战第12天】线程死循环确实是多线程编程中的一个常见问题,在编码阶段规避潜在风险
45 2
|
30天前
|
监控 安全 算法
线程死循环确实是多线程编程中的一个常见问题,它可能导致应用程序性能下降,甚至使整个系统变得不稳定。
线程死循环是多线程编程中常见的问题,可能导致性能下降或系统不稳定。通过代码审查、静态分析、日志监控、设置超时、使用锁机制、测试、选择线程安全的数据结构、限制线程数、使用现代并发库及培训,可有效预防和解决死循环问题。
48 1
|
1月前
|
监控 安全 算法
线程死循环是多线程编程中的常见问题,可能导致应用性能下降甚至系统不稳定。
【10月更文挑战第6天】线程死循环是多线程编程中的常见问题,可能导致应用性能下降甚至系统不稳定。为了解决这一问题,可以通过代码审查、静态分析、添加日志监控、设置超时机制、使用锁和同步机制、进行全面测试、选用线程安全的数据结构、限制线程数量、利用现代并发库,并对团队进行培训等方法来预防和减少死循环的发生。尽管如此,多线程编程的复杂性仍需要持续监控和维护以确保系统稳定。
50 3
|
2月前
|
消息中间件 存储 Java
服务重启了,如何保证线程池中的数据不丢失?
【8月更文挑战第30天】为确保服务重启时线程池数据不丢失,可采用数据持久化(如数据库或文件存储)、使用可靠的任务队列(如消息队列或分布式任务队列系统)、状态监测与恢复机制,以及分布式锁等方式。这些方法能有效提高系统稳定性和可靠性,需根据具体需求选择合适方案并进行测试优化。
189 5
|
3月前
|
消息中间件 监控 Java
最佳实践|一文讲解端线程死循环的治理
本文旨在介绍钉钉 Android 团队死循环检测工具建设的思路和典型案例的修复历程。希望通过此次分享,对同样面临类似死循环问题的团队能够有所启发。
|
4月前
|
存储 安全 Java
Java面试题:假设你正在开发一个Java后端服务,该服务需要处理高并发的用户请求,并且对内存使用效率有严格的要求,在多线程环境下,如何确保共享资源的线程安全?
Java面试题:假设你正在开发一个Java后端服务,该服务需要处理高并发的用户请求,并且对内存使用效率有严格的要求,在多线程环境下,如何确保共享资源的线程安全?
68 0
|
6月前
|
监控 IDE 测试技术
预防和处理线程死循环的关键步骤
【5月更文挑战第24天】预防和处理线程死循环的关键步骤包括理解死循环成因(逻辑错误、竞争条件、资源泄漏)、编码阶段采取预防措施(明确退出条件、避免无限递归、正确使用锁、资源管理、健壮的错误处理)、调试定位(断点、日志、线程分析工具、性能分析)、解决问题(修改代码、临时解决方案、逐步排查)以及测试验证(充分测试、专用测试用例)。遵循这些步骤可有效管理线程死循环风险。
115 1
|
6月前
|
监控 Java 测试技术
在多线程开发中,线程死循环可能导致系统资源耗尽,影响应用性能和稳定性
【5月更文挑战第16天】在多线程开发中,线程死循环可能导致系统资源耗尽,影响应用性能和稳定性。为解决这一问题,建议通过日志记录、线程监控工具和堆栈跟踪来定位死循环;处理时,及时终止线程、清理资源并添加错误处理机制;编码阶段要避免无限循环,正确使用同步互斥,进行代码审查和测试,以降低风险。
90 3