成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

深入理解AbstractQueuedSynchronizer(AQS)

本人免費(fèi)整理了Java高級資料,涵蓋了Java、redis、MongoDB、MySQL、Zookeeper、Spring Cloud、Dubbo高并發(fā)分布式等教程,一共30G,需要自己領(lǐng)取。
傳送門:https://mp.weixin.qq.com/s/JzddfH-7yNudmkjT0IRL8Q

1. AQS簡介

在上一篇文章中我們對lock和AbstractQueuedSynchronizer(AQS)有了初步的認(rèn)識。在同步組件的實(shí)現(xiàn)中,AQS是核心部分,同步組件的實(shí)現(xiàn)者通過使用AQS提供的模板方法實(shí)現(xiàn)同步組件語義,AQS則實(shí)現(xiàn)了對同步狀態(tài)的管理,以及對阻塞線程進(jìn)行排隊(duì),等待通知等等一些底層的實(shí)現(xiàn)處理。AQS的核心也包括了這些方面:同步隊(duì)列,獨(dú)占式鎖的獲取和釋放,共享鎖的獲取和釋放以及可中斷鎖,超時等待鎖獲取這些特性的實(shí)現(xiàn),而這些實(shí)際上則是AQS提供出來的模板方法,歸納整理如下:

成都創(chuàng)新互聯(lián)是一家專業(yè)提供中衛(wèi)企業(yè)網(wǎng)站建設(shè),專注與成都做網(wǎng)站、網(wǎng)站設(shè)計(jì)、html5、小程序制作等業(yè)務(wù)。10年已為中衛(wèi)眾多企業(yè)、政府機(jī)構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專業(yè)的建站公司優(yōu)惠進(jìn)行中。

獨(dú)占式鎖:

void acquire(int arg):獨(dú)占式獲取同步狀態(tài),如果獲取失敗則插入同步隊(duì)列進(jìn)行等待; void acquireInterruptibly(int arg):與acquire方法相同,但在同步隊(duì)列中進(jìn)行等待的時候可以檢測中斷; boolean tryAcquireNanos(int arg, long nanosTimeout):在acquireInterruptibly基礎(chǔ)上增加了超時等待功能,在超時時間內(nèi)沒有獲得同步狀態(tài)返回false; boolean release(int arg):釋放同步狀態(tài),該方法會喚醒在同步隊(duì)列中的下一個節(jié)點(diǎn)

共享式鎖:

void acquireShared(int arg):共享式獲取同步狀態(tài),與獨(dú)占式的區(qū)別在于同一時刻有多個線程獲取同步狀態(tài); void acquireSharedInterruptibly(int arg):在acquireShared方法基礎(chǔ)上增加了能響應(yīng)中斷的功能; boolean tryAcquireSharedNanos(int arg, long nanosTimeout):在acquireSharedInterruptibly基礎(chǔ)上增加了超時等待的功能; boolean releaseShared(int arg):共享式釋放同步狀態(tài)

要想掌握AQS的底層實(shí)現(xiàn),其實(shí)也就是對這些模板方法的邏輯進(jìn)行學(xué)習(xí)。在學(xué)習(xí)這些模板方法之前,我們得首先了解下AQS中的同步隊(duì)列是一種什么樣的數(shù)據(jù)結(jié)構(gòu),因?yàn)橥疥?duì)列是AQS對同步狀態(tài)的管理的基石。

2. 同步隊(duì)列

當(dāng)共享資源被某個線程占有,其他請求該資源的線程將會阻塞,從而進(jìn)入同步隊(duì)列。就數(shù)據(jù)結(jié)構(gòu)而言,隊(duì)列的實(shí)現(xiàn)方式無外乎兩者一是通過數(shù)組的形式,另外一種則是鏈表的形式。AQS中的同步隊(duì)列則是通過鏈?zhǔn)椒绞竭M(jìn)行實(shí)現(xiàn)。接下來,很顯然我們至少會抱有這樣的疑問:1. 節(jié)點(diǎn)的數(shù)據(jù)結(jié)構(gòu)是什么樣的?2. 是單向還是雙向?3. 是帶頭結(jié)點(diǎn)的還是不帶頭節(jié)點(diǎn)的?我們依舊先是通過看源碼的方式。

