RabbitMQ Demo

一、RabbitMQ基础

RabbitMQ是一个消息中间件,用于在分布式应用程序中传递消息,提供了一种基于AMQP协议的可靠消息传递机制。

RabbitMQ的核心概念包括:生产者(Producer)、交换机(Exchange)、队列(Queue)和消费者(Consumer)。生产者将消息发布到交换机,交换机根据绑定的规则,将消息路由到一个或多个队列中,消费者从队列中接收消息。

RabbitMQ支持多种消息传递模式,包括点对点(Point-to-Point)、发布-订阅(Publish-Subscribe)、路由(Routing)和主题(Topics)等。

二、RabbitMQ Demo示例

1. Point-to-Point示例

在Point-to-Point模式下,生产者将消息发送给一个队列,消费者从该队列中接收消息。

// 生产者
public static void send() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare("hello", false, false, false, null);
    String message = "Hello RabbitMQ!";
    channel.basicPublish("", "hello", null, message.getBytes());

    System.out.println("Sent message: " + message);

    channel.close();
    connection.close();
}

// 消费者
public static void receive() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare("hello", false, false, false, null);
    System.out.println("Waiting for messages...");

    Consumer consumer = new DefaultConsumer(channel) {
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("Received message: " + message);
        }
    };
    channel.basicConsume("hello", true, consumer);

    channel.close();
    connection.close();
}

2. Publish-Subscribe示例

在Publish-Subscribe模式下,生产者将消息发送给一个交换机,该交换机将消息路由到所有绑定了该交换机的队列中,每个消费者都从一个队列中接收消息。

// 生产者
public static void send() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare("logs", "fanout");
    String message = "Hello RabbitMQ!";
    channel.basicPublish("logs", "", null, message.getBytes());

    System.out.println("Sent message: " + message);

    channel.close();
    connection.close();
}

// 消费者1
public static void receive1() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare("logs", "fanout");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, "logs", "");

    System.out.println("Waiting for messages...");

    Consumer consumer = new DefaultConsumer(channel) {
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("Received message: " + message + " from queue1");
        }
    };
    channel.basicConsume(queueName, true, consumer);

    channel.close();
    connection.close();
}

// 消费者2
public static void receive2() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare("logs", "fanout");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, "logs", "");

    System.out.println("Waiting for messages...");

    Consumer consumer = new DefaultConsumer(channel) {
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("Received message: " + message + " from queue2");
        }
    };
    channel.basicConsume(queueName, true, consumer);

    channel.close();
    connection.close();
}

3. Routing示例

在Routing模式下,生产者将消息发送给一个带有routing key的交换机,交换机根据队列与交换机的绑定关系,将消息路由到指定队列中,消费者从指定队列中接收消息。

// 生产者
public static void send() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare("direct_logs", "direct");
    String message = "Hello RabbitMQ!";
    channel.basicPublish("direct_logs", "error", null, message.getBytes());

    System.out.println("Sent message: " + message);

    channel.close();
    connection.close();
}

// 消费者1
public static void receive1() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare("direct_logs", "direct");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, "direct_logs", "error");

    System.out.println("Waiting for messages...");

    Consumer consumer = new DefaultConsumer(channel) {
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("Received message: " + message + " from queue1");
        }
    };
    channel.basicConsume(queueName, true, consumer);

    channel.close();
    connection.close();
}

// 消费者2
public static void receive2() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare("direct_logs", "direct");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, "direct_logs", "warning");

    System.out.println("Waiting for messages...");

    Consumer consumer = new DefaultConsumer(channel) {
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("Received message: " + message + " from queue2");
        }
    };
    channel.basicConsume(queueName, true, consumer);

    channel.close();
    connection.close();
}

4. Topics示例

在Topics模式下,生产者将消息发送给一个带有routing key的交换机,交换机根据队列与交换机的绑定关系,将消息路由到指定队列中,消费者从指定队列中接收消息。Topic模式下的routing key可以使用通配符*和#,*表示匹配单个单词,#表示匹配任意个单词。

