深入了解跳表(Skip List):Redis Sorted Set 的高效數(shù)據(jù)結(jié)構(gòu)

2024-12-17 12:00 更新

跳表(Skip List)是一種隨機(jī)化的數(shù)據(jù)結(jié)構(gòu),基于有序鏈表,通過(guò)在鏈表上增加多級(jí)索引來(lái)提高數(shù)據(jù)的查找效率。它是由 William Pugh 在 1990 年提出的。

為什么 Redis 中的 Sorted Set 使用跳躍表

Redis 的有序集合(Sorted Set)使用跳躍表(Skip List)作為底層實(shí)現(xiàn),主要是因?yàn)樘S表在性能、實(shí)現(xiàn)復(fù)雜度和靈活性等方面具有顯著優(yōu)勢(shì),可以替代平衡樹的數(shù)據(jù)結(jié)構(gòu)。

跳躍表的原理

跳躍表是一種擴(kuò)展的有序鏈表,通過(guò)維護(hù)多個(gè)層級(jí)的索引來(lái)提高查找效率。每個(gè)節(jié)點(diǎn)包含一個(gè)數(shù)據(jù)元素和一組指向其他節(jié)點(diǎn)的指針,這些指針?lè)植荚诓煌膶蛹?jí)。最底層的鏈表包含所有元素,而每一層的節(jié)點(diǎn)數(shù)量逐漸減少。這樣,查找操作可以從高層開始,快速跳過(guò)不需要的元素,減少遍歷的節(jié)點(diǎn)數(shù),從而提高查找效率。

  • 查找過(guò)程:從最高層的頭節(jié)點(diǎn)開始,沿著索引節(jié)點(diǎn)向右查找,如果當(dāng)前節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)的值大于或等于查找的目標(biāo)值,則向下移動(dòng)到下一層繼續(xù)查找;否則,向右移動(dòng)到下一個(gè)索引節(jié)點(diǎn)。這個(gè)過(guò)程一直持續(xù)到找到目標(biāo)節(jié)點(diǎn)或到達(dá)最底層鏈表。

  • 插入和刪除操作:跳躍表支持高效的動(dòng)態(tài)插入和刪除,時(shí)間復(fù)雜度均為 O(log n)。插入時(shí),首先找到插入位置,然后根據(jù)隨機(jī)函數(shù)決定新節(jié)點(diǎn)的層數(shù),最后在相應(yīng)的層中插入節(jié)點(diǎn)。

跳躍表的優(yōu)勢(shì)

  1. 簡(jiǎn)單性:跳躍表的實(shí)現(xiàn)相對(duì)簡(jiǎn)單,易于理解和維護(hù),而平衡樹(如紅黑樹)的實(shí)現(xiàn)較為復(fù)雜,容易出錯(cuò)。

  1. 高效的范圍查詢:跳躍表在進(jìn)行范圍查詢時(shí)效率更高,因?yàn)樗怯行虻逆湵?,可以直接遍歷后繼節(jié)點(diǎn),而平衡樹需要中序遍歷,復(fù)雜度更高。

  1. 靈活性:跳躍表的層數(shù)可以根據(jù)需要?jiǎng)討B(tài)調(diào)整,適應(yīng)不同的查詢需求。

  1. 高并發(fā)性能:跳躍表的節(jié)點(diǎn)可以支持多個(gè)線程同時(shí)插入或刪除節(jié)點(diǎn),而平衡樹和哈希表通常需要復(fù)雜的并發(fā)控制。

  1. 空間效率:跳躍表的空間復(fù)雜度為 O(n),并且通過(guò)調(diào)整節(jié)點(diǎn)的抽取間隔,可以靈活平衡空間和時(shí)間復(fù)雜度。

正是因?yàn)橛羞@些優(yōu)勢(shì),Redis 選擇使用跳躍表來(lái)實(shí)現(xiàn)有序集合,而不是紅黑樹或其他數(shù)據(jù)結(jié)構(gòu)。這使得 Redis 在處理有序集合時(shí)能夠高效地支持插入、刪除和查找操作,同時(shí)保持元素的有序性,非常適合實(shí)現(xiàn)如排行榜、范圍查詢等功能。

