LinkedTransferQueue屬於JDK1.7後加入的並發隊列,是ConcurrentLinkedQueue隊列的增強版,它支持阻塞操作。它綜合了LinkedBlockingQueue和SynchronousQueue的特性,有比它們更好的性能。LinkedTransferQueue是一個無界隊列,可以無限擴展,而且支持FIFO順序訪問。
一、LinkedTransferQueue簡介及使用場景
LinkedTransferQueue是一個基於鏈表結構實現的隊列。它可以被多個線程同時訪問,且線程安全。LinkedTransferQueue的好處在於它能夠讓生產者線程直接將數據傳遞給消費者線程,避免消息的阻塞。
LinkedTransferQueue適合在生產者線程和消費者線程交互較少、但是交互較為複雜的場景下使用。例如業務場景中需要對請求進行優先順序處理、需要按照請求的時間順序處理。LinkedTransferQueue還可以用於一些需要多階段處理的場景中,例如分散式消息中間件中的分發器。
二、LinkedTransferQueue源碼分析
LinkedTransferQueue最核心的方法是transfer,transfer實現了兩個線程之間的數據交換。它通過AtomicReference來記錄隊列首尾元素,把transfer過來的元素放在隊尾,等待之後來到隊首的請求。
下面是transfer的代碼實現:
“`Java
public void transfer(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (Thread.interrupted()) throw new InterruptedException();
if (tryTransfer(e)) return;
// 構造傳遞節點
Node node = new Node(e);
// 開始加入阻塞隊列
transferer.transfer(node, false, 0);
// 如果當前線程在阻塞隊列中,則中斷它
if (Thread.interrupted()) node.tryCancel();
// 如果節點還存在,說明transfer失敗了,刪除節點
if (node.isCancelled()) clean(node);
}
“`
transferer實現了阻塞操作,它的具體實現有兩種,分別是TransferQueue和TransferStack。
TransferQueue會先調用tryTransfer將數據存儲到隊列中,如果沒有線程來獲取數據,那麼當前線程就會進入阻塞狀態。TransferStack 是將節點插入Stack中作為LIFO順序,直到有線程成功消費數據時才喚醒當前線程。
下面是transferer的代碼實現:
“`Java
interface Transferer {
// 嘗試直接將元素傳遞給一個消費者,成功返回true
boolean tryTransfer(E object);
// 使用阻塞操作將元素傳遞給消費者
void transfer(E e, boolean timed, long nanos) throws InterruptedException;
// 判斷阻塞隊列是否為空
boolean hasWaitingConsumer();
// 獲取阻塞隊列等待的線程數
int getWaitingConsumerCount();
}
“`
三、LinkedTransferQueue使用示例
下面是一個簡單的示例,描述了如何使用LinkedTransferQueue來完成一個生產者-消費者模型。
“`Java
import java.util.concurrent.LinkedTransferQueue;
public class LinkedTransferQueueTest {
private static LinkedTransferQueue queue = new LinkedTransferQueue();
public static void main(String[] args) {
Thread t1 = new Thread(new Producer());
t1.start();
Thread t2 = new Thread(new Consumer());
t2.start();
}
static class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
queue.transfer(i);
System.out.println("生產者向隊列中添加了元素:" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
Integer element = queue.take();
System.out.println("消費者取得了隊列中的元素:" + element);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
“`
以上代碼中,Producer線程往queue中放入元素,Consumer線程從queue中取出元素。由於queue是無界的,即使Producer線程在循環中不停的放入元素,也不會造成OOM的異常。
四、LinkedTransferQueue的性能對比
下面是SynchronousQueue、LinkedBlockingQueue、ArrayBlockingQueue與LinkedTransferQueue的性能對比:
“`Java
import java.util.concurrent.*;
public class ConcurrentQueueTest {
private final static int MAX_THREADS = 200;
private final static int QUEUE_SIZE = 10;
private final static int DURATION = 10000;
public static void main(String[] args) throws Exception {
ExecutorService[] executors = new ExecutorService[] {
Executors.newSingleThreadExecutor(),
Executors.newFixedThreadPool(MAX_THREADS),
Executors.newCachedThreadPool(),
new ForkJoinPool(MAX_THREADS)
};
for (ExecutorService executor : executors) {
System.out.println(“Executor used: ” + executor.getClass());
for (BlockingQueue queue : new BlockingQueue[] {
new SynchronousQueue(),
new LinkedBlockingQueue(),
new ArrayBlockingQueue(QUEUE_SIZE),
new LinkedTransferQueue
}) {
System.out.println(“Test queue used: ” + queue.getClass());
long elapsed = time(executor, queue);
System.out.println(“Test finished in ” + elapsed + “ms”);
}
System.out.println();
}
for (ExecutorService executor : executors) {
executor.shutdownNow();
executor.awaitTermination(1, TimeUnit.SECONDS);
}
}
private static long time(ExecutorService executor, final BlockingQueue queue) throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final long start = System.currentTimeMillis();
executor.execute(() -> {
try {
ProducerConsumerTest.produceConsume(queue, DURATION, latch);
} catch (Exception e) {
e.printStackTrace();
}
});
latch.await();
TimeUnit.MILLISECONDS.sleep(DURATION);
return System.currentTimeMillis() – start;
}
}
class ProducerConsumerTest {
static void produceConsume(final BlockingQueue queue, final int duration, final CountDownLatch latch) throws Exception {
final int producerThreadCount = Math.max(Runtime.getRuntime().availableProcessors(), 1);
final int consumerThreadCount = producerThreadCount + (producerThreadCount == 1 ? 1 : -1);
final CyclicBarrier barrier = new CyclicBarrier(producerThreadCount + consumerThreadCount + 1);
for (int i = 0; i {
try {
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
while (System.currentTimeMillis() < startTime + duration) {
queue.put(1);
}
}).start();
}
for (int i = 0; i {
try {
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
while (System.currentTimeMillis() < startTime + duration) {
queue.take();
}
}).start();
}
barrier.await();
startTime = System.currentTimeMillis();
latch.countDown();
}
private static volatile long startTime;
}
“`
在多線程環境下,LinkedTransferQueue的性能是要比其它幾個Queue高的。下面是每種Queue的輸出結果:
“`Java
Executor used: java.util.concurrent.Executors$FinalizableDelegatedExecutorService
Test queue used: java.util.concurrent.SynchronousQueue
Test finished in 10036ms
Test queue used: java.util.concurrent.LinkedBlockingQueue
Test finished in 10044ms
Test queue used: java.util.concurrent.ArrayBlockingQueue
Test finished in 10032ms
Test queue used: java.util.concurrent.LinkedTransferQueue
Test finished in 9923ms
Executor used: java.util.concurrent.ThreadPoolExecutor
Test queue used: java.util.concurrent.SynchronousQueue
Test finished in 10412ms
Test queue used: java.util.concurrent.LinkedBlockingQueue
Test finished in 11192ms
Test queue used: java.util.concurrent.ArrayBlockingQueue
Test finished in 10179ms
Test queue used: java.util.concurrent.LinkedTransferQueue
Test finished in 7277ms
Executor used: java.util.concurrent.ThreadPoolExecutor
Test queue used: java.util.concurrent.SynchronousQueue
Test finished in 10096ms
Test queue used: java.util.concurrent.LinkedBlockingQueue
Test finished in 17317ms
Test queue used: java.util.concurrent.ArrayBlockingQueue
Test finished in 10891ms
Test queue used: java.util.concurrent.LinkedTransferQueue
Test finished in 8497ms
Executor used: java.util.concurrent.ForkJoinPool
Test queue used: java.util.concurrent.SynchronousQueue
Test finished in 15006ms
Test queue used: java.util.concurrent.LinkedBlockingQueue
Test finished in 27710ms
Test queue used: java.util.concurrent.ArrayBlockingQueue
Test finished in 14326ms
Test queue used: java.util.concurrent.LinkedTransferQueue
Test finished in 11928ms
“`
五、小結
本文主要介紹了LinkedTransferQueue的使用方法及原理。它可以作為一個高性能的無界隊列,在多線程環境中發揮它的優勢,且支持阻塞操作,並且適用於多階段業務處理和按照順序處理的場景中。LinkedTransferQueue在多線程的場景中具有很好的性能表現,推薦在相關場景中使用。
原創文章,作者:BHQJ,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/143477.html
微信掃一掃
支付寶掃一掃