RabbitMQ Demo

一、RabbitMQ基礎

RabbitMQ是一個消息中間件,用於在分布式應用程序中傳遞消息,提供了一種基於AMQP協議的可靠消息傳遞機制。

RabbitMQ的核心概念包括:生產者(Producer)、交換機(Exchange)、隊列(Queue)和消費者(Consumer)。生產者將消息發布到交換機,交換機根據綁定的規則,將消息路由到一個或多個隊列中,消費者從隊列中接收消息。

RabbitMQ支持多種消息傳遞模式,包括點對點(Point-to-Point)、發布-訂閱(Publish-Subscribe)、路由(Routing)和主題(Topics)等。

二、RabbitMQ Demo示例

1. Point-to-Point示例

在Point-to-Point模式下,生產者將消息發送給一個隊列,消費者從該隊列中接收消息。

// 生產者
public static void send() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare("hello", false, false, false, null);
    String message = "Hello RabbitMQ!";
    channel.basicPublish("", "hello", null, message.getBytes());

    System.out.println("Sent message: " + message);

    channel.close();
    connection.close();
}

// 消費者
public static void receive() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare("hello", false, false, false, null);
    System.out.println("Waiting for messages...");

    Consumer consumer = new DefaultConsumer(channel) {
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("Received message: " + message);
        }
    };
    channel.basicConsume("hello", true, consumer);

    channel.close();
    connection.close();
}

2. Publish-Subscribe示例

在Publish-Subscribe模式下,生產者將消息發送給一個交換機,該交換機將消息路由到所有綁定了該交換機的隊列中,每個消費者都從一個隊列中接收消息。

// 生產者
public static void send() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare("logs", "fanout");
    String message = "Hello RabbitMQ!";
    channel.basicPublish("logs", "", null, message.getBytes());

    System.out.println("Sent message: " + message);

    channel.close();
    connection.close();
}

// 消費者1
public static void receive1() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare("logs", "fanout");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, "logs", "");

    System.out.println("Waiting for messages...");

    Consumer consumer = new DefaultConsumer(channel) {
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("Received message: " + message + " from queue1");
        }
    };
    channel.basicConsume(queueName, true, consumer);

    channel.close();
    connection.close();
}

// 消費者2
public static void receive2() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare("logs", "fanout");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, "logs", "");

    System.out.println("Waiting for messages...");

    Consumer consumer = new DefaultConsumer(channel) {
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("Received message: " + message + " from queue2");
        }
    };
    channel.basicConsume(queueName, true, consumer);

    channel.close();
    connection.close();
}

3. Routing示例

在Routing模式下,生產者將消息發送給一個帶有routing key的交換機,交換機根據隊列與交換機的綁定關係,將消息路由到指定隊列中,消費者從指定隊列中接收消息。

// 生產者
public static void send() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare("direct_logs", "direct");
    String message = "Hello RabbitMQ!";
    channel.basicPublish("direct_logs", "error", null, message.getBytes());

    System.out.println("Sent message: " + message);

    channel.close();
    connection.close();
}

// 消費者1
public static void receive1() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare("direct_logs", "direct");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, "direct_logs", "error");

    System.out.println("Waiting for messages...");

    Consumer consumer = new DefaultConsumer(channel) {
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("Received message: " + message + " from queue1");
        }
    };
    channel.basicConsume(queueName, true, consumer);

    channel.close();
    connection.close();
}

// 消費者2
public static void receive2() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare("direct_logs", "direct");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, "direct_logs", "warning");

    System.out.println("Waiting for messages...");

    Consumer consumer = new DefaultConsumer(channel) {
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("Received message: " + message + " from queue2");
        }
    };
    channel.basicConsume(queueName, true, consumer);

    channel.close();
    connection.close();
}

4. Topics示例

在Topics模式下,生產者將消息發送給一個帶有routing key的交換機,交換機根據隊列與交換機的綁定關係,將消息路由到指定隊列中,消費者從指定隊列中接收消息。Topic模式下的routing key可以使用通配符*和#,*表示匹配單個單詞,#表示匹配任意個單詞。

// 生產者
public static void send() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare("topic_logs", "topic");
    String message = "Hello RabbitMQ!";
    channel.basicPublish("topic_logs", "user.info", null, message.getBytes());

    System.out.println("Sent message: " + message);

    channel.close();
    connection.close();
}

// 消費者1
public static void receive1() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare("topic_logs", "topic");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, "topic_logs", "*.info");

    System.out.println("Waiting for messages...");

    Consumer consumer = new DefaultConsumer(channel) {
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("Received message: " + message + " from queue1");
        }
    };
    channel.basicConsume(queueName, true, consumer);

    channel.close();
    connection.close();
}

// 消費者2
public static void receive2() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare("topic_logs", "topic");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, "topic_logs", "*.error");

    System.out.println("Waiting for messages...");

    Consumer consumer = new DefaultConsumer(channel) {
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("Received message: " + message + " from queue2");
        }
    };
    channel.basicConsume(queueName, true, consumer);

    channel.close();
    connection.close();
}

三、總結

本文介紹了RabbitMQ的基礎概念及其使用示例,希望可以對大家了解RabbitMQ有所幫助。

原創文章,作者:CUJIJ,如若轉載,請註明出處:https://www.506064.com/zh-hant/n/368415.html

(0)
打賞 微信掃一掃 微信掃一掃 支付寶掃一掃 支付寶掃一掃
CUJIJ的頭像CUJIJ
上一篇 2025-04-12 01:13
下一篇 2025-04-12 01:13

相關推薦

  • RabbitMQ和Yii2的消息隊列應用

    本文將探討RabbitMQ和Yii2之間的消息隊列應用。從概念、安裝和配置、使用實例等多個方面詳細講解,幫助讀者了解和掌握RabbitMQ和Yii2的消息隊列應用。 一、Rabbi…

    編程 2025-04-29
  • RabbitMQ Server 3.8.0使用指南

    RabbitMQ Server 3.8.0是一個開源的消息隊列軟件,官方網站為https://www.rabbitmq.com,本文將為你講解如何使用RabbitMQ Server…

    編程 2025-04-27
  • RabbitMQ如何解決重複消費

    RabbitMQ是一個消息隊列中間件,經常在分布式系統中起到至關重要的作用。但是消息的重複消費也是一個大家經常會遇到的問題。這篇文章將針對RabbitMQ如何解決重複消費做出詳細的…

    編程 2025-04-27
  • RabbitMQ安裝教程 for Linux

    一、檢測依賴性 在安裝RabbitMQ之前,需要安裝Erlang,因為RabbitMQ是用Erlang編寫的。可以在終端命令中輸入以下命令來檢查Erlang是否已在系統中安裝: e…

    編程 2025-04-23
  • Linux安裝RabbitMQ指南

    一、前言 RabbitMQ是一個被廣泛使用的開源消息代理軟件,它採用Erlang語言開發,因其穩定性和可靠性而備受歡迎。在本文中,我們將學習如何在Linux操作系統上安裝和配置Ra…

    編程 2025-04-23
  • RabbitMQ默認端口號詳解

    一、端口號與RabbitMQ的關係 在網絡通信中,端口號用於標識一台計算機中運行的不同進程。在RabbitMQ中,端口號則主要用於標識不同的RabbitMQ實例以及RabbitMQ…

    編程 2025-04-12
  • RabbitMQ延遲隊列詳解

    一、RabbitMQ的延遲隊列概述 RabbitMQ是一個開源的消息隊列中間件,被廣泛應用於長連接數據推送、數據異步處理、系統解耦等場景中。它使用 Erlang 語言編寫,具有快速…

    編程 2025-02-05
  • 深入Qt Demo

    一、實用工具類 Qt Demo中包含了許多實用工具類,可以方便開發人員進行開發,提高生產力。以下是幾個常用的實用工具類: 1. QRegularExpression 正則表達式是一…

    編程 2025-02-05
  • three.js demo詳解

    一、three.js是什麼 在介紹three.js demo之前,我們先來了解一下three.js是什麼。three.js是一個用於開發WebGL的JavaScript庫,它可以讓…

    編程 2025-01-27
  • RabbitMQ交換機詳解

    一、交換機 1、交換機是什麼? 在 RabbitMQ 中,消息的發送者和接收者都是需要根據特定的規則進行處理,而這個規則的核心就是交換機。交換機決定了消息在 RabbitMQ 中的…

    編程 2025-01-24

發表回復

登錄後才能評論