Java并發(fā)數(shù)據(jù)結(jié)構(gòu)全解析:從ConcurrentHashMap到Phaser

2024-12-03 11:44 更新

今天 V 哥聊聊并發(fā)數(shù)據(jù)結(jié)構(gòu)的問題,我們知道,并發(fā)編程中,保障數(shù)據(jù)的安全訪問是第一要務(wù),JDK 提供了一系列JUC并發(fā)數(shù)據(jù)結(jié)構(gòu),這些數(shù)據(jù)結(jié)構(gòu)是線程安全的,可以在多線程環(huán)境中使用而無需額外的同步措施。以下是一些主要的并發(fā)數(shù)據(jù)結(jié)構(gòu):

1、ConcurrentHashMap

一個線程安全的哈希表,用于存儲鍵值對。它在內(nèi)部使用了分段鎖(Segment Locking)或其他形式的并發(fā)控制機(jī)制,允許多個線程并發(fā)讀寫,同時保持較高的性能。

ConcurrentHashMap 是 Java 并發(fā)編程中非常重要的一個線程安全的哈希表實現(xiàn),它在 java.util.concurrent 包中。ConcurrentHashMap 允許并發(fā)讀和并發(fā)寫,旨在提供比同步的 HashMap 更高的并發(fā)性能。

實現(xiàn)原理:

  1. 分段鎖(Segment Locking):

在 JDK 1.7 及之前的版本中,ConcurrentHashMap 使用了分段鎖(Segment Locking)機(jī)制。整個哈希表被分割成多個段(Segment),每個段是一個小的哈希表,它們有自己的鎖。當(dāng)多個線程訪問不同段的數(shù)據(jù)時,它們可以并發(fā)執(zhí)行,因為每個段都有自己的鎖。

  1. CAS 操作:

ConcurrentHashMap 使用了無鎖的 compare-and-swap(CAS)操作來更新數(shù)據(jù),這進(jìn)一步提高了并發(fā)性能。

  1. 讀取操作無鎖:

讀取操作通常不需要加鎖,因為 ConcurrentHashMap 的設(shè)計保證了讀取數(shù)據(jù)的可見性和一致性。

  1. JDK 1.8 的改進(jìn):

在 JDK 1.8 中,ConcurrentHashMap 的實現(xiàn)發(fā)生了變化,它取消了分段鎖,轉(zhuǎn)而使用了 synchronized 關(guān)鍵字來保護(hù)哈希表的節(jié)點(diǎn)(Node)。同時,它也引入了紅黑樹來處理哈希碰撞導(dǎo)致的鏈表過長的問題,提高了最壞情況下的性能。

作用:

ConcurrentHashMap 的主要作用是在多線程環(huán)境中提供高效的并發(fā)訪問。它適用于以下場景:

  • 當(dāng)多個線程需要訪問同一個哈希表時,使用 ConcurrentHashMap 可以減少鎖競爭,提高并發(fā)性能。
  • 在需要線程安全的集合操作時,ConcurrentHashMap 是一個性能優(yōu)于同步的 HashMap 的選擇。

示例代碼:

以下是一個簡單的 ConcurrentHashMap 使用示例:

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;


public class ConcurrentHashMapExample {
    public static void main(String[] args) throws InterruptedException {
        // 創(chuàng)建一個ConcurrentHashMap
        ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();


        // 創(chuàng)建一個線程池
        ExecutorService executor = Executors.newFixedThreadPool(10);


        // 提交10個任務(wù)到線程池,每個任務(wù)都會更新ConcurrentHashMap
        for (int i = 0; i < 10; i++) {
            final int taskNumber = i;
            executor.submit(() -> {
                map.put("key" + taskNumber, taskNumber);
                System.out.println("Task " + taskNumber + " put value: " + map.get("key" + taskNumber));
            });
        }


        // 關(guān)閉線程池
        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.MINUTES);
    }
}

在這個示例中,我們創(chuàng)建了一個 ConcurrentHashMap 并使用一個線程池來并發(fā)地更新它。每個任務(wù)都會向哈希表中插入一個鍵值對,并打印出對應(yīng)的值。由于 ConcurrentHashMap 是線程安全的,所以這個程序可以正確地運(yùn)行而不會出現(xiàn)并發(fā)問題。

解釋:

  • ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();:創(chuàng)建了一個新的 ConcurrentHashMap 實例。
  • ExecutorService executor = Executors.newFixedThreadPool(10);:創(chuàng)建了一個固定大小為10的線程池。
  • executor.submit(() -> {...});:提交了一個 Runnable 任務(wù)到線程池,任務(wù)中更新了 ConcurrentHashMap。
  • executor.shutdown(); 和 executor.awaitTermination(1, TimeUnit.MINUTES);:關(guān)閉線程池并等待所有任務(wù)完成。

這個示例展示了如何在多線程環(huán)境中安全地使用 ConcurrentHashMap。

實現(xiàn)原理的代碼分析:

ConcurrentHashMap 是 Java 中的一個線程安全的哈希表實現(xiàn),用于存儲鍵值對。在 Java 1.8 之前,ConcurrentHashMap 使用了分段鎖機(jī)制,而在 Java 1.8 之后,它采用了更為高效的鎖分離技術(shù)。

Java 1.8 之前的實現(xiàn)原理:

在 Java 1.8 之前,ConcurrentHashMap 使用分段鎖(Segment Locking)機(jī)制。每個 Segment 是一個可重入的 ReentrantLock,它用于鎖定整個哈希表的一個部分。哈希表被分割成多個段,每個段有自己的鎖,因此可以同時進(jìn)行讀寫操作。

  1. 分段鎖:

ConcurrentHashMap 使用分段鎖來保護(hù)多個哈希表段。每個段有一個自己的鎖,這使得在多線程環(huán)境中可以并發(fā)地讀寫不同的段。

  1. 寫時復(fù)制:

在 Java 1.8 之前,ConcurrentHashMap 在進(jìn)行寫操作時,會復(fù)制整個段,而不是整個哈希表。這減少了加鎖的范圍,提高了并發(fā)性能。

Java 1.8 之后的實現(xiàn)原理:

在 Java 1.8 中,ConcurrentHashMap 的實現(xiàn)發(fā)生了變化,它取消了分段鎖,轉(zhuǎn)而使用了 synchronized 關(guān)鍵字來保護(hù)哈希表的節(jié)點(diǎn)(Node)。同時,它也引入了紅黑樹來處理哈希碰撞導(dǎo)致的鏈表過長的問題,提高了最壞情況下的性能。

  1. 鎖分離:

在 Java 1.8 中,ConcurrentHashMap 使用了一種稱為“鎖分離”的技術(shù)。它將鎖的范圍縮小到鏈表的頭部節(jié)點(diǎn),而不是整個哈希表或整個段。這減少了鎖競爭,提高了并發(fā)性能。

  1. 紅黑樹:

為了提高哈希表的性能,ConcurrentHashMap 引入了紅黑樹。當(dāng)鏈表的長度超過某個閾值時,鏈表會被轉(zhuǎn)換為紅黑樹,這樣可以減少搜索時間,提高最壞情況下的性能。

代碼分析:

以下是 ConcurrentHashMap 類的一些關(guān)鍵方法的代碼分析:

  • put(K key, V value):這個方法用于向 ConcurrentHashMap 中添加一個鍵值對。

  • get(Object key):這個方法用于從 ConcurrentHashMap 中獲取與指定鍵關(guān)聯(lián)的值。

  • remove(Object key):這個方法用于從 ConcurrentHashMap 中移除與指定鍵關(guān)聯(lián)的鍵值對。

這些方法都使用了 synchronized 關(guān)鍵字來保護(hù)哈希表的節(jié)點(diǎn)。在 Java 1.8 之前,這些方法會使用分段鎖來保護(hù)整個段。而在 Java 1.8 之后,這些方法會使用鎖分離技術(shù)來保護(hù)鏈表的頭部節(jié)點(diǎn)。

這個示例展示了如何在多線程環(huán)境中使用 ConcurrentHashMap 來安全地進(jìn)行鍵值對的添加、獲取和移除操作。由于 ConcurrentHashMap 是線程安全的,所以這個程序可以正確地運(yùn)行而不會出現(xiàn)并發(fā)問題。

2、CopyOnWriteArrayList

CopyOnWriteArrayList是一個線程安全的列表,它在進(jìn)行修改操作(如添加、刪除元素)時會創(chuàng)建底層數(shù)組的一個新副本,從而實現(xiàn)讀寫分離。適用于讀多寫少的場景。

實現(xiàn)原理:

CopyOnWriteArrayList 是一個線程安全的變體,其中所有對列表的修改(添加、刪除、設(shè)置元素等)都是在底層數(shù)組的一個副加上進(jìn)行的。這意味著在修改操作發(fā)生時,會創(chuàng)建一個新的數(shù)組,并將現(xiàn)有的所有元素復(fù)制到新數(shù)組中,然后在新數(shù)組上進(jìn)行修改。完成修改后,再將內(nèi)部引用指向新數(shù)組。由于寫操作是在新數(shù)組上進(jìn)行的,讀操作可以安全地訪問舊數(shù)組,而不會受到寫操作的干擾。

  1. 寫時復(fù)制(Copy-On-Write):

這是 CopyOnWriteArrayList 的核心原理。在發(fā)生寫操作時,不直接修改原有數(shù)組,而是復(fù)制出一個新數(shù)組,修改完成后,再將內(nèi)部引用指向新數(shù)組。

  1. 讀取操作無鎖:

由于讀操作不需要修改數(shù)組,它們可以安全地讀取當(dāng)前的數(shù)組,而不需要任何鎖。這提高了讀取操作的并發(fā)性能。

  1. 寫操作加鎖:

為了確保寫操作的原子性和一致性,寫操作需要加鎖。這是通過在修改方法(如 add, remove, set)中使用 ReentrantLock 實現(xiàn)的。

作用:

CopyOnWriteArrayList 的主要作用是在讀多寫少的場景中提供線程安全的列表操作,同時盡量減少讀操作的鎖競爭。它適用于以下場景:

  • 當(dāng)你需要一個列表,其中大多數(shù)操作是讀取操作,而寫操作相對較少時。
  • 當(dāng)你可以在發(fā)生寫操作時接受一定的性能開銷,因為你需要復(fù)制整個底層數(shù)組。

示例代碼:

以下是一個簡單的 CopyOnWriteArrayList 使用示例:

import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class CopyOnWriteArrayListExample {
    public static void main(String[] args) {
        // 創(chuàng)建一個CopyOnWriteArrayList
        CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();


        // 添加元素
        list.add("A");
        list.add("B");
        list.add("C");


        // 創(chuàng)建一個線程池
        ExecutorService executor = Executors.newFixedThreadPool(2);


        // 提交一個任務(wù)到線程池,該任務(wù)會修改CopyOnWriteArrayList
        executor.submit(() -> {
            list.add("D");
            list.remove("A");
        });


        // 提交另一個任務(wù)到線程池,該任務(wù)會讀取CopyOnWriteArrayList
        executor.submit(() -> {
            for (String element : list) {
                System.out.println(element);
            }
        });


        // 關(guān)閉線程池
        executor.shutdown();
    }
}

在這個示例中,我們創(chuàng)建了一個 CopyOnWriteArrayList 并使用一個線程池來并發(fā)地修改和讀取它。一個任務(wù)嘗試添加和刪除元素,而另一個任務(wù)遍歷列表并打印所有元素。由于 CopyOnWriteArrayList 是線程安全的,所以這個程序可以正確地運(yùn)行而不會出現(xiàn)并發(fā)問題。

解釋:

  • CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();:創(chuàng)建了一個新的 CopyOnWriteArrayList 實例。
  • list.add("A");list.remove("A");:添加和刪除元素的操作。
  • ExecutorService executor = Executors.newFixedThreadPool(2);:創(chuàng)建了一個大小為2的線程池。
  • executor.submit(() -> {...});:提交了一個 Runnable 任務(wù)到線程池,任務(wù)中修改了 CopyOnWriteArrayList。
  • for (String element : list) {...}:遍歷列表并打印元素。
  • executor.shutdown();:關(guān)閉線程池。

這個示例展示了如何在多線程環(huán)境中使用 CopyOnWriteArrayList 來安全地進(jìn)行讀寫操作。由于寫操作相對昂貴(因為需要復(fù)制數(shù)組),所以 CopyOnWriteArrayList 適用于讀多寫少的場景。

代碼分析:

以下是 CopyOnWriteArrayList 類的一些關(guān)鍵方法的代碼分析:

  • add(E e):這個方法用于向 CopyOnWriteArrayList 中添加一個元素。

  • get(int index):這個方法用于從 CopyOnWriteArrayList 中獲取指定索引的元素。

  • set(int index, E element):這個方法用于將指定索引的元素設(shè)置為新元素。

  • remove(int index):這個方法用于從 CopyOnWriteArrayList 中移除指定索引的元素。

這些方法都使用了原子操作來更新數(shù)據(jù),并確保了線程安全。在 add、set 和 remove 方法中,會創(chuàng)建一個新數(shù)組,并將現(xiàn)有元素復(fù)制到新數(shù)組中,然后修改新數(shù)組。完成修改后,再將內(nèi)部引用指向新數(shù)組。

3、CopyOnWriteArraySet

與 CopyOnWriteArrayList 類似,但它存儲的是不包含重復(fù)元素的集合。

實現(xiàn)原理:

CopyOnWriteArraySet 是一個線程安全的變體,其中所有對集合的修改(添加、刪除元素等)都是在底層數(shù)組的一個副加上進(jìn)行的。這意味著在修改操作發(fā)生時,會創(chuàng)建一個新的數(shù)組,然后將現(xiàn)有的所有元素復(fù)制到新數(shù)組中,并在新數(shù)組上進(jìn)行修改。完成修改后,再將內(nèi)部引用指向新數(shù)組。由于寫操作是在新數(shù)組上進(jìn)行的,讀操作可以安全地訪問舊數(shù)組,而不會受到寫操作的干擾。

  1. 寫時復(fù)制(Copy-On-Write):

這是 CopyOnWriteArraySet 的核心原理。在發(fā)生寫操作時,不直接修改原有數(shù)組,而是復(fù)制出一個新數(shù)組,修改完成后,再將內(nèi)部引用指向新數(shù)組。

  1. 讀取操作無鎖:

由于讀操作不需要修改數(shù)組,它們可以安全地讀取當(dāng)前的數(shù)組,而不需要任何鎖。這提高了讀取操作的并發(fā)性能。

  1. 寫操作加鎖:

為了確保寫操作的原子性和一致性,寫操作需要加鎖。這是通過在修改方法(如 add, remove)中使用 ReentrantLock 實現(xiàn)的。

作用:

CopyOnWriteArraySet 的主要作用是在讀多寫少的場景中提供線程安全的集合操作,同時盡量減少讀操作的鎖競爭。它適用于以下場景:

  • 當(dāng)你需要一個集合,其中大多數(shù)操作是讀取操作,而寫操作相對較少時。
  • 當(dāng)你可以在發(fā)生寫操作時接受一定的性能開銷,因為你需要復(fù)制整個底層數(shù)組。
  • 當(dāng)你需要一個不允許有重復(fù)元素的集合時。

示例代碼:

以下是一個簡單的 CopyOnWriteArraySet 使用示例:

import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class CopyOnWriteArraySetExample {
    public static void main(String[] args) {
        // 創(chuàng)建一個CopyOnWriteArraySet
        CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();


        // 添加元素
        set.add("A");
        set.add("B");
        set.add("C");


        // 創(chuàng)建一個線程池
        ExecutorService executor = Executors.newFixedThreadPool(2);


        // 提交一個任務(wù)到線程池,該任務(wù)會修改CopyOnWriteArraySet
        executor.submit(() -> {
            set.add("D");
            set.remove("A");
        });


        // 提交另一個任務(wù)到線程池,該任務(wù)會讀取CopyOnWriteArraySet
        executor.submit(() -> {
            for (String element : set) {
                System.out.println(element);
            }
        });


        // 關(guān)閉線程池
        executor.shutdown();
    }
}

在這個示例中,我們創(chuàng)建了一個 CopyOnWriteArraySet 并使用一個線程池來并發(fā)地修改和讀取它。一個任務(wù)嘗試添加和刪除元素,而另一個任務(wù)遍歷集合并打印所有元素。由于 CopyOnWriteArraySet 是線程安全的,所以這個程序可以正確地運(yùn)行而不會出現(xiàn)并發(fā)問題。

解釋:

  • CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();:創(chuàng)建了一個新的 CopyOnWriteArraySet 實例。
  • set.add("A");set.remove("A");:添加和刪除元素的操作。
  • ExecutorService executor = Executors.newFixedThreadPool(2);:創(chuàng)建了一個大小為2的線程池。
  • executor.submit(() -> {...});:提交了一個 Runnable 任務(wù)到線程池,任務(wù)中修改了 CopyOnWriteArraySet。
  • for (String element : set) {...}:遍歷集合并打印元素。
  • executor.shutdown();:關(guān)閉線程池。

這個示例展示了如何在多線程環(huán)境中使用 CopyOnWriteArraySet 來安全地進(jìn)行讀寫操作。由于寫操作相對昂貴(因為需要復(fù)制數(shù)組),所以 CopyOnWriteArraySet 適用于讀多寫少的場景,并且需要集合中元素不重復(fù)的特性。

代碼分析:

以下是 CopyOnWriteArraySet 類的一些關(guān)鍵方法的代碼分析:

  • add(E e):這個方法用于向 CopyOnWriteArraySet 中添加一個元素。

  • contains(Object o):這個方法用于檢查集合中是否包含指定元素。

  • size():這個方法用于獲取集合中元素的數(shù)量。

  • clear():這個方法用于清空集合中的所有元素。

這些方法都使用了原子操作來更新數(shù)據(jù),并確保了線程安全。在 add 方法中,會創(chuàng)建一個新數(shù)組,并將現(xiàn)有元素復(fù)制到新數(shù)組中。如果新數(shù)組中已經(jīng)存在相同的元素,則不會添加該元素。完成修改后,再將內(nèi)部引用指向新數(shù)組。

4、ConcurrentLinkedQueue

一個線程安全的無界非阻塞隊列,基于鏈表實現(xiàn)。它使用原子操作來保證線程安全,適合在高并發(fā)環(huán)境下使用。

實現(xiàn)原理:

ConcurrentLinkedQueue 是一個基于鏈表實現(xiàn)的線程安全的無界非阻塞隊列。它使用原子操作來保證線程安全,適合在高并發(fā)環(huán)境下使用。

  1. 非阻塞隊列:

ConcurrentLinkedQueue 實現(xiàn)了 Queue 接口,提供了一組原子操作來支持隊列的基本功能,如入隊(offer)、出隊(poll)等,這些操作都是非阻塞的。

  1. 無界隊列:

ConcurrentLinkedQueue 沒有容量限制,理論上可以無限增長,直到耗盡內(nèi)存。

  1. 原子操作:

ConcurrentLinkedQueue 使用了 compare-and-swap(CAS)操作來更新鏈表節(jié)點(diǎn),這保證了在多線程環(huán)境下的線程安全。

  1. 無鎖算法:

ConcurrentLinkedQueue 使用了無鎖算法,避免了鎖競爭帶來的性能開銷。

作用:

ConcurrentLinkedQueue 的主要作用是在多線程環(huán)境中提供一個高效且線程安全的隊列。它適用于以下場景:

  • 當(dāng)你需要一個高并發(fā)隊列,且隊列的容量不需要事先確定時。
  • 當(dāng)你希望在隊列操作中避免鎖競爭和阻塞時。

示例代碼:

以下是一個簡單的 ConcurrentLinkedQueue 使用示例:

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class ConcurrentLinkedQueueExample {
    public static void main(String[] args) {
        // 創(chuàng)建一個ConcurrentLinkedQueue
        ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();


        // 創(chuàng)建一個線程池
        ExecutorService executor = Executors.newFixedThreadPool(10);


        // 提交任務(wù)到線程池,生產(chǎn)者線程
        for (int i = 0; i < 5; i++) {
            final int taskNumber = i;
            executor.submit(() -> {
                queue.offer(taskNumber);
                System.out.println("Task " + taskNumber + " added to queue");
            });
        }


        // 提交任務(wù)到線程池,消費(fèi)者線程
        for (int i = 0; i < 5; i++) {
            executor.submit(() -> {
                Integer element = queue.poll();
                if (element != null) {
                    System.out.println("Task " + element + " removed from queue");
                }
            });
        }


        // 關(guān)閉線程池
        executor.shutdown();
    }
}

在這個示例中,我們創(chuàng)建了一個 ConcurrentLinkedQueue 并使用一個線程池來模擬生產(chǎn)者和消費(fèi)者。生產(chǎn)者線程向隊列中添加元素,而消費(fèi)者線程從隊列中移除元素。由于 ConcurrentLinkedQueue 是線程安全的,所以這個程序可以正確地運(yùn)行而不會出現(xiàn)并發(fā)問題。

解釋:

  • ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();:創(chuàng)建了一個新的 ConcurrentLinkedQueue 實例。
  • queue.offer(taskNumber);:生產(chǎn)者線程將元素添加到隊列尾部。
  • Integer element = queue.poll();:消費(fèi)者線程從隊列頭部移除元素。
  • ExecutorService executor = Executors.newFixedThreadPool(10);:創(chuàng)建了一個大小為10的線程池。
  • executor.submit(() -> {...});:提交了生產(chǎn)者和消費(fèi)者任務(wù)到線程池。
  • executor.shutdown();:關(guān)閉線程池。

這個示例展示了如何在多線程環(huán)境中使用 ConcurrentLinkedQueue 來安全地進(jìn)行生產(chǎn)者和消費(fèi)者操作。由于 ConcurrentLinkedQueue 是無界的且非阻塞的,它適合用于生產(chǎn)者和消費(fèi)者數(shù)量不固定,或者需要高并發(fā)處理的場景。

代碼分析:

以下是 ConcurrentLinkedQueue 類的一些關(guān)鍵方法的代碼分析:

  • offer(E e):這個方法用于向 ConcurrentLinkedQueue 中添加一個元素。如果隊列已滿,該方法將返回 false。

  • poll():這個方法用于從 ConcurrentLinkedQueue 中移除并返回第一個元素。如果隊列為空,該方法將返回 null。

  • peek():這個方法用于返回 ConcurrentLinkedQueue 中第一個元素,但不從隊列中移除它。如果隊列為空,該方法將返回 null。

  • size():這個方法用于返回 ConcurrentLinkedQueue 中元素的數(shù)量。

這些方法都使用了原子操作來更新鏈表節(jié)點(diǎn),并確保了線程安全。在 offer 方法中,會使用 CAS 操作將新元素添加到鏈表的尾部。在 poll 方法中,會使用 CAS 操作從鏈表的頭部移除元素。

5、ConcurrentLinkedDeque

一個線程安全的雙端隊列,也是基于鏈表實現(xiàn),適用于需要從兩端插入和刪除元素的場景。

實現(xiàn)原理:

ConcurrentLinkedDeque 是一個基于鏈表實現(xiàn)的線程安全的雙端隊列。它支持在隊列的首尾進(jìn)行插入和刪除操作,并且是線程安全的。

  1. 雙端隊列:

ConcurrentLinkedDeque 實現(xiàn)了 Deque 接口,提供了一組原子操作來支持雙端隊列的基本功能,如從頭部插入(addFirst)、從尾部插入(addLast)、從頭部移除(removeFirst)、從尾部移除(removeLast)等。

  1. 無界隊列:

ConcurrentLinkedDeque 沒有容量限制,理論上可以無限增長,直到耗盡內(nèi)存。

  1. 原子操作:

ConcurrentLinkedDeque 使用了 compare-and-swap(CAS)操作來更新鏈表節(jié)點(diǎn),這保證了在多線程環(huán)境下的線程安全。

  1. 無鎖算法:

ConcurrentLinkedDeque 使用了無鎖算法,避免了鎖競爭帶來的性能開銷。

作用:

ConcurrentLinkedDeque 的主要作用是在多線程環(huán)境中提供一個高效且線程安全的雙端隊列。它適用于以下場景:

  • 當(dāng)你需要一個雙端隊列,且隊列的容量不需要事先確定時。
  • 當(dāng)你希望在隊列操作中避免鎖競爭和阻塞時。
  • 當(dāng)你需要從隊列的首尾都可以進(jìn)行插入和刪除操作時。

示例代碼:

以下是一個簡單的 ConcurrentLinkedDeque 使用示例:



import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class ConcurrentLinkedDequeExample {
    public static void main(String[] args) {
        // 創(chuàng)建一個ConcurrentLinkedDeque
        ConcurrentLinkedDeque<String> deque = new ConcurrentLinkedDeque<>();


        // 創(chuàng)建一個線程池
        ExecutorService executor = Executors.newFixedThreadPool(10);


        // 提交任務(wù)到線程池,生產(chǎn)者線程
        for (int i = 0; i < 5; i++) {
            final int taskNumber = i;
            executor.submit(() -> {
                deque.addFirst("Task " + taskNumber);
                System.out.println("Task " + taskNumber + " added to the front of the deque");
            });
        }


        // 提交任務(wù)到線程池,消費(fèi)者線程
        for (int i = 0; i < 5; i++) {
            executor.submit(() -> {
                String element = deque.removeLast();
                if (element != null) {
                    System.out.println(element + " removed from the end of the deque");
                }
            });
        }


        // 關(guān)閉線程池
        executor.shutdown();
    }
}

在這個示例中,我們創(chuàng)建了一個 ConcurrentLinkedDeque 并使用一個線程池來模擬生產(chǎn)者和消費(fèi)者。生產(chǎn)者線程向隊列的頭部添加元素,而消費(fèi)者線程從隊列的尾部移除元素。由于 ConcurrentLinkedDeque 是線程安全的,所以這個程序可以正確地運(yùn)行而不會出現(xiàn)并發(fā)問題。

解釋:

  • ConcurrentLinkedDeque<String> deque = new ConcurrentLinkedDeque<>();:創(chuàng)建了一個新的 ConcurrentLinkedDeque 實例。
  • deque.addFirst("Task " + taskNumber);:生產(chǎn)者線程將元素添加到隊列的頭部。
  • String element = deque.removeLast();:消費(fèi)者線程從隊列的尾部移除元素。
  • ExecutorService executor = Executors.newFixedThreadPool(10);:創(chuàng)建了一個大小為10的線程池。
  • executor.submit(() -> {...});:提交了生產(chǎn)者和消費(fèi)者任務(wù)到線程池。
  • executor.shutdown();:關(guān)閉線程池。

這個示例展示了如何在多線程環(huán)境中使用 ConcurrentLinkedDeque 來安全地進(jìn)行生產(chǎn)者和消費(fèi)者操作。由于 ConcurrentLinkedDeque 是無界的且非阻塞的,它適合用于生產(chǎn)者和消費(fèi)者數(shù)量不固定,或者需要高并發(fā)處理的場景,并且需要雙端隊列的特性。

代碼分析:

以下是 ConcurrentLinkedDeque 類的一些關(guān)鍵方法的代碼分析:

  • addFirst(E e):這個方法用于在 ConcurrentLinkedDeque 的頭部添加一個元素。

  • addLast(E e):這個方法用于在 ConcurrentLinkedDeque 的尾部添加一個元素。

  • removeFirst():這個方法用于從 ConcurrentLinkedDeque 的頭部移除并返回第一個元素。如果隊列為空,該方法將返回 null。

  • removeLast():這個方法用于從 ConcurrentLinkedDeque 的尾部移除并返回最后一個元素。如果隊列為空,該方法將返回 null。

這些方法都使用了原子操作來更新鏈表節(jié)點(diǎn),并確保了線程安全。在 addFirst 和 addLast 方法中,會使用 CAS 操作將新元素添加到鏈表的頭部或尾部。在 removeFirst 和 removeLast 方法中,會使用 CAS 操作從鏈表的頭部或尾部移除元素。

6、BlockingQueue 接口及其實現(xiàn)類

(如 ArrayBlockingQueue, LinkedBlockingQueue, PriorityBlockingQueue 等)提供了線程安全的隊列操作,支持阻塞的插入和獲取操作。當(dāng)隊列滿時,插入操作會阻塞;當(dāng)隊列空時,獲取操作會阻塞。

實現(xiàn)原理:

BlockingQueue 是一個支持阻塞操作的隊列。當(dāng)隊列滿時,插入操作會阻塞;當(dāng)隊列空時,獲取操作會阻塞。它實現(xiàn)了生產(chǎn)者-消費(fèi)者模式,用于線程間的數(shù)據(jù)共享。

  1. 阻塞操作:

BlockingQueue 提供了阻塞的 put 和 take 方法,這些方法在隊列滿或空時會使線程進(jìn)入等待狀態(tài),直到隊列有空閑空間或數(shù)據(jù)可用。

  1. 同步機(jī)制:

BlockingQueue 的實現(xiàn)類通常使用鎖(如 ReentrantLock)和條件變量(如 Condition)來實現(xiàn)線程同步。

  1. 容量限制:

BlockingQueue 通常有固定的容量限制,但也有一些實現(xiàn)(如 LinkedBlockingQueue)允許指定最大容量,如果沒有指定,則默認(rèn)為 Integer.MAX_VALUE。

作用:

BlockingQueue 的主要作用是在多線程環(huán)境中提供一個線程安全的隊列,用于生產(chǎn)者和消費(fèi)者之間的數(shù)據(jù)傳遞。它適用于以下場景:

  • 當(dāng)你需要一個有界或無界隊列,并且希望在隊列滿或空時阻塞線程時。
  • 當(dāng)你希望在生產(chǎn)者和消費(fèi)者之間實現(xiàn)同步時。

示例代碼:

以下是一個簡單的 BlockingQueue 使用示例,使用 ArrayBlockingQueue 作為實現(xiàn):

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class BlockingQueueExample {
    public static void main(String[] args) {
        // 創(chuàng)建一個容量為10的ArrayBlockingQueue
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);


        // 創(chuàng)建一個線程池
        ExecutorService executor = Executors.newFixedThreadPool(2);


        // 提交生產(chǎn)者任務(wù)到線程池
        executor.submit(() -> {
            try {
                for (int i = 0; i < 20; i++) {
                    queue.put("Task " + i);
                    System.out.println("Task " + i + " added to the queue");
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });


        // 提交消費(fèi)者任務(wù)到線程池
        executor.submit(() -> {
            try {
                for (int i = 0; i < 20; i++) {
                    String task = queue.take();
                    System.out.println(task + " removed from the queue");
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });


        // 關(guān)閉線程池
        executor.shutdown();
    }
}

在這個示例中,我們創(chuàng)建了一個 ArrayBlockingQueue 并使用一個線程池來模擬生產(chǎn)者和消費(fèi)者。生產(chǎn)者線程向隊列中添加元素,而消費(fèi)者線程從隊列中移除元素。由于 ArrayBlockingQueue 是線程安全的,所以這個程序可以正確地運(yùn)行而不會出現(xiàn)并發(fā)問題。

解釋:

  • BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);:創(chuàng)建了一個容量為10的 ArrayBlockingQueue 實例。
  • queue.put("Task " + i);:生產(chǎn)者線程將元素添加到隊列中,如果隊列已滿,則線程會阻塞。
  • String task = queue.take();:消費(fèi)者線程從隊列中移除元素,如果隊列空,則線程會阻塞。
  • ExecutorService executor = Executors.newFixedThreadPool(2);:創(chuàng)建了一個大小為2的線程池。
  • executor.submit(() -> {...});:提交了生產(chǎn)者和消費(fèi)者任務(wù)到線程池。
  • executor.shutdown();:關(guān)閉線程池。

這個示例展示了如何在多線程環(huán)境中使用 BlockingQueue 來安全地進(jìn)行生產(chǎn)者和消費(fèi)者操作。由于 ArrayBlockingQueue 是有界的,它在隊列滿時會阻塞生產(chǎn)者,在隊列空時會阻塞消費(fèi)者,適合用于需要阻塞隊列的場景。

代碼分析:

以下是 BlockingQueue 接口的一些關(guān)鍵方法的代碼分析:

  • put(E e):這個方法用于向 BlockingQueue 中添加一個元素。如果隊列已滿,該方法會阻塞,直到隊列有空閑空間。

  • take():這個方法用于從 BlockingQueue 中移除并返回第一個元素。如果隊列為空,該方法會阻塞,直到隊列有數(shù)據(jù)可用。

  • offer(E e):這個方法用于向 BlockingQueue 中添加一個元素。如果隊列已滿,該方法將返回 false。

  • poll(long timeout, TimeUnit unit):這個方法用于從 BlockingQueue 中移除并返回第一個元素。如果隊列為空,該方法將在指定的時間內(nèi)阻塞,如果超時則返回 null。

這些方法都使用了鎖和條件變量來實現(xiàn)線程同步。在 put 和 offer 方法中,會使用鎖來保護(hù)隊列,并使用條件變量來阻塞線程。在 take 和 poll 方法中,會使用鎖來保護(hù)隊列,并使用條件變量來喚醒等待的線程。

7、ConcurrentSkipListMap 和 ConcurrentSkipListSet

分別是線程安全的有序映射和有序集,基于跳表(Skip List)實現(xiàn),提供了高效的查找、插入和刪除操作。

實現(xiàn)原理:

ConcurrentSkipListMap 是一個線程安全的有序映射,它基于跳表(Skip List)數(shù)據(jù)結(jié)構(gòu)實現(xiàn)。跳表是一種平衡樹結(jié)構(gòu),它結(jié)合了紅黑樹和有序鏈表的特點(diǎn),提供了一種高效的數(shù)據(jù)結(jié)構(gòu),可以進(jìn)行快速的查找、插入和刪除操作。

  1. 跳表結(jié)構(gòu):

跳表包含多層索引,每一層索引都是有序的鏈表。最底層是最簡單的有序鏈表,高層索引包含指向下一層索引的指針。通過這些指針,可以快速跳過大量節(jié)點(diǎn),從而提高查找、插入和刪除操作的效率。

  1. 線程安全:

ConcurrentSkipListMap 使用 ReentrantLock 來保證線程安全。當(dāng)多個線程同時進(jìn)行修改操作時,它們會競爭鎖。

  1. 讀取操作無鎖:

讀取操作(如 get)通常不需要加鎖,因為跳表的結(jié)構(gòu)保證了讀取操作的可見性和一致性。 作用:

ConcurrentSkipListMap 的主要作用是在多線程環(huán)境中提供高效的并發(fā)訪問,同時保持元素的自然順序。它適用于以下場景:

  • 當(dāng)你需要一個線程安全的有序映射,且需要在多線程環(huán)境中進(jìn)行頻繁的讀寫操作時。
  • 當(dāng)你需要根據(jù)元素的自然順序進(jìn)行快速查找、插入和刪除操作時。

示例代碼:

以下是一個簡單的 ConcurrentSkipListMap 使用示例:



import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class ConcurrentSkipListMapExample {
    public static void main(String[] args) {
        // 創(chuàng)建一個ConcurrentSkipListMap
        ConcurrentSkipListMap<String, Integer> map = new ConcurrentSkipListMap<>();


        // 創(chuàng)建一個線程池
        ExecutorService executor = Executors.newFixedThreadPool(2);


        // 提交任務(wù)到線程池,生產(chǎn)者線程
        executor.submit(() -> {
            for (int i = 0; i < 20; i++) {
                map.put("Key " + i, i);
                System.out.println("Task " + i + " added to the map");
            }
        });


        // 提交任務(wù)到線程池,消費(fèi)者線程
        executor.submit(() -> {
            for (int i = 0; i < 20; i++) {
                Integer value = map.get("Key " + i);
                if (value != null) {
                    System.out.println("Task " + value + " found in the map");
                }
            }
        });


        // 關(guān)閉線程池
        executor.shutdown();
    }
}

在這個示例中,我們創(chuàng)建了一個 ConcurrentSkipListMap 并使用一個線程池來模擬生產(chǎn)者和消費(fèi)者。生產(chǎn)者線程向映射中添加元素,而消費(fèi)者線程從映射中查找元素。由于 ConcurrentSkipListMap 是線程安全的,所以這個程序可以正確地運(yùn)行而不會出現(xiàn)并發(fā)問題。

解釋:

  • ConcurrentSkipListMap<String, Integer> map = new ConcurrentSkipListMap<>();:創(chuàng)建了一個新的 ConcurrentSkipListMap 實例。
  • map.put("Key " + i, i);:生產(chǎn)者線程將鍵值對添加到映射中。
  • Integer value = map.get("Key " + i);:消費(fèi)者線程從映射中查找特定鍵對應(yīng)的值。
  • ExecutorService executor = Executors.newFixedThreadPool(2);:創(chuàng)建了一個大小為2的線程池。
  • executor.submit(() -> {...});:提交了生產(chǎn)者和消費(fèi)者任務(wù)到線程池。
  • executor.shutdown();:關(guān)閉

代碼分析:

以下是 ConcurrentSkipListMap 和 ConcurrentSkipListSet 類的一些關(guān)鍵方法的代碼分析:

  1. ConcurrentSkipListMap 類:
    • put(K key, V value):這個方法用于向 ConcurrentSkipListMap 中添加一個鍵值對。
    • get(Object key):這個方法用于從 ConcurrentSkipListMap 中獲取與指定鍵關(guān)聯(lián)的值。
    • remove(Object key):這個方法用于從 ConcurrentSkipListMap 中移除與指定鍵關(guān)聯(lián)的鍵值對。

這些方法都使用了 ReentrantLock 來保護(hù)跳表的修改操作,并確保線程安全。在 put、get 和 remove 方法中,會使用跳表的數(shù)據(jù)結(jié)構(gòu)進(jìn)行查找、插入和刪除操作。

  1. ConcurrentSkipListSet 類:
    • add(E e):這個方法用于向 ConcurrentSkipListSet 中添加一個元素。
    • contains(Object o):這個方法用于檢查集合中是否包含指定元素。
    • remove(Object o):這個方法用于從 ConcurrentSkipListSet 中移除指定元素。

這些方法同樣使用了 ReentrantLock 來保護(hù)跳表的修改操作,并確保線程安全。在 add、contains 和 remove 方法中,會使用跳表的數(shù)據(jù)結(jié)構(gòu)進(jìn)行查找、插入和刪除操作。

8、CountDownLatch

一個同步輔助類,允許一個或多個線程等待其他線程完成操作,可用于實現(xiàn)并發(fā)同步。

實現(xiàn)原理:

CountDownLatch 是一個同步輔助類,用于實現(xiàn)線程之間的等待/通知模式。它允許一個或多個線程等待直到一系列操作在其他線程中完成。

  1. 計數(shù)器:

CountDownLatch 使用一個計數(shù)器來跟蹤完成操作的線程數(shù)量。初始時,計數(shù)器的值等于線程的數(shù)量。

  1. 阻塞等待:

當(dāng)調(diào)用 CountDownLatch 的 await 方法時,當(dāng)前線程會阻塞,直到計數(shù)器值為零。

  1. 計數(shù)器遞減:

其他線程通過調(diào)用 CountDownLatch 的 countDown 方法來遞減計數(shù)器的值。每個線程在完成自己的操作后調(diào)用此方法。

作用:

CountDownLatch 的主要作用是在多線程環(huán)境中提供一個同步點(diǎn),使得主線程可以等待其他線程完成各自的任務(wù)后再繼續(xù)執(zhí)行。它適用于以下場景:

  • 當(dāng)你需要等待多個線程完成各自的任務(wù)后,才能繼續(xù)執(zhí)行后續(xù)操作時。
  • 當(dāng)你需要確保所有線程都完成了自己的任務(wù),再進(jìn)行下一步操作時。

示例代碼:

以下是一個簡單的 CountDownLatch 使用示例:

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class CountDownLatchExample {
    public static void main(String[] args) {
        // 創(chuàng)建一個CountDownLatch,初始值為5
        CountDownLatch latch = new CountDownLatch(5);


        // 創(chuàng)建一個線程池
        ExecutorService executor = Executors.newFixedThreadPool(5);


        // 提交任務(wù)到線程池,每個任務(wù)都會遞減latch的計數(shù)器
        for (int i = 0; i < 5; i++) {
            final int taskNumber = i;
            executor.submit(() -> {
                System.out.println("Task " + taskNumber + " is running");
                latch.countDown();
                System.out.println("Task " + taskNumber + " is completed");
            });
        }


        // 主線程等待latch的計數(shù)器歸零后繼續(xù)執(zhí)行
        try {
            latch.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }


        System.out.println("All tasks are completed, main thread continues");


        // 關(guān)閉線程池
        executor.shutdown();
    }
}

在這個示例中,我們創(chuàng)建了一個 CountDownLatch 并使用一個線程池來模擬5個并發(fā)任務(wù)。每個任務(wù)在完成自己的操作后都會遞減 CountDownLatch 的計數(shù)器。主線程在調(diào)用 latch.await() 方法時會阻塞,直到計數(shù)器歸零。當(dāng)所有任務(wù)完成后,主線程繼續(xù)執(zhí)行。

解釋:

  • CountDownLatch latch = new CountDownLatch(5);:創(chuàng)建了一個初始值為5的 CountDownLatch 實例。
  • latch.countDown();:每個線程在完成任務(wù)后調(diào)用此方法遞減計數(shù)器的值。
  • latch.await();:主線程在調(diào)用此方法時會阻塞,直到計數(shù)器的值為零。
  • ExecutorService executor = Executors.newFixedThreadPool(5);:創(chuàng)建了一個大小為5的線程池。
  • executor.submit(() -> {...});:提交了5個任務(wù)到線程池。
  • executor.shutdown();:關(guān)閉線程池。

這個示例展示了如何在多線程環(huán)境中使用 CountDownLatch 來確保主線程等待所有子線程完成任務(wù)后再繼續(xù)執(zhí)行。由于 CountDownLatch 提供了線程間的同步點(diǎn),它適合用于需要等待多個線程完成任務(wù)的場景。

代碼分析:

以下是 CountDownLatch 類的一些關(guān)鍵方法的代碼分析:

  • CountDownLatch(int count): 這個構(gòu)造方法用于創(chuàng)建一個 CountDownLatch 對象,并初始化計數(shù)器的值。

  • await(): 這個方法用于使當(dāng)前線程等待,直到計數(shù)器的值為零。如果計數(shù)器的值不為零,當(dāng)前線程會阻塞。

  • countDown(): 這個方法用于遞減計數(shù)器的值。每個線程在完成自己的操作后調(diào)用此方法。

  • getCount(): 這個方法用于獲取當(dāng)前計數(shù)器的值。

  • isLatchOpen(): 這個方法用于檢查計數(shù)器的值是否為零。如果計數(shù)器的值為零,則返回 true,否則返回 false。

這些方法都使用了 Object 類的 wait() 和 notify() 方法來實現(xiàn)線程間的同步。當(dāng)線程到達(dá) await 方法時,它會調(diào)用 wait() 方法,這會導(dǎo)致線程進(jìn)入等待狀態(tài)。當(dāng)其他線程調(diào)用 countDown() 方法并遞減計數(shù)器的值時,會調(diào)用 notify() 方法來喚醒等待的線程。

9、CyclicBarrier

一個允許一組線程互相等待的同步輔助類,直到所有線程都達(dá)到某個屏障點(diǎn)后才繼續(xù)執(zhí)行。

實現(xiàn)原理:

CyclicBarrier 是一個同步輔助類,用于讓一組線程到達(dá)一個屏障(barrier)時被阻塞,直到最后一個線程到達(dá)屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續(xù)運(yùn)行。CyclicBarrier 的名稱中的 “Cyclic” 指的是它可以被重用。當(dāng)所有參與者到達(dá)屏障時,它們會執(zhí)行 barrierAction 指定的動作,這個動作只會被最后一個到達(dá)屏障的線程執(zhí)行。

  1. 屏障計數(shù)器:

CyclicBarrier 使用一個內(nèi)部計數(shù)器來跟蹤到達(dá)屏障的線程數(shù)量。

  1. 阻塞線程:

當(dāng)線程到達(dá)屏障時,它會阻塞,直到計數(shù)器的值為零。

  1. 重用性:

CyclicBarrier 允許在屏障打開后重新使用它,而不是每次使用后都必須創(chuàng)建一個新的。

作用:

CyclicBarrier 的主要作用是在多線程環(huán)境中提供一個線程間的同步點(diǎn),使得一組線程在完成各自的任務(wù)后,能夠同時繼續(xù)執(zhí)行后續(xù)操作。它適用于以下場景:

  • 當(dāng)你需要讓一組線程等待直到所有線程都完成某個任務(wù)后,才能繼續(xù)執(zhí)行后續(xù)操作時。
  • 當(dāng)你需要確保一組線程同時開始執(zhí)行某個操作時。

示例代碼:

以下是一個簡單的 CyclicBarrier 使用示例:

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class CyclicBarrierExample {
    public static void main(String[] args) {
        // 創(chuàng)建一個CyclicBarrier,初始值為5
        CyclicBarrier barrier = new CyclicBarrier(5);


        // 創(chuàng)建一個線程池
        ExecutorService executor = Executors.newFixedThreadPool(5);


        // 提交任務(wù)到線程池,每個任務(wù)都會到達(dá)屏障并執(zhí)行后續(xù)操作
        for (int i = 0; i < 5; i++) {
            final int taskNumber = i;
            executor.submit(() -> {
                System.out.println("Task " + taskNumber + " is running");
                try {
                    barrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("Task " + taskNumber + " is completed");
            });
        }


        // 主線程等待所有任務(wù)完成后繼續(xù)執(zhí)行
        try {
            barrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            Thread.currentThread().interrupt();
        }


        System.out.println("All tasks are completed, main thread continues");


        // 關(guān)閉線程池
        executor.shutdown();
    }
}

在這個示例中,我們創(chuàng)建了一個 CyclicBarrier 并使用一個線程池來模擬5個并發(fā)任務(wù)。每個任務(wù)在完成自己的操作后會到達(dá)屏障并等待其他任務(wù)也到達(dá)屏障。當(dāng)所有任務(wù)都到達(dá)屏障時,它們會繼續(xù)執(zhí)行后續(xù)操作。主線程在調(diào)用 barrier.await() 方法時會阻塞,直到所有子線程都到達(dá)屏障。

解釋:

  • CyclicBarrier barrier = new CyclicBarrier(5);:創(chuàng)建了一個初始值為5的 CyclicBarrier 實例。
  • barrier.await();:每個線程在完成任務(wù)后調(diào)用此方法到達(dá)屏障并等待其他線程。
  • ExecutorService executor = Executors.newFixedThreadPool(5);:創(chuàng)建了一個大小為5的線程池。
  • executor.submit(() -> {...});:提交了5個任務(wù)到線程池。
  • executor.shutdown();:關(guān)閉線程池。

這個示例展示了如何在多線程環(huán)境中使用 CyclicBarrier 來確保一組線程在完成各自的任務(wù)后,能夠同時繼續(xù)執(zhí)行后續(xù)操作。由于 CyclicBarrier 提供了線程間的同步點(diǎn),它適合用于需要線程同步執(zhí)行的場景。

代碼分析:

以下是 CyclicBarrier 類的一些關(guān)鍵方法的代碼分析:

  • CyclicBarrier(int parties): 這個構(gòu)造方法用于創(chuàng)建一個 CyclicBarrier 對象,并指定屏障的參與者數(shù)量。

  • await(): 這個方法用于使當(dāng)前線程等待,直到計數(shù)器的值為零。如果計數(shù)器的值不為零,當(dāng)前線程會阻塞。

  • await(long timeout, TimeUnit unit): 這個方法與 await() 類似,但它允許設(shè)置一個超時時間。如果其他線程在超時時間內(nèi)還沒有到達(dá)屏障,當(dāng)前線程將返回 false。

  • reset(): 這個方法用于重置屏障,將其計數(shù)器的值重置為初始值。

  • getNumberWaiting(): 這個方法用于獲取當(dāng)前等待在屏障上的線程數(shù)量。

  • getParties(): 這個方法用于獲取屏障的參與者數(shù)量。

  • isBroken(): 這個方法用于檢查屏障是否被破壞。如果屏障被破壞,所有等待的線程都會被中斷。

這些方法都使用了 Object 類的 wait() 和 notify() 方法來實現(xiàn)線程間的同步。當(dāng)線程到達(dá) await 方法時,它會調(diào)用 wait() 方法,這會導(dǎo)致線程進(jìn)入等待狀態(tài)。當(dāng)其他線程到達(dá)屏障時,會調(diào)用 notify() 方法來喚醒等待的線程。

10、Semaphore

一個計數(shù)信號量,可以用來限制可以同時訪問某個特定資源的線程數(shù)量。

實現(xiàn)原理:

Exchanger 是一個同步輔助類,用于實現(xiàn)兩個線程間的數(shù)據(jù)交換。當(dāng)兩個線程都到達(dá) Exchanger 指定的交換點(diǎn)時,它們可以交換彼此的數(shù)據(jù)。如果只有一個線程到達(dá)交換點(diǎn),它會阻塞,直到另一個線程也到達(dá)交換點(diǎn)。

  1. 交換點(diǎn):

Exchanger 使用一個內(nèi)部同步機(jī)制來跟蹤到達(dá)交換點(diǎn)的線程數(shù)量。

  1. 阻塞等待:

當(dāng)線程到達(dá)交換點(diǎn)時,它會阻塞,直到另一個線程也到達(dá)交換點(diǎn)。

  1. 數(shù)據(jù)交換:

當(dāng)兩個線程都到達(dá)交換點(diǎn)時,它們可以交換彼此的數(shù)據(jù)。

作用:

Exchanger 的主要作用是在多線程環(huán)境中提供一個線程間的同步點(diǎn),使得兩個線程可以交換數(shù)據(jù)。它適用于以下場景:

  • 當(dāng)你需要實現(xiàn)兩個線程間的數(shù)據(jù)交換時。
  • 當(dāng)你需要確保兩個線程在某個點(diǎn)同步執(zhí)行時。

示例代碼:

以下是一個簡單的 Exchanger 使用示例:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Exchanger;


public class ExchangerExample {
    public static void main(String[] args) {
        // 創(chuàng)建一個Exchanger
        Exchanger<String> exchanger = new Exchanger<>();


        // 創(chuàng)建一個線程池
        ExecutorService executor = Executors.newFixedThreadPool(2);


        // 提交任務(wù)到線程池,第一個線程會生成數(shù)據(jù)并等待交換
        executor.submit(() -> {
            String data = "Data from the first thread";
            try {
                String receivedData = exchanger.exchange(data);
                System.out.println("Received data from the second thread: " + receivedData);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });


        // 提交任務(wù)到線程池,第二個線程會生成數(shù)據(jù)并交換
        executor.submit(() -> {
            String data = "Data from the second thread";
            try {
                String receivedData = exchanger.exchange(data);
                System.out.println("Received data from the first thread: " + receivedData);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });


        // 關(guān)閉線程池
        executor.shutdown();
    }
}

在這個示例中,我們創(chuàng)建了一個 Exchanger 并使用一個線程池來模擬兩個線程。第一個線程在交換點(diǎn)等待,并準(zhǔn)備好交換數(shù)據(jù)。第二個線程在交換點(diǎn)準(zhǔn)備好數(shù)據(jù),并等待第一個線程到達(dá)交換點(diǎn)。當(dāng)兩個線程都到達(dá)交換點(diǎn)時,它們可以交換數(shù)據(jù)。

解釋:

  • Exchanger<String> exchanger = new Exchanger<>();:創(chuàng)建了一個新的 Exchanger 實例。
  • String data = "Data from the first thread";:第一個線程準(zhǔn)備的數(shù)據(jù)。
  • exchanger.exchange(data);:第一個線程在交換點(diǎn)等待,并準(zhǔn)備好交換數(shù)據(jù)。
  • String receivedData = exchanger.exchange(data);:第二個線程在交換點(diǎn)準(zhǔn)備好數(shù)據(jù),并等待第一個線程到達(dá)交換點(diǎn)。
  • ExecutorService executor = Executors.newFixedThreadPool(2);:創(chuàng)建了一個大小為2的線程池。
  • executor.submit(() -> {...});:提交了兩個任務(wù)到線程池。
  • executor.shutdown();:關(guān)閉線程池。

這個示例展示了如何在多線程環(huán)境中使用 Exchanger 來確保兩個線程在某個點(diǎn)同步執(zhí)行并交換數(shù)據(jù)。由于 Exchanger 提供了線程間的同步點(diǎn),它適合用于需要線程間數(shù)據(jù)交換的場景。

代碼分析:

以下是 Semaphore 類的一些關(guān)鍵方法的代碼分析:

  • Semaphore(int permits): 這個構(gòu)造方法用于創(chuàng)建一個 Semaphore 對象,并指定信號量的初始值。

  • acquire(): 這個方法用于嘗試獲取一個資源。如果信號量的值大于零,當(dāng)前線程可以繼續(xù)執(zhí)行;如果信號量的值等于零,當(dāng)前線程將阻塞。

  • acquire(int permits): 這個方法與 acquire() 類似,但它允許指定要獲取的資源數(shù)量。

  • release(): 這個方法用于釋放一個資源。它會增加信號量的值,從而允許其他被阻塞的線程繼續(xù)執(zhí)行。

  • tryAcquire(): 這個方法用于嘗試獲取一個資源,但不阻塞。如果信號量的值大于零,當(dāng)前線程可以繼續(xù)執(zhí)行;如果信號量的值等于零,該方法將返回 false。

  • tryAcquire(int permits): 這個方法與 tryAcquire() 類似,但它允許指定要獲取的資源數(shù)量。

這些方法都使用了 Object 類的 wait() 和 notify() 方法來實現(xiàn)線程間的同步。當(dāng)線程到達(dá) acquire 方法時,它會調(diào)用 wait() 方法,這會導(dǎo)致線程進(jìn)入等待狀態(tài)。當(dāng)其他線程調(diào)用 release 方法并釋放資源時,會調(diào)用 notify() 方法來喚醒等待的線程。

11、Exchanger

一個用于在并發(fā)線程之間交換數(shù)據(jù)的工具,適用于遺傳算法、流水線設(shè)計等場景。

Exchanger 是一個用于線程間交換數(shù)據(jù)的同步輔助類,它允許兩個線程在某個點(diǎn)交換它們的數(shù)據(jù)。當(dāng)一個線程準(zhǔn)備好數(shù)據(jù)時,它會等待另一個線程準(zhǔn)備好數(shù)據(jù),然后它們可以交換數(shù)據(jù)。如果一個線程準(zhǔn)備好數(shù)據(jù)而另一個線程還沒有準(zhǔn)備好,那么第一個線程會阻塞,直到第二個線程準(zhǔn)備好數(shù)據(jù)。

實現(xiàn)原理:

Exchanger 的實現(xiàn)原理是基于 Object 類的 wait() 和 notify() 方法。當(dāng)一個線程到達(dá)交換點(diǎn)時,它會調(diào)用 exchange() 方法,該方法會嘗試將該線程的數(shù)據(jù)與另一個線程的數(shù)據(jù)交換。如果另一個線程還沒有準(zhǔn)備好數(shù)據(jù),那么第一個線程會阻塞,直到第二個線程到達(dá)交換點(diǎn)并準(zhǔn)備好數(shù)據(jù)。

  1. 交換點(diǎn):

Exchanger 使用一個內(nèi)部同步機(jī)制來跟蹤到達(dá)交換點(diǎn)的線程數(shù)量。

  1. 阻塞等待:

當(dāng)線程到達(dá)交換點(diǎn)時,它會阻塞,直到另一個線程也到達(dá)交換點(diǎn)。

  1. 數(shù)據(jù)交換:

當(dāng)兩個線程都到達(dá)交換點(diǎn)時,它們可以交換彼此的數(shù)據(jù)。

作用:

Exchanger 的主要作用是在多線程環(huán)境中提供一個線程間的同步點(diǎn),使得兩個線程可以交換數(shù)據(jù)。它適用于以下場景:

  • 當(dāng)你需要實現(xiàn)兩個線程間的數(shù)據(jù)交換時。
  • 當(dāng)你需要確保兩個線程在某個點(diǎn)同步執(zhí)行時。

示例代碼:

以下是一個簡單的 Exchanger 使用示例:



import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class ExchangerExample {
    public static void main(String[] args) {
        // 創(chuàng)建一個Exchanger
        Exchanger<String> exchanger = new Exchanger<>();


        // 創(chuàng)建一個線程池
        ExecutorService executor = Executors.newFixedThreadPool(2);


        // 提交任務(wù)到線程池,第一個線程會生成數(shù)據(jù)并等待交換
        executor.submit(() -> {
            String data = "Data from the first thread";
            try {
                String receivedData = exchanger.exchange(data);
                System.out.println("Received data from the second thread: " + receivedData);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });


        // 提交任務(wù)到線程池,第二個線程會生成數(shù)據(jù)并交換
        executor.submit(() -> {
            String data = "Data from the second thread";
            try {
                String receivedData = exchanger.exchange(data);
                System.out.println("Received data from the first thread: " + receivedData);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });


        // 關(guān)閉線程池
        executor.shutdown();
    }
}

在這個示例中,我們創(chuàng)建了一個 Exchanger 并使用一個線程池來模擬兩個線程。第一個線程在交換點(diǎn)等待,并準(zhǔn)備好交換數(shù)據(jù)。第二個線程在交換點(diǎn)準(zhǔn)備好數(shù)據(jù),并等待第一個線程到達(dá)交換點(diǎn)。當(dāng)兩個線程都到達(dá)交換點(diǎn)時,它們可以交換數(shù)據(jù)。

解釋:

  • Exchanger<String> exchanger = new Exchanger<>();:創(chuàng)建了一個新的 Exchanger 實例。
  • String data = "Data from the first thread";:第一個線程準(zhǔn)備的數(shù)據(jù)。
  • String receivedData = exchanger.exchange(data);:第一個線程在交換點(diǎn)等待,并準(zhǔn)備好交換數(shù)據(jù)。
  • String receivedData = exchanger.exchange(data);:第二個線程在交換點(diǎn)準(zhǔn)備好數(shù)據(jù),并等待第一個線程到達(dá)交換點(diǎn)。
  • ExecutorService executor = Executors.newFixedThreadPool(2);:創(chuàng)建了一個大小為2的線程池。
  • executor.submit(() -> {...});:提交了兩個任務(wù)到線程池。
  • executor.shutdown();:關(guān)閉線程池。

這個示例展示了如何在多線程環(huán)境中使用 Exchanger 來確保兩個線程在某個點(diǎn)同步執(zhí)行并交換數(shù)據(jù)。由于 Exchanger 提供了線程間的同步點(diǎn),它適合用于需要線程間數(shù)據(jù)交換的場景。

在這個示例中,兩個線程分別準(zhǔn)備了一些數(shù)據(jù),并嘗試通過 Exchanger 進(jìn)行交換。如果一個線程到達(dá)交換點(diǎn)時,另一個線程還沒有準(zhǔn)備好數(shù)據(jù),那么第一個線程會阻塞,直到第二個線程也到達(dá)交換點(diǎn)并準(zhǔn)備好數(shù)據(jù)。當(dāng)兩個線程都到達(dá)交換點(diǎn)時,它們可以交換數(shù)據(jù),然后繼續(xù)執(zhí)行。

代碼分析:

在 Java 中,Exchanger 類的實現(xiàn)原理基于 Object 類的 wait() 和 notify() 方法。以下是 Exchanger 類的一些關(guān)鍵方法的實現(xiàn)原理:

  • boolean exchange(V x):這個方法允許一個線程嘗試交換數(shù)據(jù)。如果另一個線程已經(jīng)準(zhǔn)備好數(shù)據(jù),那么這兩個線程將交換數(shù)據(jù)。如果另一個線程還沒有準(zhǔn)備好數(shù)據(jù),那么當(dāng)前線程會阻塞,直到另一個線程到達(dá)交換點(diǎn)并準(zhǔn)備好數(shù)據(jù)。

  • boolean exchange(V x, long timeout, TimeUnit unit):這個方法與 exchange(V x) 類似,但它允許設(shè)置一個超時時間。如果另一個線程在超時時間內(nèi)還沒有準(zhǔn)備好數(shù)據(jù),那么當(dāng)前線程將返回 false。

  • V exchange(V x, Phaser phaser):這個方法允許一個線程嘗試交換數(shù)據(jù),并且使用一個 Phaser 來管理線程的同步。如果另一個線程已經(jīng)準(zhǔn)備好數(shù)據(jù),那么這兩個線程將交換數(shù)據(jù)。如果另一個線程還沒有準(zhǔn)備好數(shù)據(jù),那么當(dāng)前線程會阻塞,直到另一個線程到達(dá)交換點(diǎn)并準(zhǔn)備好數(shù)據(jù)。

這些方法都使用了 Object 類的 wait()notify()方法來實現(xiàn)線程間的同步。當(dāng)一個線程到達(dá)交換點(diǎn)時,它會調(diào)用 wait() 方法,這會導(dǎo)致線程進(jìn)入等待狀態(tài)。當(dāng)另一個線程到達(dá)交換點(diǎn)并調(diào)用 notify() 方法時,第一個線程會被喚醒,并且可以繼續(xù)執(zhí)行。

12、Phaser

一個可重用的同步屏障,適用于類似于 CyclicBarrier 的場景,但提供了更靈活的注冊和注銷機(jī)制。

實現(xiàn)原理:

Phaser 是一個可重用的同步屏障,它允許一組線程互相等待,直到它們都到達(dá)某個屏障點(diǎn)。與 CyclicBarrier 類似,Phaser 允許重用同一個屏障,這意味著在所有線程到達(dá)屏障點(diǎn)后,可以重新使用該屏障。

  1. 屏障計數(shù)器:

Phaser 使用一個內(nèi)部計數(shù)器來跟蹤到達(dá)屏障點(diǎn)的線程數(shù)量。

  1. 阻塞等待:

當(dāng)線程到達(dá)屏障點(diǎn)時,它會阻塞,直到所有線程都到達(dá)屏障點(diǎn)。

  1. 重用性:

Phaser 允許在屏障點(diǎn)被觸發(fā)后重新使用它,而不是每次使用后都必須創(chuàng)建一個新的。

作用:

Phaser 的主要作用是在多線程環(huán)境中提供一個線程間的同步點(diǎn),使得一組線程在完成各自的任務(wù)后,能夠同時繼續(xù)執(zhí)行后續(xù)操作。它適用于以下場景:

  • 當(dāng)你需要讓一組線程等待直到所有線程都完成某個任務(wù)后,才能繼續(xù)執(zhí)行后續(xù)操作時。
  • 當(dāng)你需要確保一組線程同時開始執(zhí)行某個操作時。

示例代碼:

以下是一個簡單的 Phaser 使用示例:



import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;


public class PhaserExample {
    public static void main(String[] args) {
        // 創(chuàng)建一個Phaser,初始值為5
        Phaser phaser = new Phaser(5);


        // 創(chuàng)建一個線程池
        ExecutorService executor = Executors.newFixedThreadPool(5);


        // 提交任務(wù)到線程池,每個任務(wù)都會到達(dá)屏障點(diǎn)并執(zhí)行后續(xù)操作
        for (int i = 0; i < 5; i++) {
            final int taskNumber = i;
            executor.submit(() -> {
                System.out.println("Task " + taskNumber + " is running");
                try {
                    phaser.arriveAndAwaitAdvance();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("Task " + taskNumber + " is completed");
            });
        }


        // 主線程等待所有任務(wù)完成后繼續(xù)執(zhí)行
        try {
            phaser.arriveAndAwaitAdvance();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }


        System.out.println("All tasks are completed, main thread continues");


        // 關(guān)閉線程池
        executor.shutdown();
    }
}

在這個示例中,我們創(chuàng)建了一個 Phaser 并使用一個線程池來模擬5個并發(fā)任務(wù)。每個任務(wù)在完成自己的操作后會到達(dá)屏障點(diǎn)并等待其他任務(wù)也到達(dá)屏障點(diǎn)。當(dāng)所有任務(wù)都到達(dá)屏障點(diǎn)時,它們會繼續(xù)執(zhí)行后續(xù)操作。主線程在調(diào)用 phaser.arriveAndAwaitAdvance() 方法時會阻塞,直到所有子線程都到達(dá)屏障點(diǎn)。

解釋:

  • Phaser phaser = new Phaser(5);:創(chuàng)建了一個初始值為5的 Phaser 實例。
  • phaser.arriveAndAwaitAdvance();:每個線程在完成任務(wù)后調(diào)用此方法到達(dá)屏障點(diǎn)并等待其他線程。
  • ExecutorService executor = Executors.newFixedThreadPool(5);:創(chuàng)建了一個大小為5的線程池。
  • executor.submit(() -> {...});:提交了5個任務(wù)到線程池。
  • executor.shutdown();:關(guān)閉線程池。

這個示例展示了如何在多線程環(huán)境中使用 Phaser 來確保一組線程在完成各自的任務(wù)后,能夠同時繼續(xù)執(zhí)行后續(xù)操作。由于 Phaser 提供了線程間的同步點(diǎn),它適合用于需要線程同步執(zhí)行的場景。

代碼分析:

以下是 Phaser 類的一些關(guān)鍵方法的代碼分析:

  • Phaser(int parties): 這個構(gòu)造方法用于創(chuàng)建一個 Phaser 對象,并指定屏障的參與者數(shù)量。

  • arrive(): 這個方法用于使當(dāng)前線程到達(dá)屏障點(diǎn)。每次調(diào)用此方法時,計數(shù)器的值會遞減。

  • arriveAndAwaitAdvance(): 這個方法與 arrive() 類似,但它會阻塞當(dāng)前線程,直到計數(shù)器的值變?yōu)榱恪?/li>

  • awaitAdvance(): 這個方法用于阻塞當(dāng)前線程,直到屏障點(diǎn)被觸發(fā)。

  • getRegisteredParties(): 這個方法用于獲取當(dāng)前注冊在屏障上的線程數(shù)量。

  • getArrivedParties(): 這個方法用于獲取已經(jīng)到達(dá)屏障的線程數(shù)量。

  • isTerminated(): 這個方法用于檢查屏障是否已經(jīng)終止。如果屏障已經(jīng)終止,返回 true;否則返回 false。

  • forceTermination(): 這個方法用于強(qiáng)制終止屏障。它將計數(shù)器的值設(shè)置為零,并喚醒所有等待的線程。

這些方法都使用了 Object 類的 wait()notify() 方法來實現(xiàn)線程間的同步。當(dāng)線程到達(dá) arriveAndAwaitAdvance() 方法時,它會調(diào)用 wait() 方法,這會導(dǎo)致線程進(jìn)入等待狀態(tài)。當(dāng)其他線程到達(dá)屏障點(diǎn)并調(diào)用 arrive() 方法時,會調(diào)用 notify() 方法來喚醒等待的線程。

最后

以上就是 V哥給大家整理的12個并發(fā)相關(guān)的數(shù)據(jù)結(jié)構(gòu),這些并發(fā)數(shù)據(jù)結(jié)構(gòu)是 Java 并發(fā)編程的基礎(chǔ),它們在 java.util.concurrent 包(J.U.C)中提供。使用這些數(shù)據(jù)結(jié)構(gòu)可以幫助開發(fā)者編寫出高效且線程安全的并發(fā)程序,分布式應(yīng)用開發(fā)的項目中,你會使用到的。Java并發(fā)

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號