一、RabbitMQ基礎
RabbitMQ是一個消息中間件,用於在分布式應用程序中傳遞消息,提供了一種基於AMQP協議的可靠消息傳遞機制。
RabbitMQ的核心概念包括:生產者(Producer)、交換機(Exchange)、隊列(Queue)和消費者(Consumer)。生產者將消息發布到交換機,交換機根據綁定的規則,將消息路由到一個或多個隊列中,消費者從隊列中接收消息。
RabbitMQ支持多種消息傳遞模式,包括點對點(Point-to-Point)、發布-訂閱(Publish-Subscribe)、路由(Routing)和主題(Topics)等。
二、RabbitMQ Demo示例
1. Point-to-Point示例
在Point-to-Point模式下,生產者將消息發送給一個隊列,消費者從該隊列中接收消息。
// 生產者 public static void send() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("hello", false, false, false, null); String message = "Hello RabbitMQ!"; channel.basicPublish("", "hello", null, message.getBytes()); System.out.println("Sent message: " + message); channel.close(); connection.close(); } // 消費者 public static void receive() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("hello", false, false, false, null); System.out.println("Waiting for messages..."); Consumer consumer = new DefaultConsumer(channel) { public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Received message: " + message); } }; channel.basicConsume("hello", true, consumer); channel.close(); connection.close(); }
2. Publish-Subscribe示例
在Publish-Subscribe模式下,生產者將消息發送給一個交換機,該交換機將消息路由到所有綁定了該交換機的隊列中,每個消費者都從一個隊列中接收消息。
// 生產者 public static void send() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs", "fanout"); String message = "Hello RabbitMQ!"; channel.basicPublish("logs", "", null, message.getBytes()); System.out.println("Sent message: " + message); channel.close(); connection.close(); } // 消費者1 public static void receive1() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs", "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "logs", ""); System.out.println("Waiting for messages..."); Consumer consumer = new DefaultConsumer(channel) { public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Received message: " + message + " from queue1"); } }; channel.basicConsume(queueName, true, consumer); channel.close(); connection.close(); } // 消費者2 public static void receive2() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs", "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "logs", ""); System.out.println("Waiting for messages..."); Consumer consumer = new DefaultConsumer(channel) { public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Received message: " + message + " from queue2"); } }; channel.basicConsume(queueName, true, consumer); channel.close(); connection.close(); }
3. Routing示例
在Routing模式下,生產者將消息發送給一個帶有routing key的交換機,交換機根據隊列與交換機的綁定關係,將消息路由到指定隊列中,消費者從指定隊列中接收消息。
// 生產者 public static void send() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("direct_logs", "direct"); String message = "Hello RabbitMQ!"; channel.basicPublish("direct_logs", "error", null, message.getBytes()); System.out.println("Sent message: " + message); channel.close(); connection.close(); } // 消費者1 public static void receive1() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("direct_logs", "direct"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "direct_logs", "error"); System.out.println("Waiting for messages..."); Consumer consumer = new DefaultConsumer(channel) { public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Received message: " + message + " from queue1"); } }; channel.basicConsume(queueName, true, consumer); channel.close(); connection.close(); } // 消費者2 public static void receive2() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("direct_logs", "direct"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "direct_logs", "warning"); System.out.println("Waiting for messages..."); Consumer consumer = new DefaultConsumer(channel) { public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Received message: " + message + " from queue2"); } }; channel.basicConsume(queueName, true, consumer); channel.close(); connection.close(); }
4. Topics示例
在Topics模式下,生產者將消息發送給一個帶有routing key的交換機,交換機根據隊列與交換機的綁定關係,將消息路由到指定隊列中,消費者從指定隊列中接收消息。Topic模式下的routing key可以使用通配符*和#,*表示匹配單個單詞,#表示匹配任意個單詞。
// 生產者 public static void send() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("topic_logs", "topic"); String message = "Hello RabbitMQ!"; channel.basicPublish("topic_logs", "user.info", null, message.getBytes()); System.out.println("Sent message: " + message); channel.close(); connection.close(); } // 消費者1 public static void receive1() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("topic_logs", "topic"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "topic_logs", "*.info"); System.out.println("Waiting for messages..."); Consumer consumer = new DefaultConsumer(channel) { public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Received message: " + message + " from queue1"); } }; channel.basicConsume(queueName, true, consumer); channel.close(); connection.close(); } // 消費者2 public static void receive2() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("topic_logs", "topic"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "topic_logs", "*.error"); System.out.println("Waiting for messages..."); Consumer consumer = new DefaultConsumer(channel) { public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Received message: " + message + " from queue2"); } }; channel.basicConsume(queueName, true, consumer); channel.close(); connection.close(); }
三、總結
本文介紹了RabbitMQ的基礎概念及其使用示例,希望可以對大家了解RabbitMQ有所幫助。
原創文章,作者:CUJIJ,如若轉載,請註明出處:https://www.506064.com/zh-hant/n/368415.html