一、RabbitMQ基础
RabbitMQ是一个消息中间件,用于在分布式应用程序中传递消息,提供了一种基于AMQP协议的可靠消息传递机制。
RabbitMQ的核心概念包括:生产者(Producer)、交换机(Exchange)、队列(Queue)和消费者(Consumer)。生产者将消息发布到交换机,交换机根据绑定的规则,将消息路由到一个或多个队列中,消费者从队列中接收消息。
RabbitMQ支持多种消息传递模式,包括点对点(Point-to-Point)、发布-订阅(Publish-Subscribe)、路由(Routing)和主题(Topics)等。
二、RabbitMQ Demo示例
1. Point-to-Point示例
在Point-to-Point模式下,生产者将消息发送给一个队列,消费者从该队列中接收消息。
// 生产者 public static void send() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("hello", false, false, false, null); String message = "Hello RabbitMQ!"; channel.basicPublish("", "hello", null, message.getBytes()); System.out.println("Sent message: " + message); channel.close(); connection.close(); } // 消费者 public static void receive() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("hello", false, false, false, null); System.out.println("Waiting for messages..."); Consumer consumer = new DefaultConsumer(channel) { public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Received message: " + message); } }; channel.basicConsume("hello", true, consumer); channel.close(); connection.close(); }
2. Publish-Subscribe示例
在Publish-Subscribe模式下,生产者将消息发送给一个交换机,该交换机将消息路由到所有绑定了该交换机的队列中,每个消费者都从一个队列中接收消息。
// 生产者 public static void send() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs", "fanout"); String message = "Hello RabbitMQ!"; channel.basicPublish("logs", "", null, message.getBytes()); System.out.println("Sent message: " + message); channel.close(); connection.close(); } // 消费者1 public static void receive1() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs", "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "logs", ""); System.out.println("Waiting for messages..."); Consumer consumer = new DefaultConsumer(channel) { public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Received message: " + message + " from queue1"); } }; channel.basicConsume(queueName, true, consumer); channel.close(); connection.close(); } // 消费者2 public static void receive2() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs", "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "logs", ""); System.out.println("Waiting for messages..."); Consumer consumer = new DefaultConsumer(channel) { public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Received message: " + message + " from queue2"); } }; channel.basicConsume(queueName, true, consumer); channel.close(); connection.close(); }
3. Routing示例
在Routing模式下,生产者将消息发送给一个带有routing key的交换机,交换机根据队列与交换机的绑定关系,将消息路由到指定队列中,消费者从指定队列中接收消息。
// 生产者 public static void send() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("direct_logs", "direct"); String message = "Hello RabbitMQ!"; channel.basicPublish("direct_logs", "error", null, message.getBytes()); System.out.println("Sent message: " + message); channel.close(); connection.close(); } // 消费者1 public static void receive1() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("direct_logs", "direct"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "direct_logs", "error"); System.out.println("Waiting for messages..."); Consumer consumer = new DefaultConsumer(channel) { public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Received message: " + message + " from queue1"); } }; channel.basicConsume(queueName, true, consumer); channel.close(); connection.close(); } // 消费者2 public static void receive2() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("direct_logs", "direct"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "direct_logs", "warning"); System.out.println("Waiting for messages..."); Consumer consumer = new DefaultConsumer(channel) { public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Received message: " + message + " from queue2"); } }; channel.basicConsume(queueName, true, consumer); channel.close(); connection.close(); }
4. Topics示例
在Topics模式下,生产者将消息发送给一个带有routing key的交换机,交换机根据队列与交换机的绑定关系,将消息路由到指定队列中,消费者从指定队列中接收消息。Topic模式下的routing key可以使用通配符*和#,*表示匹配单个单词,#表示匹配任意个单词。
// 生产者 public static void send() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("topic_logs", "topic"); String message = "Hello RabbitMQ!"; channel.basicPublish("topic_logs", "user.info", null, message.getBytes()); System.out.println("Sent message: " + message); channel.close(); connection.close(); } // 消费者1 public static void receive1() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("topic_logs", "topic"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "topic_logs", "*.info"); System.out.println("Waiting for messages..."); Consumer consumer = new DefaultConsumer(channel) { public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Received message: " + message + " from queue1"); } }; channel.basicConsume(queueName, true, consumer); channel.close(); connection.close(); } // 消费者2 public static void receive2() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("topic_logs", "topic"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "topic_logs", "*.error"); System.out.println("Waiting for messages..."); Consumer consumer = new DefaultConsumer(channel) { public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Received message: " + message + " from queue2"); } }; channel.basicConsume(queueName, true, consumer); channel.close(); connection.close(); }
三、总结
本文介绍了RabbitMQ的基础概念及其使用示例,希望可以对大家了解RabbitMQ有所帮助。
原创文章,作者:CUJIJ,如若转载,请注明出处:https://www.506064.com/n/368415.html