在消息隊列模型中,如何將消息廣播到所有的消費者,這種模式稱為“發布/訂閱”。本文主要以一個簡單的小例子,簡述通過fanout交換機,實現消息的發布與訂閱,僅供學習分享使用,如有不足之處,還請指正。
Fanout交換機模型
扇形交換機,採用廣播模式,根據綁定的交換機,路由到與之對應的所有隊列。一個發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上。很像子網廣播,每檯子網內的主機都獲得了一份複製的消息。Fanout交換機轉發消息是最快的。

RabbitMQ控制台操作
新增兩個隊列
在同一個Virtual host下新增兩個隊列Q1,Q2,如下圖所示:

綁定fanout交換機
將兩個隊列綁定到系統默認的fanout交換機,如下所示:

示例效果圖
生產者,採用Fanout類型交換機發布消息,如下圖所示:

當生產者發布 一條消息時,Q1,Q2兩個隊列均會收到,如下圖所示:

當啟動消費者後,兩個消費者,均會訂閱到相關消息,如下圖所示:

核心代碼
消息發布
建立連接後,將通道聲明類型為Fanout的交換機,如下所示:

1 /// <summary>
2 /// fanout類型交換機,發送消息
3 /// </summary>
4 public class RabbitMqFanoutSendHelper : RabbitMqHelper {
5 /// <summary>
6 /// 發送消息
7 /// </summary>
8 /// <param name="msg"></param>
9 /// <returns></returns>
10 public bool SendMsg(string msg)
11 {
12 try
13 {
14 using (var conn = GetConnection("/Alan.hsiang"))
15 {
16 using (var channel = conn.CreateModel())
17 {
18 channel.ExchangeDeclare(exchange: "amq.fanout", type: ExchangeType.Fanout,durable:true);
19
20 var body = Encoding.UTF8.GetBytes(msg);
21
22 channel.BasicPublish(exchange: "amq.fanout",
23 routingKey: "",
24 basicProperties: null,
25 body: body);
26
27 //Console.WriteLine(" [x] Sent {0}", message);
28 };
29 };
30 return true;
31 }
32 catch (Exception ex)
33 {
34 throw ex;
35 }
36 }
37 }

消息訂閱
建立連接後,通道聲明類型為Fanout的交換機,並綁定隊列進行訂閱,如下所示:

1 /// <summary>
2 /// 扇形交換機接收消息
3 /// </summary>
4 public class RabbitMqFanoutReceiveHelper : RabbitMqHelper
5 {
6 public RabbitMqReceiveEventHandler OnReceiveEvent;
7
8 private IConnection conn;
9
10 private IModel channel;
11
12 private EventingBasicConsumer consumer;
13
14 public bool StartReceiveMsg(string queueName)
15 {
16 try
17 {
18 conn = GetConnection("/Alan.hsiang");
19
20 channel = conn.CreateModel();
21 channel.ExchangeDeclare(exchange: "amq.fanout", type: ExchangeType.Fanout,durable:true);
22 //此處隨機取出交換機下的隊列
23 //var queueName = channel.QueueDeclare().QueueName;
24 channel.QueueBind(queue: queueName, exchange: "amq.fanout", routingKey: "");
25 consumer = new EventingBasicConsumer(channel);
26 consumer.Received += (model, ea) =>
27 {
28 var body = ea.Body.ToArray();
29 var message = Encoding.UTF8.GetString(body);
30 //Console.WriteLine(" [x] Received {0}", message);
31 if (OnReceiveEvent != null)
32 {
33 OnReceiveEvent(queueName+"::"+message);
34 }
35 };
36 channel.BasicConsume(queue: queueName,
37 autoAck: true,
38 consumer: consumer);
39 return true;
40 }
41 catch (Exception ex)
42 {
43 throw ex;
44 }
45 }
46 }
原創文章,作者:投稿專員,如若轉載,請註明出處:https://www.506064.com/zh-hant/n/224579.html