為了講明白跳表的原理,現(xiàn)在我們來(lái)模擬一個(gè)簡(jiǎn)單的跳表實(shí)現(xiàn)。

在 Java 中模擬實(shí)現(xiàn) Redis 的 Sorted Set 跳表,我們需要定義跳表的數(shù)據(jù)結(jié)構(gòu),包括節(jié)點(diǎn)和跳表本身。以下是一個(gè)簡(jiǎn)單的實(shí)現(xiàn):

import java.util.Random;


public class SkipList {


    private static final double P = 0.5; // 節(jié)點(diǎn)晉升的概率
    private static final int MAX_LEVEL = 16; // 最大層數(shù)
    private int levelCount; // 當(dāng)前跳表的層數(shù)
    private Node head; // 頭節(jié)點(diǎn)
    private int size; // 跳表中元素的數(shù)量
    private Random random; // 隨機(jī)數(shù)生成器


    public SkipList() {
        this.levelCount = 0;
        this.size = 0;
        this.head = new Node(-1, Integer.MIN_VALUE, MAX_LEVEL);
        this.random = new Random();
    }


    // 節(jié)點(diǎn)定義
    private class Node {
        int value;
        int key;
        Node[] forward; // 向前指針數(shù)組


        Node(int value, int key, int level) {
            this.value = value;
            this.key = key;
            this.forward = new Node[level + 1];
        }
    }


    // 隨機(jī)生成節(jié)點(diǎn)的層數(shù)
    private int randomLevel() {
        int level = 0;
        while (random.nextFloat() < P && level < MAX_LEVEL) {
            level++;
        }
        return level;
    }


    // 搜索指定值的節(jié)點(diǎn)
    public Node search(int key) {
        Node current = head;
        for (int i = levelCount - 1; i >= 0; i--) {
            while (current.forward[i] != null && current.forward[i].key < key) {
                current = current.forward[i]; // 沿著當(dāng)前層的指針移動(dòng)
            }
        }
        current = current.forward[0]; // 移動(dòng)到第0層
        return current;
    }


    // 插入節(jié)點(diǎn)
    public void insert(int key, int value) {
        Node[] update = new Node[MAX_LEVEL + 1];
        Node current = head;


        // 查找插入位置
        for (int i = levelCount - 1; i >= 0; i--) {
            while (current.forward[i] != null && current.forward[i].key < key) {
                current = current.forward[i];
            }
            update[i] = current;
        }


        int level = randomLevel(); // 隨機(jī)生成層數(shù)
        if (level > levelCount) {
            levelCount = level;
            for (int i = levelCount - 1; i >= 0; i--) {
                update[i] = head;
            }
        }


        current = new Node(value, key, level);
        for (int i = 0; i < level; i++) {
            current.forward[i] = update[i].forward[i];
            update[i].forward[i] = current;
        }
        size++;
    }


    // 刪除節(jié)點(diǎn)
    public void delete(int key) {
        Node[] update = new Node[MAX_LEVEL + 1];
        Node current = head;


        // 查找要?jiǎng)h除的節(jié)點(diǎn)
        for (int i = levelCount - 1; i >= 0; i--) {
            while (current.forward[i] != null && current.forward[i].key < key) {
                current = current.forward[i];
            }
            update[i] = current;
        }


        current = current.forward[0];
        if (current != null && current.key == key) {
            for (int i = 0; i < levelCount; i++) {
                if (update[i].forward[i] != current) {
                    break;
                }
                update[i].forward[i] = current.forward[i];
            }
            size--;
            while (levelCount > 0 && head.forward[levelCount - 1] == null) {
                levelCount--;
            }
        }
    }


    // 打印跳表
    public void printList() {
        Node current = head.forward[0];
        while (current != null) {
            System.out.println(current.key + ":" + current.value);
            current = current.forward[0];
        }
    }