在AQS有一個靜態(tài)內(nèi)部類Node,其中有這樣一些屬性:

volatile int waitStatus //節(jié)點(diǎn)狀態(tài) volatile Node prev //當(dāng)前節(jié)點(diǎn)/線程的前驅(qū)節(jié)點(diǎn) volatile Node next; //當(dāng)前節(jié)點(diǎn)/線程的后繼節(jié)點(diǎn) volatile Thread thread;//加入同步隊(duì)列的線程引用 Node nextWaiter;//等待隊(duì)列中的下一個節(jié)點(diǎn)

節(jié)點(diǎn)的狀態(tài)有以下這些:

int CANCELLED = 1//節(jié)點(diǎn)從同步隊(duì)列中取消 int SIGNAL = -1//后繼節(jié)點(diǎn)的線程處于等待狀態(tài),如果當(dāng)前節(jié)點(diǎn)釋放同步狀態(tài)會通知后繼節(jié)點(diǎn),使得后繼節(jié)點(diǎn)的線程能夠運(yùn)行; int CONDITION = -2//當(dāng)前節(jié)點(diǎn)進(jìn)入等待隊(duì)列中 int PROPAGATE = -3//表示下一次共享式同步狀態(tài)獲取將會無條件傳播下去 int INITIAL = 0;//初始狀態(tài)

現(xiàn)在我們知道了節(jié)點(diǎn)的數(shù)據(jù)結(jié)構(gòu)類型,并且每個節(jié)點(diǎn)擁有其前驅(qū)和后繼節(jié)點(diǎn),很顯然這是一個雙向隊(duì)列。同樣的我們可以用一段demo看一下。

public class LockDemo {
    private static ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread(() -> {
                lock.lock();
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            });
            thread.start();
        }
    }
}

實(shí)例代碼中開啟了5個線程,先獲取鎖之后再睡眠10S中,實(shí)際上這里讓線程睡眠是想模擬出當(dāng)線程無法獲取鎖時進(jìn)入同步隊(duì)列的情況。通過debug,當(dāng)Thread-4(在本例中最后一個線程)獲取鎖失敗后進(jìn)入同步時,AQS時現(xiàn)在的同步隊(duì)列如圖所示:

Thread-0先獲得鎖后進(jìn)行睡眠,其他線程(Thread-1,Thread-2,Thread-3,Thread-4)獲取鎖失敗進(jìn)入同步隊(duì)列,同時也可以很清楚的看出來每個節(jié)點(diǎn)有兩個域:prev(前驅(qū))和next(后繼),并且每個節(jié)點(diǎn)用來保存獲取同步狀態(tài)失敗的線程引用以及等待狀態(tài)等信息。另外AQS中有兩個重要的成員變量:

private transient volatile Node head;
private transient volatile Node tail;

也就是說AQS實(shí)際上通過頭尾指針來管理同步隊(duì)列,同時實(shí)現(xiàn)包括獲取鎖失敗的線程進(jìn)行入隊(duì),釋放鎖時對同步隊(duì)列中的線程進(jìn)行通知等核心方法。其示意圖如下:

深入理解AbstractQueuedSynchronizer(AQS)

通過對源碼的理解以及做實(shí)驗(yàn)的方式,現(xiàn)在我們可以清楚的知道這樣幾點(diǎn):

  1. 節(jié)點(diǎn)的數(shù)據(jù)結(jié)構(gòu),即AQS的靜態(tài)內(nèi)部類Node,節(jié)點(diǎn)的等待狀態(tài)等信息;

  2. 同步隊(duì)列是一個雙向隊(duì)列,AQS通過持有頭尾指針管理同步隊(duì)列;

那么,節(jié)點(diǎn)如何進(jìn)行入隊(duì)和出隊(duì)是怎樣做的了?實(shí)際上這對應(yīng)著鎖的獲取和釋放兩個操作:獲取鎖失敗進(jìn)行入隊(duì)操作,獲取鎖成功進(jìn)行出隊(duì)操作。

