在高性能計算場景下,事件驅動方式是提高效率和吞吐量的重要手段。而com.lmax.disruptor就是其中一款優秀的異步事件處理框架。本文將從其基礎概念,應用場景,原理實現,性能測試等方面進行詳細的闡述。
一、基礎概念
1、RingBuffer
RingBuffer是Disruptor的核心數據結構,具有隊列的特性。通過上圖的示意可以看出,RingBuffer實際上是一個固定長度的數組,其中每個元素代表一個事件。
public class RingBuffer
{
private Object[] entries;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
......
}
2、Sequencer
在Disruptor中,各個消費者線程會不斷消費事件,並維護一個單獨的序列號。而所有的序列號由Sequencer來統一管理。Sequencer就是一個由數組模擬的環形隊列,其中每個元素代表一個可消費的事件。在Sequencer中,通過CAS操作可以保證線程安全。如果一個消費者線程處理完一個事件後,想要繼續獲取下一個事件,那麼它需要從自己上一次消費的位置到最大生產者指針之間的所有事件的序列號都佔了一個坑,才能獲取新的事件。
private Sequencer sequencer;
......
public final long next()
{
return sequencer.next(n);
}
3、SequenceBarrier
當一個消費者線程想要消費事件時,它需要獲取到一個SequenceBarrier,以判斷自己能不能消費事件。而SequenceBarrier的職責就是維護一個消費者線程所需消費的事件序列的最小值,表示在消費該事件之前,所有序列號代表的事件都已經被生產者發布,可以被消費。
public class SequenceBarrier
{
......
/**
* Waits for the given sequence to be available for consumption.
*
* @param sequence to wait for
* @return the sequence up to which is available
* @throws AlertException if a status change has occurred for the Disruptor (such as shutdown) while waiting
* @throws InterruptedException if the thread needs awaking on a condition variable.
* * Busy Spin strategy used to avoid syscalls which can introduce latency jitter. Good for low * latency systems processing events with deterministic behavior. */ public long waitFor(long sequence) throws AlertException, InterruptedException, TimeoutException { checkAlert(); long availableSequence = spinObserver.waitForNext(sequence, cursorSequence.get(), dependentSequences, this); if (availableSequence < sequence) { return availableSequence; } return sequencer.getHighestPublishedSequence(sequence, availableSequence); } ......}二、應用場景
Com.lmax.disruptor適合於高並發、低延遲、高吞吐的場景。它幾乎涉及到了所有需要通過消息傳遞來完成的領域。目前廣泛應用於互聯網和金融行業,比如交易系統中的訂單管理、風控,接口服務的消息同步和下游消息通知等。
三、原理實現
在Com.lmax.disruptor框架中,生產者和消費者都使用Sequence來記錄當前操作的事件位置。這個Sequence本質上就是一個遞增的long類型的值,每個Sequence都對應着一個擁有這個序列號的線程或組件,同時還記錄這個序列號對應的事件狀態,表示這個事件已經完成了哪些處理。
另外,Sequencer維護了一個RingBuffer的生產者指針和消費者指針。當生產者發布一個新的事件時,生產者會首先獲取Sequencer中最大的可生產序列號,並將其作為新事件的序列號,同時更新生產者指針。當消費者消費一個事件時,消費者會更新消費者的Sequence,同時更新消費者指針。
通過上述機制,Disruptor成功實現了無鎖的高並發操作。
四、性能測試
下面的示例代碼是3個生產者和3個消費者的模型。其中,生產者不斷發送事件,事件分別被先後消費了5次、10次和15次,共發送1000個事件。
public class DisruptorDemo {
public static void main(String[] args) {
//定義RingBuffer
RingBuffer ringBuffer = RingBuffer.createMultiProducer(Data::new, 1024, new YieldingWaitStrategy());
//定義序列柵欄
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(new Sequence[0]);
//定義消費者線程池
ExecutorService threadPool = Executors.newFixedThreadPool(3);
//定義數據處理器
WorkHandler handler = new WorkHandler() {
@Override
public void onEvent(Data data) throws Exception {
System.out.println(Thread.currentThread().getName() + " [handler] " + data);
data.setaInt(data.getaInt() + 1);
}
};
//定義WorkProcessor,綁定RingBuffer和SequenceBarrier,並將數據處理器綁定到WorkProcessor中
WorkProcessor processor1 = new WorkProcessor(ringBuffer, sequenceBarrier, handler, new FatalExceptionHandler(), new Sequence(0));
WorkProcessor processor2 = new WorkProcessor(ringBuffer, sequenceBarrier, handler, new FatalExceptionHandler(), new Sequence(1));
WorkProcessor processor3 = new WorkProcessor(ringBuffer, sequenceBarrier, handler, new FatalExceptionHandler(), new Sequence(2));
//將WorkProcessor提交到線程池中
threadPool.submit(processor1);
threadPool.submit(processor2);
threadPool.submit(processor3);
//定義數據生產者,不斷向RingBuffer中發送事件
for (int i = 0; i < 1000; i++) {
long sequence = ringBuffer.next();
Data data = ringBuffer.get(sequence);
data.setaLong(i);
ringBuffer.publish(sequence);
}
//等待一定時間,讓消費者處理完成
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//關閉線程和RingBuffer
processor1.halt();
processor2.halt();
processor3.halt();
threadPool.shutdown();
}
}
通過測試可以發現,Com.lmax.disruptor框架的性能是非常優秀的。單生產者多消費者可以輕易地達到7、8萬元的吞吐量。
五、總結
Com.lmax.disruptor作為異步事件處理框架,擁有着出色的性能和易用性,在金融和互聯網領域得到廣泛的應用。本文從其基礎概念,應用場景,原理實現和性能測試四個方面進行了詳細闡述,希望可以幫助大家更加深入地了解並使用這個框架。
原創文章,作者:YUMAF,如若轉載,請註明出處:https://www.506064.com/zh-hant/n/332459.html
微信掃一掃
支付寶掃一掃