本文未经允许禁止转载。
JDK9 中对于CompletableFuture做了新的增强,除了超时功能(orTimeout),还有面向继承、安全发布等相关方法。本文中,我们将详细分析各个新增方法,同时说明其安全发布的重要性,最后提出相关的实践原则。
public CompletableFuture newIncompleteFuture() {
return new CompletableFuture();
}
这是一个面向继承的方法,Java支持方法返回值协变,子类可以返回自己的实现。如果你不需要继承 CompletableFuture,对于使用来说,这个方法没有新增新的功能。
public Executor defaultExecutor() {
return ASYNC_POOL;
}
子类可以指定默认执行器,不过需要注意调用不显式指定执行器的async相关方法时,其执行回调所在的线程有两种可能:当前调用线程或者子类指定的执行器线程。如果使用 CompletableFuture,推荐显式指定执行器;如果使用 CFFU,无需每次都显式指定执行器。
public CompletableFuture copy() {
return uniCopyStage(this);
}
此方法等同于调用 cf.thenApply(x -> x)
, 目的是实现保护性复制,避免后续操作对于原实例的修改。
// 返回结果仅支持 CompletionStage 操作
public CompletionStage minimalCompletionStage() {
return uniAsMinimalStage();
}
private MinimalStage uniAsMinimalStage() {
Object r;
// 性能优化1:若已有结果,当前线程下直接返回stage
if ((r = result) != null)
return new MinimalStage(encodeRelay(r));
// 性能优化2:若还未有结果,注册回调
MinimalStage d = new MinimalStage();
unipush(new UniRelay(d, this));
return d;
}
// 实际实现,CompletableFuture 独有方法均不支持
static final class MinimalStage extends CompletableFuture {
MinimalStage() { }
MinimalStage(Object r) { super(r); }
@Override public CompletableFuture newIncompleteFuture() {
return new MinimalStage(); }
@Override public T get() {
throw new UnsupportedOperationException(); }
@Override public T get(long timeout, TimeUnit unit) {
throw new UnsupportedOperationException(); }
@Override public T getNow(T valueIfAbsent) {
throw new UnsupportedOperationException(); }
@Override public T join() {
throw new UnsupportedOperationException(); }
@Override public T resultNow() {
throw new UnsupportedOperationException(); }
@Override public Throwable exceptionNow() {
throw new UnsupportedOperationException(); }
@Override public boolean complete(T value) {
throw new UnsupportedOperationException(); }
@Override public boolean completeExceptionally(Throwable ex) {
throw new UnsupportedOperationException(); }
@Override public boolean cancel(boolean mayInterruptIfRunning) {
throw new UnsupportedOperationException(); }
@Override public void obtrudeValue(T value) {
throw new UnsupportedOperationException(); }
@Override public void obtrudeException(Throwable ex) {
throw new UnsupportedOperationException(); }
@Override public boolean isDone() {
throw new UnsupportedOperationException(); }
@Override public boolean isCancelled() {
throw new UnsupportedOperationException(); }
@Override public boolean isCompletedExceptionally() {
throw new UnsupportedOperationException(); }
@Override public State state() {
throw new UnsupportedOperationException(); }
@Override public int getNumberOfDependents() {
throw new UnsupportedOperationException(); }
@Override public CompletableFuture completeAsync
(Supplier extends T> supplier, Executor executor) {
throw new UnsupportedOperationException(); }
@Override public CompletableFuture completeAsync
(Supplier extends T> supplier) {
throw new UnsupportedOperationException(); }
@Override public CompletableFuture orTimeout
(long timeout, TimeUnit unit) {
throw new UnsupportedOperationException(); }
@Override public CompletableFuture completeOnTimeout
(T value, long timeout, TimeUnit unit) {
throw new UnsupportedOperationException(); }
// 返回新的CompletableFuture, 等效于thenApply(x -> x)
@Override public CompletableFuture toCompletableFuture() {
Object r;
if ((r = result) != null)
return new CompletableFuture(encodeRelay(r));
else {
CompletableFuture d = new CompletableFuture();
unipush(new UniRelay(d, this));
return d;
}
}
}
此方法返回只读的 CompletionStage,可以实现安全发布。可以类比于一个方法返回了ImmutableList,优点是可以防止后续的错误操作,比如其他线程强制写结果(类比List#add)。
有这样一个安全发布的保证至关重要,特别是面临复杂场景时,可以减轻编程负担,放心地使用 CompletableFuture 返回的结果。
我们知道CompletionStage接口定义了链式异步回调相关方法, CompletableFuture 相关读写操作(join, complete, obtrudeValue 等),不在CompletionStage的定义下,可以通过方法toCompletableFuture进行中转。
class InventoryServiceDemo {
final Executor executor = Executors.newCachedThreadPool();
// 获取库存信息
// 方法可以返回 CompletionStage,相比于CompletableFuture更安全
CompletionStage getInventoryAsync(String productId) {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(Duration.ofSeconds(2));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 42;
}, executor)
.minimalCompletionStage();
}
}
笔者曾在《深入理解 Future, CompletableFuture, ListenableFuture,回调机制》一文中质疑 CompletionStage#toCompletableFuture 方法破坏了接口设计的相关原则,CompletableFuture#minimalCompletionStage 不如直接使用接口。
- CompletionStage#toCompletableFuture 表明 CompletableFuture 可以作为 CompletionStage 的默认实现,实现两者类型的快速转换。很多情况下,两者的区别不大。
- CompletableFuture#minimalCompletionStage 底层思想是提供只读功能。
- 这里的实现有点取巧,minimalStage虽然继承自CompleatableFuture,而实际上只是实现了CompletionStage 接口方法。minimalStage 不应该是 CompletableFuture的示例,更好的实现方式是使用委托方式(组合)。
- 如果子类进行类型检查,使用 instanceOf 或者新版JDK支持的模式匹配,minimalStage 需要进行额外的处理。
- CFFU 的实现对于minimalStage做了兼容处理,你可以放心使用。
public CompletableFuture completeAsync(Supplier extends T> supplier) {
return completeAsync(supplier, defaultExecutor());
}
public CompletableFuture completeAsync(Supplier extends T> supplier,
Executor executor) {
if (supplier == null || executor == null)
throw new NullPointerException();
executor.execute(new AsyncSupply(this, supplier));
return this;
}
这两个方法很容易理解,适用于子类返回自己的实现。CompletableFuture#supplyAsync 工厂方法返回的实例是 ComplteableFuture 无法更改。子类可以自己定义自己的工厂方法;用户也可以使用这里的异步写方法。
开源项目CFFU(功夫未来)对于CompletableFuture 进行了拓展,以下仅列举和Java9新增功能相关的拓展点:
参与评论
手机查看
返回顶部