一、簡介
RocketMQ是一個分散式消息傳遞解決方案,具有高性能、高可靠、高可擴展性和分散式特性。本篇文章將從多個方面對RocketMQ的源碼進行解析,幫助更好的理解RocketMQ的內部實現。本篇文章主要分為以下幾個部分:
- Broker啟動流程
- 消息發送流程
- 消息存儲流程
- 消息消費流程
- 高可用機制
二、Broker啟動流程
Broker是RocketMQ的核心組件,負責儲存和轉發消息。下面是Broker啟動流程的簡要過程:
- Broker啟動類是BrokerStartup,首先解析配置文件,生成BrokerConfig對象。
- BrokerConfig對象主要包括了Broker的ID、名稱等基本信息,以及Topic的配置,如消息最大長度、刷盤方式等。
- 接著,啟動Netty服務監聽Producer和Consumer的連接請求,並處理 NameServer 請求。
- Broker啟動後,先從持久化存儲中載入Topic和消息數據到內存,然後啟動Consumer拉取線程、commit線程,還有定時調度線程等。
- 最後,等待NameServer的心跳請求。
完整的Broker啟動流程代碼示例如下:
public static void main(String[] args) {
try {
// 解析配置文件,生成BrokerConfig對象
final BrokerConfig brokerConfig = new BrokerConfig();
// ...
// 啟動Netty服務監聽Producer和Consumer的連接請求,並處理 NameServer 請求
final RemotingServer remotingServer = new NettyRemotingServer(brokerConfig.getNettyServerConfig());
final BrokerController brokerController = new BrokerController(brokerConfig, remotingServer);
// 從持久化存儲中載入Topic和消息數據到內存
brokerController.initialize();
// 啟動Consumer拉取線程、commit線程,還有定時調度線程等
brokerController.start();
// 等待NameServer的心跳請求
remotingServer.start();
} catch (Throwable e) {
// ...
}
}
三、消息發送流程
消息發送是RocketMQ的一項核心功能。下面是消息發送的流程簡介:
- Producer啟動後初始化MQClientInstance,然後創建MQProducerInner對象。
- 發送消息時,先從本地緩存中獲取TopicPublishInfo,如果沒有就從NameServer獲取。
- 選擇Topic路由,得到投遞的Queue,做負載均衡。
- 由MQClientInstance創建一個拉取Broker地址的任務PullNameServerTask。
- 高可用機制選擇一個Broker發送消息。
- 發送消息到Broker。
完整的消息發送流程的代碼示例如下:
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("topic", "tag", "key", "Hello World".getBytes());
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
producer.shutdown();
}
四、消息存儲流程
消息存儲是RocketMQ最核心的一環。下面是消息存儲的流程簡介:
- 消息寫入前,先獲取一個內存映射文件,如果沒有就創建。
- 將消息寫入內存映射文件,寫入後根據CommitLog文件刷盤策略決定是否同步刷盤。
- 消息寫入後,將消息存入Index文件,Index相當於一個消息ID和文件偏移量的映射。
- 消息一旦從內存中刷入磁碟,就可以被Replication模塊複製到其他機器。
完整的消息存儲流程的代碼示例如下:
// 代碼來自CommitLog.java
public void putMessage(MessageExtBrokerInner msg) {
// ...
// 先獲取一個內存映射文件,如果沒有就創建
MappedFile mappedFile = this.getMappedFile(matched);
// 將消息寫入內存映射文件
mappedFile.appendMessage(msg);
// 寫入後根據CommitLog文件刷盤策略決定是否同步刷盤
this.handleScheduleMessageService(mappedFile.getFileSize(), this.commitDataLeastPages);
// 將消息存入Index文件
this.putMessagePositionInfo(msg, mappedFile.getFileFromOffset(), mappedFile.getFileSize());
// 消息一旦從內存中刷入磁碟,就可以被Replication模塊複製到其他機器
this.dataVersion.incrementAndGet();
}
五、消息消費流程
消息消費是RocketMQ的重要組成部分。下面是消息消費的簡要過程:
- 消費者啟動後會向NameServer請求Topic路由信息,得到所要消費的消息隊列。
- Consumer從Broker拉取消息,如果有消息就開始消費。
- 消費消息前,先從Index文件中獲取消息的文件偏移量。
- 從CommitLog中讀取消息並返回。
- 消息消費成功後,提交偏移量。
完整的消息消費流程的代碼示例如下:
// 代碼來自DefaultMQPushConsumerImpl.java
@Override
public void pullMessage(final PullRequest pullRequest) {
// 拉取消息
PullResult pullResult = this.pullMessageService.pullMessage(pullRequest);
switch (pullResult.getPullStatus()) {
// 獲取到消息
case FOUND:
this.processQueue.putMessage(pullResult.getMsgFoundList());
this.executePullRequestLater(pullRequest.getPullFromThisOffset() + pullResult.getNextBeginOffset());
break;
// 等待下一次
case NO_NEW_MSG:
this.executePullRequestLater(pullRequest.getPullFromThisOffset());
break;
// 沒有變化,再次拉取消息
case NO_MATCHED_MSG:
this.executePullRequestLater(pullRequest.getPullFromThisOffset());
break;
// 拉取被暫停
case OFFSET_ILLEGAL:
log.warn("the pull request offset illegal, {} {}", pullRequest.toString(), pullResult.toString());
pullRequest.getProcessQueue().setDropped(true);
break;
default:
break;
}
}
public void handleMessage(final MessageExt msgs,
final ConsumeQueue cq,
final ConsumeQueueExt.CqExtUnit cqExt) {
// 處理消息前,先從Index文件中獲取消息的文件偏移量
final long offsetPy = cq.getOffsetInQueueByTime(msgs.getStoreTimestamp());
final long offsetDiff = cqExt.getOffsetPy() - offsetPy;
int tagsCode = 0;
// tags處理
if (msgs.getTags() != null && msgs.getTags().length() > 0) {
tagsCode = MessageExtBrokerInner.tagsString2tagsCode(msgs.getTags());
}
this.hook.consumeMessageBefore(msgs, this.context);
// 從CommitLog中讀取消息並返回
final ConsumeReturnType returnType = this.consumeMessageService.consumeMessage(
msgs, this.defaultMQPushConsumer.getMessageModel(), this.processQueue, this.consumeOrderly,
tagsCode, this.properties, this.consumeMessageFilter, this.brokerSuspendMaxTimeMillis,
this.consumerGroup, this.maxReconsumeTimes, this.context, offsetPy);
// 消息消費成功後,提交偏移量
this.processConsumeResult(msgs, returnType, this.context, consumeRequest.getStats());
this.hook.consumeMessageAfter(msgs, this.context);
}
六、高可用機制
RocketMQ通過複製和雙寫等高可用機制,實現分散式消息存儲的高可靠性。
- Master-Slave複製:Master節點負責消息寫入和消費,Slaver節點只負責消息複製,並在Master故障時接管Master工作。
- 雙寫機制:消費者消費消息時,會將消息從Master和Slave雙寫節點同時消費,以保證消息不會丟失。
七、總結
本篇文章詳細介紹了RocketMQ的源碼解析,從Broker啟動流程、消息發送流程、消息存儲流程、消息消費流程、高可用機制等多個方面進行了詳細的闡述,對於深入理解RocketMQ的內部實現有一定的幫助。
原創文章,作者:INAXH,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/343267.html
微信掃一掃
支付寶掃一掃