一、RabbitMQ發送消息的方法
RabbitMQ是一個強大的分散式消息隊列系統,可以實現應用程序之間的非同步消息傳遞。RabbitMQ發送消息的方法非常簡單,只需要通過客戶端向指定的隊列發送一條消息即可。
import pika
import json
# 連接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 發送消息到隊列中
message = {'name': 'alice', 'age': 25}
channel.basic_publish(exchange='', routing_key='queue_name', body=json.dumps(message))
# 關閉連接
connection.close()
以上代碼展示了如何在Python中使用pika庫連接RabbitMQ,將消息發送到名為「queue_name」的隊列中。
二、RabbitMQ定時發送消息
有時候需要在指定的時間發送消息。這時可以使用RabbitMQ的插件 —— RabbitMQ Delayed Message Exchange。
首先需要安裝該插件,方法如下:
# 啟用rabbitmq_management插件
rabbitmq-plugins enable rabbitmq_management
# 下載並安裝Delayed Message插件
wget https://dl.bintray.com/rabbitmq/community-plugins/3.8.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-3.8.x-${VERSION}.ez
mv rabbitmq_delayed_message_exchange-3.8.x-${VERSION}.ez /usr/lib/rabbitmq/lib/rabbitmq_server-${VERSION}/plugins/
安裝完畢後,即可通過指定消息的expiration屬性來實現消息的定時發送。消息的expiration屬性表示消息過期的時間,單位為毫秒。
import pika
import json
# 連接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 發送延遲消息到隊列中
message = {'name': 'alice', 'age': 25}
channel.basic_publish(
exchange='delayed_exchange',
routing_key='delayed_queue_name',
body=json.dumps(message),
properties=pika.BasicProperties(expiration='10000')
)
# 關閉連接
connection.close()
以上代碼展示了如何在指定的隊列「delayed_queue_name」中發送一條10秒後才會被消費的消息。
三、RabbitMQ發送消息後MQ沒收到
當我們發送消息時,如果MQ沒有收到消息,我們需要檢查以下幾點:
1、隊列是否存在
發送消息前需先確認隊列是否存在,否則之後發送的消息會導致MQ無法正常接收消息。可以通過以下代碼創建隊列:
import pika
# 連接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 創建隊列
channel.queue_declare(queue='queue_name')
# 關閉連接
connection.close()
2、交換機是否綁定了隊列
在RabbitMQ中,消息通過交換機分發到不同的隊列。在發送消息之前,需要確保該交換機已經與消費者隊列進行了綁定。可以通過以下代碼進行交換機和隊列的綁定:
import pika
# 連接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 設置隊列名稱
queue_name = 'queue_name'
# 綁定交換機和隊列
channel.exchange_declare(exchange='exchange_name', exchange_type='direct')
channel.queue_declare(queue=queue_name)
channel.queue_bind(queue=queue_name, exchange='exchange_name', routing_key='routing_key')
# 關閉連接
connection.close()
四、RabbitMQ發送消息的API
RabbitMQ提供了多種消息傳遞的API,包括AMQP、STOMP、MQTT等。其中,AMQP(Advanced Message Queuing Protocol)是RabbitMQ的默認協議,具有更高的可移植性和更好的性能。
以下是用AMQP協議發送消息的示例代碼:
import pika
# 連接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 創建隊列
channel.queue_declare(queue='queue_name')
# 發送消息到隊列中
channel.basic_publish(exchange='',
routing_key='queue_name',
body='Hello World')
# 關閉連接
connection.close()
五、RabbitMQ發送消息的參數
RabbitMQ發送消息時可以設置多個參數,比如消息的優先順序,持久性等。下面是發送一個持久化的、有優先順序的消息示例:
import pika
# 連接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 創建隊列
channel.queue_declare(queue='queue_name', durable=True)
# 發送有優先順序的消息到隊列中
channel.basic_publish(exchange='',
routing_key='queue_name',
body='Hello World',
properties=pika.BasicProperties(
delivery_mode=2, # 持久化消息
priority=1 # 設置消息優先順序為1
))
# 關閉連接
connection.close()
六、RabbitMQ發送消息設置編碼格式
RabbitMQ發送消息時,默認的編碼格式是二進位,如果要發送其他編碼格式的消息,需要在發送消息前將消息進行編碼。下面是發送一個UTF-8編碼的消息示例:
import pika
# 連接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 創建隊列
channel.queue_declare(queue='queue_name')
# 發送UTF-8編碼的消息到隊列中
text = '你好世界'
channel.basic_publish(exchange='',
routing_key='queue_name',
body=text.encode('utf-8'))
# 關閉連接
connection.close()
七、RabbitMQ發送消息的調用方法
在RabbitMQ中,消息的發送默認是同步的,即消息發送完成後,程序會一直等待MQ的確認消息。
如果我們需要非同步的方式發送消息,則我們可以通過以下方式實現:
import pika
import concurrent.futures
# 連接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 創建隊列
channel.queue_declare(queue='queue_name')
# 創建非同步Executor
executor = concurrent.futures.ThreadPoolExecutor(max_workers=10)
# 非同步發送消息到隊列中
executor.submit(channel.basic_publish,
exchange='',
routing_key='queue_name',
body='Hello World')
# 關閉連接
connection.close()
八、RabbitMQ非同步發送消息
如果需要非同步的方式發送消息,可以使用RabbitMQ提供的非同步API —— aio-pika。
以下是使用aio-pika發送非同步消息的示例代碼:
import aio_pika
import asyncio
async def main(loop):
# 連接rabbitmq
connection = await aio_pika.connect_robust(
host='localhost',
loop=loop
)
channel = await connection.channel()
# 創建隊列
queue_name = 'queue_name'
queue = await channel.declare_queue(queue_name)
# 發送非同步消息到隊列中
message = aio_pika.Message(body=b"Hello World")
await channel.default_exchange.publish(message, routing_key=queue_name)
# 關閉連接
await connection.close()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
九、RabbitMQ接收消息方式
在RabbitMQ中,我們可以使用常規方法接收消息,也可以通過消費者進行接收。
常規方法接收消息示例代碼:
import pika
# 連接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 創建隊列
channel.queue_declare(queue='queue_name')
# 定義回調函數
def callback(ch, method, properties, body):
print("Received %r" % body)
# 接收消息
channel.basic_consume(queue='queue_name',
on_message_callback=callback,
auto_ack=True)
# 開始消費
channel.start_consuming()
# 關閉連接
connection.close()
我們還可以通過消費者的方式進行消息的接收,以下是使用消費者的方法接收消息的示例代碼:
import pika
import json
# 連接rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 創建隊列
channel.queue_declare(queue='queue_name')
# 定義消費者
def consumer(ch, method, properties, body):
message = json.loads(body)
print(f"Received message: {message}")
# 開始消費
channel.basic_consume(queue='queue_name', on_message_callback=consumer)
# 關閉連接
connection.close()
以上就是RabbitMQ發送消息的詳細操作指南,包括了RabbitMQ發送消息的方法、RabbitMQ定時發送消息、RabbitMQ消息發送後MQ未收到、RabbitMQ發送消息的API、RabbitMQ發送消息的參數、RabbitMQ發送消息設置編碼格式、RabbitMQ發送消息的調用方法、RabbitMQ非同步發送消息、RabbitMQ接收消息方式等主題。通過這篇文章的學習,相信大家已經可以輕鬆地實現RabbitMQ消息的發送和接收了。
原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/300606.html