Java 异步任务优化 CompletionService

Future的缺陷

Future通过get方法来获取异步任务的结果,如果任务还没有完成则阻塞线程,因为我们需要它的结果,所以等待是应该的。

如果需要处理一批这类任务,提交到线程池我们最终会得到多个Future,但是每个任务执行的时间可能并不相同,那么我们应该优先调用哪个Future的get呢?示例如下图:



可以看到有些任务因为执行时间较长,而在他后面的任务可能执行任务的时间较短,已经提前完成了,但是并不能得到及时的处理,只能等到前面的任务执行完成后才能处理,这样设计是非常不合理的。

CompletionService解决

线程Java中提供了CompletionService来解决这个问题的,同样可以把一批任务提交到CompletionService,CompletionService可以把先执行完成的任务通过take方法获取到。使用方法如下图:



ExecutorCompletionService是CompletionService的具体实现,我们把线程池设置给ExecutorCompletionService后,也可以通过submit提交任务到ExecutorCompletionService,最后通过take方法获取到已经完成的Future

从上面例子可以看到因为id等于3的任务先执行完成,所以也先处理了这个任务。最先提交的任务id等于0的因为休眠时间较长,所以先完成的任务就可以先执行处理

ExecutorCompletionService源码

从上面的例子可以看到使用ExecutorCompletionService有三个关键步骤:设置一个线程池、submit提交任务、take获取完成的Future

看源码首先看他的属性,查看源码得到他有两个关键属性:

Executor executor:执行线程的线程池;

BlockingQueue<Future<V>> completionQueue:阻塞队列,保存完成的Future;

看到这两个属性就大概能够猜到它的实现方案:利用线程池去执行任务,利用阻塞队列来保存完成的Future

在submit方法肯定调用的是线程池的submit方法这个很简单,唯一的问题是如何把完成的Future放到阻塞队列中的?

上一篇文章中我们知道最终实现有返回的异步任务的类是FutureTask,最终运行的是FutureTask的run方法,run方法完成后会调用finishCompletion方法去唤醒那些因为调用FutureTask.get()方法而阻塞的线程,在finishCompletion的最后调用了done()方法,只不过在FutureTask中done()方法是一个空方法

而ExecutorCompletionService就是利用FutureTask的done()实现了把任务放到阻塞队列中。

ExecutorCompletionService有一个私有的内部类QueueingFuture,它继承FutureTask,并实现了done()方法,done()方法把任务放到了ExecutorCompletionService的阻塞队列中(所以QueueingFuture是私有类)。done()方法只有在任务执行完成后才会调用。

图解ExecutorCompletionService功能

通过以上分析基本上弄懂了ExecutorCompletionService的执行流程,分析如下图:



展开阅读全文

本文系作者在时代Java发表,未经许可,不得转载。

如有侵权,请联系nowjava@qq.com删除。

编辑于

关注时代Java

关注时代Java