3. 獨(dú)占鎖

3.1 獨(dú)占鎖的獲取(acquire方法)

我們繼續(xù)通過看源碼和debug的方式來看,還是以上面的demo為例,調(diào)用lock()方法是獲取獨(dú)占式鎖,獲取失敗就將當(dāng)前線程加入同步隊(duì)列,成功則線程執(zhí)行。而lock()方法實(shí)際上會調(diào)用AQS的acquire()方法,源碼如下

public final void acquire(int arg) {
        //先看同步狀態(tài)是否獲取成功,如果成功則方法結(jié)束返回
        //若失敗則先調(diào)用addWaiter()方法再調(diào)用acquireQueued()方法
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}

關(guān)鍵信息請看注釋,acquire根據(jù)當(dāng)前獲得同步狀態(tài)成功與否做了兩件事情:1. 成功,則方法結(jié)束返回,2. 失敗,則先調(diào)用addWaiter()然后在調(diào)用acquireQueued()方法。

獲取同步狀態(tài)失敗,入隊(duì)操作

當(dāng)線程獲取獨(dú)占式鎖失敗后就會將當(dāng)前線程加入同步隊(duì)列,那么加入隊(duì)列的方式是怎樣的了?我們接下來就應(yīng)該去研究一下addWaiter()和acquireQueued()。addWaiter()源碼如下:

private Node addWaiter(Node mode) {
        // 1\. 將當(dāng)前線程構(gòu)建成Node類型
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        // 2\. 當(dāng)前尾節(jié)點(diǎn)是否為null?
        Node pred = tail;
        if (pred != null) {
            // 2.2 將當(dāng)前節(jié)點(diǎn)尾插入的方式插入同步隊(duì)列中
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 2.1\. 當(dāng)前同步隊(duì)列尾節(jié)點(diǎn)為null,說明當(dāng)前線程是第一個加入同步隊(duì)列進(jìn)行等待的線程
        enq(node);
        return node;
}

分析可以看上面的注釋。程序的邏輯主要分為兩個部分:1. 當(dāng)前同步隊(duì)列的尾節(jié)點(diǎn)為null,調(diào)用方法enq()插入;2. 當(dāng)前隊(duì)列的尾節(jié)點(diǎn)不為null,則采用尾插入(compareAndSetTail()方法)的方式入隊(duì)。另外還會有另外一個問題:如果 if (compareAndSetTail(pred, node))為false怎么辦?會繼續(xù)執(zhí)行到enq()方法,同時很明顯compareAndSetTail是一個CAS操作,通常來說如果CAS操作失敗會繼續(xù)自旋(死循環(huán))進(jìn)行重試。因此,經(jīng)過我們這樣的分析,enq()方法可能承擔(dān)兩個任務(wù):1. 處理當(dāng)前同步隊(duì)列尾節(jié)點(diǎn)為null時進(jìn)行入隊(duì)操作;2. 如果CAS尾插入節(jié)點(diǎn)失敗后負(fù)責(zé)自旋進(jìn)行嘗試。那么是不是真的就像我們分析的一樣了?只有源碼會告訴我們答案:),enq()源碼如下:

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                //1\. 構(gòu)造頭結(jié)點(diǎn)
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                // 2\. 尾插入,CAS操作失敗自旋嘗試
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
}

在上面的分析中我們可以看出在第1步中會先創(chuàng)建頭結(jié)點(diǎn),說明同步隊(duì)列是帶頭結(jié)點(diǎn)的鏈?zhǔn)酱鎯Y(jié)構(gòu)。帶頭結(jié)點(diǎn)與不帶頭結(jié)點(diǎn)相比,會在入隊(duì)和出隊(duì)的操作中獲得更大的便捷性,因此同步隊(duì)列選擇了帶頭結(jié)點(diǎn)的鏈?zhǔn)酱鎯Y(jié)構(gòu)。那么帶頭節(jié)點(diǎn)的隊(duì)列初始化時機(jī)是什么?自然而然是在tail為null時,即當(dāng)前線程是第一次插入同步隊(duì)列。compareAndSetTail(t, node)方法會利用CAS操作設(shè)置尾節(jié)點(diǎn),如果CAS操作失敗會在for (;;)for死循環(huán)中不斷嘗試,直至成功return返回為止。因此,對enq()方法可以做這樣的總結(jié):

  1. 在當(dāng)前線程是第一個加入同步隊(duì)列時,調(diào)用compareAndSetHead(new Node())方法,完成鏈?zhǔn)疥?duì)列的頭結(jié)點(diǎn)的初始化;

  2. 自旋不斷嘗試CAS尾插入節(jié)點(diǎn)直至成功為止。

現(xiàn)在我們已經(jīng)很清楚獲取獨(dú)占式鎖失敗的線程包裝成Node然后插入同步隊(duì)列的過程了?那么緊接著會有下一個問題?在同步隊(duì)列中的節(jié)點(diǎn)(線程)會做什么事情了來保證自己能夠有機(jī)會獲得獨(dú)占式鎖了?帶著這樣的問題我們就來看看acquireQueued()方法,從方法名就可以很清楚,這個方法的作用就是排隊(duì)獲取鎖的過程,源碼如下:

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 1\. 獲得當(dāng)前節(jié)點(diǎn)的先驅(qū)節(jié)點(diǎn)
                final Node p = node.predecessor();
                // 2\. 當(dāng)前節(jié)點(diǎn)能否獲取獨(dú)占式鎖                 
                // 2.1 如果當(dāng)前節(jié)點(diǎn)的先驅(qū)節(jié)點(diǎn)是頭結(jié)點(diǎn)并且成功獲取同步狀態(tài),即可以獲得獨(dú)占式鎖
                if (p == head && tryAcquire(arg)) {
                    //隊(duì)列頭指針用指向當(dāng)前節(jié)點(diǎn)
                    setHead(node);
                    //釋放前驅(qū)節(jié)點(diǎn)
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 2.2 獲取鎖失敗,線程進(jìn)入等待狀態(tài)等待獲取獨(dú)占式鎖
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
}

程序邏輯通過注釋已經(jīng)標(biāo)出,整體來看這是一個這又是一個自旋的過程(for (;;)),代碼首先獲取當(dāng)前節(jié)點(diǎn)的先驅(qū)節(jié)點(diǎn),如果先驅(qū)節(jié)點(diǎn)是頭結(jié)點(diǎn)的并且成功獲得同步狀態(tài)的時候(if (p == head && tryAcquire(arg))),當(dāng)前節(jié)點(diǎn)所指向的線程能夠獲取鎖。反之,獲取鎖失敗進(jìn)入等待狀態(tài)。整體示意圖為下圖:

獲取鎖成功,出隊(duì)操作

獲取鎖的節(jié)點(diǎn)出隊(duì)的邏輯是:

//隊(duì)列頭結(jié)點(diǎn)引用指向當(dāng)前節(jié)點(diǎn)
setHead(node);
//釋放前驅(qū)節(jié)點(diǎn)
p.next = null; // help GC
failed = false;
return interrupted;

setHead()方法為:

private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
}

將當(dāng)前節(jié)點(diǎn)通過setHead()方法設(shè)置為隊(duì)列的頭結(jié)點(diǎn),然后將之前的頭結(jié)點(diǎn)的next域設(shè)置為null并且pre域也為null,即與隊(duì)列斷開,無任何引用方便GC時能夠?qū)?nèi)存進(jìn)行回收。示意圖如下:

深入理解AbstractQueuedSynchronizer(AQS)

那么當(dāng)獲取鎖失敗的時候會調(diào)用shouldParkAfterFailedAcquire()方法和parkAndCheckInterrupt()方法,看看他們做了什么事情。shouldParkAfterFailedAcquire()方法源碼為:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

shouldParkAfterFailedAcquire()方法主要邏輯是使用compareAndSetWaitStatus(pred, ws, Node.SIGNAL)使用CAS將節(jié)點(diǎn)狀態(tài)由INITIAL設(shè)置成SIGNAL,表示當(dāng)前線程阻塞。當(dāng)compareAndSetWaitStatus設(shè)置失敗則說明shouldParkAfterFailedAcquire方法返回false,然后會在acquireQueued()方法中for (;;)死循環(huán)中會繼續(xù)重試,直至compareAndSetWaitStatus設(shè)置節(jié)點(diǎn)狀態(tài)位為SIGNAL時shouldParkAfterFailedAcquire返回true時才會執(zhí)行方法parkAndCheckInterrupt()方法,該方法的源碼為:

private final boolean parkAndCheckInterrupt() {
        //使得該線程阻塞
        LockSupport.park(this);
        return Thread.interrupted();
}

該方法的關(guān)鍵是會調(diào)用LookSupport.park()方法(關(guān)于LookSupport會在以后的文章進(jìn)行討論),該方法是用來阻塞當(dāng)前線程的。因此到這里就應(yīng)該清楚了,acquireQueued()在自旋過程中主要完成了兩件事情:

  1. 如果當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn),并且能夠獲得同步狀態(tài)的話,當(dāng)前線程能夠獲得鎖該方法執(zhí)行結(jié)束退出;

  2. 獲取鎖失敗的話,先將節(jié)點(diǎn)狀態(tài)設(shè)置成SIGNAL,然后調(diào)用LookSupport.park方法使得當(dāng)前線程阻塞。

經(jīng)過上面的分析,獨(dú)占式鎖的獲取過程也就是acquire()方法的執(zhí)行流程如下圖所示:

深入理解AbstractQueuedSynchronizer(AQS)

3.2 獨(dú)占鎖的釋放(release()方法)

獨(dú)占鎖的釋放就相對來說比較容易理解了,廢話不多說先來看下源碼:

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
}

這段代碼邏輯就比較容易理解了,如果同步狀態(tài)釋放成功(tryRelease返回true)則會執(zhí)行if塊中的代碼,當(dāng)head指向的頭結(jié)點(diǎn)不為null,并且該節(jié)點(diǎn)的狀態(tài)值不為0的話才會執(zhí)行unparkSuccessor()方法。unparkSuccessor方法源碼:

private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */

    //頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        //后繼節(jié)點(diǎn)不為null時喚醒該線程
        LockSupport.unpark(s.thread);
}

源碼的關(guān)鍵信息請看注釋,首先獲取頭節(jié)點(diǎn)的后繼節(jié)點(diǎn),當(dāng)后繼節(jié)點(diǎn)的時候會調(diào)用LookSupport.unpark()方法,該方法會喚醒該節(jié)點(diǎn)的后繼節(jié)點(diǎn)所包裝的線程。因此,每一次鎖釋放后就會喚醒隊(duì)列中該節(jié)點(diǎn)的后繼節(jié)點(diǎn)所引用的線程,從而進(jìn)一步可以佐證獲得鎖的過程是一個FIFO(先進(jìn)先出)的過程。

到現(xiàn)在我們終于啃下了一塊硬骨頭了,通過學(xué)習(xí)源碼的方式非常深刻的學(xué)習(xí)到了獨(dú)占式鎖的獲取和釋放的過程以及同步隊(duì)列??梢宰鲆幌驴偨Y(jié):

  1. 線程獲取鎖失敗,線程被封裝成Node進(jìn)行入隊(duì)操作,核心方法在于addWaiter()和enq(),同時enq()完成對同步隊(duì)列的頭結(jié)點(diǎn)初始化工作以及CAS操作失敗的重試;

  2. 線程獲取鎖是一個自旋的過程,當(dāng)且僅當(dāng) 當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是頭結(jié)點(diǎn)并且成功獲得同步狀態(tài)時,節(jié)點(diǎn)出隊(duì)即該節(jié)點(diǎn)引用的線程獲得鎖,否則,當(dāng)不滿足條件時就會調(diào)用LookSupport.park()方法使得線程阻塞;

  3. 釋放鎖的時候會喚醒后繼節(jié)點(diǎn);

總體來說:在獲取同步狀態(tài)時,AQS維護(hù)一個同步隊(duì)列,獲取同步狀態(tài)失敗的線程會加入到隊(duì)列中進(jìn)行自旋;移除隊(duì)列(或停止自旋)的條件是前驅(qū)節(jié)點(diǎn)是頭結(jié)點(diǎn)并且成功獲得了同步狀態(tài)。在釋放同步狀態(tài)時,同步器會調(diào)用unparkSuccessor()方法喚醒后繼節(jié)點(diǎn)。

獨(dú)占鎖特性學(xué)習(xí)

3.3 可中斷式獲取鎖(acquireInterruptibly方法)

我們知道lock相較于synchronized有一些更方便的特性,比如能響應(yīng)中斷以及超時等待等特性,現(xiàn)在我們依舊采用通過學(xué)習(xí)源碼的方式來看看能夠響應(yīng)中斷是怎么實(shí)現(xiàn)的??身憫?yīng)中斷式鎖可調(diào)用方法lock.lockInterruptibly();而該方法其底層會調(diào)用AQS的acquireInterruptibly方法,源碼為:

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        //線程獲取鎖失敗
        doAcquireInterruptibly(arg);
}

在獲取同步狀態(tài)失敗后就會調(diào)用doAcquireInterruptibly方法:

private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    //將節(jié)點(diǎn)插入到同步隊(duì)列中
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            //獲取鎖出隊(duì)
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                //線程中斷拋異常
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

關(guān)鍵信息請看注釋,現(xiàn)在看這段代碼就很輕松了吧:),與acquire方法邏輯幾乎一致,唯一的區(qū)別是當(dāng)parkAndCheckInterrupt返回true時即線程阻塞時該線程被中斷,代碼拋出被中斷異常。

3.4 超時等待式獲取鎖(tryAcquireNanos()方法)

通過調(diào)用lock.tryLock(timeout,TimeUnit)方式達(dá)到超時等待獲取鎖的效果,該方法會在三種情況下才會返回:

  1. 在超時時間內(nèi),當(dāng)前線程成功獲取了鎖;

  2. 當(dāng)前線程在超時時間內(nèi)被中斷;

  3. 超時時間結(jié)束,仍未獲得鎖返回false。

我們?nèi)匀煌ㄟ^采取閱讀源碼的方式來學(xué)習(xí)底層具體是怎么實(shí)現(xiàn)的,該方法會調(diào)用AQS的方法tryAcquireNanos(),源碼為:

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        //實(shí)現(xiàn)超時等待的效果
        doAcquireNanos(arg, nanosTimeout);
}

很顯然這段源碼最終是靠doAcquireNanos方法實(shí)現(xiàn)超時等待的效果,該方法源碼如下:

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    //1\. 根據(jù)超時時間和當(dāng)前時間計(jì)算出截止時間
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            //2\. 當(dāng)前線程獲得鎖出隊(duì)列
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            // 3.1 重新計(jì)算超時時間
            nanosTimeout = deadline - System.nanoTime();
            // 3.2 已經(jīng)超時返回false
            if (nanosTimeout <= 0L)
                return false;
            // 3.3 線程阻塞等待 
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            // 3.4 線程被中斷拋出被中斷異常
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

程序邏輯如圖所示:

深入理解AbstractQueuedSynchronizer(AQS)

程序邏輯同獨(dú)占鎖可響應(yīng)中斷式獲取基本一致,唯一的不同在于獲取鎖失敗后,對超時時間的處理上,在第1步會先計(jì)算出按照現(xiàn)在時間和超時時間計(jì)算出理論上的截止時間,比如當(dāng)前時間是8h20min,超時時間是10min,那么根據(jù)deadline = System.nanoTime() + nanosTimeout計(jì)算出剛好達(dá)到超時時間時的系統(tǒng)時間就是8h 10min+10min = 8h 20min。然后根據(jù)deadline - System.nanoTime()就可以判斷是否已經(jīng)超時了,比如,當(dāng)前系統(tǒng)時間是8h 30min很明顯已經(jīng)超過了理論上的系統(tǒng)時間8h 20min,deadline - System.nanoTime()計(jì)算出來就是一個負(fù)數(shù),自然而然會在3.2步中的If判斷之間返回false。如果還沒有超時即3.2步中的if判斷為true時就會繼續(xù)執(zhí)行3.3步通過LockSupport.parkNanos使得當(dāng)前線程阻塞,同時在3.4步增加了對中斷的檢測,若檢測出被中斷直接拋出被中斷異常。