    // 主函數(shù)測(cè)試跳表
    public static void main(String[] args) {
        SkipList list = new SkipList();
        list.insert(3, 100);
        list.insert(6, 300);
        list.insert(4, 400);
        list.insert(7, 700);
        list.insert(5, 500);
        list.printList();
        list.delete(3);
        list.printList();
    }
}

下面,V 哥來(lái)解釋一下:

  1. 跳表節(jié)點(diǎn):每個(gè)節(jié)點(diǎn)包含一個(gè)值、一個(gè)鍵和一個(gè)向前指針數(shù)組。向前指針數(shù)組存儲(chǔ)了指向同一層下一個(gè)節(jié)點(diǎn)的引用。

  1. 隨機(jī)層數(shù):每個(gè)節(jié)點(diǎn)的層數(shù)是根據(jù)預(yù)設(shè)的概率 P 隨機(jī)生成的,這樣可以模擬出不同高度的索引層。

  1. 搜索操作:從最高層開始,沿著當(dāng)前層的指針移動(dòng),直到找到插入點(diǎn)或到達(dá)底層。

  1. 插入操作:首先找到插入位置,然后根據(jù)隨機(jī)生成的層數(shù)創(chuàng)建新節(jié)點(diǎn),并更新每一層的指針。

  1. 刪除操作:找到要?jiǎng)h除的節(jié)點(diǎn),然后逐層更新指針,最后減少跳表的層數(shù)。

  1. 打印跳表:從底層的頭節(jié)點(diǎn)開始,遍歷打印每個(gè)節(jié)點(diǎn)的鍵和值。

從這個(gè)簡(jiǎn)化版的跳表實(shí)現(xiàn)可以看到跳表的基本操作??梢詭椭覀兝斫馓韺?shí)現(xiàn)的原理。

如何在 Java 中實(shí)現(xiàn)跳表的并發(fā)操作?

在 Java 中實(shí)現(xiàn)跳表的并發(fā)操作需要考慮線程安全問(wèn)題。可以通過(guò)以下方法來(lái)實(shí)現(xiàn):

  1. 使用同步塊:在每個(gè)公共方法中使用 synchronized 關(guān)鍵字來(lái)確保同一時(shí)間只有一個(gè)線程可以執(zhí)行方法。

  1. 使用鎖:使用 ReentrantLock 或其他并發(fā)鎖來(lái)控制對(duì)跳表的并發(fā)訪問(wèn)。

  1. 使用原子引用:使用 AtomicReferenceAtomicReferenceArray 來(lái)確保節(jié)點(diǎn)的原子更新。

  1. 使用并發(fā)集合:使用 ConcurrentHashMap 等并發(fā)集合作為輔助工具來(lái)實(shí)現(xiàn)線程安全的跳表。

以下是一個(gè)使用 synchronized 關(guān)鍵字實(shí)現(xiàn)線程安全的跳表的示例:

import java.util.Random;


public class ConcurrentSkipList {


    private static final double P = 0.5; // 節(jié)點(diǎn)晉升的概率
    private static final int MAX_LEVEL = 16; // 最大層數(shù)
    private int levelCount; // 當(dāng)前跳表的層數(shù)
    private Node head; // 頭節(jié)點(diǎn)
    private int size; // 跳表中元素的數(shù)量
    private Random random; // 隨機(jī)數(shù)生成器


    public ConcurrentSkipList() {
        this.levelCount = 0;
        this.size = 0;
        this.head = new Node(-1, Integer.MIN_VALUE, MAX_LEVEL);
        this.random = new Random();
    }


    private class Node {
        int value;
        int key;
        Node[] forward; // 向前指針數(shù)組


        Node(int value, int key, int level) {
            this.value = value;
            this.key = key;
            this.forward = new Node[level + 1];
        }
    }


    private int randomLevel() {
        int level = 0;
        while (random.nextFloat() < P && level < MAX_LEVEL) {
            level++;
        }
        return level;
    }


