一、RabbitMQ的基本概念
在学习RabbitMQ之前,必须理解RabbitMQ的基本概念及其作用。RabbitMQ是一个开源的消息队列系统,用于快速、可靠地处理存储和转发消息。它基于AMQP(高级消息队列协议)开发,提供了许多高级特性,如消息持久性、发布/订阅模式和自动健康监测等。
// RabbitMQ基本操作示例代码
using RabbitMQ.Client;
using System.Text;
// 生产者,发布消息
IModel channel = connection.CreateModel();
channel.BasicPublish(exchange: "", routingKey: "hello", body: Encoding.UTF8.GetBytes("Hello World!"));
// 消费者,接收消息
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
byte[] messageBodyBytes = ea.Body.ToArray();
string message = Encoding.UTF8.GetString(messageBodyBytes);
Console.WriteLine("Received {0}", message);
};
channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);
上面的代码示例展示了如何在生产者端发送消息,并在消费者端接收消息。其中,生产者通过BasicPublish()方法来发布消息,消费者则通过BasicConsume()方法来订阅消息。另外,RabbitMQ还支持交换机(exchange)机制,用于在队列之间进行消息路由。
二、交换机的类型
RabbitMQ支持四种类型的交换机:direct、fanout、topic和headers。它们分别对应着不同的消息路由策略。其中,direct模式是最简单的模式,它会将消息路由到与routing key完全匹配的队列中。而fanout模式则是广播模式,它会将消息路由到所有绑定到该交换机上的队列中。topic模式则是按照正则表达式匹配规则来路由消息,而headers则是通过消息头中的键值对来路由消息。
// Direct模式示例代码
// 生产者,发布消息
channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);
var severity = (args.Length > 0) ? args[0] : "info";
var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "direct_logs", routingKey: severity, basicProperties: null, body: body);
// 消费者,接收消息
channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);
var queueName = channel.QueueDeclare().QueueName;
foreach (var severity in args)
{
channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: severity);
}
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey;
Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);
};
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
上面的代码示例演示了如何使用direct模式来发送和接收消息。其中,生产者通过ExchangeDeclare()方法来声明交换机,通过BasicPublish()方法来发布消息。消费者则通过QueueDeclare()方法来创建队列,通过QueueBind()方法来绑定队列和交换机,然后通过BasicConsume()方法来订阅消息。
三、消息的持久化
当RabbitMQ服务器关闭或者崩溃时,所有未被处理的消息都会丢失,这是非常危险的。为了避免这种情况的发生,我们可以将消息设置为持久化。当消息被设置为持久化时,RabbitMQ会将消息写入到硬盘中,这样即使服务器崩溃,消息也可以被恢复。
// 持久化示例代码
// 生产者,发布持久化消息
channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: properties, body: body);
// 消费者,接收持久化消息
channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine(" [x] Done");
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
上面的代码示例演示了如何将消息设置为持久化。在生产者端,我们通过设置properties的Persistent属性来将消息设置为持久化消息。而在消费者端,则需要在QueueDeclare()方法中设置durable为true来声明队列为持久化队列。此外,在处理完一条消息后,需要使用BasicAck()方法来确认消息已经被消费。
四、RabbitMQ的高可用性
RabbitMQ支持多个节点之间的集群,从而提高了消息队列的高可用性。在RabbitMQ集群中,每个节点都具有相同的角色,并且它们之间可以共享相同的队列和交换机。当其中一个节点崩溃时,其他节点会自动接管它的任务,从而保证了消息队列的可靠性。
// RabbitMQ集群示例代码
// 创建RabbitMQ连接
var factory = new ConnectionFactory() { HostName = "localhost", UserName = "guest", Password = "guest", Port = 5672, VirtualHost = "/" };
var connection = factory.CreateConnection();
// 创建Model
var channel = connection.CreateModel();
// 声明Exchange,Queue,并绑定。其中,Queue需要设置为exclusive=false,以支持多个消费者。
channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");
// 创建消费者,用于消费消息
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
};
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
上面的代码示例展示了如何在RabbitMQ集群中创建Exchange、Queue并进行绑定。其中,生产者和消费者都应该连接到集群中的某一个节点进行消息传递。
原创文章,作者:小蓝,如若转载,请注明出处:https://www.506064.com/n/283625.html
微信扫一扫
支付宝扫一扫