CompletionService是什么?

  • Callable+Future 可以实现多个task并行执行,但是如果遇到前面的task执行较慢时需要阻塞等待前面的task执行完后面task才能取得结果。

  • CompletionService的主要功能就是一边生成任务,一边获取任务的返回值。让两件事分开执行,任务之间不会互相阻塞,可以实现先执行完的先取结果,不再依赖任务顺序了。

  • 他只有一个实现类:ExecutorCompletionService

CompletionService原理

  • 内部通过阻塞队列+FutureTask,实现了任务先完成可优先获取到,即结果按照完成先后顺序排序,内部有一个先进先出的阻塞队列,用于保存已经执行完成的Future,通过调用它的take方法或poll方法可以获取到一个已经执行完成的Future,进而通过调用Future接口实现类的get方法获取最终的结果。

CompletionService的应用场景

  • 当需要批量提交异步任务的时候建议你使用CompletionService:CompletionService将线程池Executor和阻塞队列BlockingQueue的功能融合在了一起,能够让批量异步任务的管理更简单。

  • CompletionService能够让异步任务的执行结果有序化:先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待,同时还可以快速实现诸如Forking Cluster这样的需求。

  • 线程池隔离:CompletionService支持自己创建线程池,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。

CompletionService的使用方式

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class CompletionServiceTest {

public static void main(String[] args) throws ExecutionException, InterruptedException {

// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(5);
// 创建CompletionService
CompletionService<Integer> cs = new ExecutorCompletionService<>(executor);
// 开启5个任务
for (int i = 0; i < 5; i++) {
cs.submit(new CompletionTask());
}
// 将询价结果异步保存到数据库
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + " 获取到任务结果:" + cs.take().get());
}
}
}

/**
* 自定义的任务
*/

class CompletionTask implements Callable<Integer> {
@Override
public Integer call() throws Exception {
// 获取随机的世界
int randomSleepTime = new Random().nextInt(10) * 1000;
// 开始执行
System.out.println(Thread.currentThread().getName() + " 开始执行,当前任务需要等待" + randomSleepTime + "毫秒");
// 线程等待,模拟正在执行
TimeUnit.MILLISECONDS.sleep(randomSleepTime);
// 结束执行
System.out.println(Thread.currentThread().getName() + "结束执行,当前任务需要等待" + randomSleepTime + "毫秒");
// 返回他等待的时间
return randomSleepTime;
}
}

运行结果:等待时间短的线程优先返回结果

CompletionServiceTest执行结果.png

CompletionService的构造方法源码分析

/**
* 传入线程池的构造方法
*/

public ExecutorCompletionService(Executor executor) {
// 线程池为null,直接抛出异常
if (executor == null)
throw new NullPointerException();
// 把使用的线程池赋值
this.executor = executor;
// 判断是不是AbstractExecutorService的子类
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
// 默认只是用LinkedBlockingQueue进行结果的存放
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}

/**
* 传入线程池的构造方法和使用的队列
*/

public ExecutorCompletionService(Executor executor,
BlockingQueue<Future<V>> completionQueue)
{
// 线程池为null或者使用的队列为空,直接抛出异常
if (executor == null || completionQueue == null)
throw new NullPointerException();
// 把使用的线程池赋值
this.executor = executor;
// 判断是不是AbstractExecutorService的子类
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
// 使用传入的队列进行结果的存放
this.completionQueue = completionQueue;
}

CompletionService的submit方法源码分析

/**
* CompletionService的任务提交方法
*/

public Future<V> submit(Runnable task, V result) {
// 任务为空,抛出空指针异常
if (task == null) throw new NullPointerException();
// 创建一个具体的任务
RunnableFuture<V> f = newTaskFor(task, result);
// 执行这个任务:封装一下,重写了done方法,将运行结果放入队列中。
executor.execute(new QueueingFuture(f));
// 返回执行的结果
return f;
}

/**
* QueueingFuture类:他重写了done方法
*/

private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
// 执行完成的方法:运行结果放入队列中。
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}

CompletionService的take方法源码分析

/**
* CompletionService的出队take方法:直接调用队列的阻塞出队方法。
*/

public Future<V> take() throws InterruptedException {
return completionQueue.take();
}

结束语

  • 关注我:所有前置知识都可以在我的百家号中找到!

举报/反馈

绽放苍穹的程序员

95获赞 359粉丝
java程序员的修炼之路!
关注
0
0
收藏
分享