|
CompletableFuture 超时功能有大坑!使用不当直接生产事故!
本文未经允许禁止转载!
上一篇文章《如何实现超时功能(以CompletableFuture为例)》中我们讨论了 CompletableFuture 超时功能的具体实现,从整体实现来说,JDK21前的版本有着内存泄露的bug,不过很少对实际生产有影响,因为任务的编排涉及的对象并不多,少量内存泄露最终会被回收掉。从单一功能内聚的角度来说,超时功能的实现是没有问题;然而由于并发编程的复杂性,可能会出现 Delayer 线程延迟执行的情况。本文将详细复现与讨论 CompletableFuture 超时功能的大坑,同时提供一些最佳实践指导。
2024年9月8日更新:CFFU 开源项目负责人李鼎(Jerry Lee) 更新了代码示例,点击这里查看。
1. 问题复现
感谢 CFFU 开源项目负责人李鼎(Jerry Lee) 提供代码:
public class CfDelayDysfunctionDemo { public static void main(String[] args) { dysfunctionDemo(); System.out.println(); cffuOrTimeoutFixDysfunctionDemo(); } private static void dysfunctionDemo() { logWithTimeAndThread("dysfunctionDemo begin"); final long tick = System.currentTimeMillis(); final List<CompletableFuture<?>> sequentCfs = new ArrayList<>(); CompletableFuture<Integer> incomplete = new CompletableFuture<>(); CompletableFuture<?> cf = incomplete.orTimeout(100, TimeUnit.MILLISECONDS) .handle((v, ex) -> { logWithTimeAndThread("[1] timout"); sleep(1000); return null; }); sequentCfs.add(cf); cf = incomplete.orTimeout(100, TimeUnit.MILLISECONDS) .handle((v, ex) -> { logWithTimeAndThread("[2] timout"); sleep(1000); return null; }); sequentCfs.add(cf); cf = incomplete.orTimeout(100, TimeUnit.MILLISECONDS) .handle((v, ex) -> { logWithTimeAndThread("[3] timout"); sleep(1000); return null; }); sequentCfs.add(cf); CompletableFuture.allOf(sequentCfs.toArray(CompletableFuture[]::new)).join(); logWithTimeAndThread("dysfunctionDemo end in " + (System.currentTimeMillis() - tick) + "ms"); } private static void cffuOrTimeoutFixDysfunctionDemo() { logWithTimeAndThread("cffuOrTimeoutFixDysfunctionDemo begin"); final long tick = System.currentTimeMillis(); final List<CompletableFuture<?>> sequentCfs = new ArrayList<>(); CompletableFuture<Integer> incomplete = new CompletableFuture<>(); CompletableFuture<?> cf = CompletableFutureUtils.cffuOrTimeout(incomplete, 100, TimeUnit.MILLISECONDS) .handle((v, ex) -> { logWithTimeAndThread("[1] timout"); sleep(1000); return null; }); sequentCfs.add(cf); cf = CompletableFutureUtils.cffuOrTimeout(incomplete, 100, TimeUnit.MILLISECONDS) .handle((v, ex) -> { logWithTimeAndThread("[2] timout"); sleep(1000); return null; }); sequentCfs.add(cf); cf = CompletableFutureUtils.cffuOrTimeout(incomplete, 100, TimeUnit.MILLISECONDS) .handle((v, ex) -> { logWithTimeAndThread("[3] timout"); sleep(1000); return null; }); sequentCfs.add(cf); CompletableFuture.allOf(sequentCfs.toArray(CompletableFuture[]::new)).join(); logWithTimeAndThread("cffuOrTimeoutFixDysfunctionDemo end in " + (System.currentTimeMillis() - tick) + "ms"); } private static void logWithTimeAndThread(String msg) { System.out.printf("%tF %<tT.%<tL [%s] %s%n", System.currentTimeMillis(), Thread.currentThread().getName(), msg); }}执行结果如下:
代码思路是这样的:有3个运行1秒的任务,在超时之后运行,不切线程池(都在 Delayer 线程运行),运行了3秒,不能在设置100ms的超时后运行,因为单线程排队了。handle 方法传入的回调函数在 Delayer 线程中执行了。
示例代码中解决超时线程延迟执行的方法是使用CFFU提供的安全 timeout 方法,本文后面会分析相关源码。
2. 问题分析
为什么handle方法里的回调会在 CompletableFutureDelayScheduler 中执行?
// 这里的代码逐步深入到调用栈内部public <U> CompletableFuture<U> handle( BiFunction<? super T, Throwable, ? extends U> fn) { return uniHandleStage(null, fn);}private <V> CompletableFuture<V> uniHandleStage( Executor e, BiFunction<? super T, Throwable, ? extends V> f) { if (f == null) throw new NullPointerException(); CompletableFuture<V> d = newIncompleteFuture(); Object r; if ((r = result) == null) // 加入回调栈中后续再执行 unipush(new UniHandle<T,V>(e, d, this, f)); else if (e == null) // 有结果,直接执行 d.uniHandle(r, f, null); else { try { e.execute(new UniHandle<T,V>(null, d, this, f)); } catch (Throwable ex) { d.result = encodeThrowable(ex); } } return d;}final <S> boolean uniHandle(Object r, BiFunction<? super S, Throwable, ? extends T> f, UniHandle<S,T> c) { S s; Throwable x; if (result == null) { try { // 此次调用中 c 为空,无需关注UniHandle,甚至不需要知道UniHandle的具体职责 if (c != null && !c.claim()) return false; if (r instanceof AltResult) { x = ((AltResult)r).ex; s = null; } else { x = null; @SuppressWarnings("unchecked") S ss = (S) r; s = ss; } // 执行回调 completeValue(f.apply(s, x)); } catch (Throwable ex) { completeThrowable(ex); } } return true; }我们把出现问题的原因简单总结一下:
CompletionStage 中不带 async 的方法可能会在不同的线程中执行。一般情况下,如果CF的结果已经计算出来,后续的回调在调用线程中执行,如果结果没有计算出来,后续的回调在上一步计算的线程中执行。
以下是一个简化的代码示例:
@Slf4jpublic class TimeoutBugDemo { public static void main(String[] args) { new CompletableFuture<Integer>() .orTimeout(1, TimeUnit.SECONDS) .handle((v, ex) -> { log.info("v: {}", v, ex); return -1; }).join(); }}handle 方法传入的回调方法会在delayer线程中执行,从执行日志看也确实如此:
Task :TimeoutBugDemo.main()11:58:53.465 [CompletableFutureDelayScheduler] INFO com.example.demo.cftimeout.TimeoutBugDemo -- v: nulljava.util.concurrent.TimeoutException: nullat java.base/java.util.concurrent.CompletableFuture$Timeout.run(CompletableFuture.java:2920)at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)at java.base/java.lang.Thread.run(Thread.java:1583)3. CFFU 是如何解决线程传导的?
// CFFU 代码实现public static <C extends CompletableFuture<?>> C cffuOrTimeout( C cfThis, Executor executorWhenTimeout, long timeout, TimeUnit unit) { requireNonNull(cfThis, "cfThis is null"); requireNonNull(executorWhenTimeout, "executorWhenTimeout is null"); requireNonNull(unit, "unit is null"); return hopExecutorIfAtCfDelayerThread(orTimeout(cfThis, timeout, unit), executorWhenTimeout);}// 核心实现代码private static <C extends CompletableFuture<?>> C hopExecutorIfAtCfDelayerThread(C cf, Executor executor) { CompletableFuture<Object> ret = newIncompleteFuture(cf); // use `cf.handle` method(instead of `cf.whenComplete`) and return null in order to // prevent reporting the handled exception argument of this `action` at subsequent `exceptionally` cf.handle((v, ex) -> { if (!atCfDelayerThread()) completeCf(ret, v, ex); // 使用 executor 后,CF的后续回调操作就不会在Dalayer 线程中执行了 else executor.execute(() -> completeCf(ret, v, ex)); return null; }).exceptionally(ex -> reportUncaughtException("handle of executor hop", ex)); return (C) ret;}private static void completeCf(CompletableFuture<Object> cf, Object value, @Nullable Throwable ex) { try { // 写入到新CF中 if (ex == null) cf.complete(value); else cf.completeExceptionally(ex); } catch (Throwable t) { if (ex != null) t.addSuppressed(ex); reportUncaughtException("completeCf", t); throw t; // rethrow exception, report to caller }}基本思路将结果写入到新的 CompletableFuture 中,为了避免后续回调使用 Delayer 线程,改用新增的线程,保证线程传导的安全性。
提示:有时我们需要关注链式调用返回的是新值还是原有对象,比如 CompletableFuture#orTimeout 返回的是当前对象this, CFFU中返回的是新的 CompletableFuture。
4. 最佳实践的启示
|
|