今天 V 哥聊聊并發(fā)數(shù)據(jù)結(jié)構(gòu)的問題,我們知道,并發(fā)編程中,保障數(shù)據(jù)的安全訪問是第一要務(wù),JDK 提供了一系列JUC并發(fā)數(shù)據(jù)結(jié)構(gòu),這些數(shù)據(jù)結(jié)構(gòu)是線程安全的,可以在多線程環(huán)境中使用而無需額外的同步措施。以下是一些主要的并發(fā)數(shù)據(jù)結(jié)構(gòu):
一個線程安全的哈希表,用于存儲鍵值對。它在內(nèi)部使用了分段鎖(Segment Locking)或其他形式的并發(fā)控制機(jī)制,允許多個線程并發(fā)讀寫,同時保持較高的性能。
ConcurrentHashMap 是 Java 并發(fā)編程中非常重要的一個線程安全的哈希表實現(xiàn),它在 java.util.concurrent 包中。ConcurrentHashMap 允許并發(fā)讀和并發(fā)寫,旨在提供比同步的 HashMap 更高的并發(fā)性能。
實現(xiàn)原理:
在 JDK 1.7 及之前的版本中,ConcurrentHashMap 使用了分段鎖(Segment Locking)機(jī)制。整個哈希表被分割成多個段(Segment),每個段是一個小的哈希表,它們有自己的鎖。當(dāng)多個線程訪問不同段的數(shù)據(jù)時,它們可以并發(fā)執(zhí)行,因為每個段都有自己的鎖。
ConcurrentHashMap 使用了無鎖的 compare-and-swap(CAS)操作來更新數(shù)據(jù),這進(jìn)一步提高了并發(fā)性能。
讀取操作通常不需要加鎖,因為 ConcurrentHashMap 的設(shè)計保證了讀取數(shù)據(jù)的可見性和一致性。
在 JDK 1.8 中,ConcurrentHashMap 的實現(xiàn)發(fā)生了變化,它取消了分段鎖,轉(zhuǎn)而使用了 synchronized 關(guān)鍵字來保護(hù)哈希表的節(jié)點(diǎn)(Node)。同時,它也引入了紅黑樹來處理哈希碰撞導(dǎo)致的鏈表過長的問題,提高了最壞情況下的性能。
作用:
ConcurrentHashMap 的主要作用是在多線程環(huán)境中提供高效的并發(fā)訪問。它適用于以下場景:
示例代碼:
以下是一個簡單的 ConcurrentHashMap 使用示例:
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ConcurrentHashMapExample {
public static void main(String[] args) throws InterruptedException {
// 創(chuàng)建一個ConcurrentHashMap
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 創(chuàng)建一個線程池
ExecutorService executor = Executors.newFixedThreadPool(10);
// 提交10個任務(wù)到線程池,每個任務(wù)都會更新ConcurrentHashMap
for (int i = 0; i < 10; i++) {
final int taskNumber = i;
executor.submit(() -> {
map.put("key" + taskNumber, taskNumber);
System.out.println("Task " + taskNumber + " put value: " + map.get("key" + taskNumber));
});
}
// 關(guān)閉線程池
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
}
}
在這個示例中,我們創(chuàng)建了一個 ConcurrentHashMap 并使用一個線程池來并發(fā)地更新它。每個任務(wù)都會向哈希表中插入一個鍵值對,并打印出對應(yīng)的值。由于 ConcurrentHashMap 是線程安全的,所以這個程序可以正確地運(yùn)行而不會出現(xiàn)并發(fā)問題。
解釋:
這個示例展示了如何在多線程環(huán)境中安全地使用 ConcurrentHashMap。
實現(xiàn)原理的代碼分析:
ConcurrentHashMap 是 Java 中的一個線程安全的哈希表實現(xiàn),用于存儲鍵值對。在 Java 1.8 之前,ConcurrentHashMap 使用了分段鎖機(jī)制,而在 Java 1.8 之后,它采用了更為高效的鎖分離技術(shù)。
Java 1.8 之前的實現(xiàn)原理:
在 Java 1.8 之前,ConcurrentHashMap 使用分段鎖(Segment Locking)機(jī)制。每個 Segment 是一個可重入的 ReentrantLock,它用于鎖定整個哈希表的一個部分。哈希表被分割成多個段,每個段有自己的鎖,因此可以同時進(jìn)行讀寫操作。
ConcurrentHashMap 使用分段鎖來保護(hù)多個哈希表段。每個段有一個自己的鎖,這使得在多線程環(huán)境中可以并發(fā)地讀寫不同的段。
在 Java 1.8 之前,ConcurrentHashMap 在進(jìn)行寫操作時,會復(fù)制整個段,而不是整個哈希表。這減少了加鎖的范圍,提高了并發(fā)性能。
Java 1.8 之后的實現(xiàn)原理:
在 Java 1.8 中,ConcurrentHashMap 的實現(xiàn)發(fā)生了變化,它取消了分段鎖,轉(zhuǎn)而使用了 synchronized 關(guān)鍵字來保護(hù)哈希表的節(jié)點(diǎn)(Node)。同時,它也引入了紅黑樹來處理哈希碰撞導(dǎo)致的鏈表過長的問題,提高了最壞情況下的性能。
在 Java 1.8 中,ConcurrentHashMap 使用了一種稱為“鎖分離”的技術(shù)。它將鎖的范圍縮小到鏈表的頭部節(jié)點(diǎn),而不是整個哈希表或整個段。這減少了鎖競爭,提高了并發(fā)性能。
為了提高哈希表的性能,ConcurrentHashMap 引入了紅黑樹。當(dāng)鏈表的長度超過某個閾值時,鏈表會被轉(zhuǎn)換為紅黑樹,這樣可以減少搜索時間,提高最壞情況下的性能。
代碼分析:
以下是 ConcurrentHashMap 類的一些關(guān)鍵方法的代碼分析:
put(K key, V value)
:這個方法用于向 ConcurrentHashMap 中添加一個鍵值對。get(Object key)
:這個方法用于從 ConcurrentHashMap 中獲取與指定鍵關(guān)聯(lián)的值。remove(Object key)
:這個方法用于從 ConcurrentHashMap 中移除與指定鍵關(guān)聯(lián)的鍵值對。這些方法都使用了 synchronized 關(guān)鍵字來保護(hù)哈希表的節(jié)點(diǎn)。在 Java 1.8 之前,這些方法會使用分段鎖來保護(hù)整個段。而在 Java 1.8 之后,這些方法會使用鎖分離技術(shù)來保護(hù)鏈表的頭部節(jié)點(diǎn)。
這個示例展示了如何在多線程環(huán)境中使用 ConcurrentHashMap 來安全地進(jìn)行鍵值對的添加、獲取和移除操作。由于 ConcurrentHashMap 是線程安全的,所以這個程序可以正確地運(yùn)行而不會出現(xiàn)并發(fā)問題。
CopyOnWriteArrayList是一個線程安全的列表,它在進(jìn)行修改操作(如添加、刪除元素)時會創(chuàng)建底層數(shù)組的一個新副本,從而實現(xiàn)讀寫分離。適用于讀多寫少的場景。
實現(xiàn)原理:
CopyOnWriteArrayList 是一個線程安全的變體,其中所有對列表的修改(添加、刪除、設(shè)置元素等)都是在底層數(shù)組的一個副加上進(jìn)行的。這意味著在修改操作發(fā)生時,會創(chuàng)建一個新的數(shù)組,并將現(xiàn)有的所有元素復(fù)制到新數(shù)組中,然后在新數(shù)組上進(jìn)行修改。完成修改后,再將內(nèi)部引用指向新數(shù)組。由于寫操作是在新數(shù)組上進(jìn)行的,讀操作可以安全地訪問舊數(shù)組,而不會受到寫操作的干擾。
這是 CopyOnWriteArrayList 的核心原理。在發(fā)生寫操作時,不直接修改原有數(shù)組,而是復(fù)制出一個新數(shù)組,修改完成后,再將內(nèi)部引用指向新數(shù)組。
由于讀操作不需要修改數(shù)組,它們可以安全地讀取當(dāng)前的數(shù)組,而不需要任何鎖。這提高了讀取操作的并發(fā)性能。
為了確保寫操作的原子性和一致性,寫操作需要加鎖。這是通過在修改方法(如 add, remove, set)中使用 ReentrantLock 實現(xiàn)的。
作用:
CopyOnWriteArrayList 的主要作用是在讀多寫少的場景中提供線程安全的列表操作,同時盡量減少讀操作的鎖競爭。它適用于以下場景:
示例代碼:
以下是一個簡單的 CopyOnWriteArrayList 使用示例:
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CopyOnWriteArrayListExample {
public static void main(String[] args) {
// 創(chuàng)建一個CopyOnWriteArrayList
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
// 添加元素
list.add("A");
list.add("B");
list.add("C");
// 創(chuàng)建一個線程池
ExecutorService executor = Executors.newFixedThreadPool(2);
// 提交一個任務(wù)到線程池,該任務(wù)會修改CopyOnWriteArrayList
executor.submit(() -> {
list.add("D");
list.remove("A");
});
// 提交另一個任務(wù)到線程池,該任務(wù)會讀取CopyOnWriteArrayList
executor.submit(() -> {
for (String element : list) {
System.out.println(element);
}
});
// 關(guān)閉線程池
executor.shutdown();
}
}
在這個示例中,我們創(chuàng)建了一個 CopyOnWriteArrayList 并使用一個線程池來并發(fā)地修改和讀取它。一個任務(wù)嘗試添加和刪除元素,而另一個任務(wù)遍歷列表并打印所有元素。由于 CopyOnWriteArrayList 是線程安全的,所以這個程序可以正確地運(yùn)行而不會出現(xiàn)并發(fā)問題。
解釋:
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
:創(chuàng)建了一個新的 CopyOnWriteArrayList 實例。list.add("A");
和 list.remove("A");
:添加和刪除元素的操作。ExecutorService executor = Executors.newFixedThreadPool(2);
:創(chuàng)建了一個大小為2的線程池。executor.submit(() -> {...});
:提交了一個 Runnable 任務(wù)到線程池,任務(wù)中修改了 CopyOnWriteArrayList。for (String element : list) {...}
:遍歷列表并打印元素。executor.shutdown();
:關(guān)閉線程池。這個示例展示了如何在多線程環(huán)境中使用 CopyOnWriteArrayList 來安全地進(jìn)行讀寫操作。由于寫操作相對昂貴(因為需要復(fù)制數(shù)組),所以 CopyOnWriteArrayList 適用于讀多寫少的場景。
代碼分析:
以下是 CopyOnWriteArrayList 類的一些關(guān)鍵方法的代碼分析:
add(E e)
:這個方法用于向 CopyOnWriteArrayList 中添加一個元素。get(int index)
:這個方法用于從 CopyOnWriteArrayList 中獲取指定索引的元素。set(int index, E element)
:這個方法用于將指定索引的元素設(shè)置為新元素。remove(int index)
:這個方法用于從 CopyOnWriteArrayList 中移除指定索引的元素。這些方法都使用了原子操作來更新數(shù)據(jù),并確保了線程安全。在 add、set 和 remove 方法中,會創(chuàng)建一個新數(shù)組,并將現(xiàn)有元素復(fù)制到新數(shù)組中,然后修改新數(shù)組。完成修改后,再將內(nèi)部引用指向新數(shù)組。
與 CopyOnWriteArrayList 類似,但它存儲的是不包含重復(fù)元素的集合。
實現(xiàn)原理:
CopyOnWriteArraySet 是一個線程安全的變體,其中所有對集合的修改(添加、刪除元素等)都是在底層數(shù)組的一個副加上進(jìn)行的。這意味著在修改操作發(fā)生時,會創(chuàng)建一個新的數(shù)組,然后將現(xiàn)有的所有元素復(fù)制到新數(shù)組中,并在新數(shù)組上進(jìn)行修改。完成修改后,再將內(nèi)部引用指向新數(shù)組。由于寫操作是在新數(shù)組上進(jìn)行的,讀操作可以安全地訪問舊數(shù)組,而不會受到寫操作的干擾。
這是 CopyOnWriteArraySet 的核心原理。在發(fā)生寫操作時,不直接修改原有數(shù)組,而是復(fù)制出一個新數(shù)組,修改完成后,再將內(nèi)部引用指向新數(shù)組。
由于讀操作不需要修改數(shù)組,它們可以安全地讀取當(dāng)前的數(shù)組,而不需要任何鎖。這提高了讀取操作的并發(fā)性能。
為了確保寫操作的原子性和一致性,寫操作需要加鎖。這是通過在修改方法(如 add, remove)中使用 ReentrantLock 實現(xiàn)的。
作用:
CopyOnWriteArraySet 的主要作用是在讀多寫少的場景中提供線程安全的集合操作,同時盡量減少讀操作的鎖競爭。它適用于以下場景:
示例代碼:
以下是一個簡單的 CopyOnWriteArraySet 使用示例:
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CopyOnWriteArraySetExample {
public static void main(String[] args) {
// 創(chuàng)建一個CopyOnWriteArraySet
CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();
// 添加元素
set.add("A");
set.add("B");
set.add("C");
// 創(chuàng)建一個線程池
ExecutorService executor = Executors.newFixedThreadPool(2);
// 提交一個任務(wù)到線程池,該任務(wù)會修改CopyOnWriteArraySet
executor.submit(() -> {
set.add("D");
set.remove("A");
});
// 提交另一個任務(wù)到線程池,該任務(wù)會讀取CopyOnWriteArraySet
executor.submit(() -> {
for (String element : set) {
System.out.println(element);
}
});
// 關(guān)閉線程池
executor.shutdown();
}
}
在這個示例中,我們創(chuàng)建了一個 CopyOnWriteArraySet 并使用一個線程池來并發(fā)地修改和讀取它。一個任務(wù)嘗試添加和刪除元素,而另一個任務(wù)遍歷集合并打印所有元素。由于 CopyOnWriteArraySet 是線程安全的,所以這個程序可以正確地運(yùn)行而不會出現(xiàn)并發(fā)問題。
解釋:
CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();
:創(chuàng)建了一個新的 CopyOnWriteArraySet 實例。set.add("A");
和 set.remove("A");
:添加和刪除元素的操作。ExecutorService executor = Executors.newFixedThreadPool(2);
:創(chuàng)建了一個大小為2的線程池。executor.submit(() -> {...});
:提交了一個 Runnable 任務(wù)到線程池,任務(wù)中修改了 CopyOnWriteArraySet。for (String element : set) {...}
:遍歷集合并打印元素。executor.shutdown();
:關(guān)閉線程池。這個示例展示了如何在多線程環(huán)境中使用 CopyOnWriteArraySet 來安全地進(jìn)行讀寫操作。由于寫操作相對昂貴(因為需要復(fù)制數(shù)組),所以 CopyOnWriteArraySet 適用于讀多寫少的場景,并且需要集合中元素不重復(fù)的特性。
代碼分析:
以下是 CopyOnWriteArraySet 類的一些關(guān)鍵方法的代碼分析:
add(E e)
:這個方法用于向 CopyOnWriteArraySet 中添加一個元素。contains(Object o)
:這個方法用于檢查集合中是否包含指定元素。size()
:這個方法用于獲取集合中元素的數(shù)量。clear()
:這個方法用于清空集合中的所有元素。這些方法都使用了原子操作來更新數(shù)據(jù),并確保了線程安全。在 add 方法中,會創(chuàng)建一個新數(shù)組,并將現(xiàn)有元素復(fù)制到新數(shù)組中。如果新數(shù)組中已經(jīng)存在相同的元素,則不會添加該元素。完成修改后,再將內(nèi)部引用指向新數(shù)組。
一個線程安全的無界非阻塞隊列,基于鏈表實現(xiàn)。它使用原子操作來保證線程安全,適合在高并發(fā)環(huán)境下使用。
實現(xiàn)原理:
ConcurrentLinkedQueue 是一個基于鏈表實現(xiàn)的線程安全的無界非阻塞隊列。它使用原子操作來保證線程安全,適合在高并發(fā)環(huán)境下使用。
ConcurrentLinkedQueue 實現(xiàn)了 Queue 接口,提供了一組原子操作來支持隊列的基本功能,如入隊(offer)、出隊(poll)等,這些操作都是非阻塞的。
ConcurrentLinkedQueue 沒有容量限制,理論上可以無限增長,直到耗盡內(nèi)存。
ConcurrentLinkedQueue 使用了 compare-and-swap(CAS)操作來更新鏈表節(jié)點(diǎn),這保證了在多線程環(huán)境下的線程安全。
ConcurrentLinkedQueue 使用了無鎖算法,避免了鎖競爭帶來的性能開銷。
作用:
ConcurrentLinkedQueue 的主要作用是在多線程環(huán)境中提供一個高效且線程安全的隊列。它適用于以下場景:
示例代碼:
以下是一個簡單的 ConcurrentLinkedQueue 使用示例:
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ConcurrentLinkedQueueExample {
public static void main(String[] args) {
// 創(chuàng)建一個ConcurrentLinkedQueue
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
// 創(chuàng)建一個線程池
ExecutorService executor = Executors.newFixedThreadPool(10);
// 提交任務(wù)到線程池,生產(chǎn)者線程
for (int i = 0; i < 5; i++) {
final int taskNumber = i;
executor.submit(() -> {
queue.offer(taskNumber);
System.out.println("Task " + taskNumber + " added to queue");
});
}
// 提交任務(wù)到線程池,消費(fèi)者線程
for (int i = 0; i < 5; i++) {
executor.submit(() -> {
Integer element = queue.poll();
if (element != null) {
System.out.println("Task " + element + " removed from queue");
}
});
}
// 關(guān)閉線程池
executor.shutdown();
}
}
在這個示例中,我們創(chuàng)建了一個 ConcurrentLinkedQueue 并使用一個線程池來模擬生產(chǎn)者和消費(fèi)者。生產(chǎn)者線程向隊列中添加元素,而消費(fèi)者線程從隊列中移除元素。由于 ConcurrentLinkedQueue 是線程安全的,所以這個程序可以正確地運(yùn)行而不會出現(xiàn)并發(fā)問題。
解釋:
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
:創(chuàng)建了一個新的 ConcurrentLinkedQueue 實例。queue.offer(taskNumber);
:生產(chǎn)者線程將元素添加到隊列尾部。Integer element = queue.poll();
:消費(fèi)者線程從隊列頭部移除元素。ExecutorService executor = Executors.newFixedThreadPool(10);
:創(chuàng)建了一個大小為10的線程池。executor.submit(() -> {...});
:提交了生產(chǎn)者和消費(fèi)者任務(wù)到線程池。executor.shutdown();
:關(guān)閉線程池。這個示例展示了如何在多線程環(huán)境中使用 ConcurrentLinkedQueue 來安全地進(jìn)行生產(chǎn)者和消費(fèi)者操作。由于 ConcurrentLinkedQueue 是無界的且非阻塞的,它適合用于生產(chǎn)者和消費(fèi)者數(shù)量不固定,或者需要高并發(fā)處理的場景。
代碼分析:
以下是 ConcurrentLinkedQueue 類的一些關(guān)鍵方法的代碼分析:
offer(E e)
:這個方法用于向 ConcurrentLinkedQueue 中添加一個元素。如果隊列已滿,該方法將返回 false。poll()
:這個方法用于從 ConcurrentLinkedQueue 中移除并返回第一個元素。如果隊列為空,該方法將返回 null。peek()
:這個方法用于返回 ConcurrentLinkedQueue 中第一個元素,但不從隊列中移除它。如果隊列為空,該方法將返回 null。size()
:這個方法用于返回 ConcurrentLinkedQueue 中元素的數(shù)量。這些方法都使用了原子操作來更新鏈表節(jié)點(diǎn),并確保了線程安全。在 offer 方法中,會使用 CAS 操作將新元素添加到鏈表的尾部。在 poll 方法中,會使用 CAS 操作從鏈表的頭部移除元素。
一個線程安全的雙端隊列,也是基于鏈表實現(xiàn),適用于需要從兩端插入和刪除元素的場景。
實現(xiàn)原理:
ConcurrentLinkedDeque 是一個基于鏈表實現(xiàn)的線程安全的雙端隊列。它支持在隊列的首尾進(jìn)行插入和刪除操作,并且是線程安全的。
ConcurrentLinkedDeque 實現(xiàn)了 Deque 接口,提供了一組原子操作來支持雙端隊列的基本功能,如從頭部插入(addFirst)、從尾部插入(addLast)、從頭部移除(removeFirst)、從尾部移除(removeLast)等。
ConcurrentLinkedDeque 沒有容量限制,理論上可以無限增長,直到耗盡內(nèi)存。
ConcurrentLinkedDeque 使用了 compare-and-swap(CAS)操作來更新鏈表節(jié)點(diǎn),這保證了在多線程環(huán)境下的線程安全。
ConcurrentLinkedDeque 使用了無鎖算法,避免了鎖競爭帶來的性能開銷。
作用:
ConcurrentLinkedDeque 的主要作用是在多線程環(huán)境中提供一個高效且線程安全的雙端隊列。它適用于以下場景:
示例代碼:
以下是一個簡單的 ConcurrentLinkedDeque 使用示例:
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ConcurrentLinkedDequeExample {
public static void main(String[] args) {
// 創(chuàng)建一個ConcurrentLinkedDeque
ConcurrentLinkedDeque<String> deque = new ConcurrentLinkedDeque<>();
// 創(chuàng)建一個線程池
ExecutorService executor = Executors.newFixedThreadPool(10);
// 提交任務(wù)到線程池,生產(chǎn)者線程
for (int i = 0; i < 5; i++) {
final int taskNumber = i;
executor.submit(() -> {
deque.addFirst("Task " + taskNumber);
System.out.println("Task " + taskNumber + " added to the front of the deque");
});
}
// 提交任務(wù)到線程池,消費(fèi)者線程
for (int i = 0; i < 5; i++) {
executor.submit(() -> {
String element = deque.removeLast();
if (element != null) {
System.out.println(element + " removed from the end of the deque");
}
});
}
// 關(guān)閉線程池
executor.shutdown();
}
}
在這個示例中,我們創(chuàng)建了一個 ConcurrentLinkedDeque 并使用一個線程池來模擬生產(chǎn)者和消費(fèi)者。生產(chǎn)者線程向隊列的頭部添加元素,而消費(fèi)者線程從隊列的尾部移除元素。由于 ConcurrentLinkedDeque 是線程安全的,所以這個程序可以正確地運(yùn)行而不會出現(xiàn)并發(fā)問題。
解釋:
ConcurrentLinkedDeque<String> deque = new ConcurrentLinkedDeque<>();
:創(chuàng)建了一個新的 ConcurrentLinkedDeque 實例。deque.addFirst("Task " + taskNumber);
:生產(chǎn)者線程將元素添加到隊列的頭部。String element = deque.removeLast();
:消費(fèi)者線程從隊列的尾部移除元素。ExecutorService executor = Executors.newFixedThreadPool(10);
:創(chuàng)建了一個大小為10的線程池。executor.submit(() -> {...});
:提交了生產(chǎn)者和消費(fèi)者任務(wù)到線程池。executor.shutdown();
:關(guān)閉線程池。這個示例展示了如何在多線程環(huán)境中使用 ConcurrentLinkedDeque 來安全地進(jìn)行生產(chǎn)者和消費(fèi)者操作。由于 ConcurrentLinkedDeque 是無界的且非阻塞的,它適合用于生產(chǎn)者和消費(fèi)者數(shù)量不固定,或者需要高并發(fā)處理的場景,并且需要雙端隊列的特性。
代碼分析:
以下是 ConcurrentLinkedDeque 類的一些關(guān)鍵方法的代碼分析:
addFirst(E e)
:這個方法用于在 ConcurrentLinkedDeque 的頭部添加一個元素。addLast(E e)
:這個方法用于在 ConcurrentLinkedDeque 的尾部添加一個元素。removeFirst()
:這個方法用于從 ConcurrentLinkedDeque 的頭部移除并返回第一個元素。如果隊列為空,該方法將返回 null。removeLast()
:這個方法用于從 ConcurrentLinkedDeque 的尾部移除并返回最后一個元素。如果隊列為空,該方法將返回 null。這些方法都使用了原子操作來更新鏈表節(jié)點(diǎn),并確保了線程安全。在 addFirst 和 addLast 方法中,會使用 CAS 操作將新元素添加到鏈表的頭部或尾部。在 removeFirst 和 removeLast 方法中,會使用 CAS 操作從鏈表的頭部或尾部移除元素。
(如 ArrayBlockingQueue, LinkedBlockingQueue, PriorityBlockingQueue 等)提供了線程安全的隊列操作,支持阻塞的插入和獲取操作。當(dāng)隊列滿時,插入操作會阻塞;當(dāng)隊列空時,獲取操作會阻塞。
實現(xiàn)原理:
BlockingQueue 是一個支持阻塞操作的隊列。當(dāng)隊列滿時,插入操作會阻塞;當(dāng)隊列空時,獲取操作會阻塞。它實現(xiàn)了生產(chǎn)者-消費(fèi)者模式,用于線程間的數(shù)據(jù)共享。
BlockingQueue 提供了阻塞的 put 和 take 方法,這些方法在隊列滿或空時會使線程進(jìn)入等待狀態(tài),直到隊列有空閑空間或數(shù)據(jù)可用。
BlockingQueue 的實現(xiàn)類通常使用鎖(如 ReentrantLock)和條件變量(如 Condition)來實現(xiàn)線程同步。
BlockingQueue 通常有固定的容量限制,但也有一些實現(xiàn)(如 LinkedBlockingQueue)允許指定最大容量,如果沒有指定,則默認(rèn)為 Integer.MAX_VALUE。
作用:
BlockingQueue 的主要作用是在多線程環(huán)境中提供一個線程安全的隊列,用于生產(chǎn)者和消費(fèi)者之間的數(shù)據(jù)傳遞。它適用于以下場景:
示例代碼:
以下是一個簡單的 BlockingQueue 使用示例,使用 ArrayBlockingQueue 作為實現(xiàn):
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class BlockingQueueExample {
public static void main(String[] args) {
// 創(chuàng)建一個容量為10的ArrayBlockingQueue
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
// 創(chuàng)建一個線程池
ExecutorService executor = Executors.newFixedThreadPool(2);
// 提交生產(chǎn)者任務(wù)到線程池
executor.submit(() -> {
try {
for (int i = 0; i < 20; i++) {
queue.put("Task " + i);
System.out.println("Task " + i + " added to the queue");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 提交消費(fèi)者任務(wù)到線程池
executor.submit(() -> {
try {
for (int i = 0; i < 20; i++) {
String task = queue.take();
System.out.println(task + " removed from the queue");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 關(guān)閉線程池
executor.shutdown();
}
}
在這個示例中,我們創(chuàng)建了一個 ArrayBlockingQueue 并使用一個線程池來模擬生產(chǎn)者和消費(fèi)者。生產(chǎn)者線程向隊列中添加元素,而消費(fèi)者線程從隊列中移除元素。由于 ArrayBlockingQueue 是線程安全的,所以這個程序可以正確地運(yùn)行而不會出現(xiàn)并發(fā)問題。
解釋:
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
:創(chuàng)建了一個容量為10的 ArrayBlockingQueue 實例。queue.put("Task " + i);
:生產(chǎn)者線程將元素添加到隊列中,如果隊列已滿,則線程會阻塞。String task = queue.take();
:消費(fèi)者線程從隊列中移除元素,如果隊列空,則線程會阻塞。ExecutorService executor = Executors.newFixedThreadPool(2);
:創(chuàng)建了一個大小為2的線程池。executor.submit(() -> {...});
:提交了生產(chǎn)者和消費(fèi)者任務(wù)到線程池。executor.shutdown();
:關(guān)閉線程池。這個示例展示了如何在多線程環(huán)境中使用 BlockingQueue 來安全地進(jìn)行生產(chǎn)者和消費(fèi)者操作。由于 ArrayBlockingQueue 是有界的,它在隊列滿時會阻塞生產(chǎn)者,在隊列空時會阻塞消費(fèi)者,適合用于需要阻塞隊列的場景。
代碼分析:
以下是 BlockingQueue 接口的一些關(guān)鍵方法的代碼分析:
put(E e)
:這個方法用于向 BlockingQueue 中添加一個元素。如果隊列已滿,該方法會阻塞,直到隊列有空閑空間。take()
:這個方法用于從 BlockingQueue 中移除并返回第一個元素。如果隊列為空,該方法會阻塞,直到隊列有數(shù)據(jù)可用。offer(E e)
:這個方法用于向 BlockingQueue 中添加一個元素。如果隊列已滿,該方法將返回 false。poll(long timeout, TimeUnit unit)
:這個方法用于從 BlockingQueue 中移除并返回第一個元素。如果隊列為空,該方法將在指定的時間內(nèi)阻塞,如果超時則返回 null。這些方法都使用了鎖和條件變量來實現(xiàn)線程同步。在 put 和 offer 方法中,會使用鎖來保護(hù)隊列,并使用條件變量來阻塞線程。在 take 和 poll 方法中,會使用鎖來保護(hù)隊列,并使用條件變量來喚醒等待的線程。
分別是線程安全的有序映射和有序集,基于跳表(Skip List)實現(xiàn),提供了高效的查找、插入和刪除操作。
實現(xiàn)原理:
ConcurrentSkipListMap 是一個線程安全的有序映射,它基于跳表(Skip List)數(shù)據(jù)結(jié)構(gòu)實現(xiàn)。跳表是一種平衡樹結(jié)構(gòu),它結(jié)合了紅黑樹和有序鏈表的特點(diǎn),提供了一種高效的數(shù)據(jù)結(jié)構(gòu),可以進(jìn)行快速的查找、插入和刪除操作。
跳表包含多層索引,每一層索引都是有序的鏈表。最底層是最簡單的有序鏈表,高層索引包含指向下一層索引的指針。通過這些指針,可以快速跳過大量節(jié)點(diǎn),從而提高查找、插入和刪除操作的效率。
ConcurrentSkipListMap 使用 ReentrantLock 來保證線程安全。當(dāng)多個線程同時進(jìn)行修改操作時,它們會競爭鎖。
讀取操作(如 get)通常不需要加鎖,因為跳表的結(jié)構(gòu)保證了讀取操作的可見性和一致性。 作用:
ConcurrentSkipListMap 的主要作用是在多線程環(huán)境中提供高效的并發(fā)訪問,同時保持元素的自然順序。它適用于以下場景:
示例代碼:
以下是一個簡單的 ConcurrentSkipListMap 使用示例:
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ConcurrentSkipListMapExample {
public static void main(String[] args) {
// 創(chuàng)建一個ConcurrentSkipListMap
ConcurrentSkipListMap<String, Integer> map = new ConcurrentSkipListMap<>();
// 創(chuàng)建一個線程池
ExecutorService executor = Executors.newFixedThreadPool(2);
// 提交任務(wù)到線程池,生產(chǎn)者線程
executor.submit(() -> {
for (int i = 0; i < 20; i++) {
map.put("Key " + i, i);
System.out.println("Task " + i + " added to the map");
}
});
// 提交任務(wù)到線程池,消費(fèi)者線程
executor.submit(() -> {
for (int i = 0; i < 20; i++) {
Integer value = map.get("Key " + i);
if (value != null) {
System.out.println("Task " + value + " found in the map");
}
}
});
// 關(guān)閉線程池
executor.shutdown();
}
}
在這個示例中,我們創(chuàng)建了一個 ConcurrentSkipListMap 并使用一個線程池來模擬生產(chǎn)者和消費(fèi)者。生產(chǎn)者線程向映射中添加元素,而消費(fèi)者線程從映射中查找元素。由于 ConcurrentSkipListMap 是線程安全的,所以這個程序可以正確地運(yùn)行而不會出現(xiàn)并發(fā)問題。
解釋:
ConcurrentSkipListMap<String, Integer> map = new ConcurrentSkipListMap<>();
:創(chuàng)建了一個新的 ConcurrentSkipListMap 實例。map.put("Key " + i, i);
:生產(chǎn)者線程將鍵值對添加到映射中。Integer value = map.get("Key " + i);
:消費(fèi)者線程從映射中查找特定鍵對應(yīng)的值。ExecutorService executor = Executors.newFixedThreadPool(2);
:創(chuàng)建了一個大小為2的線程池。executor.submit(() -> {...});
:提交了生產(chǎn)者和消費(fèi)者任務(wù)到線程池。executor.shutdown();
:關(guān)閉
代碼分析:
以下是 ConcurrentSkipListMap 和 ConcurrentSkipListSet 類的一些關(guān)鍵方法的代碼分析:
put(K key, V value)
:這個方法用于向 ConcurrentSkipListMap 中添加一個鍵值對。get(Object key)
:這個方法用于從 ConcurrentSkipListMap 中獲取與指定鍵關(guān)聯(lián)的值。remove(Object key)
:這個方法用于從 ConcurrentSkipListMap 中移除與指定鍵關(guān)聯(lián)的鍵值對。這些方法都使用了 ReentrantLock 來保護(hù)跳表的修改操作,并確保線程安全。在 put、get 和 remove 方法中,會使用跳表的數(shù)據(jù)結(jié)構(gòu)進(jìn)行查找、插入和刪除操作。
這些方法同樣使用了 ReentrantLock 來保護(hù)跳表的修改操作,并確保線程安全。在 add、contains 和 remove 方法中,會使用跳表的數(shù)據(jù)結(jié)構(gòu)進(jìn)行查找、插入和刪除操作。
一個同步輔助類,允許一個或多個線程等待其他線程完成操作,可用于實現(xiàn)并發(fā)同步。
實現(xiàn)原理:
CountDownLatch 是一個同步輔助類,用于實現(xiàn)線程之間的等待/通知模式。它允許一個或多個線程等待直到一系列操作在其他線程中完成。
CountDownLatch 使用一個計數(shù)器來跟蹤完成操作的線程數(shù)量。初始時,計數(shù)器的值等于線程的數(shù)量。
當(dāng)調(diào)用 CountDownLatch 的 await 方法時,當(dāng)前線程會阻塞,直到計數(shù)器值為零。
其他線程通過調(diào)用 CountDownLatch 的 countDown 方法來遞減計數(shù)器的值。每個線程在完成自己的操作后調(diào)用此方法。
作用:
CountDownLatch 的主要作用是在多線程環(huán)境中提供一個同步點(diǎn),使得主線程可以等待其他線程完成各自的任務(wù)后再繼續(xù)執(zhí)行。它適用于以下場景:
示例代碼:
以下是一個簡單的 CountDownLatch 使用示例:
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CountDownLatchExample {
public static void main(String[] args) {
// 創(chuàng)建一個CountDownLatch,初始值為5
CountDownLatch latch = new CountDownLatch(5);
// 創(chuàng)建一個線程池
ExecutorService executor = Executors.newFixedThreadPool(5);
// 提交任務(wù)到線程池,每個任務(wù)都會遞減latch的計數(shù)器
for (int i = 0; i < 5; i++) {
final int taskNumber = i;
executor.submit(() -> {
System.out.println("Task " + taskNumber + " is running");
latch.countDown();
System.out.println("Task " + taskNumber + " is completed");
});
}
// 主線程等待latch的計數(shù)器歸零后繼續(xù)執(zhí)行
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("All tasks are completed, main thread continues");
// 關(guān)閉線程池
executor.shutdown();
}
}
在這個示例中,我們創(chuàng)建了一個 CountDownLatch 并使用一個線程池來模擬5個并發(fā)任務(wù)。每個任務(wù)在完成自己的操作后都會遞減 CountDownLatch 的計數(shù)器。主線程在調(diào)用 latch.await() 方法時會阻塞,直到計數(shù)器歸零。當(dāng)所有任務(wù)完成后,主線程繼續(xù)執(zhí)行。
解釋:
CountDownLatch latch = new CountDownLatch(5);
:創(chuàng)建了一個初始值為5的 CountDownLatch 實例。latch.countDown();
:每個線程在完成任務(wù)后調(diào)用此方法遞減計數(shù)器的值。latch.await();
:主線程在調(diào)用此方法時會阻塞,直到計數(shù)器的值為零。ExecutorService executor = Executors.newFixedThreadPool(5);
:創(chuàng)建了一個大小為5的線程池。executor.submit(() -> {...});
:提交了5個任務(wù)到線程池。executor.shutdown();
:關(guān)閉線程池。這個示例展示了如何在多線程環(huán)境中使用 CountDownLatch 來確保主線程等待所有子線程完成任務(wù)后再繼續(xù)執(zhí)行。由于 CountDownLatch 提供了線程間的同步點(diǎn),它適合用于需要等待多個線程完成任務(wù)的場景。
代碼分析:
以下是 CountDownLatch 類的一些關(guān)鍵方法的代碼分析:
CountDownLatch(int count)
: 這個構(gòu)造方法用于創(chuàng)建一個 CountDownLatch 對象,并初始化計數(shù)器的值。await()
: 這個方法用于使當(dāng)前線程等待,直到計數(shù)器的值為零。如果計數(shù)器的值不為零,當(dāng)前線程會阻塞。countDown()
: 這個方法用于遞減計數(shù)器的值。每個線程在完成自己的操作后調(diào)用此方法。getCount()
: 這個方法用于獲取當(dāng)前計數(shù)器的值。isLatchOpen()
: 這個方法用于檢查計數(shù)器的值是否為零。如果計數(shù)器的值為零,則返回 true,否則返回 false。這些方法都使用了 Object 類的 wait() 和 notify() 方法來實現(xiàn)線程間的同步。當(dāng)線程到達(dá) await 方法時,它會調(diào)用 wait() 方法,這會導(dǎo)致線程進(jìn)入等待狀態(tài)。當(dāng)其他線程調(diào)用 countDown() 方法并遞減計數(shù)器的值時,會調(diào)用 notify() 方法來喚醒等待的線程。
一個允許一組線程互相等待的同步輔助類,直到所有線程都達(dá)到某個屏障點(diǎn)后才繼續(xù)執(zhí)行。
實現(xiàn)原理:
CyclicBarrier 是一個同步輔助類,用于讓一組線程到達(dá)一個屏障(barrier)時被阻塞,直到最后一個線程到達(dá)屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續(xù)運(yùn)行。CyclicBarrier 的名稱中的 “Cyclic” 指的是它可以被重用。當(dāng)所有參與者到達(dá)屏障時,它們會執(zhí)行 barrierAction 指定的動作,這個動作只會被最后一個到達(dá)屏障的線程執(zhí)行。
CyclicBarrier 使用一個內(nèi)部計數(shù)器來跟蹤到達(dá)屏障的線程數(shù)量。
當(dāng)線程到達(dá)屏障時,它會阻塞,直到計數(shù)器的值為零。
CyclicBarrier 允許在屏障打開后重新使用它,而不是每次使用后都必須創(chuàng)建一個新的。
作用:
CyclicBarrier 的主要作用是在多線程環(huán)境中提供一個線程間的同步點(diǎn),使得一組線程在完成各自的任務(wù)后,能夠同時繼續(xù)執(zhí)行后續(xù)操作。它適用于以下場景:
示例代碼:
以下是一個簡單的 CyclicBarrier 使用示例:
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CyclicBarrierExample {
public static void main(String[] args) {
// 創(chuàng)建一個CyclicBarrier,初始值為5
CyclicBarrier barrier = new CyclicBarrier(5);
// 創(chuàng)建一個線程池
ExecutorService executor = Executors.newFixedThreadPool(5);
// 提交任務(wù)到線程池,每個任務(wù)都會到達(dá)屏障并執(zhí)行后續(xù)操作
for (int i = 0; i < 5; i++) {
final int taskNumber = i;
executor.submit(() -> {
System.out.println("Task " + taskNumber + " is running");
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
}
System.out.println("Task " + taskNumber + " is completed");
});
}
// 主線程等待所有任務(wù)完成后繼續(xù)執(zhí)行
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
}
System.out.println("All tasks are completed, main thread continues");
// 關(guān)閉線程池
executor.shutdown();
}
}
在這個示例中,我們創(chuàng)建了一個 CyclicBarrier 并使用一個線程池來模擬5個并發(fā)任務(wù)。每個任務(wù)在完成自己的操作后會到達(dá)屏障并等待其他任務(wù)也到達(dá)屏障。當(dāng)所有任務(wù)都到達(dá)屏障時,它們會繼續(xù)執(zhí)行后續(xù)操作。主線程在調(diào)用 barrier.await() 方法時會阻塞,直到所有子線程都到達(dá)屏障。
解釋:
CyclicBarrier barrier = new CyclicBarrier(5);
:創(chuàng)建了一個初始值為5的 CyclicBarrier 實例。barrier.await();
:每個線程在完成任務(wù)后調(diào)用此方法到達(dá)屏障并等待其他線程。ExecutorService executor = Executors.newFixedThreadPool(5);
:創(chuàng)建了一個大小為5的線程池。executor.submit(() -> {...});
:提交了5個任務(wù)到線程池。executor.shutdown();
:關(guān)閉線程池。這個示例展示了如何在多線程環(huán)境中使用 CyclicBarrier 來確保一組線程在完成各自的任務(wù)后,能夠同時繼續(xù)執(zhí)行后續(xù)操作。由于 CyclicBarrier 提供了線程間的同步點(diǎn),它適合用于需要線程同步執(zhí)行的場景。
代碼分析:
以下是 CyclicBarrier 類的一些關(guān)鍵方法的代碼分析:
CyclicBarrier(int parties)
: 這個構(gòu)造方法用于創(chuàng)建一個 CyclicBarrier 對象,并指定屏障的參與者數(shù)量。await()
: 這個方法用于使當(dāng)前線程等待,直到計數(shù)器的值為零。如果計數(shù)器的值不為零,當(dāng)前線程會阻塞。await(long timeout, TimeUnit unit)
: 這個方法與 await() 類似,但它允許設(shè)置一個超時時間。如果其他線程在超時時間內(nèi)還沒有到達(dá)屏障,當(dāng)前線程將返回 false。reset()
: 這個方法用于重置屏障,將其計數(shù)器的值重置為初始值。getNumberWaiting()
: 這個方法用于獲取當(dāng)前等待在屏障上的線程數(shù)量。getParties()
: 這個方法用于獲取屏障的參與者數(shù)量。isBroken()
: 這個方法用于檢查屏障是否被破壞。如果屏障被破壞,所有等待的線程都會被中斷。這些方法都使用了 Object 類的 wait() 和 notify() 方法來實現(xiàn)線程間的同步。當(dāng)線程到達(dá) await 方法時,它會調(diào)用 wait() 方法,這會導(dǎo)致線程進(jìn)入等待狀態(tài)。當(dāng)其他線程到達(dá)屏障時,會調(diào)用 notify() 方法來喚醒等待的線程。
一個計數(shù)信號量,可以用來限制可以同時訪問某個特定資源的線程數(shù)量。
實現(xiàn)原理:
Exchanger 是一個同步輔助類,用于實現(xiàn)兩個線程間的數(shù)據(jù)交換。當(dāng)兩個線程都到達(dá) Exchanger 指定的交換點(diǎn)時,它們可以交換彼此的數(shù)據(jù)。如果只有一個線程到達(dá)交換點(diǎn),它會阻塞,直到另一個線程也到達(dá)交換點(diǎn)。
Exchanger 使用一個內(nèi)部同步機(jī)制來跟蹤到達(dá)交換點(diǎn)的線程數(shù)量。
當(dāng)線程到達(dá)交換點(diǎn)時,它會阻塞,直到另一個線程也到達(dá)交換點(diǎn)。
當(dāng)兩個線程都到達(dá)交換點(diǎn)時,它們可以交換彼此的數(shù)據(jù)。
作用:
Exchanger 的主要作用是在多線程環(huán)境中提供一個線程間的同步點(diǎn),使得兩個線程可以交換數(shù)據(jù)。它適用于以下場景:
示例代碼:
以下是一個簡單的 Exchanger 使用示例:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Exchanger;
public class ExchangerExample {
public static void main(String[] args) {
// 創(chuàng)建一個Exchanger
Exchanger<String> exchanger = new Exchanger<>();
// 創(chuàng)建一個線程池
ExecutorService executor = Executors.newFixedThreadPool(2);
// 提交任務(wù)到線程池,第一個線程會生成數(shù)據(jù)并等待交換
executor.submit(() -> {
String data = "Data from the first thread";
try {
String receivedData = exchanger.exchange(data);
System.out.println("Received data from the second thread: " + receivedData);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 提交任務(wù)到線程池,第二個線程會生成數(shù)據(jù)并交換
executor.submit(() -> {
String data = "Data from the second thread";
try {
String receivedData = exchanger.exchange(data);
System.out.println("Received data from the first thread: " + receivedData);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 關(guān)閉線程池
executor.shutdown();
}
}
在這個示例中,我們創(chuàng)建了一個 Exchanger 并使用一個線程池來模擬兩個線程。第一個線程在交換點(diǎn)等待,并準(zhǔn)備好交換數(shù)據(jù)。第二個線程在交換點(diǎn)準(zhǔn)備好數(shù)據(jù),并等待第一個線程到達(dá)交換點(diǎn)。當(dāng)兩個線程都到達(dá)交換點(diǎn)時,它們可以交換數(shù)據(jù)。
解釋:
Exchanger<String> exchanger = new Exchanger<>();
:創(chuàng)建了一個新的 Exchanger 實例。String data = "Data from the first thread";
:第一個線程準(zhǔn)備的數(shù)據(jù)。exchanger.exchange(data);
:第一個線程在交換點(diǎn)等待,并準(zhǔn)備好交換數(shù)據(jù)。String receivedData = exchanger.exchange(data);
:第二個線程在交換點(diǎn)準(zhǔn)備好數(shù)據(jù),并等待第一個線程到達(dá)交換點(diǎn)。ExecutorService executor = Executors.newFixedThreadPool(2);
:創(chuàng)建了一個大小為2的線程池。executor.submit(() -> {...});
:提交了兩個任務(wù)到線程池。executor.shutdown();
:關(guān)閉線程池。這個示例展示了如何在多線程環(huán)境中使用 Exchanger 來確保兩個線程在某個點(diǎn)同步執(zhí)行并交換數(shù)據(jù)。由于 Exchanger 提供了線程間的同步點(diǎn),它適合用于需要線程間數(shù)據(jù)交換的場景。
代碼分析:
以下是 Semaphore 類的一些關(guān)鍵方法的代碼分析:
Semaphore(int permits)
: 這個構(gòu)造方法用于創(chuàng)建一個 Semaphore 對象,并指定信號量的初始值。acquire()
: 這個方法用于嘗試獲取一個資源。如果信號量的值大于零,當(dāng)前線程可以繼續(xù)執(zhí)行;如果信號量的值等于零,當(dāng)前線程將阻塞。acquire(int permits)
: 這個方法與 acquire() 類似,但它允許指定要獲取的資源數(shù)量。release()
: 這個方法用于釋放一個資源。它會增加信號量的值,從而允許其他被阻塞的線程繼續(xù)執(zhí)行。tryAcquire()
: 這個方法用于嘗試獲取一個資源,但不阻塞。如果信號量的值大于零,當(dāng)前線程可以繼續(xù)執(zhí)行;如果信號量的值等于零,該方法將返回 false。tryAcquire(int permits)
: 這個方法與 tryAcquire() 類似,但它允許指定要獲取的資源數(shù)量。這些方法都使用了 Object 類的 wait() 和 notify() 方法來實現(xiàn)線程間的同步。當(dāng)線程到達(dá) acquire 方法時,它會調(diào)用 wait() 方法,這會導(dǎo)致線程進(jìn)入等待狀態(tài)。當(dāng)其他線程調(diào)用 release 方法并釋放資源時,會調(diào)用 notify() 方法來喚醒等待的線程。
一個用于在并發(fā)線程之間交換數(shù)據(jù)的工具,適用于遺傳算法、流水線設(shè)計等場景。
Exchanger 是一個用于線程間交換數(shù)據(jù)的同步輔助類,它允許兩個線程在某個點(diǎn)交換它們的數(shù)據(jù)。當(dāng)一個線程準(zhǔn)備好數(shù)據(jù)時,它會等待另一個線程準(zhǔn)備好數(shù)據(jù),然后它們可以交換數(shù)據(jù)。如果一個線程準(zhǔn)備好數(shù)據(jù)而另一個線程還沒有準(zhǔn)備好,那么第一個線程會阻塞,直到第二個線程準(zhǔn)備好數(shù)據(jù)。
實現(xiàn)原理:
Exchanger 的實現(xiàn)原理是基于 Object 類的 wait() 和 notify() 方法。當(dāng)一個線程到達(dá)交換點(diǎn)時,它會調(diào)用 exchange() 方法,該方法會嘗試將該線程的數(shù)據(jù)與另一個線程的數(shù)據(jù)交換。如果另一個線程還沒有準(zhǔn)備好數(shù)據(jù),那么第一個線程會阻塞,直到第二個線程到達(dá)交換點(diǎn)并準(zhǔn)備好數(shù)據(jù)。
Exchanger 使用一個內(nèi)部同步機(jī)制來跟蹤到達(dá)交換點(diǎn)的線程數(shù)量。
當(dāng)線程到達(dá)交換點(diǎn)時,它會阻塞,直到另一個線程也到達(dá)交換點(diǎn)。
當(dāng)兩個線程都到達(dá)交換點(diǎn)時,它們可以交換彼此的數(shù)據(jù)。
作用:
Exchanger 的主要作用是在多線程環(huán)境中提供一個線程間的同步點(diǎn),使得兩個線程可以交換數(shù)據(jù)。它適用于以下場景:
示例代碼:
以下是一個簡單的 Exchanger 使用示例:
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExchangerExample {
public static void main(String[] args) {
// 創(chuàng)建一個Exchanger
Exchanger<String> exchanger = new Exchanger<>();
// 創(chuàng)建一個線程池
ExecutorService executor = Executors.newFixedThreadPool(2);
// 提交任務(wù)到線程池,第一個線程會生成數(shù)據(jù)并等待交換
executor.submit(() -> {
String data = "Data from the first thread";
try {
String receivedData = exchanger.exchange(data);
System.out.println("Received data from the second thread: " + receivedData);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 提交任務(wù)到線程池,第二個線程會生成數(shù)據(jù)并交換
executor.submit(() -> {
String data = "Data from the second thread";
try {
String receivedData = exchanger.exchange(data);
System.out.println("Received data from the first thread: " + receivedData);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 關(guān)閉線程池
executor.shutdown();
}
}
在這個示例中,我們創(chuàng)建了一個 Exchanger 并使用一個線程池來模擬兩個線程。第一個線程在交換點(diǎn)等待,并準(zhǔn)備好交換數(shù)據(jù)。第二個線程在交換點(diǎn)準(zhǔn)備好數(shù)據(jù),并等待第一個線程到達(dá)交換點(diǎn)。當(dāng)兩個線程都到達(dá)交換點(diǎn)時,它們可以交換數(shù)據(jù)。
解釋:
Exchanger<String> exchanger = new Exchanger<>();
:創(chuàng)建了一個新的 Exchanger 實例。String data = "Data from the first thread";
:第一個線程準(zhǔn)備的數(shù)據(jù)。String receivedData = exchanger.exchange(data);
:第一個線程在交換點(diǎn)等待,并準(zhǔn)備好交換數(shù)據(jù)。String receivedData = exchanger.exchange(data);
:第二個線程在交換點(diǎn)準(zhǔn)備好數(shù)據(jù),并等待第一個線程到達(dá)交換點(diǎn)。ExecutorService executor = Executors.newFixedThreadPool(2);
:創(chuàng)建了一個大小為2的線程池。executor.submit(() -> {...});
:提交了兩個任務(wù)到線程池。executor.shutdown();
:關(guān)閉線程池。這個示例展示了如何在多線程環(huán)境中使用 Exchanger 來確保兩個線程在某個點(diǎn)同步執(zhí)行并交換數(shù)據(jù)。由于 Exchanger 提供了線程間的同步點(diǎn),它適合用于需要線程間數(shù)據(jù)交換的場景。
在這個示例中,兩個線程分別準(zhǔn)備了一些數(shù)據(jù),并嘗試通過 Exchanger 進(jìn)行交換。如果一個線程到達(dá)交換點(diǎn)時,另一個線程還沒有準(zhǔn)備好數(shù)據(jù),那么第一個線程會阻塞,直到第二個線程也到達(dá)交換點(diǎn)并準(zhǔn)備好數(shù)據(jù)。當(dāng)兩個線程都到達(dá)交換點(diǎn)時,它們可以交換數(shù)據(jù),然后繼續(xù)執(zhí)行。
代碼分析:
在 Java 中,Exchanger 類的實現(xiàn)原理基于 Object 類的 wait() 和 notify() 方法。以下是 Exchanger 類的一些關(guān)鍵方法的實現(xiàn)原理:
boolean exchange(V x)
:這個方法允許一個線程嘗試交換數(shù)據(jù)。如果另一個線程已經(jīng)準(zhǔn)備好數(shù)據(jù),那么這兩個線程將交換數(shù)據(jù)。如果另一個線程還沒有準(zhǔn)備好數(shù)據(jù),那么當(dāng)前線程會阻塞,直到另一個線程到達(dá)交換點(diǎn)并準(zhǔn)備好數(shù)據(jù)。boolean exchange(V x, long timeout, TimeUnit unit)
:這個方法與 exchange(V x)
類似,但它允許設(shè)置一個超時時間。如果另一個線程在超時時間內(nèi)還沒有準(zhǔn)備好數(shù)據(jù),那么當(dāng)前線程將返回 false。V exchange(V x, Phaser phaser)
:這個方法允許一個線程嘗試交換數(shù)據(jù),并且使用一個 Phaser 來管理線程的同步。如果另一個線程已經(jīng)準(zhǔn)備好數(shù)據(jù),那么這兩個線程將交換數(shù)據(jù)。如果另一個線程還沒有準(zhǔn)備好數(shù)據(jù),那么當(dāng)前線程會阻塞,直到另一個線程到達(dá)交換點(diǎn)并準(zhǔn)備好數(shù)據(jù)。
這些方法都使用了 Object 類的 wait()
和 notify()
方法來實現(xiàn)線程間的同步。當(dāng)一個線程到達(dá)交換點(diǎn)時,它會調(diào)用 wait()
方法,這會導(dǎo)致線程進(jìn)入等待狀態(tài)。當(dāng)另一個線程到達(dá)交換點(diǎn)并調(diào)用 notify()
方法時,第一個線程會被喚醒,并且可以繼續(xù)執(zhí)行。
一個可重用的同步屏障,適用于類似于 CyclicBarrier 的場景,但提供了更靈活的注冊和注銷機(jī)制。
實現(xiàn)原理:
Phaser 是一個可重用的同步屏障,它允許一組線程互相等待,直到它們都到達(dá)某個屏障點(diǎn)。與 CyclicBarrier 類似,Phaser 允許重用同一個屏障,這意味著在所有線程到達(dá)屏障點(diǎn)后,可以重新使用該屏障。
Phaser 使用一個內(nèi)部計數(shù)器來跟蹤到達(dá)屏障點(diǎn)的線程數(shù)量。
當(dāng)線程到達(dá)屏障點(diǎn)時,它會阻塞,直到所有線程都到達(dá)屏障點(diǎn)。
Phaser 允許在屏障點(diǎn)被觸發(fā)后重新使用它,而不是每次使用后都必須創(chuàng)建一個新的。
作用:
Phaser 的主要作用是在多線程環(huán)境中提供一個線程間的同步點(diǎn),使得一組線程在完成各自的任務(wù)后,能夠同時繼續(xù)執(zhí)行后續(xù)操作。它適用于以下場景:
示例代碼:
以下是一個簡單的 Phaser 使用示例:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
public class PhaserExample {
public static void main(String[] args) {
// 創(chuàng)建一個Phaser,初始值為5
Phaser phaser = new Phaser(5);
// 創(chuàng)建一個線程池
ExecutorService executor = Executors.newFixedThreadPool(5);
// 提交任務(wù)到線程池,每個任務(wù)都會到達(dá)屏障點(diǎn)并執(zhí)行后續(xù)操作
for (int i = 0; i < 5; i++) {
final int taskNumber = i;
executor.submit(() -> {
System.out.println("Task " + taskNumber + " is running");
try {
phaser.arriveAndAwaitAdvance();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Task " + taskNumber + " is completed");
});
}
// 主線程等待所有任務(wù)完成后繼續(xù)執(zhí)行
try {
phaser.arriveAndAwaitAdvance();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("All tasks are completed, main thread continues");
// 關(guān)閉線程池
executor.shutdown();
}
}
在這個示例中,我們創(chuàng)建了一個 Phaser 并使用一個線程池來模擬5個并發(fā)任務(wù)。每個任務(wù)在完成自己的操作后會到達(dá)屏障點(diǎn)并等待其他任務(wù)也到達(dá)屏障點(diǎn)。當(dāng)所有任務(wù)都到達(dá)屏障點(diǎn)時,它們會繼續(xù)執(zhí)行后續(xù)操作。主線程在調(diào)用 phaser.arriveAndAwaitAdvance() 方法時會阻塞,直到所有子線程都到達(dá)屏障點(diǎn)。
解釋:
Phaser phaser = new Phaser(5);
:創(chuàng)建了一個初始值為5的 Phaser 實例。phaser.arriveAndAwaitAdvance();
:每個線程在完成任務(wù)后調(diào)用此方法到達(dá)屏障點(diǎn)并等待其他線程。ExecutorService executor = Executors.newFixedThreadPool(5);
:創(chuàng)建了一個大小為5的線程池。executor.submit(() -> {...});
:提交了5個任務(wù)到線程池。executor.shutdown();
:關(guān)閉線程池。這個示例展示了如何在多線程環(huán)境中使用 Phaser 來確保一組線程在完成各自的任務(wù)后,能夠同時繼續(xù)執(zhí)行后續(xù)操作。由于 Phaser 提供了線程間的同步點(diǎn),它適合用于需要線程同步執(zhí)行的場景。
代碼分析:
以下是 Phaser 類的一些關(guān)鍵方法的代碼分析:
Phaser(int parties)
: 這個構(gòu)造方法用于創(chuàng)建一個 Phaser 對象,并指定屏障的參與者數(shù)量。arrive()
: 這個方法用于使當(dāng)前線程到達(dá)屏障點(diǎn)。每次調(diào)用此方法時,計數(shù)器的值會遞減。arriveAndAwaitAdvance()
: 這個方法與 arrive() 類似,但它會阻塞當(dāng)前線程,直到計數(shù)器的值變?yōu)榱恪?/li>
awaitAdvance()
: 這個方法用于阻塞當(dāng)前線程,直到屏障點(diǎn)被觸發(fā)。getRegisteredParties()
: 這個方法用于獲取當(dāng)前注冊在屏障上的線程數(shù)量。getArrivedParties()
: 這個方法用于獲取已經(jīng)到達(dá)屏障的線程數(shù)量。isTerminated()
: 這個方法用于檢查屏障是否已經(jīng)終止。如果屏障已經(jīng)終止,返回 true;否則返回 false。forceTermination()
: 這個方法用于強(qiáng)制終止屏障。它將計數(shù)器的值設(shè)置為零,并喚醒所有等待的線程。
這些方法都使用了 Object 類的 wait()
和 notify()
方法來實現(xiàn)線程間的同步。當(dāng)線程到達(dá) arriveAndAwaitAdvance()
方法時,它會調(diào)用 wait()
方法,這會導(dǎo)致線程進(jìn)入等待狀態(tài)。當(dāng)其他線程到達(dá)屏障點(diǎn)并調(diào)用 arrive()
方法時,會調(diào)用 notify()
方法來喚醒等待的線程。
以上就是 V哥給大家整理的12個并發(fā)相關(guān)的數(shù)據(jù)結(jié)構(gòu),這些并發(fā)數(shù)據(jù)結(jié)構(gòu)是 Java 并發(fā)編程的基礎(chǔ),它們在 java.util.concurrent
包(J.U.C)中提供。使用這些數(shù)據(jù)結(jié)構(gòu)可以幫助開發(fā)者編寫出高效且線程安全的并發(fā)程序,分布式應(yīng)用開發(fā)的項目中,你會使用到的。Java并發(fā)
更多建議: