FutureTask
我们在使用线程池时, 发现线程池除了execute方法, 还有一个submit方法
, submit方法会返回一个Future类型, 然后能够使用这个返回值的get方法, 得到异步任务的返回值
, 当然, 这时候传递给线程池的对象应该是一个Callable的对象.
ExecutorService executorService = Executors.newFixedThreadPool(2);Future<?> submit = executorService.submit(() -> { System.out.println(123); TimeUnit.SECONDS.sleep(3); return "hello";});System.out.println(submit.get());
上面这段代码将会阻塞在submit.get()处3秒, 直到submit提交的任务完成之后, get()方法才会放行, 并且get()方法将会得到submit提交的任务的返回值, 也就是"hello".
Runnable和Callable
submit方法与execute方法很类似, 只是传递的对象不同, execute需要一个Runnable, 而submit需要一个Callable
:
@FunctionalInterfacepublic interface Callable<V> { V call() throws Exception;}@FunctionalInterfacepublic interface Runnable { public abstract void run();}
Callable是一个带泛型的类, 返回值的类型就是其泛型
, 通过看代码可以知道, Runnable和Callable主要有如下几种不同点:
1. Callable有泛型, 并且call方法有返回值, 返回值类型是其泛型.
2. Callable的call方法可以抛出异常, 而Runnable的run方法只能人为自己在所实现的run方法中自己捕捉异常, 或者抛出RuntimeException.
因此, Callable的作用是比Runnable要更加强大一点.
Future接口
submit方法返回的是一个Future类型, 那么我们首先看看Future这个接口到底是什么样的:
public interface Future<V> { // 取消当前任务 boolean cancel(boolean mayInterruptIfRunning); // 当前任务是否被取消 boolean isCancelled(); // 当前任务是否已完成 boolean isDone(); // 拿到当前任务的处理结果, 一直阻塞 V get() throws InterruptedException, ExecutionException; // 拿到当前任务的处理结果, 超过超时时间则抛异常 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;}
Future接口在jdk中提供了如下几个继承/实现接口/类, 当然并不是所有都是常用的, 实际上比较常用的也就是FutureTask, ScheduledFutureTask, CompletableFuture
这几个类.
而在线程池的submit方法中, 返回的类型就是FutureTask类型.
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); // 将从外部接收的Callable类型直接传递给FutureTask的构造函数, 得到一个FutureTask对象 RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask;}protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable);}
在使用submit方法的时候, 我们自己构造了一个Callable对象传递进去, 这个Callable对象直接被拿去创建一个FutureTask对象, 然后execute(task)去执行这个FutureTask, 从这里我们就可以看出来, FutureTask肯定是实现了Runnable的
.
FutureTask实现了RunnableFuture接口, 而RunnableFuture接口既继承了Runnable也继承了Future接口, 因此FutureTask实际上就是既实现Future接口, 也要实现Runnable接口
.
FutureTask原理
FutureTask简述
想要获得一个FutureTask, 首先要提供一个Callable对象, FutureTask提供了一个单参数的构造函数, 支持传递一个Callable对象, 内部还设置了一个状态属性state, 初始化为NEW(int类型, 值为0)
, 并且这个state是一个volatile的, 以供其它同时调用get()方法的线程第一时间知道任务完成了
.
// volatile的state状态属性, 使得任务执行完之后能让其它get阻塞的线程第一时间知道当前任务已经完成了private volatile int state;public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable}// FutureTask提供的几个状态枚举值private static final int NEW = 0;private static final int COMPLETING = 1;private static final int NORMAL = 2;private static final int EXCEPTIONAL = 3;private static final int CANCELLED = 4;private static final int INTERRUPTING = 5;private static final int INTERRUPTED = 6;
我们知道, 在线程池中, execute方法是一个异步的, 直接将Runnable交给线程去执行, 自己不会阻塞
, 因此在submit方法中的execute(task)这行, 也仅仅就是将这个FutureTask当做一个Runnable, 使用线程池中的线程去执行它的run方法, 然后就直接返回这个futureTask.
submit返回后, 我们会调用这个返回的futureTask的get方法, 如果此时任务还没执行完, 这里将会阻塞, 直到任务执行完才会放行, 并且得到Callable的返回值. 因此, 可以确定, 这个阻塞的原因, 就发生在FutureTask的run()方法和get()方法中了
.
get方法
public V get() throws InterruptedException, ExecutionException { int s = state; // 如果任务没完成, 将会进入awaitDone方法 if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s);}
上面我们得知, 如果任务完成了, 这时候将会直接进入report(s)方法中, 返回任务处理结果, 而如果任务没有完成, state的状态将会是NEW, 也就是0, 这时是小于COMPLETING(1)的, 将会进入awaitDone方法
. get方法的阻塞正是阻塞在awaitDone方法中
.
awaitDone方法
首先我们只看主线, 不看超时的情况, 也不看打断情况, 在无超时的get()方法中, 传递给awaitDone方法的两个参数分别为false和0, 因此awaitDone方法简化如下:
private int awaitDone(boolean timed, long nanos) throws InterruptedException { WaitNode q = null; boolean queued = false; for (;;) { int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) // 这里的写法很精髓, q.next = waiters, 既保证了传入的第三个参数是waiters的原始值 // 还把waiters属性赋值给了q的next属性, 因此这是个头插法 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else LockSupport.park(this); }}
首先这是一个for的死循环
, 我们假设调用get()方法的此时, 线程池的execute还没执行FutureTask的run方法的地方(或者说还没执行到更改state状态的地方), 那么这时候int s = state得到的s是0, 肯定小于COMPLETING
, 于是进入了q == null的判断, 此时q肯定是null, 于是就调用WaitNode的构造方法创建WaitNode对象, 创建WaitNode的时候, 将WaitNode的thread设置为当前执行get()方法的线程
. WaitNode类的代码如下所示:
static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); }}
于是q中就包含了当前调用get()方法的线程, 是为thread属性. 初始化完毕q之后, 回到循环入口处
, 继续往下.
接下来又进入!queued
的判断中. queued的含义是q是否入队
, 第一次进入循环中, queued是false, 表示未入队, 于是就要入队了. 入队使用了cas操作, 因为可能有多个线程调用了get()方法
, 为了线程安全地入队, 就必须使用cas+死循环保证线程安全. 在这个cas方法中, 参数传递了this, waitersOffset, 实际上waitersOffset就是FutureTask类中waiters属性的偏移量
, 这个操作结束后, 将会线程安全地把q这个节点加入到waiters属性中. 如果多个线程同时调用get()方法导致某次的cas失败, 将会再次回到for循环的入口处, 继续尝试cas, 直到所有调用get()方法的线程创建的WaitNode节点都加入到waiters属性中为止
.
入队成功后, queued被置为true, 于是又回到循环的入口处
, 继续往下.
这次将会进入到最终的else块中, 这时候将直接把调用get()方法的线程park. 于是get方法就阻塞在这里了
.
run方法
同样的规矩, 不看异常, 不看中断, 只看主线, 将run方法简化成如下代码:
public void run() { // FutureTask只能执行一次 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { } if (ran) set(result); } } finally { }}
run方法很简单, 直接调用传递过来的Callable对象的call()方法, 得到result, 如果没有出异常, ran变量将被置为true, 然后进入if(ran)的判断中, 判断成立, 调用set(result)方法, 传入call方法的返回值
.
set方法
set方法要完成的任务就是, 让之前调用了get()方法被阻塞的线程全部唤醒, 并且将Callable的call()方法的返回值设置到这个FutureTask对象的属性中
, 我们知道,一个FutureTask对象是只能执行一次任务的
, 于是这个执行结果也必然只有一个, 那么就可以使用一个属性去保存这个执行结果, 这个属性就是outcome
.
// FutureTask的属性, 用于保存本FutureTask对象的执行结果private Object outcome;protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); }}
首先使用cas修改state的状态值, 从NEW改为COMPLETING
, 然后再将outcome属性赋值为call()方法的返回值之后, 又将state属性设置为NORMAL
, 再调用finishCompletion()方法, 去唤醒之前调用get()方法被阻塞的线程
.
finishCompletion方法
@FunctionalInterfacepublic interface Callable<V> { V call() throws Exception;}@FunctionalInterfacepublic interface Runnable { public abstract void run();}0
该方法的逻辑大概就是, 先拿到waiters的头节点, 再使用cas将waiters属性置为null, 然遍历之前拿到的头结点, 依次唤醒头结点中的线程
, 操作也就是简单的链表操作, 内外for循环都执行完后, 所有之前调用get()方法并且阻塞在awaitDone方法中的线程都将会被解阻塞
.
最后的done()方法是一个空方法
.
截止目前, outcome属性是call()方法的返回值, state=NORMAL(2)
.
awaitDone方法解阻塞
调用get()方法阻塞的线程, 之前是阻塞在最后的else块中的LockSupport.park(this)中
, 被finishCompletion方法唤醒之后, 依然会从park的位置开始执行代码
, 于是又回到for循环的入口处
, 这时候由于state是NORMAL(2)
, 于是就会进入这个if块中.
@FunctionalInterfacepublic interface Callable<V> { V call() throws Exception;}@FunctionalInterfacepublic interface Runnable { public abstract void run();}1
判断如果q不为空, 正常情况下肯定不是空, 再将q包含的线程也取消引用, 最终返回状态值s, 正常情况下, 返回的s就是NORMAL, 也就是2.
get()方法返回
@FunctionalInterfacepublic interface Callable<V> { V call() throws Exception;}@FunctionalInterfacepublic interface Runnable { public abstract void run();}2
awaitDone方法返回了状态s为2, 于是get方法调用report(s), 返回call()方法的返回值.
@FunctionalInterfacepublic interface Callable<V> { V call() throws Exception;}@FunctionalInterfacepublic interface Runnable { public abstract void run();}3
直接返回先前设置的outcome
.
至此, FutureTask的原理就分析完毕了.
FutureTask原理总结
1. get()方法阻塞, 阻塞的是调用get()方法的线程.
2. 每个调用get()方法阻塞的线程, 都被包装在一个WaitNode的对象中, 并且形成一个链表, FutureTask对象的waiters属性保存头结点(头插法, 头结点是最后一个调用get()方法的线程所在节点).
3. 每个调用get()方法阻塞的线程, 都会被阻塞在awaitDone的最后一个else块中: LockSupport.park(this).
4. run()方法内部调用call()方法, 并将call()方法的返回值保存到FutureTask的outcome属性中, 并修改本FutureTask对象的状态为NORMAL.
5. run()方法中调用finishCompletion()方法, 该方法将会遍历waiters属性, unpark每个WaitNode节点的线程.
6. 调用get()方法阻塞的线程被unpark之后, 回到for循环入口, 此时由于状态已经是NORMAL, awaitDone方法将返回, 顺理成章地get()方法也会返回之前赋值的outcome属性.
7. FutureTask生命周期无法后退, 只能执行一次, 执行成功之后, 状态就是确定的了, 不会再改变.
FutureTask局限性
1. 无法并发执行多个任务
: 由于get方法是阻塞的, 因此只能等待所有任务执行完成.
2. 无法组合多个任务
: 如果需要对多个任务的执行结果进行操作, 只能一个一个的来.
3. 没有异常处理
: Future接口中没有对异常进行处理的方法.
4. 使用不够优雅
: 如果在执行完任务想做一些事情, 就必须在调用get()方法的这个方法里面去写.
CompletionService
相比于FutureTask, CompletionService增强了一些性能, 比如有一组任务, 分别从商品服务中获取10个商品的信息, 然后存储. 为了效率, 我们使用FutureTask, 我们就只能分别使用10次get()方法去获取每个商品的返回值. 但这并不是核心问题所在, 问题是我们的get()将会一直阻塞在耗时最长的商品获取的位置, 那么我们开始存储的动作, 一定是要在所有商品都获取成功后才能执行, 这样的话, 效率并不是最优的.
如果有一种方案, 能让先完成任务的FutureTask先去做后续的操作
, 比如10个商品里面, 前面8个商品都获取成功, 不需要管后面2个是否获取成功, 前面8个成功的依次按照完成顺序去执行存储动作, 这样等最慢的那个商品获取成功后, 可能前面的9个商品早已保存成功, 这样对比上面的方式快了9次的存储, 基本属于最优效率了. 那么这样的方式, 就是CompletionService实现的
.
CompletionService概述
CompletionService只有一个实现类
, 叫做ExecutorCompletionService
.
@FunctionalInterfacepublic interface Callable<V> { V call() throws Exception;}@FunctionalInterfacepublic interface Runnable { public abstract void run();}4
通过poll()和take()方法的返回值可以猜出来, 在CompletionService的实现类中, 应该要提供一个阻塞队列, 这个阻塞队列的元素类型是Future类型
.
ExecutorCompletionService应用
下面看看示例
@FunctionalInterfacepublic interface Callable<V> { V call() throws Exception;}@FunctionalInterfacepublic interface Runnable { public abstract void run();}5
可以看到, 提交任务的顺序是id从大到小(这样可以更大概率让id更大的先去执行), 然后在获取商品名字的方法中, 根据id休眠id秒, 但是最终打印的结果, 依然是每隔1秒打印一次获取结果, 这样就完成了先执行完查询操作的任务先去执行下一步所做的事情, 而不必要去等待其它任务的执行完成
.
ExecutorCompletionService原理
在创建ExecutorCompletionService的时候, 需要提供一个线程池. 在它的构造函数中, 核心是创建了一个无界队列completionQueue
, 泛型为Future. Future的泛型与创建ExecutorCompletionService时提供的泛型一致.
@FunctionalInterfacepublic interface Callable<V> { V call() throws Exception;}@FunctionalInterfacepublic interface Runnable { public abstract void run();}6
submit方法
@FunctionalInterfacepublic interface Callable<V> { V call() throws Exception;}@FunctionalInterfacepublic interface Runnable { public abstract void run();}7
先将传递过来的Callable构造成一个FutureTask, 然后再执行线程池的execute方法, 传入了一个QueueingFuture
, 看看QueueingFuture是什么:
@FunctionalInterfacepublic interface Callable<V> { V call() throws Exception;}@FunctionalInterfacepublic interface Runnable { public abstract void run();}8
QueueingFuture继承了FutureTask, 封装了一个task, 也就是前面根据传入的Callable创建的那个FutureTask
. 重写了done()方法, 前面在分析FutureTask的时候, 我们看到过, 在FutureTask中, done()方法是一个空方法, 在执行完唤醒所有阻塞在get()方法的线程之后会调用. 但是在QueueingFuture中, 重写了它, 内容就是往无界阻塞队列中填入执行完的FutureTask
.
其它内容与FutureTask基本一致, 只是有一些小出入, 在构造QueueingFuture的时候, 使用了super(Runnable, V)这个构造函数, 但是其实也没有什么难以理解的, 这只不过是一个套娃而已, 对于FutureTask来说, 只要能执行它的run()方法, 就是完成了这个FutureTask的任务了
.
对于FutureTask的执行过程, 前面部分已经详细介绍了, 这里就是多了个done()方法, 在唤醒所有阻塞线程之后, 将已经执行完成的FutureTask加入到无界阻塞队列中
.
take方法
@FunctionalInterfacepublic interface Callable<V> { V call() throws Exception;}@FunctionalInterfacepublic interface Runnable { public abstract void run();}9
在我们的应用代码的末尾, 使用for循环不断地从这个无界阻塞队列中获取元素, 这些元素就是已经完成了的FutureTask. 获取到FutureTask之后, 再调用get()获取返回值, 这时候调用get()方法一定不会阻塞, 因为FutureTask已经完成了.
CompletionService总结
1. CompletionService在构造时需要传入一个线程池, 这样能够自己定义, 降低任务执行风险
.
2. CompletionService在异步执行任务时, 借助了阻塞队列, 控制任务的有序性, 避免无用的等待
.
CompletableFuture
前面介绍的FutureTask和CompletionService, 都只是在处理简单的任务的时候, 表现比较好, 但是涉及到多任务合并, 多任务互相依赖, 多任务串行等任务复杂编排的情况, 这样就只能手动控制了. 而CompletableFuture则就是为了解决这个问题而出现的.
CompletableFuture是Future接口的拓展, CompletableFuture实现了Future接口, 并在此基础上完善了Future的不足之处, 实现了对任务的编排能力, 借助CompletableFuture, 我们可以轻松的进行对任务的编排
, 而不需要像在使用CountDownLatch等工具类那样还要进行大量的其它操作.
由于所实现的CompletionStage方法实在太多, 就不贴出来了, CompletableFuture为了实现任务编排, api众多, 只要能够使用就可以了, 对于Future原理上的研究, 我认为只要能够理解FutureTask的原理就可以了, 至于CompletableFuture, 核心的几个api能够使用就行了. 下面将介绍几个CompletableFuture的核心api.
执行任务runAsync和supplyAsync
runAsync是执行一个不带返回值的异步任务, 返回值是一个泛型为Void的CompletableFuture
.
supplyAsync是执行一个带返回值的任务, 返回值是一个泛型为Supplier的泛型一样的CompletableFuture
.
public interface Future<V> { // 取消当前任务 boolean cancel(boolean mayInterruptIfRunning); // 当前任务是否被取消 boolean isCancelled(); // 当前任务是否已完成 boolean isDone(); // 拿到当前任务的处理结果, 一直阻塞 V get() throws InterruptedException, ExecutionException; // 拿到当前任务的处理结果, 超过超时时间则抛异常 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;}0
runAsync需要提供一个runnable, 那么它真正执行的任务的逻辑就是Runnable的run方法的逻辑.
supplyAsync需要提供一个Supplier, 那么它真正执行的任务的逻辑就是Supplier的get方法的逻辑, 并且Future中的包装结果就是get方法的返回结果.
两种方法都支持传入一个线程池exector, 如果不传递线程池, 则使用默认的一个公共线程池
:
public interface Future<V> { // 取消当前任务 boolean cancel(boolean mayInterruptIfRunning); // 当前任务是否被取消 boolean isCancelled(); // 当前任务是否已完成 boolean isDone(); // 拿到当前任务的处理结果, 一直阻塞 V get() throws InterruptedException, ExecutionException; // 拿到当前任务的处理结果, 超过超时时间则抛异常 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;}1
get()和join()
runAsync和supplyAsync都是异步方法, 执行后不会等待返回结果, 因此这样肯定不行, CompletableFuture提供了join方法和get方法, 用来等待异步任务的执行结束
. 示例如下:
public interface Future<V> { // 取消当前任务 boolean cancel(boolean mayInterruptIfRunning); // 当前任务是否被取消 boolean isCancelled(); // 当前任务是否已完成 boolean isDone(); // 拿到当前任务的处理结果, 一直阻塞 V get() throws InterruptedException, ExecutionException; // 拿到当前任务的处理结果, 超过超时时间则抛异常 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;}2
使用get()或者join()将会得到异步任务执行的结果, 泛型类型保证了结果的类型不需要强制转换
.
whenComplete
whenComplete支持3种模式, 代码如下:
public interface Future<V> { // 取消当前任务 boolean cancel(boolean mayInterruptIfRunning); // 当前任务是否被取消 boolean isCancelled(); // 当前任务是否已完成 boolean isDone(); // 拿到当前任务的处理结果, 一直阻塞 V get() throws InterruptedException, ExecutionException; // 拿到当前任务的处理结果, 超过超时时间则抛异常 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;}3
一种是同步执行, 也就是使用前面使用的同一个线程执行业务逻辑, 另一种是异步执行, 支持传入一个线程池, 如果不传入线程池, 则使用默认的common线程池
.
whenComplete
whenComplete作用于前一个任务执行完成后, whenComplete中的逻辑会执行
, whenComplete支持同步和async两种方式, whenComplete同步就是使用主线程执行whenComplete中的业务
.
而whenCompleteAsync则支持传递一个线程池executor, 如果不传入, 则使用默认的common线程池执行whenComplete中的业务逻辑, 如果传入, 就使用传入的自定义线程池执行
.
后面很多api都带有async模式, 意义是一样的
, 后面将不再赘述, 直接提供api示例.
public interface Future<V> { // 取消当前任务 boolean cancel(boolean mayInterruptIfRunning); // 当前任务是否被取消 boolean isCancelled(); // 当前任务是否已完成 boolean isDone(); // 拿到当前任务的处理结果, 一直阻塞 V get() throws InterruptedException, ExecutionException; // 拿到当前任务的处理结果, 超过超时时间则抛异常 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;}4
whenCompleteAsync
public interface Future<V> { // 取消当前任务 boolean cancel(boolean mayInterruptIfRunning); // 当前任务是否被取消 boolean isCancelled(); // 当前任务是否已完成 boolean isDone(); // 拿到当前任务的处理结果, 一直阻塞 V get() throws InterruptedException, ExecutionException; // 拿到当前任务的处理结果, 超过超时时间则抛异常 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;}5
thenApply
thenApply
public interface Future<V> { // 取消当前任务 boolean cancel(boolean mayInterruptIfRunning); // 当前任务是否被取消 boolean isCancelled(); // 当前任务是否已完成 boolean isDone(); // 拿到当前任务的处理结果, 一直阻塞 V get() throws InterruptedException, ExecutionException; // 拿到当前任务的处理结果, 超过超时时间则抛异常 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;}6
thenApplyAsync
public interface Future<V> { // 取消当前任务 boolean cancel(boolean mayInterruptIfRunning); // 当前任务是否被取消 boolean isCancelled(); // 当前任务是否已完成 boolean isDone(); // 拿到当前任务的处理结果, 一直阻塞 V get() throws InterruptedException, ExecutionException; // 拿到当前任务的处理结果, 超过超时时间则抛异常 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;}7
thenCompose
thenCompose需要提供一个CompletableFuture, 业务逻辑也是在这个supplyAsync中执行的, 因此本身就不可能使用主线程执行业务, 但是提交任务的线程却是主线程
. 根据我的研究, 我也就只发现了thenCompose和thenApply就这一点不同点, 其它的感觉差不多.
thenComposeAsync支持传入一个线程池, 那么提交任务的线程就不会再是主线程了, 而是传入的这个线程池中的线程, 如果没传入, 则使用默认的common线程池
.
thenCompose
public interface Future<V> { // 取消当前任务 boolean cancel(boolean mayInterruptIfRunning); // 当前任务是否被取消 boolean isCancelled(); // 当前任务是否已完成 boolean isDone(); // 拿到当前任务的处理结果, 一直阻塞 V get() throws InterruptedException, ExecutionException; // 拿到当前任务的处理结果, 超过超时时间则抛异常 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;}8
thenComposeAsync
public interface Future<V> { // 取消当前任务 boolean cancel(boolean mayInterruptIfRunning); // 当前任务是否被取消 boolean isCancelled(); // 当前任务是否已完成 boolean isDone(); // 拿到当前任务的处理结果, 一直阻塞 V get() throws InterruptedException, ExecutionException; // 拿到当前任务的处理结果, 超过超时时间则抛异常 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;}9
thenCombine
thenCombine
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); // 将从外部接收的Callable类型直接传递给FutureTask的构造函数, 得到一个FutureTask对象 RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask;}protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable);}0
thenCombineAsync
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); // 将从外部接收的Callable类型直接传递给FutureTask的构造函数, 得到一个FutureTask对象 RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask;}protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable);}1
thenAccept
对单个结果进行消费, 不具有返回值, 但是可以使用前面的任务的处理结果
.
thenAccept
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); // 将从外部接收的Callable类型直接传递给FutureTask的构造函数, 得到一个FutureTask对象 RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask;}protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable);}2
thenAcceptAsync
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); // 将从外部接收的Callable类型直接传递给FutureTask的构造函数, 得到一个FutureTask对象 RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask;}protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable);}3
thenAcceptBoth
对两个结果进行消费, 不具有返回值, 但是要使用两个异步任务的结果
.
thenAcceptBoth
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); // 将从外部接收的Callable类型直接传递给FutureTask的构造函数, 得到一个FutureTask对象 RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask;}protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable);}4
thenAcceptBothAsync
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); // 将从外部接收的Callable类型直接传递给FutureTask的构造函数, 得到一个FutureTask对象 RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask;}protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable);}5
thenRun
某个任务执行完之后执行一个操作, 这个操作不依赖那个任务的返回值, 并且这个操作也不会返回任何东西. 传入的是一个Runnable对象
.
thenRun
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); // 将从外部接收的Callable类型直接传递给FutureTask的构造函数, 得到一个FutureTask对象 RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask;}protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable);}6
thenRunAsync
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); // 将从外部接收的Callable类型直接传递给FutureTask的构造函数, 得到一个FutureTask对象 RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask;}protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable);}7
AllOf
Allof是一个静态方法:
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); // 将从外部接收的Callable类型直接传递给FutureTask的构造函数, 得到一个FutureTask对象 RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask;}protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable);}8
这个方法需要传递多个CompletableFuture, 顾名思义, 就是要等待所有传入的CompletableFuture对象的任务都执行完, 用法是得到allOf的返回值后, 调用get()/join()方法, 就可以等待所有传入的异步任务执行完成了
.
示例如下: 本示例中传入了3个CompletableFuture, 每个CompletableFuture都是先生产出一个User对象, 然后再使用thenAccept将前面的任务得到的User对象放入users列表中.
调用join()方法对allOf阻塞
.
最终解阻塞后, 得到users列表中正是这3个User对象.
get()/join()方法返回的是Void
.
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); // 将从外部接收的Callable类型直接传递给FutureTask的构造函数, 得到一个FutureTask对象 RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask;}protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable);}9
AnyOf
anyOf用法与allOf差不多, 也是传入多个CompletableFuture, 使用join()/get()等待传入的异步任务执行完成.
区别在于anyOf只要有一个返回, 就会解阻塞, 并且join()/get()得到的结果是一个Object类型, 正是传入的这些CompletableFuture中执行最快的那个任务的返回值
.
下面代码同样传入3个生产User对象的CompletableFuture, 循环执行5次, 随机得到了不同的User对象.
// volatile的state状态属性, 使得任务执行完之后能让其它get阻塞的线程第一时间知道当前任务已经完成了private volatile int state;public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable}// FutureTask提供的几个状态枚举值private static final int NEW = 0;private static final int COMPLETING = 1;private static final int NORMAL = 2;private static final int EXCEPTIONAL = 3;private static final int CANCELLED = 4;private static final int INTERRUPTING = 5;private static final int INTERRUPTED = 6;0原文:https://juejin.cn/post/7097144840642625566