Java并發(fā)編程必備之Future機(jī)制
Java 5在concurrency包中引入了java.util.concurrent.Callable 接口,它和Runnable接口很相似,但它可以返回一個對象或者拋出一個異常。
Callable接口使用泛型去定義它的返回類型。Executors類提供了一些有用的方法在線程池中執(zhí)行Callable內(nèi)的任務(wù)。由于Callable任務(wù)是并行的,我們必須等待它返回的結(jié)果。而線程是屬于異步計算模型,所以不可能直接從別的線程中得到函數(shù)返回值。
java.util.concurrent.Future對象為我們解決了這個問題。在線程池提交Callable任務(wù)后返回了一個Future對象,使用它可以知道Callable任務(wù)的狀態(tài)和得到Callable返回的執(zhí)行結(jié)果。Future提供了get()方法讓我們可以等待Callable結(jié)束并獲取它的執(zhí)行結(jié)果。
Future的作用當(dāng)做一定運算的時候,運算過程可能比較耗時,有時會去查數(shù)據(jù)庫,或是繁重的計算,比如壓縮、加密等,在這種情況下,如果我們一直在原地等待方法返回,顯然是不明智的,整體程序的運行效率會大大降低。
我們可以把運算的過程放到子線程去執(zhí)行,再通過 Future 去控制子線程執(zhí)行的計算過程,最后獲取到計算結(jié)果。
這樣一來就可以把整個程序的運行效率提高,是一種異步的思想。
同時在JDK 1.8的doc中,對Future的描述如下:
A Future represents the result of an asynchronous computation. Methods are provided to check if the computation is complete, to wait for its completion, and to retrieve the result of the computation.
大概意思就是Future是一個用于異步計算的接口。
舉個例子:
比如去吃早點時,點了包子和涼菜,包子需要等3分鐘,涼菜只需1分鐘,如果是串行的一個執(zhí)行,在吃上早點的時候需要等待4分鐘,但是如果你在準(zhǔn)備包子的時候,可以同時準(zhǔn)備涼菜,這樣只需要等待3分鐘。
Future就是后面這種執(zhí)行模式。
創(chuàng)建Future線程池
class Task implements Callable<String> { public String call() throws Exception { return longTimeCalculation(); } }
ExecutorService executor = Executors.newFixedThreadPool(4); // 定義任務(wù):Callable<String> task = new Task(); // 提交任務(wù)并獲得Future: Future<String> future = executor.submit(task); // 從Future獲取異步執(zhí)行返回的結(jié)果: String result = future.get(); // 可能阻塞
當(dāng)我們提交一個Callable任務(wù)后,我們會同時獲得一個Future對象,然后,我們在主線程某個時刻調(diào)用Future對象的get()方法,就可以獲得異步執(zhí)行的結(jié)果。
在調(diào)用get()時,如果異步任務(wù)已經(jīng)完成,我們就直接獲得結(jié)果。如果異步任務(wù)還沒有完成,那么get()會阻塞,直到任務(wù)完成后才返回結(jié)果
FutureTask
除了用線程池的 submit 方法會返回一個 future 對象之外,同樣還可以用 FutureTask 來獲取 Future 類和任務(wù)的結(jié)果。
我們來看一下 FutureTask 的代碼實現(xiàn):
public class FutureTask<V> implements RunnableFuture<V>{ ...}
可以看到,它實現(xiàn)了一個接口,這個接口叫作 RunnableFuture。
我們再來看一下 RunnableFuture 接口的代碼實現(xiàn):
public interface RunnableFuture<V> extends Runnable, Future<V> { void run();}
既然 RunnableFuture 繼承了 Runnable 接口和 Future 接口,而 FutureTask 又實現(xiàn)了 RunnableFuture 接口,所以 FutureTask 既可以作為 Runnable 被線程執(zhí)行,又可以作為 Future 得到 Callable 的返回值。
典型用法是,把 Callable 實例當(dāng)作 FutureTask 構(gòu)造函數(shù)的參數(shù),生成 FutureTask 的對象,然后把這個對象當(dāng)作一個 Runnable 對象,放到線程池中或另起線程去執(zhí)行,最后還可以通過 FutureTask 獲取任務(wù)執(zhí)行的結(jié)果。
下面我們就用代碼來演示一下:
public class FutureTaskDemo { public static void main(String[] args) {Task task = new Task();FutureTask<Integer> integerFutureTask = new FutureTask<>(task);new Thread(integerFutureTask).start();try { System.out.println('task運行結(jié)果:'+integerFutureTask.get());} catch (InterruptedException e) { e.printStackTrace();} catch (ExecutionException e) { e.printStackTrace();} }}class Task implements Callable<Integer> { @Override public Integer call() throws Exception {System.out.println('子線程正在計算');int sum = 0;for (int i = 0; i < 100; i++) { sum += i;}return sum; }}
在這段代碼中可以看出,首先創(chuàng)建了一個實現(xiàn)了 Callable 接口的 Task,然后把這個 Task 實例傳入到 FutureTask 的構(gòu)造函數(shù)中去,創(chuàng)建了一個 FutureTask 實例,并且把這個實例當(dāng)作一個 Runnable 放到 new Thread() 中去執(zhí)行,最后再用 FutureTask 的 get 得到結(jié)果,并打印出來。
Future常用方法get方法最主要的作用就是獲取任務(wù)執(zhí)行的結(jié)果
我們來看一個代碼示例:
public class FutureTest { public static void main(String[] args) {ExecutorService service = Executors.newFixedThreadPool(10);Future<Integer> future = service.submit(new CallableTask());try { System.out.println(future.get());} catch (InterruptedException e) { e.printStackTrace();} catch (ExecutionException e) { e.printStackTrace();}service.shutdown(); } static class CallableTask implements Callable<Integer> {@Overridepublic Integer call() throws Exception { Thread.sleep(3000); return new Random().nextInt();} }}
在這段代碼中,main 方法新建了一個 10 個線程的線程池,并且用 submit 方法把一個任務(wù)提交進(jìn)去。
這個任務(wù)它所做的內(nèi)容就是先休眠三秒鐘,然后返回一個隨機(jī)數(shù)。
接下來我們就直接把future.get結(jié)果打印出來,其結(jié)果是正常打印出一個隨機(jī)數(shù),比如 9527 等。
isDone()方法該方法是用來判斷當(dāng)前這個任務(wù)是否執(zhí)行完畢了。
需要注意的是,這個方法如果返回 true 則代表執(zhí)行完成了;如果返回 false 則代表還沒完成。
但這里如果返回 true,并不代表這個任務(wù)是成功執(zhí)行的,比如說任務(wù)執(zhí)行到一半拋出了異常。那么在這種情況下,對于這個 isDone 方法而言,它其實也是會返回 true 的,因為對它來說,雖然有異常發(fā)生了,但是這個任務(wù)在未來也不會再被執(zhí)行,它確實已經(jīng)執(zhí)行完畢了。
所以 isDone 方法在返回 true 的時候,不代表這個任務(wù)是成功執(zhí)行的,只代表它執(zhí)行完畢了。
我們用一個代碼示例來看一看,代碼如下所示:
public class GetException { public static void main(String[] args) {ExecutorService service = Executors.newFixedThreadPool(20);Future<Integer> future = service.submit(new CallableTask());try { for (int i = 0; i < 5; i++) {System.out.println(i);Thread.sleep(500); } System.out.println(future.isDone()); future.get();} catch (InterruptedException e) { e.printStackTrace();} catch (ExecutionException e) { e.printStackTrace();} } static class CallableTask implements Callable<Integer> {@Overridepublic Integer call() throws Exception { throw new IllegalArgumentException('Callable拋出異常');} }}
在這段代碼中,可以看到有一個線程池,并且往線程池中去提交任務(wù),這個任務(wù)會直接拋出一個異常。
那么接下來我們就用一個 for 循環(huán)去休眠,同時讓它慢慢打印出 0 ~ 4 這 5 個數(shù)字,這樣做的目的是起到了一定的延遲作用。
在這個執(zhí)行完畢之后,再去調(diào)用 isDone() 方法,并且把這個結(jié)果打印出來,然后再去調(diào)用 future.get()
cancel方法如果不想執(zhí)行某個任務(wù)了,則可以使用 cancel 方法,會有以下三種情況:
第一種情況最簡單,那就是當(dāng)任務(wù)還沒有開始執(zhí)行時,一旦調(diào)用 cancel,這個任務(wù)就會被正常取消,未來也不會被執(zhí)行,那么 cancel 方法返回 true。 第二種情況也比較簡單。如果任務(wù)已經(jīng)完成,或者之前已經(jīng)被取消過了,那么執(zhí)行 cancel 方法則代表取消失敗,返回 false。因為任務(wù)無論是已完成還是已經(jīng)被取消過了,都不能再被取消了。 第三種情況就是這個任務(wù)正在執(zhí)行,這個時候會根據(jù)我們傳入的參數(shù)mayInterruptIfRunning做判斷,如果傳入的參數(shù)是 true,執(zhí)行任務(wù)的線程就會收到一個中斷的信號,正在執(zhí)行的任務(wù)可能會有一些處理中斷的邏輯,進(jìn)而停止,如果傳入的是 false 則就代表不中斷正在運行的任務(wù)isCancelled()方法判斷是否被取消,它和 cancel 方法配合使用,比較簡單。
應(yīng)用場景目前對于Future方式,我們經(jīng)常使用的有這么幾類:
Guava
ListenableFutrue,通過增加監(jiān)聽器的方式,計算完成時立即得到結(jié)果,而無需一直循環(huán)查詢
CompletableFuture
Java8的CompletableFuture,使用thenApply,thenApplyAsync可以達(dá)到和Guava類似的鏈?zhǔn)秸{(diào)用效果。
不同的是,對于Java8,如果thenApplyAsync不傳入線程池,則會使用ForkJoinPools線程池來執(zhí)行對應(yīng)的方法,如此可以避免對其他線程產(chǎn)生影響。
Netty
Netty解決的問題:
原生Future的isDone()方法判斷一個異步操作是否完成,但是定義比較模糊:正常終止、拋出異常、用戶取消都會使isDone方法返回true。 對于一個異步操作,我們有些時候更關(guān)注的是這個異步操作觸發(fā)或者結(jié)束后能否再執(zhí)行一系列的動作。與JDK相比,增加了完成狀態(tài)的細(xì)分,增加了監(jiān)聽者,異步線程結(jié)束之后能夠觸發(fā)一系列的動作。
注意事項添加超時機(jī)制假設(shè)一共有四個任務(wù)需要執(zhí)行,我們都把它放到線程池中,然后它獲取的時候是按照從 1 到 4 的順序,也就是執(zhí)行 get() 方法來獲取的
代碼如下所示:
public class FutureDemo { public static void main(String[] args) {//創(chuàng)建線程池ExecutorService service = Executors.newFixedThreadPool(10);//提交任務(wù),并用 Future 接收返回結(jié)果ArrayList<Future> allFutures = new ArrayList<>();for (int i = 0; i < 4; i++) { Future<String> future; if (i == 0 || i == 1) {future = service.submit(new SlowTask()); } else {future = service.submit(new FastTask()); } allFutures.add(future);}for (int i = 0; i < 4; i++) { Future<String> future = allFutures.get(i); try {String result = future.get();System.out.println(result); } catch (InterruptedException e) {e.printStackTrace(); } catch (ExecutionException e) {e.printStackTrace(); }}service.shutdown(); } static class SlowTask implements Callable<String> {@Overridepublic String call() throws Exception { Thread.sleep(5000); return '速度慢的任務(wù)';} } static class FastTask implements Callable<String> {@Overridepublic String call() throws Exception { return '速度快的任務(wù)';} }}
可以看出,在代碼中我們新建了線程池,并且用一個 list 來保存 4 個 Future。
其中,前兩個 Future 所對應(yīng)的任務(wù)是慢任務(wù),也就是代碼下方的 SlowTask,而后兩個 Future 對應(yīng)的任務(wù)是快任務(wù)。
慢任務(wù)在執(zhí)行的時候需要 5 秒鐘的時間才能執(zhí)行完畢,而快任務(wù)很快就可以執(zhí)行完畢,幾乎不花費時間。
在提交完這 4 個任務(wù)之后,我們用 for 循環(huán)對它們依次執(zhí)行 get 方法,來獲取它們的執(zhí)行結(jié)果,然后再把這個結(jié)果打印出來。
實際上在執(zhí)行的時候會先等待 5 秒,然后再很快打印出這 4 行語句。
所以問題是:
第三個的任務(wù)量是比較小的,它可以很快返回結(jié)果,緊接著第四個任務(wù)也會返回結(jié)果。
但是由于前兩個任務(wù)速度很慢,所以我們在利用 get 方法執(zhí)行時,會卡在第一個任務(wù)上。也就是說,雖然此時第三個和第四個任務(wù)很早就得到結(jié)果了,但我們在此時使用這種 for 循環(huán)的方式去獲取結(jié)果,依然無法及時獲取到第三個和第四個任務(wù)的結(jié)果。直到 5 秒后,第一個任務(wù)出結(jié)果了,我們才能獲取到,緊接著也可以獲取到第二個任務(wù)的結(jié)果,然后才輪到第三、第四個任務(wù)。
假設(shè)由于網(wǎng)絡(luò)原因,第一個任務(wù)可能長達(dá) 1 分鐘都沒辦法返回結(jié)果,那么這個時候,我們的主線程會一直卡著,影響了程序的運行效率。
此時我們就可以用 Future 的帶超時參數(shù)的get(long timeout, TimeUnit unit)方法來解決這個問題。
這個方法的作用是,如果在限定的時間內(nèi)沒能返回結(jié)果的話,那么便會拋出一個 TimeoutException 異常,隨后就可以把這個異常捕獲住,或者是再往上拋出去,這樣就不會一直卡著了。
源碼分析超時實現(xiàn)原理具體實現(xiàn)類:FutureTask
get()方法可以分為兩步:
判斷當(dāng)前任務(wù)的執(zhí)行狀態(tài),如果不是COMPLETING,就調(diào)用awaitDone()方法開始進(jìn)行死循環(huán)輪旋,如果任務(wù)還沒有執(zhí)行完成會使用nanos = deadline - System.nanoTime()檢查是否超時,如果方法已經(jīng)超時,則會返回,在返回后如果任務(wù)的狀態(tài)仍然<=COMPLETING,就會拋出TimeoutException()。 如果調(diào)用時任務(wù)沒有執(zhí)行完成,會調(diào)用parkNanos(),調(diào)用線程會阻塞在這里。接下來分兩種情況:
在阻塞時間完以后任務(wù)的執(zhí)行狀態(tài)仍然沒有改變?yōu)橥瓿桑M(jìn)入下一次循環(huán),直接返回。 如果在輪詢中狀態(tài)已經(jīng)改變,任務(wù)完成,則會中斷死循環(huán),返回任務(wù)執(zhí)行的返回值。到此這篇關(guān)于Java并發(fā)編程必備之Future機(jī)制的文章就介紹到這了,更多相關(guān)Java Future機(jī)制內(nèi)容請搜索好吧啦網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持好吧啦網(wǎng)!
相關(guān)文章:
