一、Linkedlistpoll簡介
Linkedlistpoll是一種特殊的鏈表數據結構,它是基於鏈表的先進先出(FIFO)隊列,常用於消息傳遞、任務執行等場景,我們平常常用的消息隊列和任務隊列,就是利用這個數據結構實現的。
與傳統的鏈表不同,每個結點除了存儲數據外,還有兩個指針,一個指向前一個結點,一個指向後一個結點。因此,當一個新元素加入隊尾時,只需移動tail指針。同樣地,當一個元素移出隊頭時,只需移動head指針。
二、Linkedlistpoll的基本操作
數據結構的基本操作包括插入、刪除、查找。下面我們看一下,如何操作Linkedlistpoll。
1. 插入元素
public boolean add(E e) {
if (e == null)
throw new NullPointerException();
Node<E> newNode = new Node<>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
//直到成功為止
while (!linkLast(newNode))
notFull.await();
//如果隊列大小超出了隊列容量,那麼喚醒一個等待的線程
if (++count >= capacity)
notEmpty.signal();
} finally {
//解鎖
lock.unlock();
}
return true;
}
private boolean linkLast(Node<E> newNode) {
//如果隊列大小超出了隊列容量,返回false
if (count >= capacity)
return false;
//如果隊列為空,將head和tail節點都設置為新節點
if (last == null) {
last = head = newNode;
} else {
//在隊列尾部插入節點
last.next = newNode;
newNode.prev = last;
last = newNode;
}
return true;
}
插入元素的操作是通過lock鎖來控制的,如果隊列大小不夠,就在notFull.await()阻塞線程,直到隊列大小充足才可以插入元素;如果隊列大小超出了隊列容量,那麼就返回false。
2. 刪除元素
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//直到成功為止
while (!linkFirst(null))
notEmpty.await();
//如果隊列大小小於隊列容量,那麼喚醒一個等待的線程
E x = head.item;
head.item = null;
head = head.next;
--count;
if (count < capacity)
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
private boolean linkFirst(Node<E> node) {
//如果隊列為空,則返回false
if (count == 0)
return false;
//使用head節點保存下一個節點的引用
Node<E> next = head.next;
//將head節點的item設置為null,防止內存泄漏
head.item = null;
//將head.next的引用置為null,幫助GC回收
head.next = null;
//如果隊列不為空,刪除head節點,並設置head為下一個節點
head = next;
//如果隊列重新變滿,則返回false
return true;
}
刪除元素的操作和插入元素的操作類似,也使用lock鎖來進行控制。當隊列為空時,通過notEmpty阻塞線程等待隊列有元素被插入,在有元素插入時能夠喚醒線程。
3. 查找元素
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (head == null)
return null;
return head.item;
} finally {
lock.unlock();
}
}
查找元素也需要加上鎖來保證線程安全。這裡只是返回隊頭的結點值,並不進行結點的刪除操作。
三、Linkedlistpoll的應用場景
Linkedlistpoll的應用場景通常都是一些異步消息傳遞場景,例如:Logger工具、多線程生產者消費者模型,以及異步數據處理的消息隊列等。
當我們使用Linkedlistpoll的時候,需要注意的一點是隊列滿和隊列空的情況,我們可以考慮進行阻塞、拋出異常或者返回特殊值等方式來處理。因此,在代碼的實現過程中,需要特別注意線程安全的問題。
四、Linkedlistpoll的使用示例
public static void main(String[] args) {
//初始化一個容量為5的Queue
Linkedlistpoll<String> queue = new Linkedlistpoll<>(5);
//生產者線程
Thread thread1 = new Thread(() -> {
for (int i = 0; i < 10; i++) {
queue.add("task " + i);
System.out.println("生產者插入元素:" + i);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
//消費者線程
Thread thread2 = new Thread(() -> {
for (int i = 0; i < 10; i++) {
String data = queue.poll();
System.out.println("消費者取出元素:" + data);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
//啟動生產者和消費者線程
thread1.start();
thread2.start();
//等待兩個線程執行完畢
try {
thread1.join();
thread2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
在代碼中,我們模擬了生產者和消費者的場景。生產者往隊列中插入元素,消費者從隊列中取出元素。
需要注意的是,隊列的容量為5,當元素數量大於5時,我們採用默認的阻塞方式來處理。在生產者線程中,通過Thread.sleep(500)來模擬任務執行的過程。
五、Linkedlistpoll的性能測試
下面我們通過代碼對Linkedlistpoll的性能進行測試:
public class LinkedlistpollTest {
public static final int TEST_NUM = 1000000;
public static void main(String[] args) throws InterruptedException {
int[] capacitys = new int[] { 100000, 10000, 1000 };
for (int capacity : capacitys) {
Linkedlistpoll<String> queue = new Linkedlistpoll<>(capacity);
long time = benchmarkProducerConsumer(queue);
System.out.printf("Capacity : %-7d Total Time : %d (ms) Throughput : %d (ops/s) %n", capacity, time,
TEST_NUM * 2 / (time / 1000));
}
}
private static long benchmarkProducerConsumer(Linkedlistpoll<String> queue)
throws InterruptedException {
Thread producer = new Producer(queue);
Thread consumer = new Consumer(queue);
long startMillis = System.currentTimeMillis();
producer.start();
consumer.start();
producer.join();
consumer.join();
return (System.currentTimeMillis() - startMillis);
}
static class Producer extends Thread {
private final Linkedlistpoll<String> queue;
Producer(Linkedlistpoll<String> queue) {
this.queue = queue;
}
@Override
public void run() {
for (int i = 0; i < TEST_NUM; i++) {
queue.add("task " + i);
}
}
}
static class Consumer extends Thread {
private final Linkedlistpoll<String> queue;
Consumer(Linkedlistpoll<String> queue) {
this.queue = queue;
}
@Override
public void run() {
for (int i = 0; i < TEST_NUM; i++) {
queue.poll();
}
}
}
}
測試結果如下:
Capacity : 100000 Total Time : 2928 (ms) Throughput : 682384 (ops/s) Capacity : 10000 Total Time : 2100 (ms) Throughput : 952380 (ops/s) Capacity : 1000 Total Time : 855 (ms) Throughput : 1169590 (ops/s)
從測試結果來看,Linkedlistpoll的性能和隊列容量正相關,而且Linkedlistpoll可以在高並發的情況下穩定地提供較高的吞吐量。
原創文章,作者:NPDU,如若轉載,請註明出處:https://www.506064.com/zh-hk/n/131668.html
微信掃一掃
支付寶掃一掃