成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

使用CompletableFuture怎么實(shí)現(xiàn)并發(fā)編程

今天就跟大家聊聊有關(guān)使用CompletableFuture怎么實(shí)現(xiàn)并發(fā)編程,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。

成都創(chuàng)新互聯(lián)主營施甸網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營網(wǎng)站建設(shè)方案,手機(jī)APP定制開發(fā),施甸h5重慶小程序開發(fā)搭建,施甸網(wǎng)站營銷推廣歡迎施甸等地區(qū)企業(yè)咨詢

實(shí)例化 首先,不管我們要做什么,我們第一步是需要構(gòu)造出 CompletableFuture 實(shí)例。

最簡單的,我們可以通過構(gòu)造函數(shù)來進(jìn)行實(shí)例化:

CompletableFuture<String> cf = new CompletableFuture<String>(); 這個(gè)實(shí)例此時(shí)還沒有什么用,因?yàn)樗鼪]有實(shí)際的任務(wù),我們選擇結(jié)束這個(gè)任務(wù):

// 可以選擇在當(dāng)前線程結(jié)束,也可以在其他線程結(jié)束 cf.complete("coding..."); 因?yàn)?CompletableFuture 是一個(gè) Future,我們用 String result = cf.get() 就能獲取到結(jié)果了。

CompletableFuture 提供了 join() 方法,它的功能和 get() 方法是一樣的,都是阻塞獲取值,它們的區(qū)別在于 join() 拋出的是 unchecked Exception。

上面的代碼確實(shí)沒什么用,下面介紹幾個(gè) static 方法,它們使用任務(wù)來實(shí)例化一個(gè) CompletableFuture 實(shí)例。

CompletableFuture.runAsync(Runnable runnable); CompletableFuture.runAsync(Runnable runnable, Executor executor);

CompletableFuture.supplyAsync(Supplier<U> supplier); CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor) runAsync 方法接收的是 Runnable 的實(shí)例,意味著它沒有返回值 supplyAsync 方法對應(yīng)的是有返回值的情況 這兩個(gè)方法的帶 executor 的變種,表示讓任務(wù)在指定的線程池中執(zhí)行,不指定的話,通常任務(wù)是在 ForkJoinPool.commonPool() 線程池中執(zhí)行的。 好的,現(xiàn)在我們已經(jīng)有了第一個(gè) CompletableFuture 實(shí)例了,我們來看接下來的內(nèi)容。

任務(wù)之間的順序執(zhí)行 我們先來看執(zhí)行兩個(gè)任務(wù)的情況,首先執(zhí)行任務(wù) A,然后將任務(wù) A 的結(jié)果傳遞給任務(wù) B。

其實(shí)這里有很多種情況,任務(wù) A 是否有返回值,任務(wù) B 是否需要任務(wù) A 的返回值,任務(wù) B 是否有返回值,等等。有個(gè)明確的就是,肯定是任務(wù) A 執(zhí)行完后再執(zhí)行任務(wù) B。

我們用下面的 6 行代碼來說:

CompletableFuture.runAsync(() -> {}).thenRun(() -> {}); CompletableFuture.runAsync(() -> {}).thenAccept(resultA -> {}); CompletableFuture.runAsync(() -> {}).thenApply(resultA -> "resultB");

CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}); CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {}); CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB"); 前面 3 行代碼演示的是,任務(wù) A 無返回值,所以對應(yīng)的,第 2 行和第 3 行代碼中,resultA 其實(shí)是 null。

第 4 行用的是 thenRun(Runnable runnable),任務(wù) A 執(zhí)行完執(zhí)行 B,并且 B 不需要 A 的結(jié)果。

第 5 行用的是 thenAccept(Consumer action),任務(wù) A 執(zhí)行完執(zhí)行 B,B 需要 A 的結(jié)果,但是任務(wù) B 不返回值。

第 6 行用的是 thenApply(Function fn),任務(wù) A 執(zhí)行完執(zhí)行 B,B 需要 A 的結(jié)果,同時(shí)任務(wù) B 有返回值。

這一小節(jié)說完了,如果任務(wù) B 后面還有任務(wù) C,往下繼續(xù)調(diào)用 .thenXxx() 即可。

異常處理 說到這里,我們順便來說下 CompletableFuture 的異常處理。這里我們要介紹兩個(gè)方法:

public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn); public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn); 看下面的代碼:

CompletableFuture.supplyAsync(() -> "resultA") .thenApply(resultA -> resultA + " resultB") .thenApply(resultB -> resultB + " resultC") .thenApply(resultC -> resultC + " resultD"); 上面的代碼中,任務(wù) A、B、C、D 依次執(zhí)行,如果任務(wù) A 拋出異常(當(dāng)然上面的代碼不會(huì)拋出異常),那么后面的任務(wù)都得不到執(zhí)行。如果任務(wù) C 拋出異常,那么任務(wù) D 得不到執(zhí)行。

那么我們怎么處理異常呢?看下面的代碼,我們在任務(wù) A 中拋出異常,并對其進(jìn)行處理:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { throw new RuntimeException(); }) .exceptionally(ex -> "errorResultA") .thenApply(resultA -> resultA + " resultB") .thenApply(resultB -> resultB + " resultC") .thenApply(resultC -> resultC + " resultD");

System.out.println(future.join()); 上面的代碼中,任務(wù) A 拋出異常,然后通過 .exceptionally() 方法處理了異常,并返回新的結(jié)果,這個(gè)新的結(jié)果將傳遞給任務(wù) B。所以最終的輸出結(jié)果是:

errorResultA resultB resultC resultD 再看下面的代碼,我們來看下另一種處理方式,使用 handle(BiFunction fn) 來處理異常:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "resultA") .thenApply(resultA -> resultA + " resultB") // 任務(wù) C 拋出異常 .thenApply(resultB -> {throw new RuntimeException();}) // 處理任務(wù) C 的返回值或異常 .handle(new BiFunction<Object, Throwable, Object>() { @Override public Object apply(Object re, Throwable throwable) { if (throwable != null) { return "errorResultC"; } return re; } }) .thenApply(resultC -> resultC + " resultD");

System.out.println(future.join()); 上面的代碼使用了 handle 方法來處理任務(wù) C 的執(zhí)行結(jié)果,上面的代碼中,re 和 throwable 必然有一個(gè)是 null,它們分別代表正常的執(zhí)行結(jié)果和異常的情況。

當(dāng)然,它們也可以都為 null,因?yàn)槿绻饔玫哪莻€(gè) CompletableFuture 實(shí)例沒有返回值的時(shí)候,re 就是 null。

取兩個(gè)任務(wù)的結(jié)果 上面一節(jié),我們說的是,任務(wù) A 執(zhí)行完 -> 任務(wù) B 執(zhí)行完 -> 執(zhí)行任務(wù) C,它們之間有先后執(zhí)行關(guān)系,因?yàn)楹竺娴娜蝿?wù)依賴于前面的任務(wù)的結(jié)果。

這節(jié)我們來看怎么讓任務(wù) A 和任務(wù) B 同時(shí)執(zhí)行,然后取它們的結(jié)果進(jìn)行后續(xù)操作。這里強(qiáng)調(diào)的是任務(wù)之間的并行工作,沒有先后執(zhí)行順序。

如果使用 Future 的話,我們通常是這么寫的:

ExecutorService executorService = Executors.newCachedThreadPool();

Future<String> futureA = executorService.submit(() -> "resultA"); Future<String> futureB = executorService.submit(() -> "resultB");

String resultA = futureA.get(); String resultB = futureB.get(); 接下來,我們看看 CompletableFuture 中是怎么寫的,看下面的幾行代碼:

CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> "resultA"); CompletableFuture<String> cfB = CompletableFuture.supplyAsync(() -> "resultB");

cfA.thenAcceptBoth(cfB, (resultA, resultB) -> {}); cfA.thenCombine(cfB, (resultA, resultB) -> "result A + B"); cfA.runAfterBoth(cfB, () -> {}); 第 3 行代碼和第 4 行代碼演示了怎么使用兩個(gè)任務(wù)的結(jié)果 resultA 和 resultB,它們的區(qū)別在于,thenAcceptBoth 表示后續(xù)的處理不需要返回值,而 thenCombine 表示需要返回值。

如果你不需要 resultA 和 resultB,那么還可以使用第 5 行描述的 runAfterBoth 方法。

注意,上面的寫法和下面的寫法是沒有區(qū)別的:

CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> "resultA");

cfA.thenAcceptBoth(CompletableFuture.supplyAsync(() -> "resultB"), (resultA, resultB) -> {}); 千萬不要以為這種寫法任務(wù) A 執(zhí)行完了以后再執(zhí)行任務(wù) B。

取多個(gè)任務(wù)的結(jié)果 接下來,我們將介紹兩個(gè)非常簡單的靜態(tài)方法:allOf() 和 anyOf() 方法。

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs){...} public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {...} 這兩個(gè)方法都非常簡單,簡單介紹一下。

