一、RabbitMQ基礎
RabbitMQ是一個消息中間件,用於在分散式應用程序中傳遞消息,提供了一種基於AMQP協議的可靠消息傳遞機制。
RabbitMQ的核心概念包括:生產者(Producer)、交換機(Exchange)、隊列(Queue)和消費者(Consumer)。生產者將消息發布到交換機,交換機根據綁定的規則,將消息路由到一個或多個隊列中,消費者從隊列中接收消息。
RabbitMQ支持多種消息傳遞模式,包括點對點(Point-to-Point)、發布-訂閱(Publish-Subscribe)、路由(Routing)和主題(Topics)等。
二、RabbitMQ Demo示例
1. Point-to-Point示例
在Point-to-Point模式下,生產者將消息發送給一個隊列,消費者從該隊列中接收消息。
// 生產者
public static void send() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
String message = "Hello RabbitMQ!";
channel.basicPublish("", "hello", null, message.getBytes());
System.out.println("Sent message: " + message);
channel.close();
connection.close();
}
// 消費者
public static void receive() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
System.out.println("Waiting for messages...");
Consumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message);
}
};
channel.basicConsume("hello", true, consumer);
channel.close();
connection.close();
}
2. Publish-Subscribe示例
在Publish-Subscribe模式下,生產者將消息發送給一個交換機,該交換機將消息路由到所有綁定了該交換機的隊列中,每個消費者都從一個隊列中接收消息。
// 生產者
public static void send() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs", "fanout");
String message = "Hello RabbitMQ!";
channel.basicPublish("logs", "", null, message.getBytes());
System.out.println("Sent message: " + message);
channel.close();
connection.close();
}
// 消費者1
public static void receive1() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs", "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "logs", "");
System.out.println("Waiting for messages...");
Consumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message + " from queue1");
}
};
channel.basicConsume(queueName, true, consumer);
channel.close();
connection.close();
}
// 消費者2
public static void receive2() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("logs", "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "logs", "");
System.out.println("Waiting for messages...");
Consumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message + " from queue2");
}
};
channel.basicConsume(queueName, true, consumer);
channel.close();
connection.close();
}
3. Routing示例
在Routing模式下,生產者將消息發送給一個帶有routing key的交換機,交換機根據隊列與交換機的綁定關係,將消息路由到指定隊列中,消費者從指定隊列中接收消息。
// 生產者
public static void send() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("direct_logs", "direct");
String message = "Hello RabbitMQ!";
channel.basicPublish("direct_logs", "error", null, message.getBytes());
System.out.println("Sent message: " + message);
channel.close();
connection.close();
}
// 消費者1
public static void receive1() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("direct_logs", "direct");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "direct_logs", "error");
System.out.println("Waiting for messages...");
Consumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message + " from queue1");
}
};
channel.basicConsume(queueName, true, consumer);
channel.close();
connection.close();
}
// 消費者2
public static void receive2() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("direct_logs", "direct");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "direct_logs", "warning");
System.out.println("Waiting for messages...");
Consumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message + " from queue2");
}
};
channel.basicConsume(queueName, true, consumer);
channel.close();
connection.close();
}
4. Topics示例
在Topics模式下,生產者將消息發送給一個帶有routing key的交換機,交換機根據隊列與交換機的綁定關係,將消息路由到指定隊列中,消費者從指定隊列中接收消息。Topic模式下的routing key可以使用通配符*和#,*表示匹配單個單詞,#表示匹配任意個單詞。
// 生產者
public static void send() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topic_logs", "topic");
String message = "Hello RabbitMQ!";
channel.basicPublish("topic_logs", "user.info", null, message.getBytes());
System.out.println("Sent message: " + message);
channel.close();
connection.close();
}
// 消費者1
public static void receive1() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topic_logs", "topic");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "topic_logs", "*.info");
System.out.println("Waiting for messages...");
Consumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message + " from queue1");
}
};
channel.basicConsume(queueName, true, consumer);
channel.close();
connection.close();
}
// 消費者2
public static void receive2() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("topic_logs", "topic");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "topic_logs", "*.error");
System.out.println("Waiting for messages...");
Consumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message + " from queue2");
}
};
channel.basicConsume(queueName, true, consumer);
channel.close();
connection.close();
}
三、總結
本文介紹了RabbitMQ的基礎概念及其使用示例,希望可以對大家了解RabbitMQ有所幫助。
原創文章,作者:CUJIJ,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/368415.html
微信掃一掃
支付寶掃一掃