一、CyclicBarrier简介
CyclicBarrier是Java并发包中的一个实用工具, 它可以用于多线程间的同步。它让一组线程在某个节点处等待,直到所有线程都到达该节点,然后再一起继续执行后续任务。
CyclicBarrier可以用于以下场景:
- 分阶段执行任务:将任务分为若干个阶段,在每个阶段结束后等待所有线程到达这个节点,然后再开始后续的阶段。
- 分治计算:将大规模的计算任务分解为若干小的计算任务,在每个小任务完成后等待其他小任务完成,最终再合并结果。
二、CyclicBarrier的用法
1. 创建CyclicBarrier对象
CyclicBarrier cyclicBarrier = new CyclicBarrier(int parties, Runnable action);
其中parties是需要等待的线程数,action是在所有线程都到达屏障时需要执行的任务。CyclicBarrier还有一个重载方法,不需要指定Runnable任务。
CyclicBarrier cyclicBarrier = new CyclicBarrier(int parties);
2. 等待线程到达屏障点
cyclicBarrier.await();
调用await方法的线程会在此处等待,知道全部线程到达该屏障点,才会一起继续执行。
3. CyclicBarrier的reset方法
cyclicBarrier.reset();
reset方法可以使CyclicBarrier的状态恢复到初始化状态,方便在重复使用CyclicBarrier时调用。
三、完整代码示例
1. 在主线程中启动多个其他线程,等待所有线程完成后,主线程再继续执行
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest {
private static final int THREADS = 5;
public static void main(String[] args) {
Runnable barrierAction = new Runnable() {
public void run() {
System.out.println("All threads have arrived at the barrier.");
}
};
CyclicBarrier barrier = new CyclicBarrier(THREADS, barrierAction);
for (int i = 0; i < THREADS; i++) {
Thread thread = new Thread(new Worker(barrier), "Thread " + i);
thread.start();
}
System.out.println("Main thread continues to do its work.");
}
static class Worker implements Runnable {
private final CyclicBarrier cyclicBarrier;
public Worker(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
public void run() {
try {
System.out.println("Thread " + Thread.currentThread().getName() + " is doing some work.");
Thread.sleep(1000);
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
2. 多个线程分别执行不同任务,等待所有任务执行完成后再继续
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest {
private static final int THREADS = 5;
public static void main(String[] args) {
Runnable barrierAction = new Runnable() {
public void run() {
System.out.println("All threads have arrived at the barrier.");
}
};
CyclicBarrier barrier = new CyclicBarrier(THREADS, barrierAction);
for (int i = 0; i < THREADS; i++) {
Thread thread = new Thread(new Worker(barrier, i), "Thread " + i);
thread.start();
}
System.out.println("Main thread continues to do its work.");
}
static class Worker implements Runnable {
private final CyclicBarrier cyclicBarrier;
private final int workId;
public Worker(CyclicBarrier cyclicBarrier, int workId) {
this.cyclicBarrier = cyclicBarrier;
this.workId = workId;
}
public void run() {
try {
System.out.println("Thread " + Thread.currentThread().getName() + " is doing work " + workId);
Thread.sleep((workId + 1) * 1000);
cyclicBarrier.await();
System.out.println("Thread " + Thread.currentThread().getName() + " continues to do its work after barrier.");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
参考资料
原创文章,作者:小蓝,如若转载,请注明出处:https://www.506064.com/n/284933.html
微信扫一扫
支付宝扫一扫