一、基礎概念
RabbitMQ 是一種使用 AMQP(Advanced Message Queuing Protocol)協議來實現消息隊列(Message Queue)服務的中間件(Middleware)系統。它支持多種消息隊列模型,其中之一是 Topic。Topic 是一種支持主題(Topic)概念的消息隊列模型。在 Topic 模型中,消息可以被發送到一個這樣的主題(Topic)中,所有訂閱該主題的消費者都將收到該消息。而 RabbitMQ 通過 Exchange(交換機)來支持 Topic 模型。根據 Exchange 的不同綁定模式,可以實現以不同的主題來發送消息。
假設存在一個主題 topic.a.b,那麼所有使用該主題的訂閱者(Subscriber)都可以收到發送到該主題的消息。而 RabbitMQ 中,主題都是由一個或多個單片語成的路由鍵(Routing Keys)來表示的。所以,一個主題可以表示為用「.」分隔的多個單片語成的路由鍵,例如 topic.a.b.c 表示三個單片語成的主題。同時,在訂閱主題時,也可以使用通配符(Wildcards)來表示一些特定的主題,例如 topic.# 表示一組以 topic 開頭的主題。
二、Topic 模型的使用場景
在 RabbitMQ 中,使用 Topic 模型可以解決以下問題:
1、任務分發:一個任務被分發到多個消費者進行處理。例如,需要將爬取到的數據分發給多個處理模塊進行處理。
2、關注事件:多個消費者可以訂閱關注相同事件的主題。例如,在電商網站上,不同的用戶可能對同一件商品感興趣,而我們需要及時地通知他們在該商品上的優惠信息。
3、日誌分析:將不同的日誌事件路由到不同的消息隊列中進行處理。例如,將用戶行為日誌、伺服器日誌和業務邏輯日誌分別存儲到不同的消息隊列中。
三、Topic 模型的代碼實現
1、發送消息
const amqp = require('amqplib/callback_api'); // 創建連接 amqp.connect('amqp://localhost', function (err, connection) { // 創建頻道 connection.createChannel(function (err, channel) { // 設置交換機類型為 topic let ex = 'topic_logs'; let msg = process.argv.slice(2).join(' ') || 'Hello World!'; let severity = process.argv.slice(2).join(' ') || 'anonymous.info'; // 發布消息 channel.assertExchange(ex, 'topic', { durable: false }); channel.publish(ex, severity, Buffer.from(msg)); console.log(" [x] Sent %s: '%s'", severity, msg); // 關閉頻道和連接 setTimeout(function () { channel.close(); connection.close(); }, 500); }); });
2、接收消息
const amqp = require('amqplib/callback_api'); // 創建連接 amqp.connect('amqp://localhost', function (err, connection) { // 創建頻道 connection.createChannel(function (err, channel) { // 設置交換機類型為 topic let ex = 'topic_logs'; let args = process.argv.slice(2); // 聲明交換機和隊列 channel.assertExchange(ex, 'topic', { durable: false }); channel.assertQueue('', { exclusive: true }, function (err, q) { // 根據傳入參數綁定相應的路由鍵 args.forEach(function (key) { channel.bindQueue(q.queue, ex, key); }); // 監聽消息隊列 channel.consume(q.queue, function (msg) { console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString()); }, { noAck: true }); }); }); });
四、Topic 模型的優化
在 RabbitMQ 中,實現 Topic 模型的最佳實踐是合理地設計交換機和 Routing Keys。對於同一類主題的消息,應該儘可能地使用相同的 Routing Keys 策略。如果存在多個 Routing Keys,那麼建議將其扇出(Fanout)為多個隊列,避免重複消費等問題。
同時,在實現 Topic 模型時,應儘可能將消費者和生產者進行解耦。例如,可以使用 RPC(Remote Procedure Call,遠程過程調用)方式實現消費者和生產者之間的通信,從而極大地提高消息隊列的應用效率。
原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/283009.html