跳表(Skip List)是一種隨機(jī)化的數(shù)據(jù)結(jié)構(gòu),基于有序鏈表,通過(guò)在鏈表上增加多級(jí)索引來(lái)提高數(shù)據(jù)的查找效率。它是由 William Pugh 在 1990 年提出的。
Redis 的有序集合(Sorted Set)使用跳躍表(Skip List)作為底層實(shí)現(xiàn),主要是因?yàn)樘S表在性能、實(shí)現(xiàn)復(fù)雜度和靈活性等方面具有顯著優(yōu)勢(shì),可以替代平衡樹的數(shù)據(jù)結(jié)構(gòu)。
跳躍表是一種擴(kuò)展的有序鏈表,通過(guò)維護(hù)多個(gè)層級(jí)的索引來(lái)提高查找效率。每個(gè)節(jié)點(diǎn)包含一個(gè)數(shù)據(jù)元素和一組指向其他節(jié)點(diǎn)的指針,這些指針?lè)植荚诓煌膶蛹?jí)。最底層的鏈表包含所有元素,而每一層的節(jié)點(diǎn)數(shù)量逐漸減少。這樣,查找操作可以從高層開始,快速跳過(guò)不需要的元素,減少遍歷的節(jié)點(diǎn)數(shù),從而提高查找效率。
正是因?yàn)橛羞@些優(yōu)勢(shì),Redis 選擇使用跳躍表來(lái)實(shí)現(xiàn)有序集合,而不是紅黑樹或其他數(shù)據(jù)結(jié)構(gòu)。這使得 Redis 在處理有序集合時(shí)能夠高效地支持插入、刪除和查找操作,同時(shí)保持元素的有序性,非常適合實(shí)現(xiàn)如排行榜、范圍查詢等功能。
為了講明白跳表的原理,現(xiàn)在我們來(lái)模擬一個(gè)簡(jiǎn)單的跳表實(shí)現(xiàn)。
在 Java 中模擬實(shí)現(xiàn) Redis 的 Sorted Set 跳表,我們需要定義跳表的數(shù)據(jù)結(jié)構(gòu),包括節(jié)點(diǎn)和跳表本身。以下是一個(gè)簡(jiǎn)單的實(shí)現(xiàn):
import java.util.Random;
public class SkipList {
private static final double P = 0.5; // 節(jié)點(diǎn)晉升的概率
private static final int MAX_LEVEL = 16; // 最大層數(shù)
private int levelCount; // 當(dāng)前跳表的層數(shù)
private Node head; // 頭節(jié)點(diǎn)
private int size; // 跳表中元素的數(shù)量
private Random random; // 隨機(jī)數(shù)生成器
public SkipList() {
this.levelCount = 0;
this.size = 0;
this.head = new Node(-1, Integer.MIN_VALUE, MAX_LEVEL);
this.random = new Random();
}
// 節(jié)點(diǎn)定義
private class Node {
int value;
int key;
Node[] forward; // 向前指針數(shù)組
Node(int value, int key, int level) {
this.value = value;
this.key = key;
this.forward = new Node[level + 1];
}
}
// 隨機(jī)生成節(jié)點(diǎn)的層數(shù)
private int randomLevel() {
int level = 0;
while (random.nextFloat() < P && level < MAX_LEVEL) {
level++;
}
return level;
}
// 搜索指定值的節(jié)點(diǎn)
public Node search(int key) {
Node current = head;
for (int i = levelCount - 1; i >= 0; i--) {
while (current.forward[i] != null && current.forward[i].key < key) {
current = current.forward[i]; // 沿著當(dāng)前層的指針移動(dòng)
}
}
current = current.forward[0]; // 移動(dòng)到第0層
return current;
}
// 插入節(jié)點(diǎn)
public void insert(int key, int value) {
Node[] update = new Node[MAX_LEVEL + 1];
Node current = head;
// 查找插入位置
for (int i = levelCount - 1; i >= 0; i--) {
while (current.forward[i] != null && current.forward[i].key < key) {
current = current.forward[i];
}
update[i] = current;
}
int level = randomLevel(); // 隨機(jī)生成層數(shù)
if (level > levelCount) {
levelCount = level;
for (int i = levelCount - 1; i >= 0; i--) {
update[i] = head;
}
}
current = new Node(value, key, level);
for (int i = 0; i < level; i++) {
current.forward[i] = update[i].forward[i];
update[i].forward[i] = current;
}
size++;
}
// 刪除節(jié)點(diǎn)
public void delete(int key) {
Node[] update = new Node[MAX_LEVEL + 1];
Node current = head;
// 查找要?jiǎng)h除的節(jié)點(diǎn)
for (int i = levelCount - 1; i >= 0; i--) {
while (current.forward[i] != null && current.forward[i].key < key) {
current = current.forward[i];
}
update[i] = current;
}
current = current.forward[0];
if (current != null && current.key == key) {
for (int i = 0; i < levelCount; i++) {
if (update[i].forward[i] != current) {
break;
}
update[i].forward[i] = current.forward[i];
}
size--;
while (levelCount > 0 && head.forward[levelCount - 1] == null) {
levelCount--;
}
}
}
// 打印跳表
public void printList() {
Node current = head.forward[0];
while (current != null) {
System.out.println(current.key + ":" + current.value);
current = current.forward[0];
}
}
// 主函數(shù)測(cè)試跳表
public static void main(String[] args) {
SkipList list = new SkipList();
list.insert(3, 100);
list.insert(6, 300);
list.insert(4, 400);
list.insert(7, 700);
list.insert(5, 500);
list.printList();
list.delete(3);
list.printList();
}
}
下面,V 哥來(lái)解釋一下:
P
隨機(jī)生成的,這樣可以模擬出不同高度的索引層。從這個(gè)簡(jiǎn)化版的跳表實(shí)現(xiàn)可以看到跳表的基本操作??梢詭椭覀兝斫馓韺?shí)現(xiàn)的原理。
在 Java 中實(shí)現(xiàn)跳表的并發(fā)操作需要考慮線程安全問(wèn)題。可以通過(guò)以下方法來(lái)實(shí)現(xiàn):
synchronized
關(guān)鍵字來(lái)確保同一時(shí)間只有一個(gè)線程可以執(zhí)行方法。ReentrantLock
或其他并發(fā)鎖來(lái)控制對(duì)跳表的并發(fā)訪問(wèn)。AtomicReference
或 AtomicReferenceArray
來(lái)確保節(jié)點(diǎn)的原子更新。ConcurrentHashMap
等并發(fā)集合作為輔助工具來(lái)實(shí)現(xiàn)線程安全的跳表。
以下是一個(gè)使用 synchronized
關(guān)鍵字實(shí)現(xiàn)線程安全的跳表的示例:
import java.util.Random;
public class ConcurrentSkipList {
private static final double P = 0.5; // 節(jié)點(diǎn)晉升的概率
private static final int MAX_LEVEL = 16; // 最大層數(shù)
private int levelCount; // 當(dāng)前跳表的層數(shù)
private Node head; // 頭節(jié)點(diǎn)
private int size; // 跳表中元素的數(shù)量
private Random random; // 隨機(jī)數(shù)生成器
public ConcurrentSkipList() {
this.levelCount = 0;
this.size = 0;
this.head = new Node(-1, Integer.MIN_VALUE, MAX_LEVEL);
this.random = new Random();
}
private class Node {
int value;
int key;
Node[] forward; // 向前指針數(shù)組
Node(int value, int key, int level) {
this.value = value;
this.key = key;
this.forward = new Node[level + 1];
}
}
private int randomLevel() {
int level = 0;
while (random.nextFloat() < P && level < MAX_LEVEL) {
level++;
}
return level;
}
public synchronized Node search(int key) {
Node current = head;
for (int i = levelCount - 1; i >= 0; i--) {
while (current.forward[i] != null && current.forward[i].key < key) {
current = current.forward[i];
}
}
current = current.forward[0];
return current;
}
public synchronized void insert(int key, int value) {
Node[] update = new Node[MAX_LEVEL + 1];
Node current = head;
for (int i = levelCount - 1; i >= 0; i--) {
while (current.forward[i] != null && current.forward[i].key < key) {
current = current.forward[i];
}
update[i] = current;
}
int level = randomLevel();
if (level > levelCount) {
for (int i = levelCount; i < level; i++) {
update[i] = head;
}
levelCount = level;
}
current = new Node(value, key, level);
for (int i = 0; i < level; i++) {
current.forward[i] = update[i].forward[i];
update[i].forward[i] = current;
}
size++;
}
public synchronized void delete(int key) {
Node[] update = new Node[MAX_LEVEL + 1];
Node current = head;
for (int i = levelCount - 1; i >= 0; i--) {
while (current.forward[i] != null && current.forward[i].key < key) {
current = current.forward[i];
}
update[i] = current;
}
current = current.forward[0];
if (current != null && current.key == key) {
for (int i = 0; i < levelCount; i++) {
if (update[i].forward[i] != current) {
break;
}
update[i].forward[i] = current.forward[i];
}
size--;
while (levelCount > 0 && head.forward[levelCount - 1] == null) {
levelCount--;
}
}
}
public synchronized void printList() {
Node current = head.forward[0];
while (current != null) {
System.out.println(current.key + ":" + current.value);
current = current.forward[0];
}
}
public static void main(String[] args) {
ConcurrentSkipList list = new ConcurrentSkipList();
list.insert(3, 100);
list.insert(6, 300);
list.insert(4, 400);
list.insert(7, 700);
list.insert(5, 500);
list.printList();
list.delete(3);
list.printList();
}
}
在這個(gè)示例中,我們使用了 synchronized
關(guān)鍵字來(lái)確保 search
、insert
和 delete
方法是線程安全的。這會(huì)鎖定當(dāng)前對(duì)象,確保同一時(shí)間只有一個(gè)線程可以執(zhí)行這些方法。
請(qǐng)注意,雖然 synchronized
可以提供線程安全,但它也可能導(dǎo)致性能瓶頸,特別是在高并發(fā)環(huán)境下。在實(shí)際應(yīng)用中,可以考慮使用更細(xì)粒度的鎖,如 ReentrantLock
,或者使用原子引用和其他并發(fā)工具來(lái)提高性能。
ReentrantLock
的實(shí)現(xiàn)方式
使用 ReentrantLock
實(shí)現(xiàn)跳表的并發(fā)操作可以提供比 synchronized
更細(xì)粒度的鎖定,從而提高并發(fā)性能。ReentrantLock
允許您在不同的方法中鎖定和解鎖,甚至可以在不同的類中使用同一個(gè)鎖對(duì)象。
以下是使用 ReentrantLock
實(shí)現(xiàn)線程安全的跳表的示例:
import java.util.Random;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConcurrentSkipList {
private static final double P = 0.5; // 節(jié)點(diǎn)晉升的概率
private static final int MAX_LEVEL = 16; // 最大層數(shù)
private final Lock lock = new ReentrantLock(); // 創(chuàng)建一個(gè) ReentrantLock 對(duì)象
private int levelCount; // 當(dāng)前跳表的層數(shù)
private Node head; // 頭節(jié)點(diǎn)
private int size; // 跳表中元素的數(shù)量
private Random random; // 隨機(jī)數(shù)生成器
public ConcurrentSkipList() {
this.levelCount = 0;
this.size = 0;
this.head = new Node(Integer.MIN_VALUE, MAX_LEVEL);
this.random = new Random();
}
private class Node {
int value;
int key;
Node[] forward; // 向前指針數(shù)組
Node(int key, int level) {
this.key = key;
this.forward = new Node[level + 1];
}
}
private int randomLevel() {
int level = 0;
while (random.nextFloat() < P && level < MAX_LEVEL) {
level++;
}
return level;
}
public void search(int key) {
lock.lock(); // 加鎖
try {
Node current = head;
for (int i = levelCount - 1; i >= 0; i--) {
while (current.forward[i] != null && current.forward[i].key < key) {
current = current.forward[i];
}
}
current = current.forward[0];
// ... 處理找到的節(jié)點(diǎn)
} finally {
lock.unlock(); // 釋放鎖
}
}
public void insert(int key, int value) {
lock.lock(); // 加鎖
try {
Node[] update = new Node[MAX_LEVEL + 1];
Node current = head;
for (int i = levelCount - 1; i >= 0; i--) {
while (current.forward[i] != null && current.forward[i].key < key) {
current = current.forward[i];
}
update[i] = current;
}
int level = randomLevel();
if (level > levelCount) {
levelCount = level;
}
current = new Node(value, key, level);
for (int i = 0; i < level; i++) {
current.forward[i] = update[i].forward[i];
update[i].forward[i] = current;
}
size++;
} finally {
lock.unlock(); // 釋放鎖
}
}
public void delete(int key) {
lock.lock(); // 加鎖
try {
Node[] update = new Node[MAX_LEVEL + 1];
Node current = head;
for (int i = levelCount - 1; i >= 1; i--) {
while (current.forward[i] != null && current.forward[i].key < key) {
current = current.forward[i];
}
update[i] = current;
}
current = current.forward[0];
if (current != null && current.key == key) {
for (int i = 0; i < levelCount; i++) {
if (update[i].forward[i] != current) {
break;
}
update[i].forward[i] = current.forward[i];
}
size--;
while (levelCount > 0 && head.forward[levelCount - 1] == null) {
levelCount--;
}
}
} finally {
lock.unlock(); // 釋放鎖
}
}
public void printList() {
lock.lock(); // 加鎖
try {
Node current = head.forward[0];
while (current != null) {
System.out.println(current.key + ":" + current.value);
current = current.forward[0];
}
} finally {
lock.unlock(); // 釋放鎖
}
}
public static void main(String[] args) {
ConcurrentSkipList list = new ConcurrentSkipList();
list.insert(3, 100);
list.insert(6, 300);
list.insert(4, 400);
list.insert(7, 700);
list.insert(5, 500);
list.printList();
list.delete(3);
list.printList();
}
}
在這個(gè)示例中,我們使用了 ReentrantLock
對(duì)象來(lái)控制對(duì)跳表的并發(fā)訪問(wèn)。每個(gè)公共方法在執(zhí)行之前都會(huì)調(diào)用 lock.lock()
加鎖,并在執(zhí)行完畢后(包括正常執(zhí)行和異常退出)調(diào)用 lock.unlock()
釋放鎖。
使用 ReentrantLock
的好處是它提供了比 synchronized
更靈活的鎖定機(jī)制,例如:
ReentrantLock
允許線程在嘗試獲取鎖時(shí)被中斷。ReentrantLock
提供了 tryLock()
方法,允許線程嘗試獲取鎖而不無(wú)限等待。ReentrantLock
還提供了 tryLock(long timeout, TimeUnit unit)
方法,允許線程在指定時(shí)間內(nèi)等待鎖。ReentrantLock
可以選擇是否使用公平鎖,公平鎖會(huì)按照線程請(qǐng)求鎖的順序來(lái)分配鎖。ReentrantLock
可以與多個(gè) Condition
對(duì)象配合使用,而 synchronized
只能與一個(gè)條件變量配合使用。理解了以上代碼實(shí)現(xiàn)原理后,我們?cè)賮?lái)理解 Redis Sorted Set 就不難了。
Redis 的 Sorted Set 是一種包含元素和關(guān)聯(lián)分?jǐn)?shù)的數(shù)據(jù)結(jié)構(gòu),它能夠根據(jù)分?jǐn)?shù)對(duì)元素進(jìn)行排序。Sorted Set 在 Redis 中的內(nèi)部實(shí)現(xiàn)可以是跳躍表(Skip List)和字典(Hash Table)的組合,或者是壓縮列表(Zip List),具體使用哪種實(shí)現(xiàn)取決于 Sorted Set 的大小和元素的長(zhǎng)度。
當(dāng) Sorted Set 的元素?cái)?shù)量較多或者元素長(zhǎng)度較長(zhǎng)時(shí),Redis 使用跳躍表和字典來(lái)實(shí)現(xiàn) Sorted Set。跳躍表是一種概率平衡的數(shù)據(jù)結(jié)構(gòu),它通過(guò)多級(jí)索引來(lái)提高搜索效率,類似于二分查找。字典則用于快速查找和更新操作。
跳躍表的每個(gè)節(jié)點(diǎn)包含以下信息:
字典則存儲(chǔ)了元素到分?jǐn)?shù)的映射,以便快速訪問(wèn)。
當(dāng) Sorted Set 的元素?cái)?shù)量較少(默認(rèn)小于128個(gè)),并且所有元素的長(zhǎng)度都小于64字節(jié)時(shí),Redis 使用壓縮列表來(lái)存儲(chǔ) Sorted Set。壓縮列表是一種連續(xù)內(nèi)存塊的順序存儲(chǔ)結(jié)構(gòu),它將所有的元素和分?jǐn)?shù)緊湊地存儲(chǔ)在一起,以節(jié)省內(nèi)存空間。
Sorted Set 常用于以下場(chǎng)景:
在 Redis 源碼中,Sorted Set 的實(shí)現(xiàn)主要在 t_zset.c
文件中。插入操作(zaddCommand
)會(huì)根據(jù) Sorted Set 的編碼類型(跳躍表或壓縮列表)來(lái)執(zhí)行不同的邏輯。如果是跳躍表編碼,那么插入操作會(huì)涉及到字典的更新和跳躍表節(jié)點(diǎn)的添加。如果是壓縮列表編碼,則會(huì)檢查是否需要轉(zhuǎn)換為跳躍表編碼。
Sorted Set 是 Redis 提供的一種強(qiáng)大的有序數(shù)據(jù)結(jié)構(gòu),它結(jié)合了跳躍表和字典的優(yōu)點(diǎn),提供了高效的插入、刪除、更新和范圍查詢操作。通過(guò)合理的使用 Sorted Set,可以有效地解決許多實(shí)際問(wèn)題。如何以上內(nèi)容對(duì)你有幫助,懇請(qǐng)點(diǎn)贊轉(zhuǎn)發(fā),V 哥在此感謝各位兄弟的支持。88,洗洗睡了。
更多建議: