并發(fā)是Java語(yǔ)言中的一個(gè)很重要的概念,而說(shuō)起并發(fā)就繞不過(guò)AQS。AQS定義了一套多線程訪問(wèn)共享資源的同步器框架,許多同步類實(shí)現(xiàn)都依賴于它。接下來(lái)將和大家簡(jiǎn)單地介紹一下 AQS。
引言
從本篇文章開(kāi)始,我們將介紹 Java AQS 的實(shí)現(xiàn)方式,本文先介紹 AQS 的內(nèi)部數(shù)據(jù)是如何組織的,后面的文章中再分別介紹 AQS 的各個(gè)部門實(shí)現(xiàn)。
AQS
通過(guò)前面的介紹,大家一定看出來(lái)了,上述的各種類型的鎖和一些線程控制接口(CountDownLatch 等),最終都是通過(guò) AQS 來(lái)實(shí)現(xiàn)的,不同之處只在于 tryAcquire 等抽象函數(shù)如何實(shí)現(xiàn)。從這個(gè)角度來(lái)看,AQS(AbstractQueuedSynchronizer) 這個(gè)基類設(shè)計(jì)的真的很不錯(cuò),能夠包容各種同步控制方案,并提供了必須的下層依賴:比如阻塞,隊(duì)列等。接下來(lái)我們就來(lái)揭開(kāi)它神秘的面紗。
內(nèi)部數(shù)據(jù)
AQS 顧名思義,就是通過(guò)隊(duì)列來(lái)組織修改互斥資源的請(qǐng)求。當(dāng)這個(gè)資源空閑時(shí)間,那么修改請(qǐng)求可以直接進(jìn)行,而當(dāng)這個(gè)資源處于鎖定狀態(tài)時(shí),就需要等待,AQS 會(huì)將所有等待的請(qǐng)求維護(hù)在一個(gè)類似于 CLH 的隊(duì)列中。CLH:Craig、Landin and Hagersten隊(duì)列,是單向鏈表,AQS中的隊(duì)列是CLH變體的虛擬雙向隊(duì)列(FIFO),AQS是通過(guò)將每條請(qǐng)求共享資源的線程封裝成一個(gè)節(jié)點(diǎn)來(lái)實(shí)現(xiàn)鎖的分配。主要原理圖如下:
圖中的 state 是一個(gè)用 volatile 修飾的 int 變量,它的使用都是通過(guò) CAS 來(lái)進(jìn)行的,而 FIFO 隊(duì)列完成請(qǐng)求排隊(duì)的工作,隊(duì)列的操作也是通過(guò) CAS 來(lái)進(jìn)行的,正因如此該隊(duì)列的操作才能達(dá)到理想的性能要求。
通過(guò) CAS 修改 state 比較容易,大家應(yīng)該都能理解,但是如果要通過(guò) CAS 維護(hù)一個(gè)雙向隊(duì)列要怎么做呢?這里我們看一下 AQS 中 CLH 隊(duì)列的實(shí)現(xiàn)。在 AQS 中有兩個(gè)指針一個(gè)指針指向了隊(duì)列頭,一個(gè)指向了隊(duì)列尾。它們都是懶初始化的,也就是說(shuō)最初都為null。
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
隊(duì)列中的每個(gè)節(jié)點(diǎn),都是一個(gè) Node 實(shí)例,該實(shí)例的第一個(gè)關(guān)鍵字段是 waitState,它表述了當(dāng)前節(jié)點(diǎn)所處的狀態(tài),通過(guò) CAS 進(jìn)行修改:
- SIGNAL:表示當(dāng)前節(jié)點(diǎn)承擔(dān)喚醒后繼節(jié)點(diǎn)的責(zé)任
- CANCELLED:表示當(dāng)前節(jié)點(diǎn)已經(jīng)超時(shí)或者被打斷
- CONDITION:表示當(dāng)前節(jié)點(diǎn)正在 Condition 上等待(通過(guò)鎖可以創(chuàng)建 Condition 對(duì)象)
- PROPAGATE:只會(huì)設(shè)置在 head 節(jié)點(diǎn)上,用于表明釋放共享鎖時(shí),需要將這個(gè)行為傳播到其他節(jié)點(diǎn)上,這個(gè)我們稍后詳細(xì)介紹。
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;
/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
*/
volatile Node prev;
/**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
*/
volatile Node next;
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;
/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
//...
}
因?yàn)槭请p向隊(duì)列,所以 Node 實(shí)例中勢(shì)必有 prev 和 next 指針,此外 Node 中還會(huì)保存與其對(duì)應(yīng)的線程。最后是 nextWaiter,當(dāng)一個(gè)節(jié)點(diǎn)對(duì)應(yīng)了共享請(qǐng)求時(shí),nextWaiter 指向了 Node. SHARED 而當(dāng)一個(gè)節(jié)點(diǎn)是排他請(qǐng)求時(shí),nextWaiter 默認(rèn)指向了 Node. EXCLUSIVE 也就是 null。我們知道 AQS 也提供了 Condition 功能,該功能就是通過(guò) nextWaiter 來(lái)維護(hù)在 Condition 上等待的線程。也就是說(shuō)這里的 nextWaiter 在鎖的實(shí)現(xiàn)部分中,扮演者共享鎖和排它鎖的標(biāo)志位,而在條件等待隊(duì)列中,充當(dāng)鏈表的 next 指針。
同步隊(duì)列
接下來(lái),我們由最常見(jiàn)的入隊(duì)操作出發(fā),介紹 AQS 框架的實(shí)現(xiàn)與使用。從下面的代碼中可以看到入隊(duì)操作支持兩種模式,一種是排他模式,一種是共享模式,分別對(duì)應(yīng)了排它鎖場(chǎng)景和共享鎖場(chǎng)景。
- 當(dāng)任意一種請(qǐng)求,要入隊(duì)時(shí),先會(huì)構(gòu)建一個(gè) Node 實(shí)例,然后獲取當(dāng)前 AQS 隊(duì)列的尾結(jié)點(diǎn),如果尾結(jié)點(diǎn)為空,就是說(shuō)隊(duì)列還沒(méi)初始化,初始化過(guò)程在后面 enq 函數(shù)中實(shí)現(xiàn)
- 這里我們先看初始化之后的情況,即 tail != null,先將當(dāng)前 Node 的前向指針 prev 更新,然后通過(guò) CAS 將尾結(jié)點(diǎn)修改為當(dāng)前 Node,修改成功時(shí),再更新前一個(gè)節(jié)點(diǎn)的后向指針 next,因?yàn)橹挥行薷奈仓羔樳^(guò)程是原子的,所以這里會(huì)出現(xiàn)新插入一個(gè)節(jié)點(diǎn)時(shí),之前的尾節(jié)點(diǎn) previousTail 的 next 指針為null的情況,也就是說(shuō)會(huì)存在短暫的正向指針和反向指針不同步的情況,不過(guò)在后面的介紹中,你會(huì)發(fā)現(xiàn) AQS 很完備地避開(kāi)了這種不同步帶來(lái)的風(fēng)險(xiǎn)(通過(guò)從后往前遍歷)
- 如果上述操作成功,則當(dāng)前線程已經(jīng)進(jìn)入同步隊(duì)列,否則,可能存在多個(gè)線程的競(jìng)爭(zhēng),其他線程設(shè)置尾結(jié)點(diǎn)成功了,而當(dāng)前線程失敗了,這時(shí)候會(huì)和尾結(jié)點(diǎn)未初始化一樣進(jìn)入 enq 函數(shù)中。
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
// 已經(jīng)進(jìn)行了初始化
node.prev = pred;
// CAS 修改尾節(jié)點(diǎn)
if (compareAndSetTail(pred, node)) {
// 成功之后再修改后向指針
pred.next = node;
return node;
}
}
// 循環(huán) CAS 過(guò)程和初始化過(guò)程
enq(node);
return node;
}
正常通過(guò) CAS 修改數(shù)據(jù)都會(huì)在一個(gè)循環(huán)中進(jìn)行,而這里的 addWaiter 只是在一個(gè) if 中進(jìn)行,這是為什么呢?實(shí)際上,大家看到的 addWaiter 的這部分 CAS 過(guò)程是一個(gè)快速執(zhí)行線,在沒(méi)有競(jìng)爭(zhēng)時(shí),這種方式能省略不少判斷過(guò)程。當(dāng)發(fā)生競(jìng)爭(zhēng)時(shí),會(huì)進(jìn)入 enq 函數(shù)中,那里才是循環(huán) CAS 的地方。
- 整個(gè) enq 的工作在一個(gè)循環(huán)中進(jìn)行
- 先會(huì)檢查是否未進(jìn)行初始化,是的話,就設(shè)置一個(gè)虛擬節(jié)點(diǎn) Node 作為 head 和 tail,也就是說(shuō)同步隊(duì)列的第一個(gè)節(jié)點(diǎn)并不保存實(shí)際數(shù)據(jù),只是一個(gè)保存指針的地方
- 初始化完成后,通過(guò) CAS 修改尾節(jié)點(diǎn),直到修改成功為止,最后修復(fù)后向指針
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {// 在一個(gè)循環(huán)中進(jìn)行 CAS 操作
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
// CAS 修改尾節(jié)點(diǎn)
if (compareAndSetTail(t, node)) {
// 成功之后再修改后向指針
t.next = node;
return t;
}
}
以上就是通過(guò)AQS實(shí)現(xiàn)數(shù)據(jù)組織的詳細(xì)內(nèi)容,更多關(guān)于AQS數(shù)據(jù)組織的資料請(qǐng)關(guān)注W3Cschool其它相關(guān)文章!