App下載

解析Java語言中如何使用Future類的方法

猿友 2021-07-22 09:44:11 瀏覽數(shù) (1879)
反饋

前言

在高性能編程中,并發(fā)編程已經(jīng)成為了極為重要的一部分。在單核CPU性能已經(jīng)趨于極限時(shí),我們只能通過多核來進(jìn)一步提升系統(tǒng)的性能,因此就催生了并發(fā)編程。

由于并發(fā)編程比串行編程更困難,也更容易出錯(cuò),因此,我們就更需要借鑒一些前人優(yōu)秀的,成熟的設(shè)計(jì)模式,使得我們的設(shè)計(jì)更加健壯,更加完美。

而Future模式,正是其中使用最為廣泛,也是極為重要的一種設(shè)計(jì)模式。

生活中的Future模式

為了更快的了解Future模式,我們先來看一個(gè)生活中的例子。

場景1:

午飯時(shí)間到了,同學(xué)們要去吃飯了,小王下樓,走了20分鐘,來到了肯德基,點(diǎn)餐,排隊(duì),吃飯一共花了20分鐘,又花了20分鐘走回公司繼續(xù)工作,合計(jì)1小時(shí)。

場景2

午飯時(shí)間到了,同學(xué)們要去吃飯了,小王點(diǎn)了個(gè)肯德基外賣,很快,它就拿到了一個(gè)訂單(雖然訂單不能當(dāng)飯吃,但是有了訂單,還怕吃不上飯嘛)。接著小王可以繼續(xù)干活,30分鐘后,外賣到了,接著小王花了10分鐘吃飯,接著又可以繼續(xù)工作了,成功的卷到了隔壁的小汪。

很明顯,在這2個(gè)場景中,小王的工作時(shí)間更加緊湊,特別是那些排隊(duì)的時(shí)間都可以讓外賣員去干,因此可以更加專注于自己的本職工作。聰明的你應(yīng)該也已經(jīng)體會到了,場景1就是典型的函數(shù)同步調(diào)用,而場景2是典型的異步調(diào)用。

而場景2的異步調(diào)用,還有一個(gè)特點(diǎn),就是它擁有一個(gè)返回值,這個(gè)返回值就是我們的訂單。這個(gè)訂單很重要,憑借著這個(gè)訂單,我們才能夠取得當(dāng)前這個(gè)調(diào)用所對應(yīng)的結(jié)果。

這里的訂單就如同F(xiàn)uture模式中的Future,這是一個(gè)合約,一份承諾。雖然訂單不能吃,但是手握訂單,不怕沒吃的,雖然Future不是我們想要的結(jié)果,但是拿著Future就能在將來得到我們想要的結(jié)果。

因此,F(xiàn)uture模式很好的解決了那些需要返回值的異步調(diào)用。

Future模式中的主要角色

一個(gè)典型的Future模式由以下幾個(gè)部分組成:

  • Main:系統(tǒng)啟動,調(diào)用Client發(fā)出請求
  • Client:返回Data對象,立即返回FutureData,并開啟ClientThread線程裝配RealData
  • Data:返回?cái)?shù)據(jù)的接口
  • FutureData:Future數(shù)據(jù),構(gòu)造很快,但是是一個(gè)虛擬的數(shù)據(jù),需要裝配RealData,好比一個(gè)訂單
  • RealData:真實(shí)數(shù)據(jù),其構(gòu)造是比較慢的,好比上面例子中的肯德基午餐。

它們之間的相互關(guān)系如下圖:

其中,值得注意是Data,RealData和FutureData。這是一組典型的代理模式,Data接口表示對外數(shù)據(jù),RealData表示真實(shí)的數(shù)據(jù),就好比午餐,獲得它的成本比較高,需要很多時(shí)間;相對的FutureData作為RealData的代理,類似于一個(gè)訂單/契約,通過FutureData,可以在將來獲得RealData。

因此,F(xiàn)uture模式本質(zhì)上是代理模式的一種實(shí)際應(yīng)用。

實(shí)現(xiàn)一個(gè)簡單的Future模式

根據(jù)上面的設(shè)計(jì),讓我們來實(shí)現(xiàn)一個(gè)簡單的代理模式吧!

首先是Data接口,代表數(shù)據(jù):

public interface Data {
    public String getResult ();
}

接著是FutureData,也是整個(gè)Future模式的核心:

public class FutureData implements Data {
    // 內(nèi)部需要維護(hù)RealData
    protected RealData realdata = null;          
    protected boolean isReady = false;
    public synchronized void setRealData(RealData realdata) {
        if (isReady) { 
            return;
        }
        this.realdata = realdata;
        isReady = true;
        //RealData已經(jīng)被注入,通知getResult()
        notifyAll();                            			
    }
    //會等待RealData構(gòu)造完成
    public synchronized String getResult() {        	
        while (!isReady) {
            try {
                //一直等待,直到RealData被注入
                wait();                        			
            } catch (InterruptedException e) {
            }
        }
        //真正需要的數(shù)據(jù)從RealData獲取
        return realdata.result;                    		
    }
}

下面是RealData:

public class RealData implements Data {
    protected final String result;
    public RealData(String para) {
        StringBuffer sb=new StringBuffer();
        //假設(shè)這里很慢很慢,構(gòu)造RealData不是一個(gè)容易的事
        result =sb.toString();
    }
    public String getResult() {
        return result;
    }
}

然后從Client得到Data:

public class Client {
    //這是一個(gè)異步方法,返回的Data接口是一個(gè)Future
    public Data request(final String queryStr) {
        final FutureData future = new FutureData();
        new Thread() {                                      
            public void run() {                    	
                // RealData的構(gòu)建很慢,所以在單獨(dú)的線程中進(jìn)行
                RealData realdata = new RealData(queryStr);
                //setRealData()的時(shí)候會notify()等待在這個(gè)future上的對象
                future.setRealData(realdata);
            }                                               
        }.start();
        // FutureData會被立即返回,不會等待RealData被構(gòu)造完
        return future;                        		
    }
}

最后一個(gè)Main函數(shù),把所有一切都串起來:

public static void main(String[] args) {
    Client client = new Client();
    //這里會立即返回,因?yàn)榈玫降氖荈utureData而不是RealData
    Data data = client.request("name");
    System.out.println("請求完畢");
    try {
        //這里可以用一個(gè)sleep代替了對其他業(yè)務(wù)邏輯的處理
        //在處理這些業(yè)務(wù)邏輯的過程中,RealData被創(chuàng)建,從而充分利用了等待時(shí)間
        Thread.sleep(2000);
    } catch (InterruptedException e) {
    }
    //使用真實(shí)的數(shù)據(jù),如果到這里數(shù)據(jù)還沒有準(zhǔn)備好,getResult()會等待數(shù)據(jù)準(zhǔn)備完,再返回
    System.out.println("數(shù)據(jù) = " + data.getResult());
}

這是一個(gè)最簡單的Future模式的實(shí)現(xiàn),雖然簡單,但是已經(jīng)包含了Future模式中最精髓的部分。對大家理解JDK內(nèi)部的Future對象,有著非常重要的作用。

Java中的Future模式

Future模式是如此常用,在JDK內(nèi)部已經(jīng)有了比較全面的實(shí)現(xiàn)和支持。下面,讓我們一起看看JDK內(nèi)部的Future實(shí)現(xiàn):

首先,JDK內(nèi)部有一個(gè)Future接口,這就是類似前面提到的訂單,當(dāng)然了,作為一個(gè)完整的商業(yè)化產(chǎn)品,這里的Future的功能更加豐富了,除了get()方法來獲得真實(shí)數(shù)據(jù)以外,還提供一組輔助方法,比如:

  • cancel():如果等太久,你可以直接取消這個(gè)任務(wù)
  • isCancelled():任務(wù)是不是已經(jīng)取消了
  • isDone():任務(wù)是不是已經(jīng)完成了
  • get():有2個(gè)get()方法,不帶參數(shù)的表示無窮等待,或者你可以只等待給定時(shí)間

下面代碼演示了這個(gè)Future的使用方法:

        //異步操作 可以用一個(gè)線程池
        ExecutorService executor = Executors.newFixedThreadPool(1);
        //執(zhí)行FutureTask,相當(dāng)于上例中的 client.request("name") 發(fā)送請求
        //在這里開啟線程進(jìn)行RealData的call()執(zhí)行
        Future<String> future = executor.submit(new RealData("name"));
        System.out.println("請求完畢,數(shù)據(jù)準(zhǔn)備中");
        try {
            //這里依然可以做額外的數(shù)據(jù)操作,這里使用sleep代替其他業(yè)務(wù)邏輯的處理
            Thread.sleep(2000);
        } catch (InterruptedException e) {
        }
        //如果此時(shí)call()方法沒有執(zhí)行完成,則依然會等待
        System.out.println("數(shù)據(jù) = " + future.get());

整個(gè)使用過程非常簡單,下面我們來分析一下executor.submit()里面究竟發(fā)生了什么:

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        // 根據(jù)Callable對象,創(chuàng)建一個(gè)RunnableFuture,這里其實(shí)就是FutureTask
        RunnableFuture<T> ftask = newTaskFor(task);
        //將ftask推送到線程池
        //在新線程中執(zhí)行的,就是run()方法,在下面的代碼中有給出
        execute(ftask);
        //返回這個(gè)Future,將來通過這個(gè)Future就可以得到執(zhí)行的結(jié)果
        return ftask;
    }
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

最關(guān)鍵的部分在下面,F(xiàn)utureTask作為一個(gè)線程單獨(dú)執(zhí)行時(shí),會將結(jié)果保存到outcome中,并設(shè)置任務(wù)的狀態(tài),下面是FutureTask的run()方法:

從FutureTask中獲得結(jié)果的實(shí)現(xiàn)如下:

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        //如果沒有完成,就等待,回到用park()方法阻塞線程
        //同時(shí),所有等待線程會在FutureTask的waiters字段中排隊(duì)等待
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
    private V report(int s) throws ExecutionException {
        //outcome里保存的就是最終的計(jì)算結(jié)果
        Object x = outcome;
        if (s == NORMAL)
            //正常完成,就返回outcome
            return (V)x;
        //如果沒有正常完成, 比如被用戶取消了,或者有異常了,就拋出異常
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

Future模式的高階版本—— CompletableFuture

Future模式雖然好用,但也有一個(gè)問題,那就是將任務(wù)提交給線程后,調(diào)用線程并不知道這個(gè)任務(wù)什么時(shí)候執(zhí)行完,如果執(zhí)行調(diào)用get()方法或者isDone()方法判斷,可能會進(jìn)行不必要的等待,那么系統(tǒng)的吞吐量很難提高。

為了解決這個(gè)問題,JDK對Future模式又進(jìn)行了加強(qiáng),創(chuàng)建了一個(gè)CompletableFuture,它可以理解為Future模式的升級版本,它最大的作用是提供了一個(gè)回調(diào)機(jī)制,可以在任務(wù)完成后,自動回調(diào)一些后續(xù)的處理,這樣,整個(gè)程序可以把“結(jié)果等待”完全給移除了。

下面來看一個(gè)簡單的例子:

在這個(gè)例子中,首先以getPrice()為基礎(chǔ)創(chuàng)建一個(gè)異步調(diào)用,接著,使用thenAccept()方法,設(shè)置了一個(gè)后續(xù)的操作,也就是當(dāng)getPrice()執(zhí)行完成后的后續(xù)處理。

不難看到,CompletableFuture比一般的Future更具有實(shí)用性,因?yàn)樗梢栽贔uture執(zhí)行成功后,自動回調(diào)進(jìn)行下一步的操作,因此整個(gè)程序不會有任何阻塞的地方(也就是說你不用去到處等待Future的執(zhí)行,而是讓Future執(zhí)行成功后,自動來告訴你)。

以上面的代碼為例,CompletableFuture之所有會有那么神奇的功能,完全得益于AsyncSupply類(由上述代碼中的supplyAsync()方法創(chuàng)建)。

AsyncSupply在執(zhí)行時(shí),如下所示:

        public void run() {
            CompletableFuture<T> d; Supplier<T> f;
            if ((d = dep) != null && (f = fn) != null) {
                dep = null; fn = null;
                if (d.result == null) {
                    try {
                        //這里就是你要執(zhí)行的異步方法
                        //結(jié)果會被保存下來,放到d.result字段中
                        d.completeValue(f.get());
                    } catch (Throwable ex) {
                        d.completeThrowable(ex);
                    }
                }
                //執(zhí)行成功了,進(jìn)行后續(xù)處理,在這個(gè)后續(xù)處理中,就會調(diào)用thenAccept()中的消費(fèi)者
                //這里就相當(dāng)于Future完成后的通知
                d.postComplete();
            }
        }

繼續(xù)看d.postComplete(),這里會調(diào)用后續(xù)一系列操作

   final void postComplete() {
                //省略部分代碼,重點(diǎn)在tryFire()里
                //在tryFire()里,真正觸發(fā)了后續(xù)的調(diào)用,也就是thenAccept()中的部分
                f = (d = h.tryFire(NESTED)) == null ? this : d;
            }
        }
    }

絮叨

今天,我們主要介紹Future模式,我們從一個(gè)最簡單的Future模式開始,逐步深入,先后介紹了JDK內(nèi)部的Future模式實(shí)現(xiàn),以及對Future模式的進(jìn)化版本CompletableFuture做了簡單的介紹。對于多線程開發(fā)而言,F(xiàn)uture模式的應(yīng)用極其廣泛,可以說這個(gè)模式已經(jīng)成為了異步開發(fā)的基礎(chǔ)設(shè)施。

以上就是關(guān)于 Java 中使用 Future 類的具體內(nèi)容,想要了解更多關(guān)于 Java Future類的其他資料請關(guān)注W3Cschool其它相關(guān)文章!如果本篇文章對您的學(xué)習(xí)有所幫助,還希望大家能夠多多地支持我們!


0 人點(diǎn)贊