一、基礎概念
RabbitMQ 是一種使用 AMQP(Advanced Message Queuing Protocol)協議來實現消息隊列(Message Queue)服務的中間件(Middleware)系統。它支持多種消息隊列模型,其中之一是 Topic。Topic 是一種支持主題(Topic)概念的消息隊列模型。在 Topic 模型中,消息可以被發送到一個這樣的主題(Topic)中,所有訂閱該主題的消費者都將收到該消息。而 RabbitMQ 通過 Exchange(交換機)來支持 Topic 模型。根據 Exchange 的不同綁定模式,可以實現以不同的主題來發送消息。
假設存在一個主題 topic.a.b,那麼所有使用該主題的訂閱者(Subscriber)都可以收到發送到該主題的消息。而 RabbitMQ 中,主題都是由一個或多個單詞組成的路由鍵(Routing Keys)來表示的。所以,一個主題可以表示為用“.”分隔的多個單詞組成的路由鍵,例如 topic.a.b.c 表示三個單詞組成的主題。同時,在訂閱主題時,也可以使用通配符(Wildcards)來表示一些特定的主題,例如 topic.# 表示一組以 topic 開頭的主題。
二、Topic 模型的使用場景
在 RabbitMQ 中,使用 Topic 模型可以解決以下問題:
1、任務分發:一個任務被分發到多個消費者進行處理。例如,需要將爬取到的數據分發給多個處理模塊進行處理。
2、關注事件:多個消費者可以訂閱關注相同事件的主題。例如,在電商網站上,不同的用戶可能對同一件商品感興趣,而我們需要及時地通知他們在該商品上的優惠信息。
3、日誌分析:將不同的日誌事件路由到不同的消息隊列中進行處理。例如,將用戶行為日誌、服務器日誌和業務邏輯日誌分別存儲到不同的消息隊列中。
三、Topic 模型的代碼實現
1、發送消息
const amqp = require('amqplib/callback_api');
// 創建連接
amqp.connect('amqp://localhost', function (err, connection) {
// 創建頻道
connection.createChannel(function (err, channel) {
// 設置交換機類型為 topic
let ex = 'topic_logs';
let msg = process.argv.slice(2).join(' ') || 'Hello World!';
let severity = process.argv.slice(2).join(' ') || 'anonymous.info';
// 發布消息
channel.assertExchange(ex, 'topic', { durable: false });
channel.publish(ex, severity, Buffer.from(msg));
console.log(" [x] Sent %s: '%s'", severity, msg);
// 關閉頻道和連接
setTimeout(function () {
channel.close();
connection.close();
}, 500);
});
});
2、接收消息
const amqp = require('amqplib/callback_api');
// 創建連接
amqp.connect('amqp://localhost', function (err, connection) {
// 創建頻道
connection.createChannel(function (err, channel) {
// 設置交換機類型為 topic
let ex = 'topic_logs';
let args = process.argv.slice(2);
// 聲明交換機和隊列
channel.assertExchange(ex, 'topic', { durable: false });
channel.assertQueue('', { exclusive: true }, function (err, q) {
// 根據傳入參數綁定相應的路由鍵
args.forEach(function (key) {
channel.bindQueue(q.queue, ex, key);
});
// 監聽消息隊列
channel.consume(q.queue, function (msg) {
console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString());
}, { noAck: true });
});
});
});
四、Topic 模型的優化
在 RabbitMQ 中,實現 Topic 模型的最佳實踐是合理地設計交換機和 Routing Keys。對於同一類主題的消息,應該儘可能地使用相同的 Routing Keys 策略。如果存在多個 Routing Keys,那麼建議將其扇出(Fanout)為多個隊列,避免重複消費等問題。
同時,在實現 Topic 模型時,應儘可能將消費者和生產者進行解耦。例如,可以使用 RPC(Remote Procedure Call,遠程過程調用)方式實現消費者和生產者之間的通信,從而極大地提高消息隊列的應用效率。
原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-hant/n/283009.html
微信掃一掃
支付寶掃一掃