Java 编程中,如果并发的任务想在指定的时间内,获取所有完成的作业,未完成则丢弃,则可以用 CompletionService 类。
CompletionService 类实际是一个接口,具体实现类为 ExecutorCompletionService,内部是通过 BlockingQueue 阻塞队列来实现了生产者-消费者模式,CompletionService 就是将生产者(新的异步线程生产的 Future)和消费者(获取生产出来的 Future)进行解耦,生产者只需要 submit(Callable/Runnable) 将新的任务提交,就可以生产 Future,而消费者只要 take() 或 poll() 即可获取消费对象。
推荐方式
利用 CompletionService 具体步骤如下:
- 创建一个具体实例 ExecutorCompletionService,并指定具体的线程池 Executor;
- 提交(submit)所有可异步的任务;
- 设置获取所有任务的超时总时间,while 轮询去调用 CompletionService 的 poll 方法,poll 可以以 5 毫秒时间粒度的超时获取已完成的任务。
具体示例代码如下:
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletionService;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class CompletionServiceDemo {
/**
* 异步执行任务及在指定的时间内快速获取所有的任务结果
*
* @param taskList 待执行的任务
* @param executor 使用的线程池
* @param timeout 所有任务获取的总超时时间
*/
public void taskConcurrentExecuteAndGet(List<Object> taskList, Executor executor, int timeout) {
CompletionService<Object> completionService = new ExecutorCompletionService<>(executor);
for (Object task : taskList) {
completionService.submit(() -> {
Object result = new Object();
System.out.println("do task and set result");
return result;
});
}
// 待完成的任务
int taskSize = taskList.size();
// 已完成的任务
int completedTaskSize = 0;
Instant start = Instant.now();
try {
while (Duration.between(start, Instant.now()).toMillis() < timeout && (completedTaskSize < taskSize)) {
// 以 5 毫秒为超时时间粒度获取结果
Future<Object> future = completionService.poll(5, TimeUnit.MILLISECONDS);
if (Objects.nonNull(future)) {
Object result = future.get();
completedTaskSize++;
// handle result
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}