同步器對象與一組線程一起使用。
它維護(hù)一個(gè)狀態(tài),根據(jù)它的狀態(tài),它讓一個(gè)線程通過或強(qiáng)迫它等待。
本節(jié)將討論四種類型的同步器:
信號量用于控制可以訪問資源的線程數(shù)。
java.util.concurrent包中的Semaphore類表示信號量同步器。
您可以使用其構(gòu)造函數(shù)創(chuàng)建信號量,如下所示:
final int MAX_PERMITS = 3; Semaphore s = new Semaphores(MAX_PERMITS);
Semaphore類的另一個(gè)構(gòu)造函數(shù)使用公平作為第二個(gè)參數(shù)
final int MAX_PERMITS = 3; Semaphore s = new Semaphores(MAX_PERMITS, true); // A fair semaphore
如果你創(chuàng)建一個(gè)公平的信號量,在多線程請求許可的情況下,信號量將保證先進(jìn)先出(FIFO)。也就是說,首先請求許可的線程將首先獲得許可。
要獲取許可證,請使用acquire()方法。
如果許可證可用,它立即返回。
它阻止如果許可證不可用。線程在等待許可證可用時(shí)可能中斷。
Semaphore類的其他方法允許您一次性獲取一個(gè)或多個(gè)許可證。要釋放許可證,請使用release()方法。
以下代碼顯示了一個(gè)Restaurant類,它使用信號量來控制對表的訪問。
import java.util.Random; import java.util.concurrent.Semaphore; class Restaurant { private Semaphore tables; public Restaurant(int tablesCount) { this.tables = new Semaphore(tablesCount); } public void getTable(int customerID) { try { System.out.println("Customer #" + customerID + " is trying to get a table."); tables.acquire(); System.out.println("Customer #" + customerID + " got a table."); } catch (InterruptedException e) { e.printStackTrace(); } } public void returnTable(int customerID) { System.out.println("Customer #" + customerID + " returned a table."); tables.release(); } } class RestaurantCustomer extends Thread { private Restaurant r; private int customerID; private static final Random random = new Random(); public RestaurantCustomer(Restaurant r, int customerID) { this.r = r; this.customerID = customerID; } public void run() { r.getTable(this.customerID); // Get a table try { int eatingTime = random.nextInt(30) + 1; System.out.println("Customer #" + this.customerID + " will eat for " + eatingTime + " seconds."); Thread.sleep(eatingTime * 1000); System.out.println("Customer #" + this.customerID + " is done eating."); } catch (InterruptedException e) { e.printStackTrace(); } finally { r.returnTable(this.customerID); } } } public class Main{ public static void main(String[] args) { Restaurant restaurant = new Restaurant(2); for (int i = 1; i <= 5; i++) { RestaurantCustomer c = new RestaurantCustomer(restaurant, i); c.start(); } } }
上面的代碼生成以下結(jié)果。
屏障使一組線在屏障點(diǎn)匯合。
來自到達(dá)屏障的組的線程等待,直到該組中的所有線程到達(dá)。
一旦組中的最后一個(gè)線程到達(dá)屏障,組中的所有線程都將被釋放。
當(dāng)你有一個(gè)可以分成子任務(wù)的任務(wù)時(shí),你可以使用一個(gè)屏障;每個(gè)子任務(wù)可以在單獨(dú)的線程中執(zhí)行,并且每個(gè)線程必須在共同點(diǎn)處相遇以組合它們的結(jié)果。
java.util.concurrent包中的CyclicBarrier類提供了屏障同步器的實(shí)現(xiàn)。
CyclicBarrier類可以通過調(diào)用其reset()方法來重用。
以下代碼顯示了如何在程序中使用循環(huán)障礙。
import java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; class Worker extends Thread { private CyclicBarrier barrier; private int ID; private static Random random = new Random(); public Worker(int ID, CyclicBarrier barrier) { this.ID = ID; this.barrier = barrier; } public void run() { try { int workTime = random.nextInt(30) + 1; System.out.println("Thread #" + ID + " is going to work for " + workTime + " seconds"); Thread.sleep(workTime * 1000); System.out.println("Thread #" + ID + " is waiting at the barrier."); this.barrier.await(); System.out.println("Thread #" + ID + " passed the barrier."); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { System.out.println("Barrier is broken."); } } } public class Main { public static void main(String[] args) { Runnable barrierAction = () -> System.out.println("We are ready."); CyclicBarrier barrier = new CyclicBarrier(3, barrierAction); for (int i = 1; i <= 3; i++) { Worker t = new Worker(i, barrier); t.start(); } } }
上面的代碼生成以下結(jié)果。
Phaser提供類似于CyclicBarrier和CountDownLatch同步器的功能。它提供以下功能:
Phaser是可重復(fù)使用的。
在Phaser上同步的參與方數(shù)量可以動(dòng)態(tài)更改。在循環(huán)障礙中,當(dāng)創(chuàng)建障礙時(shí),方的數(shù)量是固定的。
移相器具有相關(guān)的相位編號,從零開始。當(dāng)所有注冊方都到達(dá)移相器時(shí),移相器進(jìn)入下一個(gè)階段,階段編號加1。相位編號的最大值為Integer.MAX_VALUE。在其最大值之后,相位編號重新從零開始。
Phaser有終止?fàn)顟B(tài)。在終止?fàn)顟B(tài)的Phaser上調(diào)用的所有同步方法立即返回,而不等待提前。
移相器有三種類型的參與者計(jì)數(shù):注冊參與者計(jì)數(shù),到達(dá)參與者計(jì)數(shù)和未參與方計(jì)數(shù)。
注冊方數(shù)量是注冊同步的方的數(shù)量。到達(dá)的當(dāng)事方數(shù)目是已經(jīng)到達(dá)移相器的當(dāng)前階段的各方的數(shù)目。
未攜帶者數(shù)量是尚未到達(dá)移動(dòng)器的當(dāng)前階段的各方的數(shù)量。
當(dāng)最后一方到達(dá)時(shí),移相器前進(jìn)到下一階段。
或者,當(dāng)所有注冊方都到達(dá)移動(dòng)器時(shí),Phaser可以執(zhí)行移相器操作。
CyclicBarrier允許您執(zhí)行屏障操作,這是一個(gè)Runnable任務(wù)。
我們通過在Phaser類的onAdvance()方法中編寫代碼來指定移相器操作。
我們需要繼承Phaser類,并覆蓋onAdvance()方法以提供Phaser動(dòng)作。
以下代碼顯示了如何表示通過在Phaser上同步啟動(dòng)的任務(wù)
import java.util.Random; import java.util.concurrent.Phaser; class StartTogetherTask extends Thread { private Phaser phaser; private String taskName; private static Random rand = new Random(); public StartTogetherTask(String taskName, Phaser phaser) { this.taskName = taskName; this.phaser = phaser; } @Override public void run() { System.out.println(taskName + ":Initializing..."); int sleepTime = rand.nextInt(5) + 1; try { Thread.sleep(sleepTime * 1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(taskName + ":Initialized..."); phaser.arriveAndAwaitAdvance(); System.out.println(taskName + ":Started..."); } } public class Main { public static void main(String[] args) { Phaser phaser = new Phaser(1); for (int i = 1; i <= 3; i++) { phaser.register(); String taskName = "Task #" + i; StartTogetherTask task = new StartTogetherTask(taskName, phaser); task.start(); } phaser.arriveAndDeregister(); } }
上面的代碼生成以下結(jié)果。
以下代碼顯示了如何向Phaser添加Phaser Action。
import java.util.concurrent.Phaser; public class Main { public static void main(String[] args) { Phaser phaser = new Phaser() { protected boolean onAdvance(int phase, int parties) { System.out.println("Inside onAdvance(): phase = " + phase + ", Registered Parties = " + parties); // Do not terminate the phaser by returning false return false; } }; // Register the self (the "main" thread) as a party phaser.register(); System.out.println("#1: isTerminated():" + phaser.isTerminated()); phaser.arriveAndDeregister(); // Trigger another phase advance phaser.register(); phaser.arriveAndDeregister(); System.out.println("#2: isTerminated():" + phaser.isTerminated()); phaser.forceTermination(); System.out.println("#3: isTerminated():" + phaser.isTerminated()); } }
上面的代碼生成以下結(jié)果。
以下代碼顯示如何使用移相器生成一些整數(shù)。
import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.Phaser; class AdderTask extends Thread { private Phaser phaser; private String taskName; private List<Integer> list; public AdderTask(String taskName, Phaser phaser, List<Integer> list) { this.taskName = taskName; this.phaser = phaser; this.list = list; } @Override public void run() { do { System.out.println(taskName + " added " + 3); list.add(3); phaser.arriveAndAwaitAdvance(); } while (!phaser.isTerminated()); } } public class Main { public static void main(String[] args) { final int PHASE_COUNT = 2; Phaser phaser = new Phaser() { public boolean onAdvance(int phase, int parties) { System.out.println("Phase:" + phase + ", Parties:" + parties + ", Arrived:" + this.getArrivedParties()); boolean terminatePhaser = false; if (phase >= PHASE_COUNT - 1 || parties == 0) { terminatePhaser = true; } return terminatePhaser; } }; List<Integer> list = Collections.synchronizedList(new ArrayList<Integer>()); int ADDER_COUNT = 3; phaser.bulkRegister(ADDER_COUNT + 1); for (int i = 1; i <= ADDER_COUNT; i++) { String taskName = "Task #" + i; AdderTask task = new AdderTask(taskName, phaser, list); task.start(); } while (!phaser.isTerminated()) { phaser.arriveAndAwaitAdvance(); } int sum = 0; for (Integer num : list) { sum = sum + num; } System.out.println("Sum = " + sum); } }
上面的代碼生成以下結(jié)果。
鎖存器使一組線程等待,直到它到達(dá)其終端狀態(tài)。
一旦鎖存器達(dá)到其終端狀態(tài),它允許所有線程通過。
與障礙不同,它是一個(gè)一次性的對象。它不能被重置和重用。
使用鎖存器,其中在一定數(shù)量的一次性活動(dòng)完成之前,多個(gè)活動(dòng)不能進(jìn)行。
例如,一個(gè)服務(wù)不應(yīng)該啟動(dòng),直到它依賴的所有服務(wù)都已啟動(dòng)。
java.util.concurrent包中的CountDownLatch類提供了一個(gè)鎖存器的實(shí)現(xiàn)。
import java.util.concurrent.CountDownLatch; class LatchHelperService extends Thread { private int ID; private CountDownLatch latch; public LatchHelperService(int ID, CountDownLatch latch) { this.ID = ID; this.latch = latch; } public void run() { try { Thread.sleep(1000); System.out.println("Service #" + ID + " has started..."); } catch (InterruptedException e) { e.printStackTrace(); } finally { this.latch.countDown(); } } } class LatchMainService extends Thread { private CountDownLatch latch; public LatchMainService(CountDownLatch latch) { this.latch = latch; } public void run() { try { System.out.println("waiting for services to start."); latch.await(); System.out.println("started."); } catch (InterruptedException e) { e.printStackTrace(); } } } public class Main { public static void main(String[] args) { // Create a countdown latch with 2 as its counter CountDownLatch latch = new CountDownLatch(2); LatchMainService ms = new LatchMainService(latch); ms.start(); for (int i = 1; i <= 2; i++) { LatchHelperService lhs = new LatchHelperService(i, latch); lhs.start(); } } }
上面的代碼生成以下結(jié)果。
交換器允許兩個(gè)線程在同步點(diǎn)處等待彼此。
當(dāng)兩個(gè)線程到達(dá)時(shí),它們交換一個(gè)對象并繼續(xù)他們的活動(dòng)。
Exchanger類提供了交換器同步器的實(shí)現(xiàn)。
以下代碼顯示將使用交換器與客戶交換數(shù)據(jù)的生產(chǎn)者線程。
import java.util.ArrayList; import java.util.concurrent.Exchanger; class ExchangerProducer extends Thread { private Exchanger<ArrayList<Integer>> exchanger; private ArrayList<Integer> buffer = new ArrayList<Integer>(); public ExchangerProducer(Exchanger<ArrayList<Integer>> exchanger) { this.exchanger = exchanger; } public void run() { while (true) { try { System.out.println("Producer."); Thread.sleep(1000); fillBuffer(); System.out.println("Producer has produced and waiting:" + buffer); buffer = exchanger.exchange(buffer); } catch (InterruptedException e) { e.printStackTrace(); } } } public void fillBuffer() { for (int i = 0; i <= 3; i++) { buffer.add(i); } } } class ExchangerConsumer extends Thread { private Exchanger<ArrayList<Integer>> exchanger; private ArrayList<Integer> buffer = new ArrayList<Integer>(); public ExchangerConsumer(Exchanger<ArrayList<Integer>> exchanger) { this.exchanger = exchanger; } public void run() { while (true) { try { System.out.println("Consumer."); buffer = exchanger.exchange(buffer); System.out.println("Consumer has received:" + buffer); Thread.sleep(1000); System.out.println("eating:"+buffer); buffer.clear(); } catch (InterruptedException e) { e.printStackTrace(); } } } } public class Main { public static void main(String[] args) { Exchanger<ArrayList<Integer>> exchanger = new Exchanger<>(); ExchangerProducer producer = new ExchangerProducer(exchanger); ExchangerConsumer consumer = new ExchangerConsumer(exchanger); producer.start(); consumer.start(); } }
上面的代碼生成以下結(jié)果。
更多建議: