English 简体中文 繁體中文 한국 사람 日本語 Deutsch русский بالعربية TÜRKÇE português คนไทย french
查看: 11|回复: 0

如何安全发布 CompletableFuture ?Java9新增方法分析

[复制链接]
查看: 11|回复: 0

如何安全发布 CompletableFuture ?Java9新增方法分析

[复制链接]
查看: 11|回复: 0

341

主题

0

回帖

1033

积分

金牌会员

积分
1033
okkkk

341

主题

0

回帖

1033

积分

金牌会员

积分
1033
2025-2-5 12:11:21 | 显示全部楼层 |阅读模式
如何安全发布 CompletableFuture ?Java9新增方法分析

本文未经允许禁止转载。
JDK9 中对于CompletableFuture做了新的增强,除了超时功能(orTimeout),还有面向继承、安全发布等相关方法。本文中,我们将详细分析各个新增方法,同时说明其安全发布的重要性,最后提出相关的实践原则。
1. newIncompleteFuture

public <U> CompletableFuture<U> newIncompleteFuture() {    return new CompletableFuture<U>();}这是一个面向继承的方法,Java支持方法返回值协变,子类可以返回自己的实现。如果你不需要继承 CompletableFuture,对于使用来说,这个方法没有新增新的功能。
2. 默认执行器

public Executor defaultExecutor() {    return ASYNC_POOL;}子类可以指定默认执行器,不过需要注意调用不显式指定执行器的async相关方法时,其执行回调所在的线程有两种可能:当前调用线程或者子类指定的执行器线程。如果使用 CompletableFuture,推荐显式指定执行器;如果使用 CFFU,无需每次都显式指定执行器。
3. 复制方法

public CompletableFuture<T> copy() {    return uniCopyStage(this);}此方法等同于调用 cf.thenApply(x -> x), 目的是实现保护性复制,避免后续操作对于原实例的修改。
4. 只读形式与安全发布

// 返回结果仅支持 CompletionStage 操作public CompletionStage<T> minimalCompletionStage() {    return uniAsMinimalStage();}private MinimalStage<T> uniAsMinimalStage() {    Object r;          // 性能优化1:若已有结果,当前线程下直接返回stage    if ((r = result) != null)        return new MinimalStage<T>(encodeRelay(r));          // 性能优化2:若还未有结果,注册回调    MinimalStage<T> d = new MinimalStage<T>();    unipush(new UniRelay<T,T>(d, this));    return d;}// 实际实现,CompletableFuture 独有方法均不支持static final class MinimalStage<T> extends CompletableFuture<T> {        MinimalStage() { }        MinimalStage(Object r) { super(r); }        @Override public <U> CompletableFuture<U> newIncompleteFuture() {            return new MinimalStage<U>(); }        @Override public T get() {            throw new UnsupportedOperationException(); }        @Override public T get(long timeout, TimeUnit unit) {            throw new UnsupportedOperationException(); }        @Override public T getNow(T valueIfAbsent) {            throw new UnsupportedOperationException(); }        @Override public T join() {            throw new UnsupportedOperationException(); }        @Override public T resultNow() {            throw new UnsupportedOperationException(); }        @Override public Throwable exceptionNow() {            throw new UnsupportedOperationException(); }        @Override public boolean complete(T value) {            throw new UnsupportedOperationException(); }        @Override public boolean completeExceptionally(Throwable ex) {            throw new UnsupportedOperationException(); }        @Override public boolean cancel(boolean mayInterruptIfRunning) {            throw new UnsupportedOperationException(); }        @Override public void obtrudeValue(T value) {            throw new UnsupportedOperationException(); }        @Override public void obtrudeException(Throwable ex) {            throw new UnsupportedOperationException(); }        @Override public boolean isDone() {            throw new UnsupportedOperationException(); }        @Override public boolean isCancelled() {            throw new UnsupportedOperationException(); }        @Override public boolean isCompletedExceptionally() {            throw new UnsupportedOperationException(); }        @Override public State state() {            throw new UnsupportedOperationException(); }        @Override public int getNumberOfDependents() {            throw new UnsupportedOperationException(); }        @Override public CompletableFuture<T> completeAsync            (Supplier<? extends T> supplier, Executor executor) {            throw new UnsupportedOperationException(); }        @Override public CompletableFuture<T> completeAsync            (Supplier<? extends T> supplier) {            throw new UnsupportedOperationException(); }        @Override public CompletableFuture<T> orTimeout            (long timeout, TimeUnit unit) {            throw new UnsupportedOperationException(); }        @Override public CompletableFuture<T> completeOnTimeout            (T value, long timeout, TimeUnit unit) {            throw new UnsupportedOperationException(); }                          // 返回新的CompletableFuture, 等效于thenApply(x -> x)        @Override public CompletableFuture<T> toCompletableFuture() {            Object r;            if ((r = result) != null)                return new CompletableFuture<T>(encodeRelay(r));            else {                CompletableFuture<T> d = new CompletableFuture<>();                unipush(new UniRelay<T,T>(d, this));                return d;            }        }    }此方法返回只读的 CompletionStage,可以实现安全发布。可以类比于一个方法返回了ImmutableList,优点是可以防止后续的错误操作,比如其他线程强制写结果(类比List#add)。
有这样一个安全发布的保证至关重要,特别是面临复杂场景时,可以减轻编程负担,放心地使用 CompletableFuture 返回的结果。
我们知道CompletionStage接口定义了链式异步回调相关方法, CompletableFuture 相关读写操作(join, complete,  obtrudeValue 等),不在CompletionStage的定义下,可以通过方法toCompletableFuture进行中转。
class InventoryServiceDemo {    final Executor executor = Executors.newCachedThreadPool();    // 获取库存信息          // 方法可以返回 CompletionStage,相比于CompletableFuture更安全    CompletionStage<Integer> getInventoryAsync(String productId) {        return CompletableFuture.supplyAsync(() -> {                    try {                        Thread.sleep(Duration.ofSeconds(2));                    } catch (InterruptedException e) {                        throw new RuntimeException(e);                    }                    return 42;                }, executor)                .minimalCompletionStage();    }}
笔者曾在《深入理解 Future, CompletableFuture, ListenableFuture,回调机制》一文中质疑 CompletionStage#toCompletableFuture 方法破坏了接口设计的相关原则,CompletableFuture#minimalCompletionStage 不如直接使用接口。

  • CompletionStage#toCompletableFuture 表明 CompletableFuture 可以作为 CompletionStage 的默认实现,实现两者类型的快速转换。很多情况下,两者的区别不大。
  • CompletableFuture#minimalCompletionStage 底层思想是提供只读功能。
  • 这里的实现有点取巧,minimalStage虽然继承自CompleatableFuture,而实际上只是实现了CompletionStage 接口方法。minimalStage 不应该是 CompletableFuture的示例,更好的实现方式是使用委托方式(组合)。
  • 如果子类进行类型检查,使用 instanceOf 或者新版JDK支持的模式匹配,minimalStage 需要进行额外的处理。
  • CFFU 的实现对于minimalStage做了兼容处理,你可以放心使用。
5. 异步写操作

public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier) {    return completeAsync(supplier, defaultExecutor());}public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier,                                          Executor executor) {    if (supplier == null || executor == null)        throw new NullPointerException();    executor.execute(new AsyncSupply<T>(this, supplier));    return this;}这两个方法很容易理解,适用于子类返回自己的实现。CompletableFuture#supplyAsync 工厂方法返回的实例是 ComplteableFuture 无法更改。子类可以自己定义自己的工厂方法;用户也可以使用这里的异步写方法。
6. CFFU的优化拓展

开源项目CFFU(功夫未来)对于CompletableFuture 进行了拓展,以下仅列举和Java9新增功能相关的拓展点:

  • 可以在对象创建时指定执行器,后续操作无需重复指定
  • Java8 版本下可以使用后续版本的功能
  • 提供了相关安全发布功能,比如安全的超时功能
  • 内部的很多便利方法应用了保护性复制方法
  • CffuFactoryBuilder 支持以参数形式配置是否允许强制重写结果
7. 实践原则


  • 异步方法可以返回 CompletionStage,其相比于CompletableFuture更为安全,同时可以提示用户不要使用阻塞相关方法
  • 就像推荐使用不可变对象一样,推荐默认使用 CompletableFuture#minimalCompletionStage 方法
  • 无论是对于方法入参还是返回结果,使用接口一般都优于实体类
  • 理想的 Future 设计应该是读写分离的,不可变的。虽然 CompletableFuture 并没有完全遵循以上原则,但是我们在使用时应当注意遵循。
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

341

主题

0

回帖

1033

积分

金牌会员

积分
1033

QQ|智能设备 | 粤ICP备2024353841号-1

GMT+8, 2025-3-10 15:12 , Processed in 2.461452 second(s), 27 queries .

Powered by 智能设备

©2025

|网站地图