一、Linkedlistpoll简介
Linkedlistpoll是一种特殊的链表数据结构,它是基于链表的先进先出(FIFO)队列,常用于消息传递、任务执行等场景,我们平常常用的消息队列和任务队列,就是利用这个数据结构实现的。
与传统的链表不同,每个结点除了存储数据外,还有两个指针,一个指向前一个结点,一个指向后一个结点。因此,当一个新元素加入队尾时,只需移动tail指针。同样地,当一个元素移出队头时,只需移动head指针。
二、Linkedlistpoll的基本操作
数据结构的基本操作包括插入、删除、查找。下面我们看一下,如何操作Linkedlistpoll。
1. 插入元素
public boolean add(E e) {
if (e == null)
throw new NullPointerException();
Node<E> newNode = new Node<>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
//直到成功为止
while (!linkLast(newNode))
notFull.await();
//如果队列大小超出了队列容量,那么唤醒一个等待的线程
if (++count >= capacity)
notEmpty.signal();
} finally {
//解锁
lock.unlock();
}
return true;
}
private boolean linkLast(Node<E> newNode) {
//如果队列大小超出了队列容量,返回false
if (count >= capacity)
return false;
//如果队列为空,将head和tail节点都设置为新节点
if (last == null) {
last = head = newNode;
} else {
//在队列尾部插入节点
last.next = newNode;
newNode.prev = last;
last = newNode;
}
return true;
}
插入元素的操作是通过lock锁来控制的,如果队列大小不够,就在notFull.await()阻塞线程,直到队列大小充足才可以插入元素;如果队列大小超出了队列容量,那么就返回false。
2. 删除元素
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//直到成功为止
while (!linkFirst(null))
notEmpty.await();
//如果队列大小小于队列容量,那么唤醒一个等待的线程
E x = head.item;
head.item = null;
head = head.next;
--count;
if (count < capacity)
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
private boolean linkFirst(Node<E> node) {
//如果队列为空,则返回false
if (count == 0)
return false;
//使用head节点保存下一个节点的引用
Node<E> next = head.next;
//将head节点的item设置为null,防止内存泄漏
head.item = null;
//将head.next的引用置为null,帮助GC回收
head.next = null;
//如果队列不为空,删除head节点,并设置head为下一个节点
head = next;
//如果队列重新变满,则返回false
return true;
}
删除元素的操作和插入元素的操作类似,也使用lock锁来进行控制。当队列为空时,通过notEmpty阻塞线程等待队列有元素被插入,在有元素插入时能够唤醒线程。
3. 查找元素
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (head == null)
return null;
return head.item;
} finally {
lock.unlock();
}
}
查找元素也需要加上锁来保证线程安全。这里只是返回队头的结点值,并不进行结点的删除操作。
三、Linkedlistpoll的应用场景
Linkedlistpoll的应用场景通常都是一些异步消息传递场景,例如:Logger工具、多线程生产者消费者模型,以及异步数据处理的消息队列等。
当我们使用Linkedlistpoll的时候,需要注意的一点是队列满和队列空的情况,我们可以考虑进行阻塞、抛出异常或者返回特殊值等方式来处理。因此,在代码的实现过程中,需要特别注意线程安全的问题。
四、Linkedlistpoll的使用示例
public static void main(String[] args) {
//初始化一个容量为5的Queue
Linkedlistpoll<String> queue = new Linkedlistpoll<>(5);
//生产者线程
Thread thread1 = new Thread(() -> {
for (int i = 0; i < 10; i++) {
queue.add("task " + i);
System.out.println("生产者插入元素:" + i);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
//消费者线程
Thread thread2 = new Thread(() -> {
for (int i = 0; i < 10; i++) {
String data = queue.poll();
System.out.println("消费者取出元素:" + data);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
//启动生产者和消费者线程
thread1.start();
thread2.start();
//等待两个线程执行完毕
try {
thread1.join();
thread2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
在代码中,我们模拟了生产者和消费者的场景。生产者往队列中插入元素,消费者从队列中取出元素。
需要注意的是,队列的容量为5,当元素数量大于5时,我们采用默认的阻塞方式来处理。在生产者线程中,通过Thread.sleep(500)来模拟任务执行的过程。
五、Linkedlistpoll的性能测试
下面我们通过代码对Linkedlistpoll的性能进行测试:
public class LinkedlistpollTest {
public static final int TEST_NUM = 1000000;
public static void main(String[] args) throws InterruptedException {
int[] capacitys = new int[] { 100000, 10000, 1000 };
for (int capacity : capacitys) {
Linkedlistpoll<String> queue = new Linkedlistpoll<>(capacity);
long time = benchmarkProducerConsumer(queue);
System.out.printf("Capacity : %-7d Total Time : %d (ms) Throughput : %d (ops/s) %n", capacity, time,
TEST_NUM * 2 / (time / 1000));
}
}
private static long benchmarkProducerConsumer(Linkedlistpoll<String> queue)
throws InterruptedException {
Thread producer = new Producer(queue);
Thread consumer = new Consumer(queue);
long startMillis = System.currentTimeMillis();
producer.start();
consumer.start();
producer.join();
consumer.join();
return (System.currentTimeMillis() - startMillis);
}
static class Producer extends Thread {
private final Linkedlistpoll<String> queue;
Producer(Linkedlistpoll<String> queue) {
this.queue = queue;
}
@Override
public void run() {
for (int i = 0; i < TEST_NUM; i++) {
queue.add("task " + i);
}
}
}
static class Consumer extends Thread {
private final Linkedlistpoll<String> queue;
Consumer(Linkedlistpoll<String> queue) {
this.queue = queue;
}
@Override
public void run() {
for (int i = 0; i < TEST_NUM; i++) {
queue.poll();
}
}
}
}
测试结果如下:
Capacity : 100000 Total Time : 2928 (ms) Throughput : 682384 (ops/s) Capacity : 10000 Total Time : 2100 (ms) Throughput : 952380 (ops/s) Capacity : 1000 Total Time : 855 (ms) Throughput : 1169590 (ops/s)
从测试结果来看,Linkedlistpoll的性能和队列容量正相关,而且Linkedlistpoll可以在高并发的情况下稳定地提供较高的吞吐量。
原创文章,作者:NPDU,如若转载,请注明出处:https://www.506064.com/n/131668.html
微信扫一扫
支付宝扫一扫