构建一个新的线程是有一定代价的,因为涉及与操作系统的交互。若程序中创建了大量的生命期很短的线程,应该使用线程池。一个线程池中包含许多准备运行的空闲线程。将 Runnable 对象交给线程池,就会有一个线程调用 run 方法。当 run 方法退出时,线程不会死亡,而是在池中准备为下一个请求提供服务。
另一个使用线程池的理由是减少并发线程的数目。创建大量线程会大大降低性能甚至使虚拟机崩溃。如果有一个会创建许多线程的算法,应该使用一个线程数“固定的”线程池以限制并发线程的总数。
线程池
执行器 (Executor)类有许多静态工厂方法用来构建线程池:
方法 | 描述 |
---|---|
Executors.newCachedThreadPool | 必要时创建新线程;空闲线程会被保留 60 秒 |
Executors.newFixedThreadPool | 该池包含固定数量的线程;空闲线程会一直被保留 |
Executors.newSingleThreadExecutor | 只有一个线程的“池”,该线程顺序执行每一个提交的任务 |
Executors.newScheduledThreadPool | 用于预定执行而构建的固定线程池,替代 java.util.Timer |
Executors.newSingleThreadScheduledExecutor | 用于预定执行而构建的单线程“池” |
newCachedThreadPool 方法构建了一个线程池,对于每个任务,如果有空闲线程可用,立即让它执行任务,如果没有可用的空闲线程,则创建一个新线程。
newFixedThreadPool 方法构建一个具有固定大小的线程池。如果提交的任务数多于空闲的线程数,那么把得不到服务的任务放置到队列中。当其他任务完成以后再运行它们。
newSingleThreadExecutor 是一个退化了的大小为 1 的线程池:由一个线程执行提交的任务,一个接着一个。
如上 3 个方法是返回实现了 ExecutorService 接口的 ThreadPoolExecutor 类的对象。
可用下面的方法之一将一个 Runnable 对象或 Callable 对象提交给 ExecutorService:
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
该池会在方便的时候尽早执行提交的任务。调用 submit 时,会得到一个 Future 对象,可用来查询该任务的状态。
第一个 submit 方法返回一个奇怪样子的 Future<?>。可以使用这样一个对象来调用 isDone cancel 或 isCancelled。但是 get 方法在完成的时候只是简单地返回 null。
第二个版本的 submit 也提交一个 Runnable,并且 Future 的 get 方法在完成的时候返回指定的 result 对象。
第三个版本的 submit 提交一个 Callable,并且返回的 Future 对象将在计算结果准备好的时候得到它。
当用完一个线程池的时候,调用 shutdown。该方法启动该池的关闭序列。被关闭的执行器不再接受新的任务。当所有任务都完成以后,线程池中的线程死亡。另一种方法是调用 shutdownNow。该池取消尚未开始的所有任务并试图中断正在运行的线程。
下面总结了在使用连接池时应该做的事:
- 调用 Executors 类中静态的方法 newCachedThreadPool 或 newFixedThreadPool。
- 调用 submit 提交 Runnable 或 Callable 对象。
- 如果想要取消一个任务,或如果提交 Callable 对象,那就要保存好返回的 Future 对象。
- 当不再提交任何任务时,调用 shutdown。
预定执行
ScheduledExecutorService 接口具有为预定执行(Scheduled Execution)或重复执行任务而设计的方法。它是一种允许使用线程池机制的 java.util.Timer
的泛化。Executors 类的 newScheduledThreadPool 和 newSingleThreadScheduledExecutor 方法将返回实现了 ScheduledExecutorService 接口的对象。
可以预定 Runnable 或 Callable 在初始的延迟之后只运行一次,也可以预定一个 Runnable 对象周期性地运行。
java.util.concurrent.Executors
中构建预定执行器的相关 API:
// 返回一个线程池,它使用给定的线程数来调度任务
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
// 返回一个执行器,它在一个单独线程中调度任务
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
java.util.concurrent.ScheduledExecutorService
中任务调度的相关 API:
/**
* 预定在指定的时间之后执行 Runnable 任务
*/
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
/**
* 预定在指定的时间之后执行 Callable 任务
*/
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
/**
* 预定在初始的延迟结束后,周期性地运行给定的任务,周期长度是 period
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
/**
* 预定在初始的延迟结束后周期性地运行给定的任务,在一次调用完成和下一次调用开始之间有长度为 delay 的延迟
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
控制任务组
有时,使用执行器有更有实际意义的原因是控制一组相关任务。
例如,可以在执行器中使用 shutdownNow 方法取消所有的任务。
invokeAny 方法提交所有对象到一个 Callable 对象的集合中,并返回某个已经完成了的任务的结果。无法知道返回的究竟是哪个任务的结果,也许是最先完成的那个任务的结果。
invokeAll 方法提交所有对象到一个 Callable 对象的集合中,并返回一个 Future 对象的列表,代表所有任务的解决方案。当计算结果可获得时,可以像下面这样对结果进行处理:
List<Callab1e<T>> tasks = . . .;
List<Future<T>> results = executor.invokeAll(tasks);
for (Future<T> result : results)
processFurther(result.get());
这个方法的缺点是如果第一个任务恰巧花去了很多时间,则可能不得不进行等待。将结果按可获得的顺序保存起来更有实际意义。可以用 ExecutorCompletionService 来进行排列。
用常规的方法获得一个执行器。然后,构建一个 ExecutorCompletionService,提交任务给完成服务(completion service)。该服务管理 Future 对象的阻塞队列,其中包含已经提交的任务的执行结果(当这些结果成为可用时)。这样一来,相比前面的计算,一个更有效的组织形式如下:
ExecutorCompletionService<T> service = new ExecutorCompletionService<>(executor);
for (Callable<T> task : tasks)
service.submit(task);
for (int i = 0; i < tasks.size(); i++)
processFurther(service.take().get());
Fork-Join 框架
有些应用使用了大量线程,但其中大多数都是空闲的。举例来说,一个 Web 服务器可能会为每个连接分别使用一个线程。另外一些应用可能对每个处理器内核分别使用一个线程,来完成计算密集型任务,如图像或视频处理。Java SE 7 中新引入了 fork-join 框架,专门用来支持后一类应用,但其前提是假设有一个处理任务,它可以很自然地分解为子任务,即任务操作支持并行处理。
要采用框架可用的一种方式完成这种递归计算,需要提供一个扩展 RecursiveTask<T> 的类(如果计算会生成一个类型为 T 的结果)或者提供一个扩展 RecursiveAction 的类(如果不生成任何结果)。再覆盖 compute 方法来生成并调用子任务,然后合并其结果。
在后台,fork-join 框架使用了一种有效的智能方法来平衡可用线程的工作负载,这种方法称为工作密取(work stealing)。每个工作线程都有一个双端队列(deque)来完成任务。一个工作线程将子任务压入其双端队列的队头。(只有一个线程可以访问队头,所以不需要加锁。)一个工作线程空闲时,它会从另一个双端队列的队尾“密取”一个任务。由于大的子任务都在队尾,这种密取很少出现。
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinTask<Integer> fjt = new Fibonacci(45);
ForkJoinPool forkJoinPool = new ForkJoinPool();
Future<Integer> result = forkJoinPool.submit(fjt);
System.out.println(result.get());
}
static class Fibonacci extends RecursiveTask<Integer> {
final int n;
Fibonacci(int n) {
this.n = n;
}
private int compute(int small) {
final int[] results = {1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89};
return results[small];
}
public Integer compute() {
if (n <= 10) {
return compute(n);
}
Fibonacci f1 = new Fibonacci(n - 1);
Fibonacci f2 = new Fibonacci(n - 2);
f1.fork();
f2.fork();
return f1.join() + f2.join();
}
}
上例中,我们用到了 RecursiveTask 提供的方法 fork() 和 join()。它们分别表示:子任务的异步执行和阻塞等待结果完成。
可完成的 Future
处理非阻塞调用的传统方法是使用事件处理器,程序员为任务完成之后要出现的动作注册一个处理器。当然,如果下一个动作也是异步的,在它之后的下一个动作会在一个不同的事件处理器中。尽管程序员会认为“ 先做步骤 1,然后是步骤 2,再完成步骤 3”,但实际上程序逻辑会分散到不同的处理器中。如果必须增加错误处理,情况会更糟糕。假设步骤 2 是“用户登录”。可能需要重复这个步骤,因为用户输入凭据时可能会出错。要尝试在一组事件处理器中实现这样一个控制流,或者想要理解所实现的这样一组事件处理器,会很有难度。
Java SE 8 的 CompletableFuture 类提供了一种候选方法。与事件处理器不同,“可完成 future” 可以“组合”(composed)。