CompletableFuture cfA = CompletableFuture.supplyAsync(() -> "resultA"); CompletableFuture cfB = CompletableFuture.supplyAsync(() -> 123); CompletableFuture cfC = CompletableFuture.supplyAsync(() -> "resultC");

CompletableFuture<Void> future = CompletableFuture.allOf(cfA, cfB, cfC); // 所以這里的 join() 將阻塞,直到所有的任務(wù)執(zhí)行結(jié)束 future.join(); 由于 allOf 聚合了多個(gè) CompletableFuture 實(shí)例,所以它是沒有返回值的。這也是它的一個(gè)缺點(diǎn)。

anyOf 也非常容易理解,就是只要有任意一個(gè) CompletableFuture 實(shí)例執(zhí)行完成就可以了,看下面的例子:

CompletableFuture cfA = CompletableFuture.supplyAsync(() -> "resultA"); CompletableFuture cfB = CompletableFuture.supplyAsync(() -> 123); CompletableFuture cfC = CompletableFuture.supplyAsync(() -> "resultC");

CompletableFuture<Object> future = CompletableFuture.anyOf(cfA, cfB, cfC); Object result = future.join(); 最后一行的 join() 方法會(huì)返回最先完成的任務(wù)的結(jié)果,所以它的泛型用的是 Object,因?yàn)槊總€(gè)任務(wù)可能返回的類型不同。

either 方法 如果你的 anyOf(...) 只需要處理兩個(gè) CompletableFuture 實(shí)例,那么也可以使用 xxxEither() 來處理,

cfA.acceptEither(cfB, result -> {}); cfA.acceptEitherAsync(cfB, result -> {}); cfA.acceptEitherAsync(cfB, result -> {}, executorService);

cfA.applyToEither(cfB, result -> {return result;}); cfA.applyToEitherAsync(cfB, result -> {return result;}); cfA.applyToEitherAsync(cfB, result -> {return result;}, executorService);

cfA.runAfterEither(cfA, () -> {}); cfA.runAfterEitherAsync(cfB, () -> {}); cfA.runAfterEitherAsync(cfB, () -> {}, executorService); 上面的各個(gè)帶 either 的方法,表達(dá)的都是一個(gè)意思,指的是兩個(gè)任務(wù)中的其中一個(gè)執(zhí)行完成,就執(zhí)行指定的操作。它們幾組的區(qū)別也很明顯,分別用于表達(dá)是否需要任務(wù) A 和任務(wù) B 的執(zhí)行結(jié)果,是否需要返回值。

大家可能會(huì)對這里的幾個(gè)變種有盲區(qū),這里順便說幾句。

1、cfA.acceptEither(cfB, result -> {}); 和 cfB.acceptEither(cfA, result -> {}); 是一個(gè)意思;

2、第二個(gè)變種,加了 Async 后綴的方法,代表將需要執(zhí)行的任務(wù)放到 ForkJoinPool.commonPool() 中執(zhí)行(非完全嚴(yán)謹(jǐn));第三個(gè)變種很好理解,將任務(wù)放到指定線程池中執(zhí)行;

3、難道第一個(gè)變種是同步的?不是的,而是說,它由任務(wù) A 或任務(wù) B 所在的執(zhí)行線程來執(zhí)行,取決于哪個(gè)任務(wù)先結(jié)束。

compose update on 2019-07-26

這里我們簡單來說說 CompletableFuture 的最后一塊拼圖,compose 方法。

前面我們介紹了 thenAcceptBoth 和 thenCombine 用于聚合兩個(gè)任務(wù),其實(shí) compose 也是一樣的功能,它們本質(zhì)上都是為了讓多個(gè) CompletableFuture 實(shí)例形成一個(gè)鏈。

我們還是用代碼來說吧:

CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> { System.out.println("processing a..."); return "hello"; });

CompletableFuture<String> cfB = CompletableFuture.supplyAsync(() -> { System.out.println("processing b..."); return " world"; });

CompletableFuture<String> cfC = CompletableFuture.supplyAsync(() -> { System.out.println("processing c..."); return ", I'm robot!"; }); 我們示例三個(gè)實(shí)例的情況,這邊不介紹 thenAcceptBoth 了,我們來看下 thenCombine:

cfA.thenCombine(cfB, (resultA, resultB) -> { System.out.println(resultA + resultB); // hello world return resultA + resultB; }).thenCombine(cfC, (resultAB, resultC) -> { System.out.println(resultAB + resultC); // hello world, I'm robot! return resultAB + resultC; }); 我們先有 cfA,然后和 cfB 組成一個(gè)鏈:cfA -> cfB,然后又組合了 cfC,形成鏈:cfA -> cfB -> cfC。

這里有個(gè)隱藏的點(diǎn):cfA、cfB、cfC 它們完全沒有數(shù)據(jù)依賴關(guān)系,我們只不過是聚合了它們的結(jié)果。

這下看 compose 就清楚了:

CompletableFuture<String> result = CompletableFuture.supplyAsync(() -> { // 第一個(gè)實(shí)例的結(jié)果 return "hello"; }).thenCompose(resultA -> CompletableFuture.supplyAsync(() -> { // 把上一個(gè)實(shí)例的結(jié)果傳遞到這里 return resultA + " world"; })).thenCompose(resultAB -> CompletableFuture.supplyAsync(() -> { // 到這里大家應(yīng)該很清楚了 return resultAB + ", I'm robot"; }));

System.out.println(result.join()); // hello world, I'm robot 前面一個(gè) CompletableFuture 實(shí)例的結(jié)果可以傳遞到下一個(gè)實(shí)例中,這就是 compose 和 combine 的主要區(qū)別。

combine 是把結(jié)果進(jìn)行聚合,但是 compose 更像是把多個(gè)已有的 cf 實(shí)例組合成一個(gè)整體的實(shí)例。

thenCompose 和 thenApply 的區(qū)別 評論區(qū)有同學(xué)關(guān)注到了 thenApply 和 thenCompose,這里簡單說說。

我們來看看它們的方法貼到一起對比一下:

public <U> CompletableFuture<U> thenApply( Function<? super T,? extends U> fn) { return uniApplyStage(null, fn); } public <U> CompletableFuture<U> thenCompose( Function<? super T, ? extends CompletionStage<U>> fn) { return uniComposeStage(null, fn); } 使用示例:

CompletableFuture<String> future1 = CompletableFuture .supplyAsync(() -> "hello") .thenApply(cfA -> cfA + " world");

CompletableFuture<String> future2 = CompletableFuture .supplyAsync(() -> "hello") .thenCompose(cfA -> CompletableFuture.supplyAsync(() -> cfA + " world")); 它們都需要接收一個(gè) Function,這個(gè)函數(shù)的主要的區(qū)別在于 thenApply 中返回一個(gè)具體的值,而 thenCompose 返回一個(gè)新的 cf 實(shí)例。

thenApply 類似于 map 操作,把 cf 實(shí)例的結(jié)果加工成另一個(gè)值,像 Stream 里面的 map() 方法。它還有一個(gè)很重要的特征,這里是同步的操作。

如果你希望執(zhí)行一個(gè)異步的 map 操作,那么就應(yīng)該使用 thenCompose 了,比如上面的第二個(gè)例子。

我們來繼續(xù)較真一下,我們可以讓 thenApply 的 Function 也返回 CompletableFuture 實(shí)例,不就實(shí)現(xiàn)了異步的需求:

CompletableFuture<CompletableFuture<String>> future = CompletableFuture .supplyAsync(() -> "hello") .thenApply(cfA -> CompletableFuture.supplyAsync(() -> cfA + " world")); 可是,返回值我們可就不太喜歡了。說到這里,大家可能會(huì)想到 Stream 里面的 flatMap() 了

看完上述內(nèi)容,你們對使用CompletableFuture怎么實(shí)現(xiàn)并發(fā)編程有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝大家的支持。

網(wǎng)頁標(biāo)題:使用CompletableFuture怎么實(shí)現(xiàn)并發(fā)編程
轉(zhuǎn)載源于:http://jinyejixie.com/article4/gpgooe.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站排名、App開發(fā)、網(wǎng)站制作網(wǎng)站策劃、服務(wù)器托管軟件開發(fā)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

搜索引擎優(yōu)化
南漳县| 鄂伦春自治旗| 深泽县| 广宁县| 西城区| 五常市| 蓬莱市| 博爱县| 望奎县| 定兴县| 伊宁县| 油尖旺区| 山西省| 紫阳县| 昭平县| 盐城市| 东乌| 汨罗市| 临泽县| 大庆市| 云南省| 石嘴山市| 吉首市| 新宁县| 余庆县| 班玛县| 泾源县| 渝中区| 思茅市| 靖西县| 黄山市| 乌兰浩特市| 河池市| 长子县| 武山县| 荔波县| 宜阳县| 乌兰浩特市| 诏安县| 三台县| 忻州市|