// 生产者
public static void send() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare("topic_logs", "topic");
    String message = "Hello RabbitMQ!";
    channel.basicPublish("topic_logs", "user.info", null, message.getBytes());

    System.out.println("Sent message: " + message);

    channel.close();
    connection.close();
}

// 消费者1
public static void receive1() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare("topic_logs", "topic");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, "topic_logs", "*.info");

    System.out.println("Waiting for messages...");

    Consumer consumer = new DefaultConsumer(channel) {
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("Received message: " + message + " from queue1");
        }
    };
    channel.basicConsume(queueName, true, consumer);

    channel.close();
    connection.close();
}

// 消费者2
public static void receive2() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare("topic_logs", "topic");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, "topic_logs", "*.error");

    System.out.println("Waiting for messages...");

    Consumer consumer = new DefaultConsumer(channel) {
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("Received message: " + message + " from queue2");
        }
    };
    channel.basicConsume(queueName, true, consumer);

    channel.close();
    connection.close();
}

三、总结

本文介绍了RabbitMQ的基础概念及其使用示例,希望可以对大家了解RabbitMQ有所帮助。

原创文章,作者:CUJIJ,如若转载,请注明出处:https://www.506064.com/n/368415.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
CUJIJCUJIJ
上一篇 2025-04-12 01:13
下一篇 2025-04-12 01:13

相关推荐

  • RabbitMQ和Yii2的消息队列应用

    本文将探讨RabbitMQ和Yii2之间的消息队列应用。从概念、安装和配置、使用实例等多个方面详细讲解,帮助读者了解和掌握RabbitMQ和Yii2的消息队列应用。 一、Rabbi…

    编程 2025-04-29
  • RabbitMQ Server 3.8.0使用指南

    RabbitMQ Server 3.8.0是一个开源的消息队列软件,官方网站为https://www.rabbitmq.com,本文将为你讲解如何使用RabbitMQ Server…

    编程 2025-04-27
  • RabbitMQ如何解决重复消费

    RabbitMQ是一个消息队列中间件,经常在分布式系统中起到至关重要的作用。但是消息的重复消费也是一个大家经常会遇到的问题。这篇文章将针对RabbitMQ如何解决重复消费做出详细的…

    编程 2025-04-27
  • RabbitMQ安装教程 for Linux

    一、检测依赖性 在安装RabbitMQ之前,需要安装Erlang,因为RabbitMQ是用Erlang编写的。可以在终端命令中输入以下命令来检查Erlang是否已在系统中安装: e…

    编程 2025-04-23
  • Linux安装RabbitMQ指南

    一、前言 RabbitMQ是一个被广泛使用的开源消息代理软件,它采用Erlang语言开发,因其稳定性和可靠性而备受欢迎。在本文中,我们将学习如何在Linux操作系统上安装和配置Ra…

    编程 2025-04-23
  • RabbitMQ默认端口号详解

    一、端口号与RabbitMQ的关系 在网络通信中,端口号用于标识一台计算机中运行的不同进程。在RabbitMQ中,端口号则主要用于标识不同的RabbitMQ实例以及RabbitMQ…

    编程 2025-04-12
  • RabbitMQ延迟队列详解

    一、RabbitMQ的延迟队列概述 RabbitMQ是一个开源的消息队列中间件,被广泛应用于长连接数据推送、数据异步处理、系统解耦等场景中。它使用 Erlang 语言编写,具有快速…

    编程 2025-02-05
  • 深入Qt Demo

    一、实用工具类 Qt Demo中包含了许多实用工具类,可以方便开发人员进行开发,提高生产力。以下是几个常用的实用工具类: 1. QRegularExpression 正则表达式是一…

    编程 2025-02-05
  • three.js demo详解

    一、three.js是什么 在介绍three.js demo之前,我们先来了解一下three.js是什么。three.js是一个用于开发WebGL的JavaScript库,它可以让…

    编程 2025-01-27
  • RabbitMQ交换机详解

    一、交换机 1、交换机是什么? 在 RabbitMQ 中,消息的发送者和接收者都是需要根据特定的规则进行处理,而这个规则的核心就是交换机。交换机决定了消息在 RabbitMQ 中的…

    编程 2025-01-24

发表回复

登录后才能评论