    public synchronized Node search(int key) {
        Node current = head;
        for (int i = levelCount - 1; i >= 0; i--) {
            while (current.forward[i] != null && current.forward[i].key < key) {
                current = current.forward[i];
            }
        }
        current = current.forward[0];
        return current;
    }


    public synchronized void insert(int key, int value) {
        Node[] update = new Node[MAX_LEVEL + 1];
        Node current = head;


        for (int i = levelCount - 1; i >= 0; i--) {
            while (current.forward[i] != null && current.forward[i].key < key) {
                current = current.forward[i];
            }
            update[i] = current;
        }


        int level = randomLevel();
        if (level > levelCount) {
            for (int i = levelCount; i < level; i++) {
                update[i] = head;
            }
            levelCount = level;
        }


        current = new Node(value, key, level);
        for (int i = 0; i < level; i++) {
            current.forward[i] = update[i].forward[i];
            update[i].forward[i] = current;
        }
        size++;
    }


    public synchronized void delete(int key) {
        Node[] update = new Node[MAX_LEVEL + 1];
        Node current = head;


        for (int i = levelCount - 1; i >= 0; i--) {
            while (current.forward[i] != null && current.forward[i].key < key) {
                current = current.forward[i];
            }
            update[i] = current;
        }


        current = current.forward[0];
        if (current != null && current.key == key) {
            for (int i = 0; i < levelCount; i++) {
                if (update[i].forward[i] != current) {
                    break;
                }
                update[i].forward[i] = current.forward[i];
            }
            size--;
            while (levelCount > 0 && head.forward[levelCount - 1] == null) {
                levelCount--;
            }
        }
    }


    public synchronized void printList() {
        Node current = head.forward[0];
        while (current != null) {
            System.out.println(current.key + ":" + current.value);
            current = current.forward[0];
        }
    }


    public static void main(String[] args) {
        ConcurrentSkipList list = new ConcurrentSkipList();
        list.insert(3, 100);
        list.insert(6, 300);
        list.insert(4, 400);
        list.insert(7, 700);
        list.insert(5, 500);
        list.printList();
        list.delete(3);
        list.printList();
    }
}

在這個(gè)示例中,我們使用了 synchronized 關(guān)鍵字來(lái)確保 search、insertdelete 方法是線程安全的。這會(huì)鎖定當(dāng)前對(duì)象,確保同一時(shí)間只有一個(gè)線程可以執(zhí)行這些方法。

請(qǐng)注意,雖然 synchronized 可以提供線程安全,但它也可能導(dǎo)致性能瓶頸,特別是在高并發(fā)環(huán)境下。在實(shí)際應(yīng)用中,可以考慮使用更細(xì)粒度的鎖,如 ReentrantLock,或者使用原子引用和其他并發(fā)工具來(lái)提高性能。

使用 ReentrantLock 的實(shí)現(xiàn)方式

使用 ReentrantLock 實(shí)現(xiàn)跳表的并發(fā)操作可以提供比 synchronized 更細(xì)粒度的鎖定,從而提高并發(fā)性能。ReentrantLock 允許您在不同的方法中鎖定和解鎖,甚至可以在不同的類中使用同一個(gè)鎖對(duì)象。

以下是使用 ReentrantLock 實(shí)現(xiàn)線程安全的跳表的示例:

import java.util.Random;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;


public class ConcurrentSkipList {


    private static final double P = 0.5; // 節(jié)點(diǎn)晉升的概率
    private static final int MAX_LEVEL = 16; // 最大層數(shù)
    private final Lock lock = new ReentrantLock(); // 創(chuàng)建一個(gè) ReentrantLock 對(duì)象
    private int levelCount; // 當(dāng)前跳表的層數(shù)
    private Node head; // 頭節(jié)點(diǎn)
    private int size; // 跳表中元素的數(shù)量
    private Random random; // 隨機(jī)數(shù)生成器


    public ConcurrentSkipList() {
        this.levelCount = 0;
        this.size = 0;
        this.head = new Node(Integer.MIN_VALUE, MAX_LEVEL);
        this.random = new Random();
    }


    private class Node {
        int value;
        int key;
        Node[] forward; // 向前指針數(shù)組


        Node(int key, int level) {
            this.key = key;
            this.forward = new Node[level + 1];
        }
    }


    private int randomLevel() {
        int level = 0;
        while (random.nextFloat() < P && level < MAX_LEVEL) {
            level++;
        }
        return level;
    }


    public void search(int key) {
        lock.lock(); // 加鎖
        try {
            Node current = head;
            for (int i = levelCount - 1; i >= 0; i--) {
                while (current.forward[i] != null && current.forward[i].key < key) {
                    current = current.forward[i];
                }
            }
            current = current.forward[0];
            // ... 處理找到的節(jié)點(diǎn)
        } finally {
            lock.unlock(); // 釋放鎖
        }
    }


    public void insert(int key, int value) {
        lock.lock(); // 加鎖
        try {
            Node[] update = new Node[MAX_LEVEL + 1];
            Node current = head;
            for (int i = levelCount - 1; i >= 0; i--) {
                while (current.forward[i] != null && current.forward[i].key < key) {
                    current = current.forward[i];
                }
                update[i] = current;
            }


            int level = randomLevel();
            if (level > levelCount) {
                levelCount = level;
            }


            current = new Node(value, key, level);
            for (int i = 0; i < level; i++) {
                current.forward[i] = update[i].forward[i];
                update[i].forward[i] = current;
            }
            size++;
        } finally {
            lock.unlock(); // 釋放鎖
        }
    }


    public void delete(int key) {
        lock.lock(); // 加鎖
        try {
            Node[] update = new Node[MAX_LEVEL + 1];
            Node current = head;
            for (int i = levelCount - 1; i >= 1; i--) {
                while (current.forward[i] != null && current.forward[i].key < key) {
                    current = current.forward[i];
                }
                update[i] = current;
            }


            current = current.forward[0];
            if (current != null && current.key == key) {
                for (int i = 0; i < levelCount; i++) {
                    if (update[i].forward[i] != current) {
                        break;
                    }
                    update[i].forward[i] = current.forward[i];
                }
                size--;
                while (levelCount > 0 && head.forward[levelCount - 1] == null) {
                    levelCount--;
                }
            }
        } finally {
            lock.unlock(); // 釋放鎖
        }
    }


    public void printList() {
        lock.lock(); // 加鎖
        try {
            Node current = head.forward[0];
            while (current != null) {
                System.out.println(current.key + ":" + current.value);
                current = current.forward[0];
            }
        } finally {
            lock.unlock(); // 釋放鎖
        }
    }


    public static void main(String[] args) {
        ConcurrentSkipList list = new ConcurrentSkipList();
        list.insert(3, 100);
        list.insert(6, 300);
        list.insert(4, 400);
        list.insert(7, 700);
        list.insert(5, 500);
        list.printList();
        list.delete(3);
        list.printList();
    }
}

在這個(gè)示例中,我們使用了 ReentrantLock 對(duì)象來(lái)控制對(duì)跳表的并發(fā)訪問(wèn)。每個(gè)公共方法在執(zhí)行之前都會(huì)調(diào)用 lock.lock() 加鎖,并在執(zhí)行完畢后(包括正常執(zhí)行和異常退出)調(diào)用 lock.unlock() 釋放鎖。

使用 ReentrantLock 的好處是它提供了比 synchronized 更靈活的鎖定機(jī)制,例如:

  1. 可中斷的鎖定ReentrantLock 允許線程在嘗試獲取鎖時(shí)被中斷。

  1. 嘗試非阻塞鎖定ReentrantLock 提供了 tryLock() 方法,允許線程嘗試獲取鎖而不無(wú)限等待。

  1. 超時(shí)獲取鎖ReentrantLock 還提供了 tryLock(long timeout, TimeUnit unit) 方法,允許線程在指定時(shí)間內(nèi)等待鎖。

  1. 公平鎖ReentrantLock 可以選擇是否使用公平鎖,公平鎖會(huì)按照線程請(qǐng)求鎖的順序來(lái)分配鎖。

  1. 多個(gè)條件變量ReentrantLock 可以與多個(gè) Condition 對(duì)象配合使用,而 synchronized 只能與一個(gè)條件變量配合使用。

理解了以上代碼實(shí)現(xiàn)原理后,我們?cè)賮?lái)理解 Redis Sorted Set 就不難了。

Redis 的 Sorted Set 是一種包含元素和關(guān)聯(lián)分?jǐn)?shù)的數(shù)據(jù)結(jié)構(gòu),它能夠根據(jù)分?jǐn)?shù)對(duì)元素進(jìn)行排序。Sorted Set 在 Redis 中的內(nèi)部實(shí)現(xiàn)可以是跳躍表(Skip List)和字典(Hash Table)的組合,或者是壓縮列表(Zip List),具體使用哪種實(shí)現(xiàn)取決于 Sorted Set 的大小和元素的長(zhǎng)度。

跳躍表 + 字典實(shí)現(xiàn)

當(dāng) Sorted Set 的元素?cái)?shù)量較多或者元素長(zhǎng)度較長(zhǎng)時(shí),Redis 使用跳躍表和字典來(lái)實(shí)現(xiàn) Sorted Set。跳躍表是一種概率平衡的數(shù)據(jù)結(jié)構(gòu),它通過(guò)多級(jí)索引來(lái)提高搜索效率,類似于二分查找。字典則用于快速查找和更新操作。

跳躍表的每個(gè)節(jié)點(diǎn)包含以下信息:

  • 元素(member)
  • 分?jǐn)?shù)(score)
  • 后退指針(backward)
  • 多層前進(jìn)指針(forward),每一層都是一個(gè)有序鏈表

字典則存儲(chǔ)了元素到分?jǐn)?shù)的映射,以便快速訪問(wèn)。

壓縮列表實(shí)現(xiàn)

當(dāng) Sorted Set 的元素?cái)?shù)量較少(默認(rèn)小于128個(gè)),并且所有元素的長(zhǎng)度都小于64字節(jié)時(shí),Redis 使用壓縮列表來(lái)存儲(chǔ) Sorted Set。壓縮列表是一種連續(xù)內(nèi)存塊的順序存儲(chǔ)結(jié)構(gòu),它將所有的元素和分?jǐn)?shù)緊湊地存儲(chǔ)在一起,以節(jié)省內(nèi)存空間。

應(yīng)用場(chǎng)景

Sorted Set 常用于以下場(chǎng)景:

  1. 排行榜:例如游戲中的玩家分?jǐn)?shù)排名。
  2. 范圍查詢:例如獲取一定分?jǐn)?shù)范圍內(nèi)的用戶。
  3. 任務(wù)調(diào)度:例如按照任務(wù)的優(yōu)先級(jí)執(zhí)行。
  4. 實(shí)時(shí)排名:例如股票價(jià)格的實(shí)時(shí)排名。

代碼分析

在 Redis 源碼中,Sorted Set 的實(shí)現(xiàn)主要在 t_zset.c 文件中。插入操作(zaddCommand)會(huì)根據(jù) Sorted Set 的編碼類型(跳躍表或壓縮列表)來(lái)執(zhí)行不同的邏輯。如果是跳躍表編碼,那么插入操作會(huì)涉及到字典的更新和跳躍表節(jié)點(diǎn)的添加。如果是壓縮列表編碼,則會(huì)檢查是否需要轉(zhuǎn)換為跳躍表編碼。

總結(jié)

Sorted Set 是 Redis 提供的一種強(qiáng)大的有序數(shù)據(jù)結(jié)構(gòu),它結(jié)合了跳躍表和字典的優(yōu)點(diǎn),提供了高效的插入、刪除、更新和范圍查詢操作。通過(guò)合理的使用 Sorted Set,可以有效地解決許多實(shí)際問(wèn)題。如何以上內(nèi)容對(duì)你有幫助,懇請(qǐng)點(diǎn)贊轉(zhuǎn)發(fā),V 哥在此感謝各位兄弟的支持。88,洗洗睡了。

以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)