一、CyclicBarrier簡介
CyclicBarrier是Java並發包中的一個實用工具, 它可以用於多線程間的同步。它讓一組線程在某個節點處等待,直到所有線程都到達該節點,然後再一起繼續執行後續任務。
CyclicBarrier可以用於以下場景:
- 分階段執行任務:將任務分為若干個階段,在每個階段結束後等待所有線程到達這個節點,然後再開始後續的階段。
- 分治計算:將大規模的計算任務分解為若干小的計算任務,在每個小任務完成後等待其他小任務完成,最終再合併結果。
二、CyclicBarrier的用法
1. 創建CyclicBarrier對象
CyclicBarrier cyclicBarrier = new CyclicBarrier(int parties, Runnable action);
其中parties是需要等待的線程數,action是在所有線程都到達屏障時需要執行的任務。CyclicBarrier還有一個重載方法,不需要指定Runnable任務。
CyclicBarrier cyclicBarrier = new CyclicBarrier(int parties);
2. 等待線程到達屏障點
cyclicBarrier.await();
調用await方法的線程會在此處等待,知道全部線程到達該屏障點,才會一起繼續執行。
3. CyclicBarrier的reset方法
cyclicBarrier.reset();
reset方法可以使CyclicBarrier的狀態恢復到初始化狀態,方便在重複使用CyclicBarrier時調用。
三、完整代碼示例
1. 在主線程中啟動多個其他線程,等待所有線程完成後,主線程再繼續執行
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest {
private static final int THREADS = 5;
public static void main(String[] args) {
Runnable barrierAction = new Runnable() {
public void run() {
System.out.println("All threads have arrived at the barrier.");
}
};
CyclicBarrier barrier = new CyclicBarrier(THREADS, barrierAction);
for (int i = 0; i < THREADS; i++) {
Thread thread = new Thread(new Worker(barrier), "Thread " + i);
thread.start();
}
System.out.println("Main thread continues to do its work.");
}
static class Worker implements Runnable {
private final CyclicBarrier cyclicBarrier;
public Worker(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
public void run() {
try {
System.out.println("Thread " + Thread.currentThread().getName() + " is doing some work.");
Thread.sleep(1000);
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
2. 多個線程分別執行不同任務,等待所有任務執行完成後再繼續
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest {
private static final int THREADS = 5;
public static void main(String[] args) {
Runnable barrierAction = new Runnable() {
public void run() {
System.out.println("All threads have arrived at the barrier.");
}
};
CyclicBarrier barrier = new CyclicBarrier(THREADS, barrierAction);
for (int i = 0; i < THREADS; i++) {
Thread thread = new Thread(new Worker(barrier, i), "Thread " + i);
thread.start();
}
System.out.println("Main thread continues to do its work.");
}
static class Worker implements Runnable {
private final CyclicBarrier cyclicBarrier;
private final int workId;
public Worker(CyclicBarrier cyclicBarrier, int workId) {
this.cyclicBarrier = cyclicBarrier;
this.workId = workId;
}
public void run() {
try {
System.out.println("Thread " + Thread.currentThread().getName() + " is doing work " + workId);
Thread.sleep((workId + 1) * 1000);
cyclicBarrier.await();
System.out.println("Thread " + Thread.currentThread().getName() + " continues to do its work after barrier.");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
參考資料
原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-hk/n/284933.html
微信掃一掃
支付寶掃一掃