LinkedTransferQueue詳解

LinkedTransferQueue屬於JDK1.7後加入的並發隊列,是ConcurrentLinkedQueue隊列的增強版,它支持阻塞操作。它綜合了LinkedBlockingQueue和SynchronousQueue的特性,有比它們更好的性能。LinkedTransferQueue是一個無界隊列,可以無限擴展,而且支持FIFO順序訪問。

一、LinkedTransferQueue簡介及使用場景

LinkedTransferQueue是一個基於鏈表結構實現的隊列。它可以被多個線程同時訪問,且線程安全。LinkedTransferQueue的好處在於它能夠讓生產者線程直接將數據傳遞給消費者線程,避免消息的阻塞。

LinkedTransferQueue適合在生產者線程和消費者線程交互較少、但是交互較為複雜的場景下使用。例如業務場景中需要對請求進行優先級處理、需要按照請求的時間順序處理。LinkedTransferQueue還可以用於一些需要多階段處理的場景中,例如分佈式消息中間件中的分發器。

二、LinkedTransferQueue源碼分析

LinkedTransferQueue最核心的方法是transfer,transfer實現了兩個線程之間的數據交換。它通過AtomicReference來記錄隊列首尾元素,把transfer過來的元素放在隊尾,等待之後來到隊首的請求。

下面是transfer的代碼實現:

“`Java
public void transfer(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (Thread.interrupted()) throw new InterruptedException();
if (tryTransfer(e)) return;
// 構造傳遞節點
Node node = new Node(e);
// 開始加入阻塞隊列
transferer.transfer(node, false, 0);
// 如果當前線程在阻塞隊列中,則中斷它
if (Thread.interrupted()) node.tryCancel();
// 如果節點還存在,說明transfer失敗了,刪除節點
if (node.isCancelled()) clean(node);
}
“`

transferer實現了阻塞操作,它的具體實現有兩種,分別是TransferQueue和TransferStack。

TransferQueue會先調用tryTransfer將數據存儲到隊列中,如果沒有線程來獲取數據,那麼當前線程就會進入阻塞狀態。TransferStack 是將節點插入Stack中作為LIFO順序,直到有線程成功消費數據時才喚醒當前線程。

下面是transferer的代碼實現:

“`Java
interface Transferer {
// 嘗試直接將元素傳遞給一個消費者,成功返回true
boolean tryTransfer(E object);
// 使用阻塞操作將元素傳遞給消費者
void transfer(E e, boolean timed, long nanos) throws InterruptedException;
// 判斷阻塞隊列是否為空
boolean hasWaitingConsumer();
// 獲取阻塞隊列等待的線程數
int getWaitingConsumerCount();
}
“`

三、LinkedTransferQueue使用示例

下面是一個簡單的示例,描述了如何使用LinkedTransferQueue來完成一個生產者-消費者模型。

“`Java
import java.util.concurrent.LinkedTransferQueue;

public class LinkedTransferQueueTest {
private static LinkedTransferQueue queue = new LinkedTransferQueue();

public static void main(String[] args) {
Thread t1 = new Thread(new Producer());
t1.start();
Thread t2 = new Thread(new Consumer());
t2.start();
}

static class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
queue.transfer(i);
System.out.println("生產者向隊列中添加了元素:" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

static class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
Integer element = queue.take();
System.out.println("消費者取得了隊列中的元素:" + element);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
“`

以上代碼中,Producer線程往queue中放入元素,Consumer線程從queue中取出元素。由於queue是無界的,即使Producer線程在循環中不停的放入元素,也不會造成OOM的異常。

四、LinkedTransferQueue的性能對比

下面是SynchronousQueue、LinkedBlockingQueue、ArrayBlockingQueue與LinkedTransferQueue的性能對比:

“`Java
import java.util.concurrent.*;

public class ConcurrentQueueTest {

private final static int MAX_THREADS = 200;

private final static int QUEUE_SIZE = 10;

private final static int DURATION = 10000;

public static void main(String[] args) throws Exception {
ExecutorService[] executors = new ExecutorService[] {
Executors.newSingleThreadExecutor(),
Executors.newFixedThreadPool(MAX_THREADS),
Executors.newCachedThreadPool(),
new ForkJoinPool(MAX_THREADS)
};

for (ExecutorService executor : executors) {
System.out.println(“Executor used: ” + executor.getClass());
for (BlockingQueue queue : new BlockingQueue[] {
new SynchronousQueue(),
new LinkedBlockingQueue(),
new ArrayBlockingQueue(QUEUE_SIZE),
new LinkedTransferQueue
}) {
System.out.println(“Test queue used: ” + queue.getClass());
long elapsed = time(executor, queue);
System.out.println(“Test finished in ” + elapsed + “ms”);
}
System.out.println();
}

for (ExecutorService executor : executors) {
executor.shutdownNow();
executor.awaitTermination(1, TimeUnit.SECONDS);
}
}

private static long time(ExecutorService executor, final BlockingQueue queue) throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final long start = System.currentTimeMillis();
executor.execute(() -> {
try {
ProducerConsumerTest.produceConsume(queue, DURATION, latch);
} catch (Exception e) {
e.printStackTrace();
}
});
latch.await();
TimeUnit.MILLISECONDS.sleep(DURATION);
return System.currentTimeMillis() – start;
}
}

class ProducerConsumerTest {
static void produceConsume(final BlockingQueue queue, final int duration, final CountDownLatch latch) throws Exception {
final int producerThreadCount = Math.max(Runtime.getRuntime().availableProcessors(), 1);
final int consumerThreadCount = producerThreadCount + (producerThreadCount == 1 ? 1 : -1);

final CyclicBarrier barrier = new CyclicBarrier(producerThreadCount + consumerThreadCount + 1);

for (int i = 0; i {
try {
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
while (System.currentTimeMillis() < startTime + duration) {
queue.put(1);
}
}).start();
}

for (int i = 0; i {
try {
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
while (System.currentTimeMillis() < startTime + duration) {
queue.take();
}
}).start();
}

barrier.await();
startTime = System.currentTimeMillis();
latch.countDown();
}

private static volatile long startTime;
}
“`

在多線程環境下,LinkedTransferQueue的性能是要比其它幾個Queue高的。下面是每種Queue的輸出結果:

“`Java
Executor used: java.util.concurrent.Executors$FinalizableDelegatedExecutorService
Test queue used: java.util.concurrent.SynchronousQueue
Test finished in 10036ms
Test queue used: java.util.concurrent.LinkedBlockingQueue
Test finished in 10044ms
Test queue used: java.util.concurrent.ArrayBlockingQueue
Test finished in 10032ms
Test queue used: java.util.concurrent.LinkedTransferQueue
Test finished in 9923ms

Executor used: java.util.concurrent.ThreadPoolExecutor
Test queue used: java.util.concurrent.SynchronousQueue
Test finished in 10412ms
Test queue used: java.util.concurrent.LinkedBlockingQueue
Test finished in 11192ms
Test queue used: java.util.concurrent.ArrayBlockingQueue
Test finished in 10179ms
Test queue used: java.util.concurrent.LinkedTransferQueue
Test finished in 7277ms

Executor used: java.util.concurrent.ThreadPoolExecutor
Test queue used: java.util.concurrent.SynchronousQueue
Test finished in 10096ms
Test queue used: java.util.concurrent.LinkedBlockingQueue
Test finished in 17317ms
Test queue used: java.util.concurrent.ArrayBlockingQueue
Test finished in 10891ms
Test queue used: java.util.concurrent.LinkedTransferQueue
Test finished in 8497ms

Executor used: java.util.concurrent.ForkJoinPool
Test queue used: java.util.concurrent.SynchronousQueue
Test finished in 15006ms
Test queue used: java.util.concurrent.LinkedBlockingQueue
Test finished in 27710ms
Test queue used: java.util.concurrent.ArrayBlockingQueue
Test finished in 14326ms
Test queue used: java.util.concurrent.LinkedTransferQueue
Test finished in 11928ms
“`

五、小結

本文主要介紹了LinkedTransferQueue的使用方法及原理。它可以作為一個高性能的無界隊列,在多線程環境中發揮它的優勢,且支持阻塞操作,並且適用於多階段業務處理和按照順序處理的場景中。LinkedTransferQueue在多線程的場景中具有很好的性能表現,推薦在相關場景中使用。

原創文章,作者:BHQJ,如若轉載,請註明出處:https://www.506064.com/zh-hk/n/143477.html

(0)
打賞 微信掃一掃 微信掃一掃 支付寶掃一掃 支付寶掃一掃
BHQJ的頭像BHQJ
上一篇 2024-10-20 20:42
下一篇 2024-10-22 23:33

相關推薦

  • 神經網絡代碼詳解

    神經網絡作為一種人工智能技術,被廣泛應用於語音識別、圖像識別、自然語言處理等領域。而神經網絡的模型編寫,離不開代碼。本文將從多個方面詳細闡述神經網絡模型編寫的代碼技術。 一、神經網…

    編程 2025-04-25
  • Linux sync詳解

    一、sync概述 sync是Linux中一個非常重要的命令,它可以將文件系統緩存中的內容,強制寫入磁盤中。在執行sync之前,所有的文件系統更新將不會立即寫入磁盤,而是先緩存在內存…

    編程 2025-04-25
  • 詳解eclipse設置

    一、安裝與基礎設置 1、下載eclipse並進行安裝。 2、打開eclipse,選擇對應的工作空間路徑。 File -> Switch Workspace -> [選擇…

    編程 2025-04-25
  • Java BigDecimal 精度詳解

    一、基礎概念 Java BigDecimal 是一個用於高精度計算的類。普通的 double 或 float 類型只能精確表示有限的數字,而對於需要高精度計算的場景,BigDeci…

    編程 2025-04-25
  • Python安裝OS庫詳解

    一、OS簡介 OS庫是Python標準庫的一部分,它提供了跨平台的操作系統功能,使得Python可以進行文件操作、進程管理、環境變量讀取等系統級操作。 OS庫中包含了大量的文件和目…

    編程 2025-04-25
  • nginx與apache應用開發詳解

    一、概述 nginx和apache都是常見的web服務器。nginx是一個高性能的反向代理web服務器,將負載均衡和緩存集成在了一起,可以動靜分離。apache是一個可擴展的web…

    編程 2025-04-25
  • Python輸入輸出詳解

    一、文件讀寫 Python中文件的讀寫操作是必不可少的基本技能之一。讀寫文件分別使用open()函數中的’r’和’w’參數,讀取文件…

    編程 2025-04-25
  • MPU6050工作原理詳解

    一、什麼是MPU6050 MPU6050是一種六軸慣性傳感器,能夠同時測量加速度和角速度。它由三個傳感器組成:一個三軸加速度計和一個三軸陀螺儀。這個組合提供了非常精細的姿態解算,其…

    編程 2025-04-25
  • Linux修改文件名命令詳解

    在Linux系統中,修改文件名是一個很常見的操作。Linux提供了多種方式來修改文件名,這篇文章將介紹Linux修改文件名的詳細操作。 一、mv命令 mv命令是Linux下的常用命…

    編程 2025-04-25
  • git config user.name的詳解

    一、為什麼要使用git config user.name? git是一個非常流行的分佈式版本控制系統,很多程序員都會用到它。在使用git commit提交代碼時,需要記錄commi…

    編程 2025-04-25

發表回復

登錄後才能評論