4. 共享鎖

4.1 共享鎖的獲?。╝cquireShared()方法)

在聊完AQS對獨(dú)占鎖的實(shí)現(xiàn)后,我們繼續(xù)一鼓作氣的來看看共享鎖是怎樣實(shí)現(xiàn)的?共享鎖的獲取方法為acquireShared,源碼為:

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

這段源碼的邏輯很容易理解,在該方法中會首先調(diào)用tryAcquireShared方法,tryAcquireShared返回值是一個int類型,當(dāng)返回值為大于等于0的時候方法結(jié)束說明獲得成功獲取鎖,否則,表明獲取同步狀態(tài)失敗即所引用的線程獲取鎖失敗,會執(zhí)行doAcquireShared方法,該方法的源碼為:

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 當(dāng)該節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是頭結(jié)點(diǎn)且成功獲取同步狀態(tài)
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

現(xiàn)在來看這段代碼會不會很容易了?邏輯幾乎和獨(dú)占式鎖的獲取一模一樣,這里的自旋過程中能夠退出的條件是當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是頭結(jié)點(diǎn)并且tryAcquireShared(arg)返回值大于等于0即能成功獲得同步狀態(tài)。

4.2 共享鎖的釋放(releaseShared()方法)

共享鎖的釋放在AQS中會調(diào)用方法releaseShared:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

當(dāng)成功釋放同步狀態(tài)之后即tryReleaseShared會繼續(xù)執(zhí)行doReleaseShared方法:

private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

這段方法跟獨(dú)占式鎖釋放過程有點(diǎn)點(diǎn)不同,在共享式鎖的釋放過程中,對于能夠支持多個線程同時訪問的并發(fā)組件,必須保證多個線程能夠安全的釋放同步狀態(tài),這里采用的CAS保證,當(dāng)CAS操作失敗continue,在下一次循環(huán)中進(jìn)行重試。

4.3 可中斷(acquireSharedInterruptibly()方法),超時等待(tryAcquireSharedNanos()方法)

關(guān)于可中斷鎖以及超時等待的特性其實(shí)現(xiàn)和獨(dú)占式鎖可中斷獲取鎖以及超時等待的實(shí)現(xiàn)幾乎一致,具體的就不再說了,如果理解了上面的內(nèi)容對這部分的理解也是水到渠成的。

通過這篇,加深了對AQS的底層實(shí)現(xiàn)更加清楚了,也對了解并發(fā)組件的實(shí)現(xiàn)原理打下了基礎(chǔ),學(xué)無止境,繼續(xù)加油:);如果覺得不錯,請給贊,嘿嘿。

名稱欄目:深入理解AbstractQueuedSynchronizer(AQS)
瀏覽地址:http://jinyejixie.com/article24/ggioje.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供品牌網(wǎng)站建設(shè)、App設(shè)計(jì)、網(wǎng)站制作、網(wǎng)站建設(shè)、Google定制開發(fā)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都網(wǎng)站建設(shè)公司
车险| 鄂托克前旗| 宜君县| 壶关县| 景东| 平凉市| 东阿县| 濉溪县| 余姚市| 清丰县| 古丈县| 南昌市| 行唐县| 昂仁县| 安化县| 仁怀市| 天台县| 军事| 昭通市| 胶州市| 疏附县| 封开县| 和林格尔县| 张北县| 石狮市| 三台县| 成安县| 宜章县| 玉田县| 长兴县| 临邑县| 应城市| 开平市| 三亚市| 岱山县| 郯城县| 大庆市| 石家庄市| 海淀区| 资中县| 漠河县|