一、RabbitMQ的基本概念
在學習RabbitMQ之前,必須理解RabbitMQ的基本概念及其作用。RabbitMQ是一個開源的消息隊列系統,用於快速、可靠地處理存儲和轉發消息。它基於AMQP(高級消息隊列協議)開發,提供了許多高級特性,如消息持久性、發布/訂閱模式和自動健康監測等。
// RabbitMQ基本操作示例代碼 using RabbitMQ.Client; using System.Text; // 生產者,發布消息 IModel channel = connection.CreateModel(); channel.BasicPublish(exchange: "", routingKey: "hello", body: Encoding.UTF8.GetBytes("Hello World!")); // 消費者,接收消息 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { byte[] messageBodyBytes = ea.Body.ToArray(); string message = Encoding.UTF8.GetString(messageBodyBytes); Console.WriteLine("Received {0}", message); }; channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);
上面的代碼示例展示了如何在生產者端發送消息,並在消費者端接收消息。其中,生產者通過BasicPublish()方法來發布消息,消費者則通過BasicConsume()方法來訂閱消息。另外,RabbitMQ還支持交換機(exchange)機制,用於在隊列之間進行消息路由。
二、交換機的類型
RabbitMQ支持四種類型的交換機:direct、fanout、topic和headers。它們分別對應著不同的消息路由策略。其中,direct模式是最簡單的模式,它會將消息路由到與routing key完全匹配的隊列中。而fanout模式則是廣播模式,它會將消息路由到所有綁定到該交換機上的隊列中。topic模式則是按照正則表達式匹配規則來路由消息,而headers則是通過消息頭中的鍵值對來路由消息。
// Direct模式示例代碼 // 生產者,發布消息 channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct); var severity = (args.Length > 0) ? args[0] : "info"; var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World!"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "direct_logs", routingKey: severity, basicProperties: null, body: body); // 消費者,接收消息 channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct); var queueName = channel.QueueDeclare().QueueName; foreach (var severity in args) { channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: severity); } var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
上面的代碼示例演示了如何使用direct模式來發送和接收消息。其中,生產者通過ExchangeDeclare()方法來聲明交換機,通過BasicPublish()方法來發布消息。消費者則通過QueueDeclare()方法來創建隊列,通過QueueBind()方法來綁定隊列和交換機,然後通過BasicConsume()方法來訂閱消息。
三、消息的持久化
當RabbitMQ伺服器關閉或者崩潰時,所有未被處理的消息都會丟失,這是非常危險的。為了避免這種情況的發生,我們可以將消息設置為持久化。當消息被設置為持久化時,RabbitMQ會將消息寫入到硬碟中,這樣即使伺服器崩潰,消息也可以被恢復。
// 持久化示例代碼 // 生產者,發布持久化消息 channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout); var properties = channel.CreateBasicProperties(); properties.Persistent = true; var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: properties, body: body); // 消費者,接收持久化消息 channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); var consumer = new EventingBasicConsumer(channel); channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); };
上面的代碼示例演示了如何將消息設置為持久化。在生產者端,我們通過設置properties的Persistent屬性來將消息設置為持久化消息。而在消費者端,則需要在QueueDeclare()方法中設置durable為true來聲明隊列為持久化隊列。此外,在處理完一條消息後,需要使用BasicAck()方法來確認消息已經被消費。
四、RabbitMQ的高可用性
RabbitMQ支持多個節點之間的集群,從而提高了消息隊列的高可用性。在RabbitMQ集群中,每個節點都具有相同的角色,並且它們之間可以共享相同的隊列和交換機。當其中一個節點崩潰時,其他節點會自動接管它的任務,從而保證了消息隊列的可靠性。
// RabbitMQ集群示例代碼 // 創建RabbitMQ連接 var factory = new ConnectionFactory() { HostName = "localhost", UserName = "guest", Password = "guest", Port = 5672, VirtualHost = "/" }; var connection = factory.CreateConnection(); // 創建Model var channel = connection.CreateModel(); // 聲明Exchange,Queue,並綁定。其中,Queue需要設置為exclusive=false,以支持多個消費者。 channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout); var queueName = channel.QueueDeclare().QueueName; channel.QueueBind(queue: queueName, exchange: "logs", routingKey: ""); // 創建消費者,用於消費消息 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
上面的代碼示例展示了如何在RabbitMQ集群中創建Exchange、Queue並進行綁定。其中,生產者和消費者都應該連接到集群中的某一個節點進行消息傳遞。
原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/283625.html