LinkedTransferQueue详解

LinkedTransferQueue属于JDK1.7后加入的并发队列,是ConcurrentLinkedQueue队列的增强版,它支持阻塞操作。它综合了LinkedBlockingQueue和SynchronousQueue的特性,有比它们更好的性能。LinkedTransferQueue是一个无界队列,可以无限扩展,而且支持FIFO顺序访问。

一、LinkedTransferQueue简介及使用场景

LinkedTransferQueue是一个基于链表结构实现的队列。它可以被多个线程同时访问,且线程安全。LinkedTransferQueue的好处在于它能够让生产者线程直接将数据传递给消费者线程,避免消息的阻塞。

LinkedTransferQueue适合在生产者线程和消费者线程交互较少、但是交互较为复杂的场景下使用。例如业务场景中需要对请求进行优先级处理、需要按照请求的时间顺序处理。LinkedTransferQueue还可以用于一些需要多阶段处理的场景中,例如分布式消息中间件中的分发器。

二、LinkedTransferQueue源码分析

LinkedTransferQueue最核心的方法是transfer,transfer实现了两个线程之间的数据交换。它通过AtomicReference来记录队列首尾元素,把transfer过来的元素放在队尾,等待之后来到队首的请求。

下面是transfer的代码实现:

“`Java
public void transfer(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (Thread.interrupted()) throw new InterruptedException();
if (tryTransfer(e)) return;
// 构造传递节点
Node node = new Node(e);
// 开始加入阻塞队列
transferer.transfer(node, false, 0);
// 如果当前线程在阻塞队列中,则中断它
if (Thread.interrupted()) node.tryCancel();
// 如果节点还存在,说明transfer失败了,删除节点
if (node.isCancelled()) clean(node);
}
“`

transferer实现了阻塞操作,它的具体实现有两种,分别是TransferQueue和TransferStack。

TransferQueue会先调用tryTransfer将数据存储到队列中,如果没有线程来获取数据,那么当前线程就会进入阻塞状态。TransferStack 是将节点插入Stack中作为LIFO顺序,直到有线程成功消费数据时才唤醒当前线程。

下面是transferer的代码实现:

“`Java
interface Transferer {
// 尝试直接将元素传递给一个消费者,成功返回true
boolean tryTransfer(E object);
// 使用阻塞操作将元素传递给消费者
void transfer(E e, boolean timed, long nanos) throws InterruptedException;
// 判断阻塞队列是否为空
boolean hasWaitingConsumer();
// 获取阻塞队列等待的线程数
int getWaitingConsumerCount();
}
“`

三、LinkedTransferQueue使用示例

下面是一个简单的示例,描述了如何使用LinkedTransferQueue来完成一个生产者-消费者模型。

“`Java
import java.util.concurrent.LinkedTransferQueue;

public class LinkedTransferQueueTest {
private static LinkedTransferQueue queue = new LinkedTransferQueue();

public static void main(String[] args) {
Thread t1 = new Thread(new Producer());
t1.start();
Thread t2 = new Thread(new Consumer());
t2.start();
}

static class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
queue.transfer(i);
System.out.println("生产者向队列中添加了元素:" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

static class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
try {
Integer element = queue.take();
System.out.println("消费者取得了队列中的元素:" + element);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
“`

以上代码中,Producer线程往queue中放入元素,Consumer线程从queue中取出元素。由于queue是无界的,即使Producer线程在循环中不停的放入元素,也不会造成OOM的异常。

四、LinkedTransferQueue的性能对比

下面是SynchronousQueue、LinkedBlockingQueue、ArrayBlockingQueue与LinkedTransferQueue的性能对比:

“`Java
import java.util.concurrent.*;

public class ConcurrentQueueTest {

private final static int MAX_THREADS = 200;

private final static int QUEUE_SIZE = 10;

private final static int DURATION = 10000;

public static void main(String[] args) throws Exception {
ExecutorService[] executors = new ExecutorService[] {
Executors.newSingleThreadExecutor(),
Executors.newFixedThreadPool(MAX_THREADS),
Executors.newCachedThreadPool(),
new ForkJoinPool(MAX_THREADS)
};

for (ExecutorService executor : executors) {
System.out.println(“Executor used: ” + executor.getClass());
for (BlockingQueue queue : new BlockingQueue[] {
new SynchronousQueue(),
new LinkedBlockingQueue(),
new ArrayBlockingQueue(QUEUE_SIZE),
new LinkedTransferQueue
}) {
System.out.println(“Test queue used: ” + queue.getClass());
long elapsed = time(executor, queue);
System.out.println(“Test finished in ” + elapsed + “ms”);
}
System.out.println();
}

for (ExecutorService executor : executors) {
executor.shutdownNow();
executor.awaitTermination(1, TimeUnit.SECONDS);
}
}

private static long time(ExecutorService executor, final BlockingQueue queue) throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final long start = System.currentTimeMillis();
executor.execute(() -> {
try {
ProducerConsumerTest.produceConsume(queue, DURATION, latch);
} catch (Exception e) {
e.printStackTrace();
}
});
latch.await();
TimeUnit.MILLISECONDS.sleep(DURATION);
return System.currentTimeMillis() – start;
}
}

class ProducerConsumerTest {
static void produceConsume(final BlockingQueue queue, final int duration, final CountDownLatch latch) throws Exception {
final int producerThreadCount = Math.max(Runtime.getRuntime().availableProcessors(), 1);
final int consumerThreadCount = producerThreadCount + (producerThreadCount == 1 ? 1 : -1);

final CyclicBarrier barrier = new CyclicBarrier(producerThreadCount + consumerThreadCount + 1);

for (int i = 0; i {
try {
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
while (System.currentTimeMillis() < startTime + duration) {
queue.put(1);
}
}).start();
}

for (int i = 0; i {
try {
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
while (System.currentTimeMillis() < startTime + duration) {
queue.take();
}
}).start();
}

barrier.await();
startTime = System.currentTimeMillis();
latch.countDown();
}

private static volatile long startTime;
}
“`

在多线程环境下,LinkedTransferQueue的性能是要比其它几个Queue高的。下面是每种Queue的输出结果:

“`Java
Executor used: java.util.concurrent.Executors$FinalizableDelegatedExecutorService
Test queue used: java.util.concurrent.SynchronousQueue
Test finished in 10036ms
Test queue used: java.util.concurrent.LinkedBlockingQueue
Test finished in 10044ms
Test queue used: java.util.concurrent.ArrayBlockingQueue
Test finished in 10032ms
Test queue used: java.util.concurrent.LinkedTransferQueue
Test finished in 9923ms

Executor used: java.util.concurrent.ThreadPoolExecutor
Test queue used: java.util.concurrent.SynchronousQueue
Test finished in 10412ms
Test queue used: java.util.concurrent.LinkedBlockingQueue
Test finished in 11192ms
Test queue used: java.util.concurrent.ArrayBlockingQueue
Test finished in 10179ms
Test queue used: java.util.concurrent.LinkedTransferQueue
Test finished in 7277ms

Executor used: java.util.concurrent.ThreadPoolExecutor
Test queue used: java.util.concurrent.SynchronousQueue
Test finished in 10096ms
Test queue used: java.util.concurrent.LinkedBlockingQueue
Test finished in 17317ms
Test queue used: java.util.concurrent.ArrayBlockingQueue
Test finished in 10891ms
Test queue used: java.util.concurrent.LinkedTransferQueue
Test finished in 8497ms

Executor used: java.util.concurrent.ForkJoinPool
Test queue used: java.util.concurrent.SynchronousQueue
Test finished in 15006ms
Test queue used: java.util.concurrent.LinkedBlockingQueue
Test finished in 27710ms
Test queue used: java.util.concurrent.ArrayBlockingQueue
Test finished in 14326ms
Test queue used: java.util.concurrent.LinkedTransferQueue
Test finished in 11928ms
“`

五、小结

本文主要介绍了LinkedTransferQueue的使用方法及原理。它可以作为一个高性能的无界队列,在多线程环境中发挥它的优势,且支持阻塞操作,并且适用于多阶段业务处理和按照顺序处理的场景中。LinkedTransferQueue在多线程的场景中具有很好的性能表现,推荐在相关场景中使用。

原创文章,作者:BHQJ,如若转载,请注明出处:https://www.506064.com/n/143477.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
BHQJ的头像BHQJ
上一篇 2024-10-20 20:42
下一篇 2024-10-22 23:33

相关推荐

  • 神经网络代码详解

    神经网络作为一种人工智能技术,被广泛应用于语音识别、图像识别、自然语言处理等领域。而神经网络的模型编写,离不开代码。本文将从多个方面详细阐述神经网络模型编写的代码技术。 一、神经网…

    编程 2025-04-25
  • Linux sync详解

    一、sync概述 sync是Linux中一个非常重要的命令,它可以将文件系统缓存中的内容,强制写入磁盘中。在执行sync之前,所有的文件系统更新将不会立即写入磁盘,而是先缓存在内存…

    编程 2025-04-25
  • 详解eclipse设置

    一、安装与基础设置 1、下载eclipse并进行安装。 2、打开eclipse,选择对应的工作空间路径。 File -> Switch Workspace -> [选择…

    编程 2025-04-25
  • Java BigDecimal 精度详解

    一、基础概念 Java BigDecimal 是一个用于高精度计算的类。普通的 double 或 float 类型只能精确表示有限的数字,而对于需要高精度计算的场景,BigDeci…

    编程 2025-04-25
  • Python安装OS库详解

    一、OS简介 OS库是Python标准库的一部分,它提供了跨平台的操作系统功能,使得Python可以进行文件操作、进程管理、环境变量读取等系统级操作。 OS库中包含了大量的文件和目…

    编程 2025-04-25
  • nginx与apache应用开发详解

    一、概述 nginx和apache都是常见的web服务器。nginx是一个高性能的反向代理web服务器,将负载均衡和缓存集成在了一起,可以动静分离。apache是一个可扩展的web…

    编程 2025-04-25
  • Python输入输出详解

    一、文件读写 Python中文件的读写操作是必不可少的基本技能之一。读写文件分别使用open()函数中的’r’和’w’参数,读取文件…

    编程 2025-04-25
  • MPU6050工作原理详解

    一、什么是MPU6050 MPU6050是一种六轴惯性传感器,能够同时测量加速度和角速度。它由三个传感器组成:一个三轴加速度计和一个三轴陀螺仪。这个组合提供了非常精细的姿态解算,其…

    编程 2025-04-25
  • Linux修改文件名命令详解

    在Linux系统中,修改文件名是一个很常见的操作。Linux提供了多种方式来修改文件名,这篇文章将介绍Linux修改文件名的详细操作。 一、mv命令 mv命令是Linux下的常用命…

    编程 2025-04-25
  • git config user.name的详解

    一、为什么要使用git config user.name? git是一个非常流行的分布式版本控制系统,很多程序员都会用到它。在使用git commit提交代码时,需要记录commi…

    编程 2025-04-25

发表回复

登录后才能评论