定制并发类(六)自定义在计划的线程池内运行的任务

简介:

声明:本文是《 Java 7 Concurrency Cookbook 》的第七章, 作者: Javier Fernández González 译者:郑玉婷

自定义在计划的线程池内运行的任务

计划的线程池是 Executor 框架的基本线程池的扩展,允许你定制一个计划来执行一段时间后需要被执行的任务。 它通过 ScheduledThreadPoolExecutor 类来实现,并允许运行以下这两种任务:

  • Delayed 任务:这种任务在一段时间后仅执行一次。
  • Periodic 任务:这种任务在延迟后执行,然后通常周期性运行

Delayed 任务可以执行 Callable 和 Runnable 对象,但是 periodic任务只能执行 Runnable 对象。全部任务通过计划池执行的都必须实现 RunnableScheduledFuture 接口。在这个指南,你将学习如何实现你自己的 RunnableScheduledFuture 接口来执行延迟和周期性任务。

准备

指南中的例子是使用Eclipse IDE 来实现的。如果你使用Eclipse 或者其他的IDE,例如NetBeans, 打开并创建一个新的java项目。

怎么做呢…

按照这些步骤来实现下面的例子:

//1.  创建一个类,名为 MyScheduledTask,使名为 V 的泛型类型参数化。它扩展 FutureTask 类并实现 RunnableScheduledFuture 接口。
public class MyScheduledTask<V> extends FutureTask<V> implements
RunnableScheduledFuture<V> {

//2.   声明一个私有 RunnableScheduledFuture 属性,名为 task.
private RunnableScheduledFuture<V> task;

//3.   声明一个私有 ScheduledThreadPoolExecutor,名为 executor.
private ScheduledThreadPoolExecutor executor;

//4.   声明一个私有long属性,名为 period。
private long period;

//5.   声明一个私有long属性,名为 startDate。
private long startDate;

//6.   实现类的构造函数。它接收任务:将要运行的 Runnable 对象,任务要返回的 result,将被用来创建 MyScheduledTask 对象的 RunnableScheduledFuture 任务,和要执行这个任务的 ScheduledThreadPoolExecutor 对象。 调用它的父类的构造函数并储存任务和执行者属性。
public MyScheduledTask(Runnable runnable, V result, RunnableScheduledFuture<V> task, ScheduledThreadPoolExecutor executor) {
	super(runnable, result);
	this.task=task;
	this.executor=executor;
}

//7.	实现 getDelay() 方法。如果是周期性任务且 startDate 形象的值非0,计算并返回 startDate 属性与当前日期的相差值。否则,返回储存在 task 属性的原先任务的延迟值。不要忘记你要返回结果时,要传递 time unit 作为参数哦。
@Override
public long getDelay(TimeUnit unit) {
	if (!isPeriodic()) {
		return task.getDelay(unit);
	} else {
		if (startDate==0){
			return task.getDelay(unit);
		} else {
			Date now=new Date();
			long delay=startDate-now.getTime();
			return unit.convert(delay, TimeUnit.MILLISECONDS);
		}
	}
}

//8.  实现 compareTo() 方法。调用原先任务的 compareTo() 方法。
@Override
public int compareTo(Delayed o) {
	return task.compareTo(o);
}

//9.  实现 isPeriodic() 方法。调用原来任务的 isPeriodic() 方法。
@Override
public boolean isPeriodic() {
return task.isPeriodic();
}

//10. 实现方法 run()。如果这是一个周期性任务,你要用下一个执行任务的开始日期更新它的 startDate 属性。用当前日期和时间间隔的和计算它。 然后,把再次把任务添加到 ScheduledThreadPoolExecutor 对象的 queue中。
@Override
public void run() {
	if (isPeriodic() && (!executor.isShutdown())) {
		Date now=new Date();
		startDate=now.getTime()+period;
		executor.getQueue().add(this);
	}

//11.打印当前日期的信息到操控台,调用 runAndReset() 方法运行任务,然后再打印另一条关于当前日期的信息到操控台。
System.out.printf("Pre-MyScheduledTask: %s\n",new Date());
System.out.printf("MyScheduledTask: Is Periodic:%s\n",isPeriodic());
super.runAndReset();
System.out.printf("Post-MyScheduledTask: %s\n",new Date());
}

//12. 实现 setPeriod() 方法,来确立任务的周期时间。
public void setPeriod(long period) {
	this.period=period;
}

//13. 创建一个类,名为 MyScheduledThreadPoolExecutor 来实现一个运行 MyScheduledTask 任务的 ScheduledThreadPoolExecutor 对象。特别扩展 ScheduledThreadPoolExecutor 类。
public class MyScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {

//14. 实现类的构造函数,只要调用它的父类的构造函数。
public MyScheduledThreadPoolExecutor(int corePoolSize) {
	super(corePoolSize);
}

//15. 实现方法 decorateTask()。它接收将要被运行的 Runnable 对象和将运行 Runnable 对象的 RunnableScheduledFuture 任务作为参数。使用这些对象来构造来创建并返回 MyScheduledTask 任务。
@Override
//译者:前面那个<V>是打错吧多余的吧?
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
	MyScheduledTask<V> myTask=new MyScheduledTask<V>(runnable, null, task,this);
	return myTask;
}

//16. 覆盖方法 scheduledAtFixedRate()。调用它的父类的方法,调用它的父类的方法,  method. Call the method of its parent class, convert the returned object into a MyScheduledTask object, and establish the period of that task using the setPeriod() method.

@Override
//译者:不知道怎么出现?号的。应该是V。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
	ScheduledFuture<?> task= super.scheduleAtFixedRate(command, initialDelay, period, unit);
	MyScheduledTask<?> myTask=(MyScheduledTask<?>)task;
	myTask.setPeriod(TimeUnit.MILLISECONDS.convert(period,unit));
	return task;
}

