RabbitMQ發送消息詳解

一、RabbitMQ發送消息的方法

RabbitMQ是一個強大的分散式消息隊列系統,可以實現應用程序之間的非同步消息傳遞。RabbitMQ發送消息的方法非常簡單,只需要通過客戶端向指定的隊列發送一條消息即可。


import pika
import json

# 連接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 發送消息到隊列中
message = {'name': 'alice', 'age': 25}
channel.basic_publish(exchange='', routing_key='queue_name', body=json.dumps(message))

# 關閉連接
connection.close()

以上代碼展示了如何在Python中使用pika庫連接RabbitMQ,將消息發送到名為「queue_name」的隊列中。

二、RabbitMQ定時發送消息

有時候需要在指定的時間發送消息。這時可以使用RabbitMQ的插件 —— RabbitMQ Delayed Message Exchange。

首先需要安裝該插件,方法如下:


# 啟用rabbitmq_management插件
rabbitmq-plugins enable rabbitmq_management
# 下載並安裝Delayed Message插件
wget https://dl.bintray.com/rabbitmq/community-plugins/3.8.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-3.8.x-${VERSION}.ez
mv rabbitmq_delayed_message_exchange-3.8.x-${VERSION}.ez /usr/lib/rabbitmq/lib/rabbitmq_server-${VERSION}/plugins/

安裝完畢後,即可通過指定消息的expiration屬性來實現消息的定時發送。消息的expiration屬性表示消息過期的時間,單位為毫秒。


import pika
import json

# 連接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 發送延遲消息到隊列中
message = {'name': 'alice', 'age': 25}
channel.basic_publish(
    exchange='delayed_exchange',
    routing_key='delayed_queue_name',
    body=json.dumps(message),
    properties=pika.BasicProperties(expiration='10000')
)

# 關閉連接
connection.close()

以上代碼展示了如何在指定的隊列「delayed_queue_name」中發送一條10秒後才會被消費的消息。

三、RabbitMQ發送消息後MQ沒收到

當我們發送消息時,如果MQ沒有收到消息,我們需要檢查以下幾點:

1、隊列是否存在

發送消息前需先確認隊列是否存在,否則之後發送的消息會導致MQ無法正常接收消息。可以通過以下代碼創建隊列:


import pika

# 連接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 創建隊列
channel.queue_declare(queue='queue_name')

# 關閉連接
connection.close()

2、交換機是否綁定了隊列

在RabbitMQ中,消息通過交換機分發到不同的隊列。在發送消息之前,需要確保該交換機已經與消費者隊列進行了綁定。可以通過以下代碼進行交換機和隊列的綁定:


import pika

# 連接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 設置隊列名稱
queue_name = 'queue_name'

# 綁定交換機和隊列
channel.exchange_declare(exchange='exchange_name', exchange_type='direct')
channel.queue_declare(queue=queue_name)
channel.queue_bind(queue=queue_name, exchange='exchange_name', routing_key='routing_key')

# 關閉連接
connection.close()

四、RabbitMQ發送消息的API

RabbitMQ提供了多種消息傳遞的API,包括AMQP、STOMP、MQTT等。其中,AMQP(Advanced Message Queuing Protocol)是RabbitMQ的默認協議,具有更高的可移植性和更好的性能。

以下是用AMQP協議發送消息的示例代碼:


import pika

# 連接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 創建隊列
channel.queue_declare(queue='queue_name')

# 發送消息到隊列中
channel.basic_publish(exchange='',
                      routing_key='queue_name',
                      body='Hello World')

# 關閉連接
connection.close()

五、RabbitMQ發送消息的參數

RabbitMQ發送消息時可以設置多個參數,比如消息的優先順序,持久性等。下面是發送一個持久化的、有優先順序的消息示例:


import pika

# 連接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 創建隊列
channel.queue_declare(queue='queue_name', durable=True)

# 發送有優先順序的消息到隊列中
channel.basic_publish(exchange='',
                      routing_key='queue_name',
                      body='Hello World',
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # 持久化消息
                          priority=1  # 設置消息優先順序為1
                      ))

# 關閉連接
connection.close()

六、RabbitMQ發送消息設置編碼格式

RabbitMQ發送消息時,默認的編碼格式是二進位,如果要發送其他編碼格式的消息,需要在發送消息前將消息進行編碼。下面是發送一個UTF-8編碼的消息示例:


import pika

# 連接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 創建隊列
channel.queue_declare(queue='queue_name')

# 發送UTF-8編碼的消息到隊列中
text = '你好世界'
channel.basic_publish(exchange='',
                      routing_key='queue_name',
                      body=text.encode('utf-8'))

# 關閉連接
connection.close()

七、RabbitMQ發送消息的調用方法

在RabbitMQ中,消息的發送默認是同步的,即消息發送完成後,程序會一直等待MQ的確認消息。

如果我們需要非同步的方式發送消息,則我們可以通過以下方式實現:


import pika
import concurrent.futures

# 連接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 創建隊列
channel.queue_declare(queue='queue_name')

# 創建非同步Executor
executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)

# 非同步發送消息到隊列中
executor.submit(channel.basic_publish,
                exchange='',
                routing_key='queue_name',
                body='Hello World')

# 關閉連接
connection.close()

八、RabbitMQ非同步發送消息

