// 异步任务,每隔1s, count累加1class CounterServiceImpl implements CounterService {private volatile int count = 0;public void run() {new Thread(new Runnable() {@Overridepublic void run() {try {for (int index = 0; index < 5; index++) {Thread.sleep(1000);count += 1;}} catch (InterruptedException e) {throw new RuntimeException(e);}}}).start();}public int getCount() {return count;}}@Testpublic void testAsynchronousPoll() {final CounterService service = new CounterServiceImpl();service.run();// 轮询查询,pollInterval每隔多少时间段轮询,pollDelay延迟校验时间with().atMost(60, SECONDS).and().pollInterval(ONE_HUNDRED_MILLISECONDS).and().pollDelay(50, MILLISECONDS).await("count is greater 6").until(new Callable<Boolean>() {@Overridepublic Boolean call() throws Exception {return service.getCount() == 6;}});}
// 设置超时时间,6satMost(6, SECONDS)// 设置间隔100mspollInterval(ONE_HUNDRED_MILLISECONDS)// 设置延迟50mspollDelay(50, MILLISECONDS)// 设置提示语await("count is greater 6")// 连接and()// 等待java.util.concurrent.Callable返回trueuntil(new Callable<Boolean>() {@Overridepublic Boolean call() throws Exception {return service.getCount() == 6;}});
package org.awaitility.core
public static ConditionFactory with() {return new ConditionFactory(defaultWaitConstraint, defaultPollInterval, defaultPollDelay,defaultCatchUncaughtExceptions, defaultExceptionIgnorer, defaultConditionEvaluationListener);}
/*** Instantiates a new condition factory.** @param timeoutConstraint the timeout* @param pollInterval the poll interval* @param pollDelay The delay before the polling starts* @param exceptionsIgnorer the ignore exceptions* @param catchUncaughtExceptions the catch uncaught exceptions*/public ConditionFactory(WaitConstraint timeoutConstraint, PollInterval pollInterval, Duration pollDelay,boolean catchUncaughtExceptions, ExceptionIgnorer exceptionsIgnorer,ConditionEvaluationListener conditionEvaluationListener) {this(null, timeoutConstraint, pollInterval, pollDelay, catchUncaughtExceptions, exceptionsIgnorer,conditionEvaluationListener);}
/*** The default poll interval (fixed 100 ms).*/private static volatile PollInterval defaultPollInterval = DEFAULT_POLL_INTERVAL;/*** The default wait constraint (10 seconds).*/private static volatile WaitConstraint defaultWaitConstraint = AtMostWaitConstraint.TEN_SECONDS;/*** The default poll delay*/private static volatile Duration defaultPollDelay = DEFAULT_POLL_DELAY;/*** Catch all uncaught exceptions by default?*/private static volatile boolean defaultCatchUncaughtExceptions = true;/*** Ignore caught exceptions by default?*/private static volatile ExceptionIgnorer defaultExceptionIgnorer = new PredicateExceptionIgnorer(new Predicate<Exception>() {public boolean matches(Exception e) {return false;}});/*** Default listener of condition evaluation results.*/private static volatile ConditionEvaluationListener defaultConditionEvaluationListener = null;
public ConditionFactory atMost(Duration timeout) {return new ConditionFactory(alias, timeoutConstraint.withMaxWaitTime(timeout), pollInterval, pollDelay,catchUncaughtExceptions, exceptionsIgnorer, conditionEvaluationListener);}
public ConditionFactory pollInterval(Duration pollInterval) {return new ConditionFactory(alias, timeoutConstraint, pollInterval, pollDelay, catchUncaughtExceptions,exceptionsIgnorer, conditionEvaluationListener);}
timeoutConstraint为60spollInterval为100mspollDelay为50msalias为"count is greater 6"
public void until(Callable<Boolean> conditionEvaluator) {until(new CallableCondition(conditionEvaluator, generateConditionSettings()));}private <T> T until(Condition<T> condition) {return condition.await();}
new ConditionSettings(alias, catchUncaughtExceptions, timeoutConstraint, pollInterval, actualPollDelay,conditionEvaluationListener, exceptionsIgnorer);
public CallableCondition(final Callable<Boolean> matcher, ConditionSettings settings) {conditionEvaluationHandler = new ConditionEvaluationHandler<Object>(null, settings);ConditionEvaluationWrapper conditionEvaluationWrapper = new ConditionEvaluationWrapper(matcher, settings, conditionEvaluationHandler);conditionAwaiter = new ConditionAwaiter(conditionEvaluationWrapper, settings) {@SuppressWarnings("rawtypes")@Overrideprotected String getTimeoutMessage() {if (timeout_message != null) {return timeout_message;}final String timeoutMessage;if (matcher == null) {timeoutMessage = "";} else {final Class<? extends Callable> type = matcher.getClass();final Method enclosingMethod = type.getEnclosingMethod();if (type.isAnonymousClass() && enclosingMethod != null) {timeoutMessage = String.format("Condition returned by method \"%s\" in class %s was not fulfilled",enclosingMethod.getName(), enclosingMethod.getDeclaringClass().getName());} else {final String message;if (isLambdaClass(type)) {message = "with " + generateLambdaErrorMessagePrefix(type, false);} else {message = type.getName();}timeoutMessage = String.format("Condition %s was not fulfilled", message);}}return timeoutMessage;}};}
public ConditionAwaiter(final ConditionEvaluator conditionEvaluator,final ConditionSettings conditionSettings) {if (conditionEvaluator == null) {throw new IllegalArgumentException("You must specify a condition (was null).");}if (conditionSettings == null) {throw new IllegalArgumentException("You must specify the condition settings (was null).");}if (conditionSettings.shouldCatchUncaughtExceptions()) {Thread.setDefaultUncaughtExceptionHandler(this);}this.conditionSettings = conditionSettings;this.latch = new CountDownLatch(1);this.conditionEvaluator = conditionEvaluator;this.executor = initExecutorService();}
public Void await() {conditionAwaiter.await(conditionEvaluationHandler);return null;}
public <T> void await(final ConditionEvaluationHandler<T> conditionEvaluationHandler) {final Duration pollDelay = conditionSettings.getPollDelay();final Duration maxWaitTime = conditionSettings.getMaxWaitTime();final Duration minWaitTime = conditionSettings.getMinWaitTime();final long maxTimeout = maxWaitTime.getValue();final TimeUnit maxTimeoutUnit = maxWaitTime.getTimeUnit();long pollingStarted = System.currentTimeMillis() - pollDelay.getValueInMS();pollSchedulingThread(conditionEvaluationHandler, pollDelay, maxWaitTime).start();try {try {final boolean finishedBeforeTimeout;if (maxWaitTime == Duration.FOREVER) {latch.await();finishedBeforeTimeout = true;} else {finishedBeforeTimeout = latch.await(maxTimeout, maxTimeoutUnit);}Duration evaluationDuration =new Duration(System.currentTimeMillis() - pollingStarted, TimeUnit.MILLISECONDS).minus(pollDelay);if (throwable != null) {throw throwable;} else if (!finishedBeforeTimeout) {final String maxWaitTimeLowerCase = maxWaitTime.getTimeUnitAsString();final String message;if (conditionSettings.hasAlias()) {message = String.format("Condition with alias '%s' didn't complete within %s %s because %s.",conditionSettings.getAlias(), maxTimeout, maxWaitTimeLowerCase, Introspector.decapitalize(getTimeoutMessage()));} else {message = String.format("%s within %s %s.", getTimeoutMessage(), maxTimeout, maxWaitTimeLowerCase);}final ConditionTimeoutException e;// Not all systems support deadlock detection so ignore if ThreadMXBean & ManagementFactory is not in classpathif (existInCP("java.lang.management.ThreadMXBean") && existInCP("java.lang.management.ManagementFactory")) {java.lang.management.ThreadMXBean bean = java.lang.management.ManagementFactory.getThreadMXBean();Throwable cause = this.cause;try {long[] threadIds = bean.findDeadlockedThreads();if (threadIds != null) {cause = new DeadlockException(threadIds);}} catch (UnsupportedOperationException ignored) {// findDeadLockedThreads() not supported on this VM,// don't init cause and move on.}e = new ConditionTimeoutException(message, cause);} else {e = new ConditionTimeoutException(message, this.cause);}throw e;} else if (evaluationDuration.compareTo(minWaitTime) < 0) {String message = String.format("Condition was evaluated in %s %s which is earlier than expected " +"minimum timeout %s %s", evaluationDuration.getValue(), evaluationDuration.getTimeUnit(),minWaitTime.getValue(), minWaitTime.getTimeUnit());throw new ConditionTimeoutException(message);}} finally {executor.shutdown();if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {try {executor.shutdownNow();executor.awaitTermination(1, TimeUnit.SECONDS);} catch (InterruptedException e) {CheckedExceptionRethrower.safeRethrow(e);}}}} catch (Throwable e) {CheckedExceptionRethrower.safeRethrow(e);}}
private final CountDownLatch latch;
this.latch = new CountDownLatch(1);
A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.A CountDownLatch is initialized with a given count. The await methods block until the current count reaches zero due to invocations of the countDown() method, after which all waiting threads are released and any subsequent invocations of await return immediately.详情见:<https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html>
pollSchedulingThread(conditionEvaluationHandler, pollDelay, maxWaitTime).start();
private <T> Thread pollSchedulingThread(final ConditionEvaluationHandler<T> conditionEvaluationHandler,final Duration pollDelay, final Duration maxWaitTime) {final long maxTimeout = maxWaitTime.getValue();final TimeUnit maxTimeoutUnit = maxWaitTime.getTimeUnit();return new Thread(new Runnable() {public void run() {int pollCount = 0;try {conditionEvaluationHandler.start();if (!pollDelay.isZero()) {Thread.sleep(pollDelay.getValueInMS());}Duration pollInterval = pollDelay;while (!executor.isShutdown()) {if (conditionCompleted()) {break;}pollCount = pollCount + 1;Future<?> future = executor.submit(new ConditionPoller(pollInterval));if (maxWaitTime == Duration.FOREVER) {future.get();} else {future.get(maxTimeout, maxTimeoutUnit);}pollInterval = conditionSettings.getPollInterval().next(pollCount, pollInterval);Thread.sleep(pollInterval.getValueInMS());}} catch (Throwable e) {throwable = e;}}}, "awaitility-poll-scheduling");}
Future<?> future = executor.submit(new ConditionPoller(pollInterval));
private class ConditionPoller implements Runnable {private final Duration delayed;/*** @param delayed The duration of the poll interval*/public ConditionPoller(Duration delayed) {this.delayed = delayed;}public void run() {try {ConditionEvaluationResult result = conditionEvaluator.eval(delayed);if (result.isSuccessful()) {latch.countDown();} else if (result.hasThrowable()) {cause = result.getThrowable();}} catch (Exception e) {if (!conditionSettings.shouldExceptionBeIgnored(e)) {throwable = e;latch.countDown();}}}}
conditionAwaiter = new ConditionAwaiter(conditionEvaluationWrapper, settings)
public ConditionEvaluationResult eval(Duration pollInterval) throws Exception {boolean conditionFulfilled = matcher.call();if (conditionFulfilled) {conditionEvaluationHandler.handleConditionResultMatch(getMatchMessage(matcher, settings.getAlias()), true, pollInterval);} else {conditionEvaluationHandler.handleConditionResultMismatch(getMismatchMessage(matcher, settings.getAlias()), false, pollInterval);}return new ConditionEvaluationResult(conditionFulfilled);}
ConditionEvaluationWrapper(Callable<Boolean> matcher, ConditionSettings settings, ConditionEvaluationHandler<Object> conditionEvaluationHandler) {this.matcher = matcher;this.settings = settings;this.conditionEvaluationHandler = conditionEvaluationHandler;}
new Callable<Boolean>() {@Overridepublic Boolean call() throws Exception {return service.getCount() == 6;}}
ConditionEvaluationResult result = conditionEvaluator.eval(delayed);if (result.isSuccessful()) {latch.countDown();} else if (result.hasThrowable()) {cause = result.getThrowable();}
Thread.sleep(pollInterval.getValueInMS());
Future<?> future = executor.submit(new ConditionPoller(pollInterval));if (maxWaitTime == Duration.FOREVER) {future.get();} else {future.get(maxTimeout, maxTimeoutUnit);}
<code data-language="" livecodeserver"="">latch.await(): Causes the current thread to wait until the latch has counted down to zero。latch.await(long timeout, TimeUnit unit):最多等待timeout, true if the count reached zero and false if the waiting time elapsed before the count reached zero
集结各类场景实战经验,助你开发运维畅行无忧