CompletableFuture 是 Java 8 在 java.util.concurrent 中新增的非同步處理類別
該類別主要有兩種執行非同步工作的方法:runAsync, supplyAsync,差別是有沒有需要回傳結果,在沒有 executor 的狀況下,會使用預設的 thread pool
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
java sample
private static void test1() {
try {
CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync START");
System.out.println("say supplyAsync Hello World");
return "supplyAsync Hello World";
});
System.out.println(hello.get());
CompletableFuture<Void> world = CompletableFuture.runAsync(() -> {
System.out.println("runAsync START");
System.out.println("say runAsync Hello World");
return ;
});
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
// console 結果
// supplyAsync START
// say supplyAsync Hello World
// supplyAsync Hello World
// runAsync START
// say runAsync Hello World
}
用 Executors 產生自訂的 thread pool
ExecutorService threadPool = Executors.newFixedThreadPool(10);
CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> System.out.println("runAsync"), threadPool);
CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> "supplyAsync", threadPool);
java sample
處理 exception
可在工作處理完成後,取得結果,在處理過程中,遇到 exception 時,可以攔截 exception 並執行特定的工作
相關的 method 是
CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
// catch exception
CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
// use your own thread pool
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
sample code:
// 在工作處理完成後,取得結果,在處理過程中,遇到 exception 時,可以攔截 exception 並執行特定的工作
private static void test3() {
ExecutorService threadPool = Executors.newFixedThreadPool(10);
CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {
String doSomething = "Hello World";
return doSomething;
}, threadPool);
supplyAsync.whenCompleteAsync((result, ex) -> {
System.out.println("---supplyAsync---");
System.out.println("supplyAsync result: " + result);
System.out.println("exception: " + ex);
}, threadPool).exceptionally(ex -> {
System.out.println("exceptionally: " + ex.getMessage());
return ex.getMessage();
}).join();
CompletableFuture<String> supplyAsyncException = CompletableFuture.supplyAsync(() -> {
throw new CompletionException(new Exception("throw exception"));
}, threadPool);
supplyAsyncException.whenCompleteAsync((result, ex) -> {
System.out.println("---supplyAsyncException---");
System.out.println("supplyAsyncException result: " + result);
System.out.println("exception: " + ex);
}, threadPool).exceptionally(ex -> {
System.out.println("exceptionally: " + ex.getMessage());
return ex.getMessage();
}).join();
// 執行結果:
// ---supplyAsync---
// supplyAsync result: Hello World
// exception: null
// ---supplyAsyncException---
// supplyAsyncException result: null
// exception: java.util.concurrent.CompletionException: java.lang.Exception: throw exception
// exceptionally: java.lang.Exception: throw exception
threadPool.shutdown();
}
也可以改用 handle 處理 exception,差別是 handle 可以有回傳值
CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
// use your own thread pool
CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
sample code:
private static void test4() {
ExecutorService threadPool = Executors.newFixedThreadPool(2);
CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {
throw new CompletionException(new Exception("throw exception"));
}, threadPool);
String res = supplyAsync.handle((result, ex) -> (null != ex) ? ex.getMessage() : result).join();
System.out.println("res: " + res);
// 結果
// res: java.lang.Exception: throw exception
threadPool.shutdown();
}
資料轉換
可使用 thenApply 進行資料轉換,還有 thenCompose,不需要回傳值的 thenAccept
CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn)
CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn)
// use your own thread pool
CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor)
//-----------
CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)
//-----------
CompletableFuture<Void> thenAccept(Consumer<? super T> action)
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
// use your own thread pool
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
"兩個"獨立 CompletableFuture 要整合結果
當有"兩個"獨立 CompletableFuture 要整合結果時,可使用
thenCombine 可將兩個獨立的 CompletableFuture 執行結果整合在一起
thenAcceptBoth 類似 thenCombine,但不需要回傳值
//-----------
CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn)
CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn)
// use your own thread pool
CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn, Executor executor)
///////
CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
// use your own thread pool
CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)
java sample
// 將 supplyAsync1 和 supplyAsync2 結果整合
public static void test5() {
ExecutorService threadPool = Executors.newFixedThreadPool(2);
CompletableFuture<String> supplyAsync1 = CompletableFuture.supplyAsync(() -> "supplyAsync 1", threadPool);
CompletableFuture<String> supplyAsync2 = CompletableFuture.supplyAsync(() -> "supplyAsync 2", threadPool);
String ans = supplyAsync1.thenCombine(supplyAsync2, (result1, result2) -> result1 + ", " + result2).join();
System.out.println("ans: " + ans);
threadPool.shutdown();
// ans: supplyAsync 1, supplyAsync 2
//-----------
// 不需要回傳值
ExecutorService threadPool2 = Executors.newFixedThreadPool(10);
CompletableFuture<String> supplyAsync21 = CompletableFuture.supplyAsync(() -> "supplyAsync 1", threadPool2);
CompletableFuture<String> supplyAsync22 = CompletableFuture.supplyAsync(() -> "supplyAsync 2", threadPool2);
supplyAsync21.thenAcceptBothAsync(supplyAsync22, (result1, result2) -> System.out.println(result1 + ", " + result2), threadPool2).join();
threadPool2.shutdown();
// supplyAsync 1, supplyAsync 2
}
多個 CompletableFuture
如果超過兩個以上的 CompletableFuture,可使用 allOf 等待所有的 CompletableFuture 結果,串接一個 thenRun
。
public static void test6() {
ExecutorService threadPool = Executors.newFixedThreadPool(10);
CompletableFuture<String> supplyAsync1 = CompletableFuture.supplyAsync(() -> "supplyAsync 1", threadPool);
CompletableFuture<String> supplyAsync2 = CompletableFuture.supplyAsync(() -> "supplyAsync 2", threadPool);
CompletableFuture<String> supplyAsync3 = CompletableFuture.supplyAsync(() -> "supplyAsync 3", threadPool);
CompletableFuture.allOf(supplyAsync1, supplyAsync2, supplyAsync3).thenRun(() -> {
try {
StringBuffer ans = new StringBuffer();
ans.append(supplyAsync1.get()).append(", ")
.append(supplyAsync2.get()).append(", ")
.append(supplyAsync3.get());
System.out.println("ans: " + ans.toString());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}).join();
threadPool.shutdown();
// ans: supplyAsync 1, supplyAsync 2, supplyAsync 3
}
只想要其中一個有完成就往下執行,可使用 applyToEither,沒有回傳值的 acceptEither,用在多個 CompletableFuture 的anyOf
CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)
// use your own thread pool
CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)
///////
CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
// use your own thread pool
CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
///////
CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
References
Guide To CompletableFuture | Baeldung
Java 9 CompletableFuture API Improvements | Baeldung
JDK8 - CompletableFuture 非同步處理簡介
CompletableFuture
【Java 8 新特性】Java CompletableFuture thenApply()_.thenapply_猫巳的博客-CSDN博客
【JDK8】CompletableFuture 非同步處理
關於 Java CompletableFuture 的用法 | HengLin31