如果需要非同步的方式發送消息,可以使用RabbitMQ提供的非同步API —— aio-pika。

以下是使用aio-pika發送非同步消息的示例代碼:


import aio_pika
import asyncio

async def main(loop):
    # 連接rabbitmq
    connection = await aio_pika.connect_robust(
        host='localhost',
        loop=loop
    )
    channel = await connection.channel()

    # 創建隊列
    queue_name = 'queue_name'
    queue = await channel.declare_queue(queue_name)

    # 發送非同步消息到隊列中
    message = aio_pika.Message(body=b"Hello World")
    await channel.default_exchange.publish(message, routing_key=queue_name)

    # 關閉連接
    await connection.close()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))

九、RabbitMQ接收消息方式

在RabbitMQ中,我們可以使用常規方法接收消息,也可以通過消費者進行接收。

常規方法接收消息示例代碼:


import pika

# 連接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 創建隊列
channel.queue_declare(queue='queue_name')

# 定義回調函數
def callback(ch, method, properties, body):
    print("Received %r" % body)

# 接收消息
channel.basic_consume(queue='queue_name',
                      on_message_callback=callback,
                      auto_ack=True)

# 開始消費
channel.start_consuming()

# 關閉連接
connection.close()

我們還可以通過消費者的方式進行消息的接收,以下是使用消費者的方法接收消息的示例代碼:


import pika
import json

# 連接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 創建隊列
channel.queue_declare(queue='queue_name')

# 定義消費者
def consumer(ch, method, properties, body):
    message = json.loads(body)
    print(f"Received message: {message}")

# 開始消費
channel.basic_consume(queue='queue_name', on_message_callback=consumer)

# 關閉連接
connection.close()

以上就是RabbitMQ發送消息的詳細操作指南,包括了RabbitMQ發送消息的方法、RabbitMQ定時發送消息、RabbitMQ消息發送後MQ未收到、RabbitMQ發送消息的API、RabbitMQ發送消息的參數、RabbitMQ發送消息設置編碼格式、RabbitMQ發送消息的調用方法、RabbitMQ非同步發送消息、RabbitMQ接收消息方式等主題。通過這篇文章的學習,相信大家已經可以輕鬆地實現RabbitMQ消息的發送和接收了。

原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/300606.html

(0)
打賞 微信掃一掃 微信掃一掃 支付寶掃一掃 支付寶掃一掃
小藍的頭像小藍
上一篇 2024-12-29 12:52
下一篇 2024-12-29 12:52

相關推薦

  • RabbitMQ和Yii2的消息隊列應用

    本文將探討RabbitMQ和Yii2之間的消息隊列應用。從概念、安裝和配置、使用實例等多個方面詳細講解,幫助讀者了解和掌握RabbitMQ和Yii2的消息隊列應用。 一、Rabbi…

    編程 2025-04-29
  • Java定時發送消息,毫秒級實現

    本文介紹如何使用Java定時發送消息,實現毫秒級定時,適合需要高精度的定時任務場景。 一、使用Timer和TimerTask實現定時任務 在Java中,我們可以使用Timer和Ti…

    編程 2025-04-28
  • RabbitMQ Server 3.8.0使用指南

    RabbitMQ Server 3.8.0是一個開源的消息隊列軟體,官方網站為https://www.rabbitmq.com,本文將為你講解如何使用RabbitMQ Server…

    編程 2025-04-27
  • RabbitMQ如何解決重複消費

    RabbitMQ是一個消息隊列中間件,經常在分散式系統中起到至關重要的作用。但是消息的重複消費也是一個大家經常會遇到的問題。這篇文章將針對RabbitMQ如何解決重複消費做出詳細的…

    編程 2025-04-27
  • 神經網路代碼詳解

    神經網路作為一種人工智慧技術,被廣泛應用於語音識別、圖像識別、自然語言處理等領域。而神經網路的模型編寫,離不開代碼。本文將從多個方面詳細闡述神經網路模型編寫的代碼技術。 一、神經網…

    編程 2025-04-25
  • Linux sync詳解

    一、sync概述 sync是Linux中一個非常重要的命令,它可以將文件系統緩存中的內容,強制寫入磁碟中。在執行sync之前,所有的文件系統更新將不會立即寫入磁碟,而是先緩存在內存…

    編程 2025-04-25
  • 詳解eclipse設置

    一、安裝與基礎設置 1、下載eclipse並進行安裝。 2、打開eclipse,選擇對應的工作空間路徑。 File -> Switch Workspace -> [選擇…

    編程 2025-04-25
  • Python輸入輸出詳解

    一、文件讀寫 Python中文件的讀寫操作是必不可少的基本技能之一。讀寫文件分別使用open()函數中的’r’和’w’參數,讀取文件…

    編程 2025-04-25
  • Linux修改文件名命令詳解

    在Linux系統中,修改文件名是一個很常見的操作。Linux提供了多種方式來修改文件名,這篇文章將介紹Linux修改文件名的詳細操作。 一、mv命令 mv命令是Linux下的常用命…

    編程 2025-04-25
  • git config user.name的詳解

    一、為什麼要使用git config user.name? git是一個非常流行的分散式版本控制系統,很多程序員都會用到它。在使用git commit提交代碼時,需要記錄commi…

    編程 2025-04-25

發表回復

登錄後才能評論