//17.  创建一个类,名为 Task,实现 Runnable 接口。
public class Task implements Runnable {

//18. 实现方法 run() 。在任务开始时打印一条信息,再让当前线程进入休眠2秒。最后在任务结束时,再打印另一条信息。
@Override
public void run() {
	System.out.printf("Task: Begin.\n");
	try {
		TimeUnit.SECONDS.sleep(2);
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
	System.out.printf("Task: End.\n");
}

//19. 创建例子的主类通过创建一个类,名为 Main 并添加 main()方法。
public class Main {

public static void main(String[] args) throws Exception{

//20. 创建一个 MyScheduledThreadPoolExecutor 对象,名为 executor。使用2作为参数来在池中获得2个线程。
MyScheduledThreadPoolExecutor executor=new MyScheduledThreadPoolExecutor(2);

//21. 创建 Task 对象,名为 task。把当前日期写入操控台。
Task task=new Task(); System.out.printf("Main: %s\n",new Date());

//22. 使用 schedule() 方法发送一个延迟任务给执行者。此任务在延迟一秒后运行。
executor.schedule(task, 1, TimeUnit.SECONDS);

//23. 让主线程休眠3秒。
TimeUnit.SECONDS.sleep(3);

//24. 创建另一个 Task 对象。再次在操控台打印当前日期。
task=new Task();
System.out.printf("Main: %s\n",new Date());

//25. 使用方法 scheduleAtFixedRate()发送一个周期性任务给执行者。此任务在延迟一秒后被运行,然后每3秒执行。
executor.scheduleAtFixedRate(task, 1, 3, TimeUnit.SECONDS);

//26. 让主线程休眠10秒。
TimeUnit.SECONDS.sleep(10);

//27. 使用 shutdown() 方法关闭执行者。使用 awaitTermination() 方法等待执行者的完结。
executor.shutdown();
executor.awaitTermination(1, TimeUnit.DAYS);

//28. 写信息到操控台表明任务结束。
System.out.printf("Main: End of the program.\n");

它是怎么工作的…

在这个指南,你实现了 MyScheduledTask 类实现在 ScheduledThreadPoolExecutor 执行者中执行的自定义任务。这个类扩展 FutureTask 类并实现了 RunnableScheduledFuture 接口。它实现 RunnableScheduledFuture 接口, 因为在计划的执行者中执行的全部任务都一定要实现 这个接口,并扩展了 FutureTask 类,因为这个类提供了能有效的实现在 RunnableScheduledFuture 接口声明的方法。 之前提到的全部接口和类都被参数化成任务要返回的数据类型。

为了在计划的执行者中使用 MyScheduledTask 任务,要重写在 MyScheduledThreadPoolExecutor 类的 decorateTask() 方法。这个类扩展 ScheduledThreadPoolExecutor 执行者和它的方法提供一个把 ScheduledThreadPoolExecutor 执行者默认的计划任务转换成 MyScheduledTask 任务来实现的机制。所以,当你实现你的版本的计划任务时,你必须实现你的版本的计划的执行者。

decorateTask() 方法只是简单的创建了新的带有参数的 MyScheduledTask 对象:将要在任务中执行的 Runnable 对象; 将被任务返回结果对象,在这个例子,任务将不会返回结果,所以你要使用null值;原来执行 Runnable 对象的任务,新的对象将在池中代替这个任务;和
将执行任务的执行者,在这个例子,你使用 this 关键词指向创建这个任务的执行者。

The MyScheduledTask 类可以执行延迟和周期性任务。你已经实现了有全部必须的算法可以执行这2种任务的方法。他们是 getDelay() 和 run() 方法。

  • The getDelay() 方法被计划的执行者调用来确认它是否需要运行任务。此方法对延迟任务和周期任务的响应是不同的。在之前提到的, MyScheduledClass 类的构造函数接收 原先的将要执行 Runnable 对象的 ScheduledRunnableFuture 对象, 并储存它作为类的属性来获取它的方法和它的数据。当我们要运行延迟任务时,getDelay() 方法返回原先任务的延迟,但是在周期任务的例子中,getDelay() 方法返回 startDate 属性值与当前时间的相差值。
  • run() 方法是用来执行任务的。周期性任务的一个特别之处是你必须把下一次任务的执行作为一个新的任务放入到执行者的queue中,如果你要再次运行任务的话。所以,如果你执行周期性任务,你确定 startDate 属性值通过把当前时间和任务的执行周期相加,然后把任务储存在执行者的queue中。startDate 属性储存下一次任务将开始运行的时间。然后,使用 FutureTask 类提供的 runAndReset() 方法来运行任务。 在这个例子的延迟任务由于他们仅仅执行一次,就不用把他们放入执行者的queue中了。

你必须要注意如果执行者已经关闭。在这个例子,你不不需要再次把周期性任务储存进执行者的queue。

最后,你重写了在 MyScheduledThreadPoolExecutor 类的 scheduleAtFixedRate() 方法。我们之前提到的,对于周期任务,你要使用任务的周期来确定 startDate 属性值,但是你还没有初始这个周期呢。你必须重写此方法接收周期作为参数,然后传递给 MyScheduledTask 类这样它才能使用。

有了 Task 类例子总是完成了,它实现 Runnable 接口,也是在计划的执行者中运行的任务。这个例子的主类创建了 MyScheduledThreadPoolExecutor 执行者,然后给他们发送了以下2个任务:

  • 一个延迟任务,在当前时间过一秒后运行
  • 一个周期任务,在当前时间过一秒后运行,接着每隔3秒运行

以下裁图展示了这个例子的运行的一部分。你可以检查2种任务运行正常:

7-6

更多…

ScheduledThreadPoolExecutor 类提供了另一个版本的 decorateTask() 方法,它接收 Callable 对象作为参数来代替 Runnable 对象。

参见

第四章,线程执行者:执行者在延迟后运行任务
第四章,线程执行者:执行者周期性运行任务

目录
相关文章
|
1月前
|
存储 Java 数据库
如何处理线程池关闭时未完成的任务?
总之,处理线程池关闭时未完成的任务需要综合考虑多种因素,并根据实际情况选择合适的处理方式。通过合理的处理,可以最大程度地减少任务丢失和数据不一致等问题,确保系统的稳定运行和业务的顺利开展。
119 64
|
1月前
|
消息中间件 监控 Java
线程池关闭时未完成的任务如何保证数据的一致性?
保证线程池关闭时未完成任务的数据一致性需要综合运用多种方法和机制。通过备份与恢复、事务管理、任务状态记录与恢复、数据同步与协调、错误处理与补偿、监控与预警等手段的结合,以及结合具体业务场景进行分析和制定策略,能够最大程度地确保数据的一致性,保障系统的稳定运行和业务的顺利开展。同时,不断地优化和改进这些方法和机制,也是提高系统性能和可靠性的重要途径。
119 62
|
25天前
|
缓存 监控 Java
Java线程池提交任务流程底层源码与源码解析
【11月更文挑战第30天】嘿,各位技术爱好者们,今天咱们来聊聊Java线程池提交任务的底层源码与源码解析。作为一个资深的Java开发者,我相信你一定对线程池并不陌生。线程池作为并发编程中的一大利器,其重要性不言而喻。今天,我将以对话的方式,带你一步步深入线程池的奥秘,从概述到功能点,再到背景和业务点,最后到底层原理和示例,让你对线程池有一个全新的认识。
53 12
|
20小时前
|
Java
【JavaEE】——多线程常用类
Callable的call方法,FutureTask类,ReentrantLock可重入锁和对比,Semaphore信号量(PV操作)CountDownLatch锁存器,
|
20小时前
|
Java 程序员 调度
【JavaEE】线程创建和终止,Thread类方法,变量捕获(7000字长文)
创建线程的五种方式,Thread常见方法(守护进程.setDaemon() ,isAlive),start和run方法的区别,如何提前终止一个线程,标志位,isinterrupted,变量捕获
|
20小时前
|
安全 Java API
【JavaEE】多线程编程引入——认识Thread类
Thread类,Thread中的run方法,在编程中怎么调度多线程
|
1月前
|
安全 Java
线程安全的艺术:确保并发程序的正确性
在多线程环境中,确保线程安全是编程中的一个核心挑战。线程安全问题可能导致数据不一致、程序崩溃甚至安全漏洞。本文将分享如何确保线程安全,探讨不同的技术策略和最佳实践。
41 6
|
3天前
|
NoSQL Redis
单线程传奇Redis,为何引入多线程?
Redis 4.0 引入多线程支持,主要用于后台对象删除、处理阻塞命令和网络 I/O 等操作,以提高并发性和性能。尽管如此,Redis 仍保留单线程执行模型处理客户端请求,确保高效性和简单性。多线程仅用于优化后台任务,如异步删除过期对象和分担读写操作,从而提升整体性能。
12 1
|
2月前
|
存储 消息中间件 资源调度
C++ 多线程之初识多线程
这篇文章介绍了C++多线程的基本概念,包括进程和线程的定义、并发的实现方式,以及如何在C++中创建和管理线程,包括使用`std::thread`库、线程的join和detach方法,并通过示例代码展示了如何创建和使用多线程。
60 1
|
2月前
|
Java 开发者
在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口
【10月更文挑战第20天】在Java多线程编程中,创建线程的方法有两种:继承Thread类和实现Runnable接口。本文揭示了这两种方式的微妙差异和潜在陷阱,帮助你更好地理解和选择适合项目需求的线程创建方式。
32 3