一、簡介
RocketMQ是一個分佈式消息傳遞解決方案,具有高性能、高可靠、高可擴展性和分佈式特性。本篇文章將從多個方面對RocketMQ的源碼進行解析,幫助更好的理解RocketMQ的內部實現。本篇文章主要分為以下幾個部分:
- Broker啟動流程
- 消息發送流程
- 消息存儲流程
- 消息消費流程
- 高可用機制
二、Broker啟動流程
Broker是RocketMQ的核心組件,負責儲存和轉發消息。下面是Broker啟動流程的簡要過程:
- Broker啟動類是BrokerStartup,首先解析配置文件,生成BrokerConfig對象。
- BrokerConfig對象主要包括了Broker的ID、名稱等基本信息,以及Topic的配置,如消息最大長度、刷盤方式等。
- 接着,啟動Netty服務監聽Producer和Consumer的連接請求,並處理 NameServer 請求。
- Broker啟動後,先從持久化存儲中加載Topic和消息數據到內存,然後啟動Consumer拉取線程、commit線程,還有定時調度線程等。
- 最後,等待NameServer的心跳請求。
完整的Broker啟動流程代碼示例如下:
public static void main(String[] args) { try { // 解析配置文件,生成BrokerConfig對象 final BrokerConfig brokerConfig = new BrokerConfig(); // ... // 啟動Netty服務監聽Producer和Consumer的連接請求,並處理 NameServer 請求 final RemotingServer remotingServer = new NettyRemotingServer(brokerConfig.getNettyServerConfig()); final BrokerController brokerController = new BrokerController(brokerConfig, remotingServer); // 從持久化存儲中加載Topic和消息數據到內存 brokerController.initialize(); // 啟動Consumer拉取線程、commit線程,還有定時調度線程等 brokerController.start(); // 等待NameServer的心跳請求 remotingServer.start(); } catch (Throwable e) { // ... } }
三、消息發送流程
消息發送是RocketMQ的一項核心功能。下面是消息發送的流程簡介:
- Producer啟動後初始化MQClientInstance,然後創建MQProducerInner對象。
- 發送消息時,先從本地緩存中獲取TopicPublishInfo,如果沒有就從NameServer獲取。
- 選擇Topic路由,得到投遞的Queue,做負載均衡。
- 由MQClientInstance創建一個拉取Broker地址的任務PullNameServerTask。
- 高可用機制選擇一個Broker發送消息。
- 發送消息到Broker。
完整的消息發送流程的代碼示例如下:
public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("producer_group"); producer.setNamesrvAddr("localhost:9876"); producer.start(); Message msg = new Message("topic", "tag", "key", "Hello World".getBytes()); SendResult sendResult = producer.send(msg); System.out.println(sendResult); producer.shutdown(); }
四、消息存儲流程
消息存儲是RocketMQ最核心的一環。下面是消息存儲的流程簡介:
- 消息寫入前,先獲取一個內存映射文件,如果沒有就創建。
- 將消息寫入內存映射文件,寫入後根據CommitLog文件刷盤策略決定是否同步刷盤。
- 消息寫入後,將消息存入Index文件,Index相當於一個消息ID和文件偏移量的映射。
- 消息一旦從內存中刷入磁盤,就可以被Replication模塊複製到其他機器。
完整的消息存儲流程的代碼示例如下:
// 代碼來自CommitLog.java public void putMessage(MessageExtBrokerInner msg) { // ... // 先獲取一個內存映射文件,如果沒有就創建 MappedFile mappedFile = this.getMappedFile(matched); // 將消息寫入內存映射文件 mappedFile.appendMessage(msg); // 寫入後根據CommitLog文件刷盤策略決定是否同步刷盤 this.handleScheduleMessageService(mappedFile.getFileSize(), this.commitDataLeastPages); // 將消息存入Index文件 this.putMessagePositionInfo(msg, mappedFile.getFileFromOffset(), mappedFile.getFileSize()); // 消息一旦從內存中刷入磁盤,就可以被Replication模塊複製到其他機器 this.dataVersion.incrementAndGet(); }
五、消息消費流程
消息消費是RocketMQ的重要組成部分。下面是消息消費的簡要過程:
- 消費者啟動後會向NameServer請求Topic路由信息,得到所要消費的消息隊列。
- Consumer從Broker拉取消息,如果有消息就開始消費。
- 消費消息前,先從Index文件中獲取消息的文件偏移量。
- 從CommitLog中讀取消息並返回。
- 消息消費成功後,提交偏移量。
完整的消息消費流程的代碼示例如下:
// 代碼來自DefaultMQPushConsumerImpl.java @Override public void pullMessage(final PullRequest pullRequest) { // 拉取消息 PullResult pullResult = this.pullMessageService.pullMessage(pullRequest); switch (pullResult.getPullStatus()) { // 獲取到消息 case FOUND: this.processQueue.putMessage(pullResult.getMsgFoundList()); this.executePullRequestLater(pullRequest.getPullFromThisOffset() + pullResult.getNextBeginOffset()); break; // 等待下一次 case NO_NEW_MSG: this.executePullRequestLater(pullRequest.getPullFromThisOffset()); break; // 沒有變化,再次拉取消息 case NO_MATCHED_MSG: this.executePullRequestLater(pullRequest.getPullFromThisOffset()); break; // 拉取被暫停 case OFFSET_ILLEGAL: log.warn("the pull request offset illegal, {} {}", pullRequest.toString(), pullResult.toString()); pullRequest.getProcessQueue().setDropped(true); break; default: break; } } public void handleMessage(final MessageExt msgs, final ConsumeQueue cq, final ConsumeQueueExt.CqExtUnit cqExt) { // 處理消息前,先從Index文件中獲取消息的文件偏移量 final long offsetPy = cq.getOffsetInQueueByTime(msgs.getStoreTimestamp()); final long offsetDiff = cqExt.getOffsetPy() - offsetPy; int tagsCode = 0; // tags處理 if (msgs.getTags() != null && msgs.getTags().length() > 0) { tagsCode = MessageExtBrokerInner.tagsString2tagsCode(msgs.getTags()); } this.hook.consumeMessageBefore(msgs, this.context); // 從CommitLog中讀取消息並返回 final ConsumeReturnType returnType = this.consumeMessageService.consumeMessage( msgs, this.defaultMQPushConsumer.getMessageModel(), this.processQueue, this.consumeOrderly, tagsCode, this.properties, this.consumeMessageFilter, this.brokerSuspendMaxTimeMillis, this.consumerGroup, this.maxReconsumeTimes, this.context, offsetPy); // 消息消費成功後,提交偏移量 this.processConsumeResult(msgs, returnType, this.context, consumeRequest.getStats()); this.hook.consumeMessageAfter(msgs, this.context); }
六、高可用機制
RocketMQ通過複製和雙寫等高可用機制,實現分佈式消息存儲的高可靠性。
- Master-Slave複製:Master節點負責消息寫入和消費,Slaver節點只負責消息複製,並在Master故障時接管Master工作。
- 雙寫機制:消費者消費消息時,會將消息從Master和Slave雙寫節點同時消費,以保證消息不會丟失。
七、總結
本篇文章詳細介紹了RocketMQ的源碼解析,從Broker啟動流程、消息發送流程、消息存儲流程、消息消費流程、高可用機制等多個方面進行了詳細的闡述,對於深入理解RocketMQ的內部實現有一定的幫助。
原創文章,作者:INAXH,如若轉載,請註明出處:https://www.506064.com/zh-hk/n/343267.html