English 简体中文 繁體中文 한국 사람 日本語 Deutsch русский بالعربية TÜRKÇE português คนไทย french
查看: 2|回复: 0

JUC并发—14.Future模式和异步编程分析

[复制链接]
查看: 2|回复: 0

JUC并发—14.Future模式和异步编程分析

[复制链接]
查看: 2|回复: 0

232

主题

0

回帖

706

积分

高级会员

积分
706
tua4w9Lq

232

主题

0

回帖

706

积分

高级会员

积分
706
2025-2-27 00:45:12 | 显示全部楼层 |阅读模式
大纲
1.FutureTask(Future/Callable)的使用例子
2.FutureTask(Future/Callable)的实现原理
3.FutureTask(Future/Callable)的源码分析
4.CompletableFuture的基本介绍
5.CompletionStage方法及作用说明
6.CompletableFuture的实现原理分析
7.CompletableFuture的核心源码分析
 
1.FutureTask(Future/Callable)的使用例子
Future/Callable实现了一个异步执行并带有返回结果的功能。Future表示获取一个异步执行的结果,Callable表示一个异步执行的任务,Callable会产生一个结果并给到Future。
 
Future/Callable的使用例子如下:
public class FutureCallableExample {    static class CalculationCallable implements Callable<Integer> {        private int x;        private int y;        public CalculationCallable(int x, int y) {            this.x = x;            this.y = y;        }        @Override        public Integer call() throws Exception {            System.out.println("开始执行:" + new Date());            TimeUnit.SECONDS.sleep(2);//模拟任务执行的耗时            return x + y;        }    }    public static void main(String[] args) throws ExecutionException, InterruptedException {        CalculationCallable calculationCallable = new CalculationCallable(1, 2);        FutureTask<Integer> futureTask = new FutureTask<>(calculationCallable);        new Thread(futureTask).start();        System.out.println("开始执行futureTask:" + new Date());        Integer rs = futureTask.get();        System.out.println("执行结果:" + rs);        System.out.println("结束执行futureTask:" + new Date());    }}首先定义一个CalculationCallable类。该类实现了Callable接口,并重写了call()方法,它的功能就是定义一个具有返回值的任务。
 
然后用FutureTask声明一个带有返回值的任务,把CalculationCallable作为构造参数传递进去。
 
FutureTask实现了Future接口和Runnable接口。我们知道线程执行完之后是不可能获得一个返回值的。Future之所以能够获得返回值,是因为在线程执行中做了相关处理。FutureTask就是用来获得线程执行结果的。
 
接着把FutureTask作为一个任务传入Thread的构造方法,让线程去执行。FutureTask既然实现了Runnable接口,创建FutureTask时又把实现了Callable接口的任务传递到其构造方法中,那么FutureTask的run()方法中会调用Callable接口的call()方法的实现,最终在获得返回值之后保存到某个属性中。
 
最后使用FutureTask.get()方法来获得返回值,这个get()方法是个阻塞方法。当线程还没有执行完FutureTask之前,主线程会阻塞在get()方法中。直到FutureTask执行结束,主线程才会被唤醒。
 
2.FutureTask(Future/Callable)的实现原理
(1)FutureTask的类关系
(2)FutureTask的实现核心
 
(1)FutureTask的类关系
Runnable接口的实现可以被线程执行,Future接口提供了获取线程执行结果的方法。RunnableFuture接口同时继承了Runnable接口和Future接口,而FutureTask类则实现了RunnableFuture接口。
 
创建FutureTask类实例时,会传入Callable接口的实现类实例作为构造参数,也就是FutureTask类会封装Callable接口的实现类。这样在启动线程后执行FutureTask类重写Runnable接口的run()方法时,FutureTask类实例就会把执行Callable接口call()方法的运行结果保存起来,然后通过Future接口提供的get()方法来获取运行结果。
 
一.FutureTask的类关系源码
//A cancellable asynchronous computation.//This class provides a base implementation of Future, with methods to start and cancel a computation, //query to see if the computation is complete, and retrieve the result of the computation.//The result can only be retrieved when the computation has completed; //the get methods will block if the computation has not yet completed.//Once the computation has completed, the computation cannot be restarted or cancelled //(unless the computation is invoked using #runAndReset).//A FutureTask can be used to wrap a Callable or Runnable object.  //Because FutureTask implements Runnable, //a FutureTask can be submitted to an Executor for execution.//In addition to serving as a standalone class, //this class provides protected functionality that may be useful when creating customized task classes.public class FutureTask<V> implements RunnableFuture<V> {    ...    //Creates a FutureTask that will, upon running, execute the given Callable.    public FutureTask(Callable<V> callable) {        if (callable == null) {            throw new NullPointerException();        }        this.callable = callable;        this.state = NEW;//ensure visibility of callable    }    ...}//A Future that is Runnable.//Successful execution of the run method causes completion of the Future and allows access to its results.public interface RunnableFuture<V> extends Runnable, Future<V> {    //Sets this Future to the result of its computation unless it has been cancelled.    void run();}//A Future represents the result of an asynchronous computation.//Methods are provided to check if the computation is complete, //to wait for its completion, and to retrieve the result of the computation.  //The result can only be retrieved using method get when the computation has completed, //blocking if necessary until it is ready.//Cancellation is performed by the cancel method.  //Additional methods are provided to determine if the task completed normally or was cancelled. //Once a computation has completed, the computation cannot be cancelled.//If you would like to use a Future for the sake of cancellability but not provide a usable result, //you can declare types of the form Future<?> and return null as a result of the underlying task.public interface Future<V> {    //用来取消任务,取消成功则返回true,取消失败则返回false    //mayInterruptIfRunning参数表示是否允许取消正在执行却没有执行完毕的任务,设为true,则表示可以取消正在执行过程中的任务    //如果任务已完成,则无论mayInterruptIfRunning为true还是false,此方法都返回false,即如果取消已经完成的任务会返回false    //如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false    //如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true    boolean cancel(boolean mayInterruptIfRunning);    //表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回true    boolean isCancelled();    //表示任务是否已经完成,若任务完成,则返回true    boolean isDone();    //获取执行结果,如果最终结果还没得出该方法会产生阻塞,直到任务执行完毕返回结果    V get() throws InterruptedException, ExecutionException;    //获取执行结果,如果在指定时间内,还没获取到结果,则抛出TimeoutException    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;}@FunctionalInterfacepublic interface Runnable {    public abstract void run();}二.FutureTask的类关系图

(2)FutureTask的实现核心
一.FutureTask本身是一个线程
通过new Thread(new FutureTask(callable)).start()来启动,必然会执行FutureTask实现Runnable接口的run()方法。
 
二.Runnable接口的run()方法是没有返回值的
实际上返回值是由Callable接口的call()方法提供,所以调用FutureTask的run()方法,会触发调用Callable的call()方法。
 
三.通过Future接口的get()方法阻塞式获得返回值
如果在FutureTask的run()方法中调用Callable接口的call()方法执行任务时,需要比较长的时间,那么为了能够正确获得返回值,Future接口的get()方法必须实现阻塞,直到call()方法执行完毕。
 
四.需要一个队列来保存阻塞的线程
涉及线程阻塞和唤醒,要使用LockSupport来阻塞和唤醒队列中的线程。
 
3.FutureTask(Future/Callable)的源码分析
(1)FutureTask的核心属性
(2)FutureTask的run()方法
(3)FutureTask的get()方法
(4)FutureTask的finishCompletion()方法
(5)FutureTask的实现原理总结
 
(1)FutureTask的核心属性
一.state
代表任务在运行过程中的状态(7种)。
二.callable
当前要执行的任务。
三.outcome
任务的执行结果,通过Future.get()获取的值。
四.runner
当前执行callable任务的线程。
五.waiter
用来保存所有等待任务执行结束的线程的单向链表。
public class FutureTask<V> implements RunnableFuture<V> {    //The run state of this task, initially NEW.     //The run state transitions to a terminal state only in methods set, setException, and cancel.    //During completion, state may take on transient values of COMPLETING (while outcome is being set)     //or INTERRUPTING (only while interrupting the runner to satisfy a cancel(true)).     //Transitions from these intermediate to final states use cheaper ordered/lazy writes     //because values are unique and cannot be further modified.    //Possible state transitions:    //NEW(初始状态) -> COMPLETING(正在设置任务结果) -> NORMAL,这是任务正常执行完毕时状态的变更流程    //NEW(初始状态) -> COMPLETING(正在设置任务结果) -> EXCEPTIONAL,这是任务执行异常时状态的变更流程    //NEW(初始状态) -> CANCELLED(任务被取消),这是调用了Future.cancel()方法    //NEW(初始状态) -> INTERRUPTING(正在中断执行任务的线程) -> INTERRUPTED(任务被中断)    //代表任务在运行过程中的状态(7种)    private volatile int state;    private static final int NEW          = 0;    private static final int COMPLETING   = 1;    private static final int NORMAL       = 2;    private static final int EXCEPTIONAL  = 3;    private static final int CANCELLED    = 4;    private static final int INTERRUPTING = 5;    private static final int INTERRUPTED  = 6;    //The underlying callable; nulled out after running    //当前要执行的任务    private Callable<V> callable;        //The result to return or exception to throw from get()    //任务的执行结果,通过Future.get()获取的值    private Object outcome;        //The thread running the callable; CASed during run()    //当前执行callable任务的线程    private volatile Thread runner;        //Treiber stack of waiting threads    //用来保存所有等待任务执行结束的线程的单向链表    private volatile WaitNode waiters;    ...}(2)FutureTask的run()方法
使用线程来执行FutureTask任务时,比如new Thread(new FutureTask(callable)).start(),会回调FutureTask的run()方法。
 
FutureTask的run()方法的执行流程如下:
首先判断当前状态是否为NEW,并使用CAS设置runner属性为当前线程。如果当前状态不是NEW或者CAS设置失败,则说明已经有其他线程正在执行当前任务了,于是直接返回。然后获取通过构造方法传入的Callable接口的实现类实例callable,接着调用Callable接口的实现类实例callable中的call()方法获得执行结果,最后调用FutureTask的set()方法把执行结果保存到outcome属性中。
public class FutureTask<V> implements RunnableFuture<V> {    ...    //代表任务在运行过程中的状态(7种)    private volatile int state;        //当前要执行的任务    private Callable<V> callable;        //任务的执行结果,通过Future.get()获取的值    private Object outcome;        //当前执行callable任务的线程    private volatile Thread runner;        //用来保存所有等待任务执行结束的线程的单向链表    private volatile WaitNode waiters;        private static final sun.misc.Unsafe UNSAFE;    private static final long stateOffset;    private static final long runnerOffset;    private static final long waitersOffset;    static {        try {            UNSAFE = sun.misc.Unsafe.getUnsafe();            Class<?> k = FutureTask.class;            stateOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("state"));            runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("runner"));            waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiters"));        } catch (Exception e) {            throw new Error(e);        }    }        public void run() {        //首先判断当前状态是否为NEW,并使用CAS把runner属性设置为当前线程        //如果当前状态不是NEW或者CAS设置失败,说明已经有其他线程正在执行当前任务了,于是直接返回        if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) {             return;        }        try {            //获取通过构造方法传入的Callable接口的实现类实例callable            Callable<V> c = callable;            if (c != null && state == NEW) {                V result;                boolean ran;                try {                    //然后调用callable中的call()方法获得执行结果                    result = c.call();                    ran = true;                } catch (Throwable ex) {                    result = null;                    ran = false;                    setException(ex);                }                if (ran) {                    //调用set()方法把执行结果保存到outcome属性中                    set(result);                }            }        } finally {            //runner must be non-null until state is settled to prevent concurrent calls to run()            runner = null;            //state must be re-read after nulling runner to prevent leaked interrupts            int s = state;            if (s >= INTERRUPTING) {                handlePossibleCancellationInterrupt(s);            }        }    }        //Sets the result of this future to the given value unless this future has already been set or has been cancelled.    //This method is invoked internally by the run method upon successful completion of the computation.    protected void set(V v) {        //CAS修改任务状态为COMPLETING        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {            //把调用call()方法获取到的结果保存到outcome            outcome = v;            //CAS修改任务状态为NORMAL            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state            finishCompletion();        }    }    ...}(3)FutureTask的get()方法
FutureTask的get()方法的逻辑很简单,如果当前状态不是COMPLETING,就调用awaitDone()方法让当前线程阻塞等待,直到任务执行完成。其中awaitDone()方法的返回值表示任务的状态,当任务进入终止状态后,会调用reports()方法,根据状态类型来决定是返回运行结果还是抛异常。
 
在FutureTask的awaitDone()方法中会进行自旋。首先如果检测到线程被中断,则把加入等待队列中的线程移除。然后如果发现任务已经进入终止状态,则直接返回任务状态。如果任务正在设置执行结果,则通过Thread.yield()让出当前线程的CPU资源。
 
当FutureTask.awaitDone()方法第一次调用时,在第一次for循环中会初始化一个WaitNode结点,这个WaitNode结点便保存了调用FutureTask.get()方法的线程。在第二次for循环中会通过CAS按头插法将WaitNode结点插入waiters链表。在之后的for循环中,也就是当前线程已经加入了等待队列后,如果发现任务还没有执行完成,则通过LockSupport的方法阻塞线程。
 
注意,被阻塞的线程在如下两种情况下会被唤醒:
一.任务执行完成后,在set()方法中调用finishCompletion()方法
二.线程被中断,在awaitDone()方法中执行中断检测if(Thread.interrupted())
public class FutureTask<V> implements RunnableFuture<V> {    ...    //代表任务在运行过程中的状态(7种)    private volatile int state;    //当前要执行的任务    private Callable<V> callable;    //任务的执行结果,通过Future.get()获取的值    private Object outcome;    //当前执行callable任务的线程    private volatile Thread runner;    //用来保存所有等待任务执行结束的线程的单向链表    private volatile WaitNode waiters;        public V get() throws InterruptedException, ExecutionException {        int s = state;        if (s <= COMPLETING) {            s = awaitDone(false, 0L);        }        return report(s);    }        //Awaits completion or aborts on interrupt or timeout.    //@param timed true if use timed waits    //@param nanos time to wait, if timed    //@return state upon completion    private int awaitDone(boolean timed, long nanos) throws InterruptedException {        //阻塞超时时间,timed表示是否传递阻塞时间的参数        final long deadline = timed ? System.nanoTime() + nanos : 0L;        WaitNode q = null;        boolean queued = false;        for (;;) {//自旋            //如果检测到线程被中断,则把加入等待队列中的线程移除            if (Thread.interrupted()) {                removeWaiter(q);                //抛出中断异常                throw new InterruptedException();            }            int s = state;            //如果任务已经进入终止状态,则直接返回任务状态            if (s > COMPLETING) {                if (q != null) {                    q.thread = null;                }                return s;            } else if (s == COMPLETING) {// cannot time out yet                //如果任务正在设置执行结果,则通过Thread.yield()让出当前线程的CPU资源                Thread.yield();            } else if (q == null) {                //awaitDone()方法被第一次调用时,q == null为true                //此时会初始化一个WaitNode结点并赋值给q,这个WaitNode结点保存了调用FutureTask.get()的线程                 q = new WaitNode();            } else if (!queued) {                //awaitDone()方法被第一次调用时,进入的第二次for循环                //便会通过CAS将q结点按头插法插入waiters单向链表中                queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);            } else if (timed) {                //如果当前线程加入等待队列后,任务还没有执行完成,则通过LockSupport的方法阻塞线程                nanos = deadline - System.nanoTime();                if (nanos <= 0L) {                    removeWaiter(q);                    return state;                }                LockSupport.parkNanos(this, nanos);            } else {                LockSupport.park(this);            }        }    }        //Returns result or throws exception for completed task.    //@param s completed state value    @SuppressWarnings("unchecked")    private V report(int s) throws ExecutionException {        Object x = outcome;        if (s == NORMAL) {            return (V)x;        }        if (s >= CANCELLED) {            throw new CancellationException();        }        throw new ExecutionException((Throwable)x);    }        static final class WaitNode {        volatile Thread thread;        volatile WaitNode next;        WaitNode() {            thread = Thread.currentThread();        }    }        private void removeWaiter(WaitNode node) {        if (node != null) {            node.thread = null;            retry:            for (;;) {//restart on removeWaiter race                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {                    s = q.next;                    if (q.thread != null) {                        pred = q;                    } else if (pred != null) {                        pred.next = s;                        if (pred.thread == null) {// check for race                            continue retry;                        }                    } else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) {                        continue retry;                    }                }                break;            }        }    }    ...}(4)FutureTask的finishCompletion()方法
当Callable任务执行完成后,FutureTask的set()方法会调用finishCompletion()方法唤醒链表中的阻塞线程。
public class FutureTask<V> implements RunnableFuture<V> {    ...    //代表任务在运行过程中的状态(7种)    private volatile int state;    //当前要执行的任务    private Callable<V> callable;    //任务的执行结果,通过Future.get()获取的值    private Object outcome;    //当前执行callable任务的线程    private volatile Thread runner;    //用来保存所有等待任务执行结束的线程的单向链表    private volatile WaitNode waiters;        ...    //Removes and signals all waiting threads, invokes done(), and nulls out callable.    private void finishCompletion() {        for (WaitNode q; (q = waiters) != null;) {            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {                for (;;) {                    Thread t = q.thread;                    if (t != null) {                        q.thread = null;                        LockSupport.unpark(t);                    }                    WaitNode next = q.next;                    if (next == null) {                        break;                    }                    q.next = null; // unlink to help gc                    q = next;                }                break;            }        }        done();        callable = null;    }    ...}(5)FutureTask的实现原理总结
FutureTask实现了Runnable和Future接口,FutureTask表示一个带有状态及执行结果的任务,而任务执行结果的获取是基于阻塞的方式来实现的。
 
在Callable接口的call()方法没有返回结果之前,其他线程调用FutureTask的get()方法获取结果时,FutureTask会构建一个waiters链表,把当前线程存储到链表中并通过LockSupport进行阻塞,直到call()方法返回后把结果设置到outcome属性以及唤醒阻塞的线程。

(6)FutureTask的局限性
局限性一:
在获取异步任务的执行结果时,要么调用get()方法阻塞等待返回结果,要么耗费CPU资源通过轮询调用FutureTask.isDone()方法来判断任务的执行状态,然后再调用get()方法获取返回结果。
 
局限性二:
FutureTask没有提供通知机制,没有办法知道任务什么时候执行完成。
 
4.CompletableFuture的基本介绍
(1)CompletableFuture的介绍
(2)CompletableFuture的类关系图
(3)CompletableFuture的方法说明
 
(1)CompletableFuture的介绍
CompletableFuture针对Future做了改进,也就是在异步任务执行完成后,主线程如果需要依赖该任务的执行结果来继续后面的操作,则可以不用通过等待来实现,只需向CompletableFuture传入一个回调对象。当异步任务执行完毕后,便会自动调用该回调对象(异步回调通知功能)。
 
CompletableFuture还提供了非常强大的功能。对于回调对象的执行,可以放到非任务线程中,也可以放到任务线程中。CompletableFuture提供了函数式编程能力,简化了异步编程的复杂性。还提供了多个CompletableFuture的组合与转化功能。
 
(2)CompletableFuture的类关系
CompletableFuture类实现了Future和CompletionStage这两个接口,其中Future接口提供了获取任务执行结果及任务执行状态的功能,CompletionStage接口表示任务执行的一个阶段。CompletionStage接口定义了很多方法,比如thenApply()、thenAccept()等。通过这些方法可以实现多个任务之间的时序关系,比如串行、并行、聚合等。
 
因此CompletableFuture既提供了Future阻塞式获取结果 + 任务状态的功能,也提供了CompletionStage的任务执行后触发回调 + 多个任务聚合的功能。
 
(3)CompletableFuture的方法说明
一.构建CompletableFuture的静态方法
二.runAsync()和supplyAsync()静态方法
三.allOf()和anyOf()静态方法
四.主动获取任务执行结果的方法
 
一.构建CompletableFuture的静态方法
CompletableFuture提供了4个静态方法来构建一个异步事件。由于传递进CompletableFuture这4个方法的任务需要异步执行,所以默认会使用ForkJoinPool.commonPool()提供的线程池来执行异步任务,当然也可以自定义一个线程池传入这些静态方法来执行异步任务。
//A Future that may be explicitly completed (setting its value and status), //and may be used as a CompletionStage,//supporting dependent functions and actions that trigger upon its completion.public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {    ...    //Returns a new CompletableFuture that is asynchronously completed     //by a task running in the ForkJoinPool#commonPool()     //with the value obtained by calling the given Supplier.    //@param supplier a function returning the value to be used to complete the returned CompletableFuture     //@param <U> the function's return type    //@return the new CompletableFuture    //带有返回值的异步执行方法,传入一个函数式接口,返回一个新的CompletableFuture对象    //默认使用ForkJoinPool.commonPool()作为线程池执行异步任务    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {        return asyncSupplyStage(asyncPool, supplier);    }    //Returns a new CompletableFuture that is asynchronously completed     //by a task running in the given executor with the value obtained by calling the given Supplier.    //@param supplier a function returning the value to be used to complete the returned CompletableFuture    //@param executor the executor to use for asynchronous execution    //@param <U> the function's return type    //@return the new CompletableFuture    //带有返回值的异步执行方法,传入一个函数式接口 + 一个线程池,返回一个新的CompletableFuture对象    //多了一个Executor参数,表示使用自定义线程池来执行任务    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) {        return asyncSupplyStage(screenExecutor(executor), supplier);    }        static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) {        if (f == null) throw new NullPointerException();        CompletableFuture<U> d = new CompletableFuture<U>();        e.execute(new AsyncSupply<U>(d, f));        return d;    }    //Returns a new CompletableFuture that is asynchronously completed    //by a task running in the ForkJoinPool#commonPool() after it runs the given action.    //@param runnable the action to run before completing the returned CompletableFuture    //@return the new CompletableFuture    //不带返回值的异步执行方法,传入一个Runnable参数,返回一个新的CompletableFuture对象    //默认使用ForkJoinPool.commonPool()作为线程池执行异步任务    public static CompletableFuture<Void> runAsync(Runnable runnable) {        return asyncRunStage(asyncPool, runnable);    }    //Returns a new CompletableFuture that is asynchronously completed     //by a task running in the given executor after it runs the given action.    //@param runnable the action to run before completing the returned CompletableFuture    //@param executor the executor to use for asynchronous execution    //@return the new CompletableFuture    //不带返回值的异步执行方法,传入一个Runnable参数 + 一个线程池,返回一个新的CompletableFuture对象    //多了一个Executor参数,表示使用自定义线程池来执行任务    public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {        return asyncRunStage(screenExecutor(executor), runnable);    }        static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {        if (f == null) throw new NullPointerException();        CompletableFuture<Void> d = new CompletableFuture<Void>();        e.execute(new AsyncRun(d, f));        return d;    }    ...}二.runAsync()和supplyAsync()静态方法
下面使用runAsync()方法来构建一个异步执行事件,由于runAsync()方法是没有返回值的,所以get()这个阻塞等待任务执行完成的方法返回的还是null。
CompletableFuture cf = CompletableFuture.runAsync(() -> {    System.out.println(Thread.currentThread().getName() + ":异步执行一个任务");});cf.get();//阻塞等待任务执行完成下面使用supplyAsync()方法来构建一个异步执行事件,由于supplyAsync()方法具有返回值,所以get()方法会返回"supplyAsync"。
CompletableFuture cf = CompletableFuture.supplyAsync(() -> {    System.out.println("supplyAsync");});cf.get();//阻塞等待任务执行完成三.allOf()和anyOf()静态方法
allOf()方法接收多个CompletableFuture的无返回值的任务。当所有的任务都执行结束后,返回一个新的CompletableFuture对象。
 
allOf()方法相当于实现了等待多个任务执行结束后再返回的功能,并且接收的CompletableFuture任务是通过runAsync()方法构建的。之所以无返回值,是因为当多个任务都具有返回值时get()方法不知取哪个。
public class CompletableFutureExample {    public static void main(String[] args) throws ExecutionException, InterruptedException {        CompletableFuture<Void> v1 = CompletableFuture.runAsync(() -> {            System.out.println("任务v1没有返回值");        });        CompletableFuture<Void> v2 = CompletableFuture.runAsync(() -> {            System.out.println("任务v2没有返回值");        });        //通过join()方法让主线程阻塞等待allOf()方法中的所有任务都执行完成后再继续执行        CompletableFuture.allOf(v1, v2).join();    }}anyOf()方法接收多个CompletableFuture的带有返回值的任务。当任何一个任务执行完成后,返回一个新的CompletableFuture对象。
 
anyOf()方法实现了等待多个任务中任何一个任务执行结束便返回的功能,接收的CompletableFuture任务是通过supplyAsync()方法构建的。
public class CompletableFutureExample {    public static void main(String[] args) throws ExecutionException, InterruptedException {        CompletableFuture<String> v1 = CompletableFuture.supplyAsync(() -> {            return "任务v1的返回值";        });        CompletableFuture<String> v2 = CompletableFuture.supplyAsync(() -> {            return "任务v2的返回值";        });        //通过join()方法让主线程阻塞等待anyOf()方法中的任何一个任务执行完成后再继续执行        CompletableFuture.anyOf(v1, v2).thenAccept(value -> System.out.println(value)).join();    }}四.主动获取任务执行结果的方法
由于CompletableFuture实现了Future接口,所以它可以像Future那样主动通过阻塞或轮询的方式来获得执行结果。比如可以通过get()方法阻塞式获取异步任务的执行结果(可中断),比如也可以通过join()方法阻塞式获取异步任务的执行结果(不可中断),此外通过complete()方法可实现线程间的数据传递 + 唤醒被get()阻塞的线程。
public class CompleteMethodExample {    public static class ClientThread implements Runnable {        private CompletableFuture completableFuture;                public ClientThread(CompletableFuture completableFuture) {            this.completableFuture = completableFuture;        }                @Override        public void run() {            log.info(Thread.currentThread().getName() + ":" + completableFuture.get());        }    }        public static void main(String[] args) {        //在ClientThread线程中使用completableFuture.get()获取返回值时,        //由于传入的cf并没有使用runAsync()等方法构建具体的异步任务,        //所以ClientThread线程中的completableFuture.get()方法必然会阻塞;        CompletableFuture cf = new CompletableFuture();        new Thread(new ClientThread(cf)).start();        new Thread(new ClientThread(cf)).start();        System.out.println("此时两个客户端线程正在被get()方法阻塞");        //通过compelete()方法来完成cf任务,并且设置了任务的返回结果为"finish"        cf.complete("finish");//此时会将值为"finish"传入两个线程,并唤醒这两个线程    }} 
5.CompletionStage方法及作用说明
(1)CompletionStage示例
(2)CompletionStage的方法概述
(3)有传参但没返回值的方法
(4)有传参且有返回值的方法
(5)没传参也没返回值的方法
(6)组合起来串行执行的方法
(7)异常处理方法
 
(1)CompletionStage示例
CompletionStage表示任务执行的一个阶段,每个异步任务都会返回一个新的CompletionStage对象,可针对多个CompletionStage对象进行串行、并行、聚合等操作。简单来说,CompletionStage就是实现异步任务执行后的自动回调功能。
 
下面的CompletionStage例子:首先需要调用一个远程方法获得结果,然后把返回结果保存到数据库。所以代码中先定义一个异步任务处理远程调用,并返回CompletionStage,接着调用thenAccept()方法把第一步的执行结果保存到数据库中。
public class CompletionStageExample {    public static void main(String[] args) {        CompletionStage<String> cf = CompletableFuture.supplyAsync(() -> "远程调用的返回结果");        cf.thenAccept(result -> {            System.out.println("第一个异步任务的返回值是:" + result);            System.out.println("把result保存到数据库");        });    }}可以看见和Future明显不一样的地方就是:thenAccept()方法中传入的回调对象是第一个异步任务执行完后自动触发的,不需要像Future那样去阻塞当前线程等待返回结果,还可以使用thenAcceptAsync()方法让保存到数据库的任务使用独立线程池。
 
(2)CompletionStage的方法概述
CompletionStage总共提供了38个方法来实现多个任务的串行、并行、聚合等功能,这些方法可以按功能进行如下的分类:
一.有传参但没返回值的方法
二.有传参且有返回值的方法
三.没传参也没返回值的方法
四.组合起来串行执行的方法
五.异常处理的方法
 
注意:Accept关键字有传参没有返回值,Run关键字没传参没返回值。
 
(3)有传参但没返回值的方法
有传参但没返回值的方法就是:用上一个异步任务的结果作为当前方法的参数进行下一步运算,并且当前方法会产生一个新的没有返回值的CompletionStage对象。有传参但没返回值的方法都包含Accept关键字。
 
一.依赖单个CompletionStage任务完成
thenAccept()相关方法用上一个任务的执行结果作为参数执行当前的action,这些方法接收的参数是一个函数式接口Consumer,表示一个待执行的任务。这些方法的返回值是CompletionStage,表示没有返回值。
 
注意:方法以Async结尾,表示使用单独的线程池来执行action,否则使用执行当前任务的线程来执行action。
public interface CompletionStage<T> {    ...    //Returns a new CompletionStage that, when this stage completes normally,     //is executed with this stage's result as the argument to the supplied action.    //@param action the action to perform before completing the returned CompletionStage    //@return the new CompletionStage    public CompletionStage<Void> thenAccept(Consumer<? super T> action);    //Returns a new CompletionStage that, when this stage completes normally,     //is executed using this stage's default asynchronous execution facility,     //with this stage's result as the argument to the supplied action.    //@param action the action to perform before completing the returned CompletionStage    //@return the new CompletionStage    public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);    //Returns a new CompletionStage that, when this stage completes normally,     //is executed using the supplied Executor,     //with this stage's result as the argument to the supplied action.    //@param action the action to perform before completing the returned CompletionStage    //@param executor the executor to use for asynchronous execution    //@return the new CompletionStage    public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor);    ...}public class CompletionStageExample {    //当cf实例的任务执行完成后,会回调传入thenAcceptAsync()方法中的回调函数    //其中回调函数的result表示cf异步任务的返回结果    public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {         CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "thenAccept message");        cf.thenAcceptAsync((result) -> {            System.out.println(Thread.currentThread().getName() + "第一个异步任务的返回值:" + result);        });    }}二.依赖两个CompletionStage任务都完成
thenAcceptBoth()相关方法提供了与thenAccept()相关方法类似的功能。不同点在于thenAcceptBoth()相关方法多了一个CompletionStage参数,表示当两个CompletionStage任务都完成后,才执行后面的action。而且这个action可以接收两个参数,这两个参数分别表示两个任务的返回值。thenAcceptBoth()相关方法相当于实现了两个异步任务的组合。
public interface CompletionStage<T> {    ...    //Returns a new CompletionStage that,     //when this and the other given stage both complete normally,     //is executed with the two results as arguments to the supplied action.    //@param other the other CompletionStage    //@param action the action to perform before completing the returned CompletionStage    //@param <U> the type of the other CompletionStage's result    //@return the new CompletionStage    public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);     //Returns a new CompletionStage that,     //when this and the other given stage complete normally,     //is executed using this stage's default asynchronous execution facility,     //with the two results as arguments to the supplied action.    //@param other the other CompletionStage    //@param action the action to perform before completing the returned CompletionStage    //@param <U> the type of the other CompletionStage's result    //@return the new CompletionStage    public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);     //Returns a new CompletionStage that,     //when this and the other given stage complete normally,     //is executed using the supplied executor,     //with the two results as arguments to the supplied function.    //@param other the other CompletionStage    //@param action the action to perform before completing the returned CompletionStage    //@param executor the executor to use for asynchronous execution    //@param <U> the type of the other CompletionStage's result    //@return the new CompletionStage    public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor);     ...}public class ThenAcceptBothExample {    //task1和task2都执行完成后,会得到两个任务的返回值AcceptBoth和message,    //接着开始执行thenAcceptBoth()中的action,    //这个action会接收前面两个任务的执行结果r1和r2,并最终打印出:执行结果为"AcceptBoth+message"    public static void main(String[] args) {        CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> "AcceptBoth");        CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> "message");        task1.thenAcceptBoth(task2, (r1, r2) -> {            System.out.println("执行结果" + r1 + "+" + r2);        });               //或者采用Fluent风格来写        //CompletableFuture.supplyAsync(() -> "AcceptBoth").thenAcceptBoth(        //    CompletableFuture.supplyAsync(() -> "message"), (r1, r2) -> {        //      System.out.println("执行结果:" + r1 + ", " + r2);        //    }        //);    }}三.依赖两个CompletionStage任务中的任何一个完成
acceptEither()相关方法和thenAcceptBoth()相关方法几乎一样。它同样接收两个CompletionStage任务,但是只需要保证其中一个任务完成,就会回调acceptEither()方法中传入的action任务。这两个CompletionStage任务谁先完成就会获得谁的返回值,作为参数传给后续的action任务。
public interface CompletionStage<T> {    ...    //Returns a new CompletionStage that,     //when either this or the other given stage complete normally,     //is executed with the corresponding result as argument to the supplied action.    //@param other the other CompletionStage    //@param action the action to perform before completing the returned CompletionStage    //@return the new CompletionStage    public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action);    //Returns a new CompletionStage that,     //when either this or the other given stage complete normally,     //is executed using this stage's default asynchronous execution facility,     //with the corresponding result as argument to the supplied action.    //@param other the other CompletionStage    //@param action the action to perform before completing the returned CompletionStage    //@return the new CompletionStage    public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action);    //Returns a new CompletionStage that,     //when either this or the other given stage complete normally,     //is executed using the supplied executor,     //with the corresponding result as argument to the supplied function.    //@param other the other CompletionStage    //@param action the action to perform before completing the returned CompletionStage    //@param executor the executor to use for asynchronous execution    //@return the new CompletionStage    public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor);     ...}(4)有传参且有返回值的方法
有传参且有返回值的方法就是:用上一个异步任务的执行结果作为当前方法的参数进行下一步计算,并且当前方法会产生一个新的有返回值的CompletionStage对象。
 
一.依赖单个CompletionStage任务完成
thenApply()这一组方法的功能是等上一个CompletionStage任务执行完后,就会把执行结果传递给函数fn,将函数fn作为一个新的执行任务去执行,最后返回一个新的有返回值的CompletionStage对象。
 
其中以Async结尾的方法表示函数fn这个任务将采用单独的线程池来执行。
public interface CompletionStage<T> {    ...    //Returns a new CompletionStage that, when this stage completes normally,     //is executed with this stage's result as the argument to the supplied function.    //@param fn the function to use to compute the value of the returned CompletionStage    //@param <U> the function's return type    //@return the new CompletionStage    public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);    //Returns a new CompletionStage that, when this stage completes normally,     //is executed using this stage's default asynchronous execution facility,     //with this stage's result as the argument to the supplied function.    //@param fn the function to use to compute the value of the returned CompletionStage    //@param <U> the function's return type    //@return the new CompletionStage    public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);    //Returns a new CompletionStage that, when this stage completes normally,     //is executed using the supplied Executor,     //with this stage's result as the argument to the supplied function.    //@param fn the function to use to compute the value of the returned CompletionStage    //@param executor the executor to use for asynchronous execution    //@param <U> the function's return type    //@return the new CompletionStage    public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);     ...}二.依赖两个CompletionStage任务都完成
thenCombine()这一组方法的功能类似于thenAcceptBoth()方法。它表示两个CompletionStage任务并行执行结束后,把这两个CompletionStage任务的执行结果传递给函数fn,函数fn执行后返回一个新的有返回值的CompletionStage对象。
public interface CompletionStage<T> {    ...    //Returns a new CompletionStage that,     //when this and the other given stage both complete normally,     //is executed with the two results as arguments to the supplied function.    //@param other the other CompletionStage    //@param fn the function to use to compute the value of the returned CompletionStage    //@param <U> the type of the other CompletionStage's result    //@param <V> the function's return type    //@return the new CompletionStage    public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);    //Returns a new CompletionStage that,     //when this and the other given stage complete normally,     //is executed using this stage's default asynchronous execution facility,     //with the two results as arguments to the supplied function.    //@param other the other CompletionStage    //@param fn the function to use to compute the value of the returned CompletionStage    //@param <U> the type of the other CompletionStage's result    //@param <V> the function's return type    //@return the new CompletionStage    public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);    //Returns a new CompletionStage that,     //when this and the other given stage complete normally,     //is executed using the supplied executor,     //with the two results as arguments to the supplied function.      //@param other the other CompletionStage    //@param fn the function to use to compute the value of the returned CompletionStage    //@param executor the executor to use for asynchronous execution    //@param <U> the type of the other CompletionStage's result    //@param <V> the function's return type    //@return the new CompletionStage    public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor);     ...}public class ThenCombineExample {    public static void main(String[] args) {        CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> "Combine");        CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> "message");        CompletableFuture<String> cf = task1.thenCombineAsync(task2, (r1, r2) -> {            System.out.println("执行结果:" + r1 + ", " + r2);            return r1 + r2;        });        System.out.println(cf.get());              //或者采用Fluent风格来写        //CompletableFuture cf = CompletableFuture.supplyAsync(() -> "Combine").thenCombineAsync(        //    CompletableFuture.supplyAsync(() -> "message"), (r1, r2) -> {        //        System.out.println("执行结果:" + r1 + ", " + r2);        //        return r1 + r2;        //    }        //);        //System.out.println(cf.get());    }}三.依赖两个CompletionStage任务中的任何一个完成
applyToEither()方法表示两个CompletionStage任务中任意一个任务完成后,都执行传入applyToEither()方法中的函数fn,函数fn执行后返回一个新的有返回值的CompletionStage对象。
public interface CompletionStage<T> {    ...    //Returns a new CompletionStage that,     //when either this or the other given stage complete normally,     //is executed with the corresponding result as argument to the supplied function.    //@param other the other CompletionStage    //@param fn the function to use to compute the value of the returned CompletionStage    //@param <U> the function's return type    //@return the new CompletionStage    public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn);    //Returns a new CompletionStage that,     //when either this or the other given stage complete normally,     //is executed using this stage's default asynchronous execution facility,     //with the corresponding result as argument to the supplied function.    //@param other the other CompletionStage    //@param fn the function to use to compute the value of the returned CompletionStage    //@param <U> the function's return type    //@return the new CompletionStage    public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn);    //Returns a new CompletionStage that,     //when either this or the other given stage complete normally,     //is executed using the supplied executor,     //with the corresponding result as argument to the supplied function.    //@param other the other CompletionStage    //@param fn the function to use to compute the value of the returned CompletionStage    //@param executor the executor to use for asynchronous execution    //@param <U> the function's return type    //@return the new CompletionStage    public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor);     ...}(5)没传参也没返回值的方法
没传参也没返回值的方法就是:当前方法不依赖上一个异步任务的执行结果,只要上一个异步任务执行完成就执行当前方法,并且当前方法会产生一个新的没有返回值的CompletionStage对象,没传参也没返回值的方法都包含Run关键字。
 
一.依赖单个CompletionStage任务完成
thenRun()方法只要上一个阶段的任务执行完成后,便立即执行指定action。thenRunAsync()表示采用ForkjoinPool.commonPool()线程池来执行action,action执行完成后会返回一个新的没有返回值的CompletionStage对象。
public interface CompletionStage<T> {    ...    //Returns a new CompletionStage that,     //when this stage completes normally, executes the given action.    //@param action the action to perform before completing the returned CompletionStage    //@return the new CompletionStage    public CompletionStage<Void> thenRun(Runnable action);    //Returns a new CompletionStage that, when this stage completes normally,     //executes the given action using this stage's default asynchronous execution facility.    //@param action the action to perform before completing the returned CompletionStage    //@return the new CompletionStage    public CompletionStage<Void> thenRunAsync(Runnable action);    //Returns a new CompletionStage that, when this stage completes normally,     //executes the given action using the supplied Executor.    //@param action the action to perform before completing the returned CompletionStage    //@param executor the executor to use for asynchronous execution    //@return the new CompletionStage    public CompletionStage<Void> thenRunAsync(Runnable action, Executor executor);    ...}二.依赖两个CompletionStage任务都完成
runAfterBoth()方法接收一个CompletionStage任务。该方法要保证两个CompletionStage任务都完成,再执行指定的action。action执行完成后会返回一个新的没有返回值的CompletionStage对象。
public interface CompletionStage<T> {    ...    //Returns a new CompletionStage that,     //when this and the other given stage both complete normally, executes the given action.    //@param other the other CompletionStage    //@param action the action to perform before completing the returned CompletionStage    //@return the new CompletionStage    public CompletionStage<Void> runAfterBoth(CompletionStage<?> other, Runnable action);    //Returns a new CompletionStage that,     //when this and the other given stage complete normally,     //executes the given action using this stage's default asynchronous execution facility.    //@param other the other CompletionStage    //@param action the action to perform before completing the returned CompletionStage    //@return the new CompletionStage    public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action);    //Returns a new CompletionStage that,     //when this and the other given stage complete normally,     //executes the given action using the supplied executor.    //@param other the other CompletionStage    //@param action the action to perform before completing the returned CompletionStage    //@param executor the executor to use for asynchronous execution    //@return the new CompletionStage    public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor);     ...}三.依赖两个CompletionStage任务中的任何一个完成
runAfterEither()方法接收一个CompletionStage任务。它只需要保证两个任务中任意一个任务执行完成,即可执行指定的action,action执行完成后会返回一个新的没有返回值的CompletionStage对象。
public interface CompletionStage<T> {    ...    //Returns a new CompletionStage that,     //when either this or the other given stage complete normally, executes the given action.    //@param other the other CompletionStage    //@param action the action to perform before completing the returned CompletionStage    //@return the new CompletionStage    public CompletionStage<Void> runAfterEither(CompletionStage<?> other, Runnable action);    //Returns a new CompletionStage that,     //when either this or the other given stage complete normally,     //executes the given action using this stage's default asynchronous execution facility.    //@param other the other CompletionStage    //@param action the action to perform before completing the returned CompletionStage    //@return the new CompletionStage    public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action);    //Returns a new CompletionStage that,     //when either this or the other given stage complete normally,     //executes the given action using the supplied executor.    //@param other the other CompletionStage    //@param action the action to perform before completing the returned CompletionStage    //@param executor the executor to use for asynchronous execution    //@return the new CompletionStage    public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor);     ...}(6)组合起来串行执行的方法
thenCompose()是多任务组合方法,它的作用是把两个CompletionStage任务进行组合达到串行执行的目的,也就是把第一个任务的执行结果作为参数传递给第二个任务执行。
 
thenCompose()方法有点类似于thenCombine()方法,但thenCompose()方法中的两个任务存在先后关系,而thenCombine()方法中的两个任务是并行执行的。
public interface CompletionStage<T> {    ...    //Returns a new CompletionStage that, when this stage completes normally,     //is executed with this stage as the argument to the supplied function.    //@param fn the function returning a new CompletionStage    //@param <U> the type of the returned CompletionStage's result    //@return the CompletionStage    public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);    //Returns a new CompletionStage that, when this stage completes normally,     //is executed using this stage's default asynchronous execution facility,    //with this stage as the argument to the supplied function.    //@param fn the function returning a new CompletionStage    //@param <U> the type of the returned CompletionStage's result    //@return the CompletionStage    public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn);    //Returns a new CompletionStage that, when this stage completes normally,     //is executed using the supplied Executor,     //with this stage's result as the argument to the supplied function.    //@param fn the function returning a new CompletionStage    //@param executor the executor to use for asynchronous execution    //@param <U> the type of the returned CompletionStage's result    //@return the CompletionStage    public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor);     ...}public class ThenComposeExample {    //下面使用supplyAsync()方法构建了一个异步带返回值的任务,返回值为"Compose Message";    //接着使用thenCompose()方法组合另外一个任务,并把前面任务的返回值r作为参数传递给第二个任务    //在第二个任务中同样使用supplyAsync()方法构建了一个新的任务将参数r转为大写    //最后thenCompose()方法返回一个新的没有返回值的CompletionStage对象    public static void main(String[] args) {        CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> "Compose Message");        CompletableFuture<String> cf = task1.thenCompose(r -> CompletableFuture.supplyAsync(() -> r.toUpperCase()));        System.out.println(cf.get());              //或者采用Fluent风格来写        //CompletableFuture cf = CompletableFuture.supplyAsync(() -> "Compose Message")        //     .thenCompose(r -> CompletableFuture.supplyAsync(() -> r.toUpperCase())        //);        //System.out.println(cf.get());    }}(7)异常处理方法
上述介绍的方法都是CompletionStage任务正常执行时的处理方法。如果依赖的前一个任务出现异常,那么会导致后续的任务无法正常执行。比如下述代码,如果前置任务cf出现异常,那么会影响后置任务的执行。
public class RunAfterBothExample {    public static void main(String[] args) {        CompletableFuture cf = CompletableFuture.supplyAsync(() -> {            throw new RuntimeException("Exception");        }).runAfterBoth(CompletableFuture.supplyAsync(() -> "Message"), () -> {            System.out.println("Done");        });        System.out.println(cf.get());    }}CompletionStage提供了3类异常处理的方法。
 
一.whenComplete()方法
whenComplete()这一组方法表示的是:不论前置的CompletionStage任务是正常执行结束还是出现异常,都能触发执行指定action,最后返回一个没返回值的CompletionStage对象。
public interface CompletionStage<T> {    ...    //Returns a new CompletionStage with the same result or exception as this stage,     //that executes the given action when this stage completes.    //@param action the action to perform    //@return the new CompletionStage    public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);    //Returns a new CompletionStage with the same result or exception as this stage,     //that executes the given action using this stage's default asynchronous execution facility when this stage completes.    //@param action the action to perform    //@return the new CompletionStage    public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);    //Returns a new CompletionStage with the same result or exception as this stage,     //that executes the given action using the supplied Executor when this stage completes.    //@param action the action to perform    //@param executor the executor to use for asynchronous execution    //@return the new CompletionStage    public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor);     ...}二.handle()方法
handle()这一组方法表示的是:不论前置的CompletionStage任务是正常执行结束还是出现异常,都会执行其中的函数fn,最后返回一个有返回值的CompletionStage对象。
public interface CompletionStage<T> {    ...    //Returns a new CompletionStage that,     //when this stage completes either normally or exceptionally,     //is executed with this stage's result and exception as arguments to the supplied function.    //@param fn the function to use to compute the value of the returned CompletionStage    //@param <U> the function's return type    //@return the new CompletionStage    public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);    //Returns a new CompletionStage that,     //when this stage completes either normally or exceptionally,     //is executed using this stage's default asynchronous execution facility,     //with this stage's result and exception as arguments to the supplied function.    //@param fn the function to use to compute the value of the returned CompletionStage    //@param <U> the function's return type    //@return the new CompletionStage    public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);    //Returns a new CompletionStage that,     //when this stage completes either normally or exceptionally,     //is executed using the supplied executor,     //with this stage's result and exception as arguments to the supplied function.    //@param fn the function to use to compute the value of the returned CompletionStage    //@param executor the executor to use for asynchronous execution    //@param <U> the function's return type    //@return the new CompletionStage    public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor);     ...}public class HandleExample {    public static void main(String[] args) throws ExecutionException, InterruptedException {        CompletableFuture cf = CompletableFuture.supplyAsync(() -> {            throw new RuntimeException("Exception");        }).handleAsync((r, th) -> {           return th != null ? "出现异常" : "正常执行";        });        System.out.println(cf.get());    }}三.exceptionally()方法
exceptionally()方法接收一个函数fn,当上一个CompletionStage任务出现异常时,会把该异常作为参数传递给fn,最后返回一个有返回值的CompletionStage对象。
public interface CompletionStage<T> {    ...    //Returns a new CompletionStage that,     //when this stage completes exceptionally,     //is executed with this stage's exception as the argument to the supplied function.    //Otherwise, if this stage completes normally,     //then the returned stage also completes normally with the same value.    //@param fn the function to use to compute the value of the returned CompletionStage if this CompletionStage completed exceptionally    //@return the new CompletionStage    public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);    ...}public class ExceptionallyExample {    public static void main(String[] args) throws ExecutionException, InterruptedException {        CompletableFuture cf = CompletableFuture.supplyAsync(() -> {            throw new RuntimeException("Exception");        }).exceptionally(e -> {            log.error(e);            return "ExceptionallyExample";        });        System.out.println(cf.get());    }} 
6.CompletableFuture的实现原理分析
(1)CompletableFuture实现回调的例子
(2)CompletableFuture如何存储任务
(3)Completion的几个实现类
(4)Completion的栈结构存储回调任务
(5)Completion中的回调任务的执行和总结
 
(1)CompletableFuture实现回调的例子
CompletableFuture实现了Future接口和CompletionStage接口,CompletionStage接口为CompletableFuture提供了丰富的异步回调接口,CompletableFuture可以使用这些接口来实现复杂的异步计算工作。
 
下面是一个使用CompletableFuture回调的例子。其中构建了两个CompletionStage任务,第一个任务是返回"thenAccept message"字符串,第二个任务是打印第一个任务的返回值。注意:Accept关键字有传参没有返回值,Run关键字没传参没返回值。
 
这两个任务建立了串行执行的关系,第二个任务相当于第一个任务执行结束后的异步回调,并且多个CompletionStage任务可以使用链式风格串联。
public class CompletionStageExample {    public static void main(String[] args) throws InterruptedException, ExecutionException {        CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> "thenAccept message")            .thenAcceptAsync((result) -> {                System.out.println("第一个异步任务的返回值:" + result);            });        cf.get();    }}(2)CompletableFuture如何存储任务
一.CompletableFuture的成员变量
CompletableFuture的成员变量只有两个:result和stack。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {    ...    //表示CompletionStage任务的返回结果或者一个异常的封装对象AltResult    volatile Object result;//Either the result or boxed AltResult        //表示依赖操作栈的栈顶,链式调用中传递的任务都会被压入这个stack中    volatile Completion stack;//Top of Treiber stack of dependent actions    ...}二.表示具体执行任务的Completion
成员变量stack是一个存储Completion对象的Treiber Stack结构,Treiber Stack是一种基于CAS机制实现的无锁并发栈。
 
Completion表示一个具体的执行任务。每个回调任务都会封装成Completion对象,然后放入Treiber Stack中。Completion中的成员变量next保存了栈中的下一个回调任务。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {    ...    abstract static class Completion extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask {        volatile Completion next;//Treiber stack link        //Performs completion action if triggered, returning a dependent that may need propagation, if one exists.        //@param mode SYNC, ASYNC, or NESTED        abstract CompletableFuture<?> tryFire(int mode);        //Returns true if possibly still triggerable. Used by cleanStack.        abstract boolean isLive();        public final void run() {            tryFire(ASYNC);        }        public final boolean exec() {            tryFire(ASYNC);            return true;        }        public final Void getRawResult() {            return null;        }        public final void setRawResult(Void v) {              }    }    ...}(3)Completion的几个实现类
一.UniCompletion
当使用如thenRun()、thenApply()等方法处理单个任务的,那么这些任务就会封装成UniCompletion对象。
 
二.CoCompletion
当使用如thenCombine()、applyToEither()等方法处理两个任务的,那么这些任务会封装成CoCompletion对象。
 
三.Signaller
当使用如get()、join()等方法处理任务时,那么调用方也会作为任务被封装成Signaller对象。
 
(4)Completion的栈结构存储回调任务
以如下创建一个CompletableFuture任务为例,说明在CompletableFuture中是如何存储这些回调任务的。注意:该例子在Debug中没有发现baseFuture的成员变量stack的变化。
public class CompletionStackExample {    public static void main(String[] args) throws InterruptedException, ExecutionException {        //创建一个CompletableFuture任务对象        CompletableFuture<String> baseFuture = CompletableFuture.supplyAsync(() -> {            try {                System.out.println("开始执行入栈baseFuture的第一个异步任务");                Thread.sleep(5000);                System.out.println("第一个异步任务执行完毕");            } catch (Exception e) {            }            return "BaseFuture";        });        System.out.println("主线程第一次打印");        baseFuture.thenApply(r -> {            System.out.println("开始执行入栈baseFuture的第二个异步任务");            try {                Thread.sleep(5000);                System.out.println("第二个异步任务执行完毕");            } catch (Exception e) {            }            return "Then Apply";        });//输出结果中没有"Then Apply",因为没有任务使用"Then Apply"这个返回值        System.out.println("主线程第二次打印");        baseFuture.thenAccept(r -> {            System.out.println("开始执行入栈baseFuture的第三个异步任务");            try {                Thread.sleep(5000);                System.out.println("第三个异步任务执行完毕: " + r);            } catch (Exception e) {            }        }).thenAccept(Void -> {            System.out.println("baseFuture的第三个异步任务返回的新CompletableFuture,入栈第一个异步任务");            try {                Thread.sleep(5000);                System.out.println("第三个异步任务的子任务执行完毕");            } catch (Exception e) {            }        });        System.out.println("主线程第三次打印");        baseFuture.thenApply(r -> {            System.out.println("开始执行入栈baseFuture的第四个异步任务");            try {                Thread.sleep(5000);                System.out.println("第四个异步任务执行完毕");            } catch (Exception e) {            }            return "Apply Message";        }).thenAccept(r -> {            System.out.println("baseFuture的第四个异步任务返回的新CompletableFuture,入栈第一个异步任务");            try {                Thread.sleep(5000);                System.out.println("第四个异步任务的子任务执行完毕: " + r);            } catch (Exception e) {            }        });        System.out.println("主线程第四次打印");        System.out.println("finish: " + baseFuture.get());    }    //输出的结果如下:    //主线程第一次打印    //开始执行入栈baseFuture的第一个异步任务    //主线程第二次打印    //主线程第三次打印    //主线程第四次打印    //第一个异步任务执行完毕    //开始执行入栈baseFuture的第四个异步任务    //开始执行入栈baseFuture的第三个异步任务    //第四个异步任务执行完毕    //第三个异步任务执行完毕: BaseFuture    //baseFuture的第三个异步任务返回的新CompletableFuture,入栈第一个异步任务    //baseFuture的第四个异步任务返回的新CompletableFuture,入栈第一个异步任务    //第三个异步任务的子任务执行完毕    //第四个异步任务的子任务执行完毕: Apply Message    //开始执行入栈baseFuture的第二个异步任务    //第二个异步任务执行完毕    //finish: BaseFuture}一.第一阶段的Completion Stack结构
主线程第一次打印和第二次打印执行完成后,会创建如下图所示的结构。此时Completion类型是UniCompletion,因为thenApply()方法只接收一个任务。

二.第二阶段的Completion Stack结构
主线程第三次打印执行完成后,就会创建如下图所示的结构。
 
首先使用baseFuture.thenAccept()方法在baseFuture上增加一个回调,此时会把这个回调对应的Completion压入baseFuture的stack的栈顶。
 
然后会产生一个新的CompletableFuture对象实例继续执行thenAccept(),由于这个新的CompletableFuture对象实例是在栈顶的Completion中产生的,因此在栈顶的Completion中会有一个dep属性指向这个新的对象实例。
 
在新的CompletableFuture对象中又调用thenAccept()来构建一个回调任务,所以又会有一个新的Completion Stack结构。

三.第三阶段的Completion Stack结构
主线程第四次打印执行完成后,就会创建如下图所示的结构。
 
首先是在baseFuture上使用thenApply()方法创建一个带有返回值的回调,这个回调对应的Completion同样会压入baseFuture的stack的栈顶。然后同样会创建一个新的CompletableFuture对象实例。接着在这个新的对象实例中继续使用thenAccept()方法添加另外一个回调,这个回调对应的Completion会压入新的CompletableFuture的stack的栈顶

(5)Completion中的回调任务的执行和总结
从Completion Stack的栈顶中逐个出栈来执行。
 
如果当前出栈的Completion存在一个子Completion Stack,那么就优先执行这一条链路的Completion任务。
 
CompletableFuture中的回调任务,是基于Completion来实现的。针对CompletionStage中不同类型的方法,Completion有不同的子类处理。
 
Completion表示一个具体的回调任务,这些Completion采用了一种Treiber Stack结构来存储。由于每个Completion都可能会产生新的CompletableFuture,所以整个结构看起来像一棵很深的树。
 
7.CompletableFuture的核心源码分析
(1)CompletableFuture的核心源码
(2)CompletableFuture对象的创建
(3)Completion Stack的构建
(4)Completion任务的执行流程
(5)Completion任务的执行结果获取
(6)总结
 
(1)CompletableFuture的核心源码
CompletableFuture的源码主要分四部分:
一.CompletableFuture对象的创建
二.Completion Stack的构建
三.get()方法获取任务处理结果时阻塞和唤醒线程
四.当前置任务执行完成后,Completion Stack的执行流程
 
(2)CompletableFuture对象的创建
假设使用supplyAsync()方法来创建一个CompletableFuture对象。那么在执行supplyAsync()方法时触发调用的asyncSupplyStage()方法中,便会使用线程池来执行一个由AsyncSupply()构造方法构建的任务,这个线程池默认情况下是由ForkJoinPool的commonPool()方法返回的。
 
当线程池执行由AsyncSupply()构造方法构建的任务时,会调用AsyncSupply的run()方法来执行具体的任务。
 
在AsyncSupply的run()方法中:首先会使用f.get()来获得Supplier这个函数式接口的执行结果,然后通过执行CompletableFuture的completeValue()方法,把执行结果通过CAS设置到CompletableFuture的成员变量result中。最后调用CompletableFuture的postComplete()方法表示执行完成,该postComplete()方法会执行Completion Stack中的所有回调任务。
//Represents a supplier of results.//There is no requirement that a new or distinct result be returned each time the supplier is invoked.//This is a functional interface whose functional method is get().//@param <T> the type of results supplied by this supplier@FunctionalInterfacepublic interface Supplier<T> {    //Gets a result.    //@return a result    T get();}public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {    volatile Object result;       // Either the result or boxed AltResult    volatile Completion stack;    // Top of Treiber stack of dependent actions        //Returns a new CompletableFuture that is asynchronously completed by a task     //running in the ForkJoinPool#commonPool() with the value obtained by calling the given Supplier.    //@param supplier a function returning the value to be used to complete the returned CompletableFuture    //@param <U> the function's return type    //@return the new CompletableFuture    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {        return asyncSupplyStage(asyncPool, supplier);    }        static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) {        if (f == null) throw new NullPointerException();        CompletableFuture<U> d = new CompletableFuture<U>();        //使用线程池来执行一个由AsyncSupply()方法构建的任务        e.execute(new AsyncSupply<U>(d, f));        //返回一个新的CompletableFuture对象        return d;    }        static final class AsyncSupply<T> extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask {        CompletableFuture<T> dep;        Supplier<T> fn;        AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {            this.dep = dep;            this.fn = fn;        }               public final Void getRawResult() {            return null;        }        public final void setRawResult(Void v) {                    }        public final boolean exec() {            run();            return true;        }               public void run() {            CompletableFuture<T> d;            Supplier<T> f;            if ((d = dep) != null && (f = fn) != null) {                dep = null; fn = null;                if (d.result == null) {                    try {                        //首先使用f.get()来获得Supplier这个函数式接口中的执行结果                        //然后通过执行CompletableFuture的completeValue()方法,                        //把执行结果设置到CompletableFuture的成员变量result中;                        d.completeValue(f.get());                    } catch (Throwable ex) {                        d.completeThrowable(ex);                    }                }                //最后调用CompletableFuture的postComplete()方法执行Completion Stack中的所有回调任务                d.postComplete();            }        }    }        //Completes with a non-exceptional result, unless already completed.    final boolean completeValue(T t) {        return UNSAFE.compareAndSwapObject(this, RESULT, null, (t == null) ? NIL : t);    }        //Pops and tries to trigger all reachable dependents. Call only when known to be done.    final void postComplete() {        //On each step, variable f holds current dependents to pop and run.          //It is extended along only one path at a time, pushing others to avoid unbounded recursion.        CompletableFuture<?> f = this;        Completion h;        while ((h = f.stack) != null || (f != this && (h = (f = this).stack) != null)) {            CompletableFuture<?> d;            Completion t;            if (f.casStack(h, t = h.next)) {                if (t != null) {                    if (f != this) {                        pushStack(h);                        continue;                    }                    h.next = null;    // detach                }                f = (d = h.tryFire(NESTED)) == null ? this : d;            }        }    }        private static final sun.misc.Unsafe UNSAFE;    private static final long RESULT;    private static final long STACK;    private static final long NEXT;    static {        try {            final sun.misc.Unsafe u;            UNSAFE = u = sun.misc.Unsafe.getUnsafe();            Class<?> k = CompletableFuture.class;            RESULT = u.objectFieldOffset(k.getDeclaredField("result"));            STACK = u.objectFieldOffset(k.getDeclaredField("stack"));            NEXT = u.objectFieldOffset(Completion.class.getDeclaredField("next"));        } catch (Exception x) {            throw new Error(x);        }    }    ...}(3)Completion Stack的构建
假设已经使用了CompletableFuture的supplyAsync()方法创建了源任务,接着需要使用CompletionStage的thenApply()等方法来构建回调任务。
 
源任务 -> Supplier接口的实现类对象(get()方法),回调任务 -> Function接口的实现类对象(apply()方法)。
 
CompletableFuture的thenApply()方法会触发执行uniApplyStage()方法。在uniApplyStage()方法中,首先会创建一个新的CompletableFuture对象,然后根据CompletableFuture的uniApply()方法判断源任务是否已经完成。如果源任务已经完成,则不需要入栈,直接执行回调任务的apply()方法。如果源任务还没执行完成,才将回调任务封装为UniApply对象并入栈。
 
源任务还没执行完成的处理过程具体如下:首先把回调任务封装成一个UniApply对象,然后调用CompletableFuture的push()方法,把UniApply对象压入源任务所在CompletableFuture对象中的stack的栈顶,最后调用UniApply的tryFire()方法来尝试执行该回调任务。
 
注意:UniApply对象其实是一个Completion对象,因为UniApply类继承自UniCompletion类,而UniCompletion类又继承自Completion类。
//Represents a function that accepts one argument and produces a result.//This is a functional interface whose functional method is apply(Object).//@param <T> the type of the input to the function//@param <R> the type of the result of the function@FunctionalInterfacepublic interface Function<T, R> {    //Applies this function to the given argument.    //@param t the function argument    //@return the function result    R apply(T t);    ...}    public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {    volatile Object result;       // Either the result or boxed AltResult    volatile Completion stack;    // Top of Treiber stack of dependent actions        public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {        return uniApplyStage(null, fn);    }        private <V> CompletableFuture<V> uniApplyStage(Executor e, Function<? super T,? extends V> f) {        if (f == null) throw new NullPointerException();        //创建一个新的CompletableFuture对象        CompletableFuture<V> d =  new CompletableFuture<V>();        //根据CompletableFuture的uniApply()方法判断源任务是否已经完成        //如果源任务已经完成,则不需要入栈        if (e != null || !d.uniApply(this, f, null)) {            //首先把回调任务f封装成一个UniApply对象            UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);            //然后调用CompletableFuture的push()方法            //把UniApply对象压入源任务所在的CompletableFuture对象中的stack的栈顶            push(c);            //最后调用UniApply的tryFire()方法来尝试执行该回调任务            c.tryFire(SYNC);        }        return d;    }        final <S> boolean uniApply(CompletableFuture<S> a, Function<? super S,? extends T> f, UniApply<S,T> c) {         Object r;        Throwable x;        //如果任务还没完成(result == null),直接返回false        if (a == null || (r = a.result) == null || f == null) {            return false;        }        tryComplete: if (result == null) {            //判断result是否为异常类型            if (r instanceof AltResult) {                if ((x = ((AltResult)r).ex) != null) {                    //如果result是异常类型,则使用completeThrowable()方法处理,并返回true                    completeThrowable(x, r);                    break tryComplete;                }                r = null;            }            //如果result不为空,任务已经执行完成,并且没有出现异常            try {                if (c != null && !c.claim()) {                    return false;                }                //把源任务的执行结果s作为参数传给回调任务f                //直接执行回调任务的apply()方法,并将结果设置到CompletableFuture对象的成员变量result中                @SuppressWarnings("unchecked") S s = (S) r;                completeValue(f.apply(s));             } catch (Throwable ex) {                completeThrowable(ex);            }        }        return true;    }        //Pushes the given completion (if it exists) unless done.    final void push(UniCompletion<?,?> c) {        if (c != null) {            while (result == null && !tryPushStack(c)) {                lazySetNext(c, null); // clear on failure            }        }    }        final boolean tryPushStack(Completion c) {        Completion h = stack;        lazySetNext(c, h);        return UNSAFE.compareAndSwapObject(this, STACK, h, c);    }        static void lazySetNext(Completion c, Completion next) {        UNSAFE.putOrderedObject(c, NEXT, next);    }        ...        static final class UniApply<T,V> extends UniCompletion<T,V> {        Function<? super T,? extends V> fn;        UniApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, Function<? super T,? extends V> fn) {            super(executor, dep, src);            this.fn = fn;        }        //尝试执行当前CompletableFuture中的Completion        final CompletableFuture<V> tryFire(int mode) {            CompletableFuture<V> d;            CompletableFuture<T> a;            //执行CompletableFuture的uniApply()方法尝试执行回调任务            if ((d = dep) == null || !d.uniApply(a = src, fn, mode > 0 ? null : this)) {                return null;            }            dep = null;            src = null;            fn = null;            //执行CompletableFuture的postFire()方法            return d.postFire(a, mode);        }    }        final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {        if (a != null && a.stack != null) {            if (mode < 0 || a.result == null) {                a.cleanStack();            } else {                a.postComplete();            }        }        if (result != null && stack != null) {            if (mode < 0) {                return this;            } else {                postComplete();            }        }        return null;    }        abstract static class UniCompletion<T,V> extends Completion {        Executor executor;//执行当前任务的线程池        CompletableFuture<V> dep;//构建当前任务的CompletableFuture对象        CompletableFuture<T> src;//指向源任务        UniCompletion(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src) {            this.executor = executor;            this.dep = dep;            this.src = src;        }        //判断是否使用单独的线程池来执行任务        final boolean claim() {            Executor e = executor;            if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {                if (e == null) {                    return true;                }                executor = null; // disable                e.execute(this);            }            return false;        }        //判断任务是否存活        final boolean isLive() {            return dep != null;        }    }}(4)Completion任务的执行流程
一.CompletableFuture的postComplete()方法
CompletableFuture中的任务完成后即源任务完成后,会通过CompletableFuture.postComplete()方法来完成后置逻辑,也就是把当前CompletableFuture.stack中存储的Completion逐项出栈执行。
 
postComplete()方法会触发stack中所有可执行的回调任务Completion,该方法会遍历整个stack,并通过Completion任务的tryFire()方法来尝试执行。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {    volatile Object result;    volatile Completion stack;        public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {        return asyncSupplyStage(asyncPool, supplier);    }        static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) {        if (f == null) throw new NullPointerException();        CompletableFuture<U> d = new CompletableFuture<U>();        //使用线程池来执行一个由AsyncSupply()方法构建的任务        e.execute(new AsyncSupply<U>(d, f));        //返回一个新的CompletableFuture对象        return d;    }        static final class AsyncSupply<T> extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask {        CompletableFuture<T> dep;        Supplier<T> fn;        AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {            this.dep = dep;            this.fn = fn;        }        ...        public void run() {            CompletableFuture<T> d;            Supplier<T> f;            if ((d = dep) != null && (f = fn) != null) {                dep = null; fn = null;                if (d.result == null) {                    try {                        //首先使用f.get()来获得Supplier这个函数式接口中的执行结果                        //然后通过执行CompletableFuture的completeValue()方法,                        //把执行结果设置到CompletableFuture的成员变量result中;                        d.completeValue(f.get());                    } catch (Throwable ex) {                        d.completeThrowable(ex);                    }                }                //最后调用CompletableFuture的postComplete()方法执行Completion Stack中的所有回调任务                d.postComplete();            }        }    }        final boolean completeValue(T t) {        return UNSAFE.compareAndSwapObject(this, RESULT, null, (t == null) ? NIL : t);    }        final void postComplete() {        CompletableFuture<?> f = this;        Completion h;        //如果stack不为空,则不断循环从stack中出栈Completion任务        while ((h = f.stack) != null || (f != this && (h = (f = this).stack) != null)) {            CompletableFuture<?> d;            Completion t;            //通过CAS逐个取出stack中的Completion任务并重置stack            if (f.casStack(h, t = h.next)) {                if (t != null) {                    //表示h.tryFire()返回了另外一个CompleableFuture对象                    if (f != this) {                        pushStack(h);                        continue;                    }                    h.next = null;    // detach                }                //执行指定Completion的tryFire()方法,比如UniApply.tryFire()方法                f = (d = h.tryFire(NESTED)) == null ? this : d;            }        }    }    ...}二.Completion任务的执行流程图

(5)Completion任务的执行结果获取
可以通过get()或join()方法获取CompletableFuture的执行结果。当任务还没执行结束时(r == null),则调用waitingGet()方法进行阻塞等待。主要会先自旋256次判断执行是否结束,如果不是才挂起线程进行阻塞,从而避免直接挂起线程带来的性能开销。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {    ...    //Waits if necessary for this future to complete, and then returns its result.    public T get() throws InterruptedException, ExecutionException {        Object r;        return reportGet((r = result) == null ? waitingGet(true) : r);    }        //Returns raw result after waiting, or null if interruptible and interrupted.    private Object waitingGet(boolean interruptible) {        Signaller q = null;        boolean queued = false;        int spins = -1;        Object r;        while ((r = result) == null) {            if (spins < 0) {                spins = (Runtime.getRuntime().availableProcessors() > 1) ? 1 << 8 : 0;            } else if (spins > 0) {                if (ThreadLocalRandom.nextSecondarySeed() >= 0) {                    --spins;                }            } else if (q == null) {                q = new Signaller(interruptible, 0L, 0L);            } else if (!queued) {                queued = tryPushStack(q);            } else if (interruptible && q.interruptControl < 0) {                q.thread = null;                cleanStack();                return null;            } else if (q.thread != null && result == null) {                try {                    ForkJoinPool.managedBlock(q);                } catch (InterruptedException ie) {                    q.interruptControl = -1;                }            }        }        if (q != null) {            q.thread = null;            if (q.interruptControl < 0) {                if (interruptible) {                    r = null; // report interruption                } else {                    Thread.currentThread().interrupt();                }            }        }        postComplete();        return r;    }    ...}(6)总结
CompletableFuture的核心在于CompletionStage,CompletionStage提供了最基础的异步回调机制。也就是主线程不需要通过阻塞方式来等待异步任务的执行结果,而是当异步任务执行完成后主动通知来触发执行下一个任务。此外,CompletionStage全部采用了函数式接口的方式来实现,可以通过链式的方式来对多个CompletionStage进行组合。
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

232

主题

0

回帖

706

积分

高级会员

积分
706

QQ|智能设备 | 粤ICP备2024353841号-1

GMT+8, 2025-3-10 19:20 , Processed in 0.772357 second(s), 29 queries .

Powered by 智能设备

©2025

|网站地图