一、什麼是消息隊列
在軟件系統中,不同應用程序之間經常需要進行異步通信。通常情況下,這些通信實現起來非常複雜。使用消息隊列可以解決這個問題。
消息隊列就是一種分佈式的消息傳遞系統。消息發送者把消息發送給隊列,消息接收者從隊列中獲取消息來進行處理。消息傳遞過程中,隊列就像一個中間代理,並且具有消息緩存的功能。這樣,消息發送者和接收者就可以獨立進行操作,並且不會出現因為通信故障導致的消息丟失情況。
二、為什麼要使用消息隊列
使用消息隊列的主要目的是為了解耦合。異步通信可以讓系統中的應用程序之間不需要進行直接交互,從而減少了它們之間的依賴關係。消息隊列還可以減輕系統負載,針對高並發場景,消息隊列可以幫助處理大量流量並發請求,從而避免系統崩潰。
三、使用場景
下面介紹一些常見的使用場景:
1、任務隊列:MQ可以代替定時任務框架。定時任務的缺點在於無法對細微的更新進行感知,並且隨着時間的推移,任務積累得越來越多。而使用MQ則可以進行及時的任務派發。
2、處理程序解耦合:服務A想要調用服務B,但是它們的依賴關係很緊密。如果他們之間使用消息隊列,那麼兩個應用程序就沒有直接的依賴關係。因此,可以在不改變代碼的情況下,對服務A和服務B進行部署和更新。
3、流量削峰:應用程序處理流量的峰值是自然而然的,但是如果峰值超出了它的承載能力,這可能會導致應用程序崩潰。使用消息隊列,程序可以在峰值時刻暫存請求,等到峰值結束後再慢慢處理這些請求。這就是流量削峰。
四、RabbitMQ使用教程
1、同步阻塞模式
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
上述代碼演示了消費者向隊列發送消息的過程。BlockingConnection方法返回一個Connection對象,Channel對象則用於發消息。
2、接收消息
import pika
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
上述代碼演示了消費者從隊列中接收消息的過程。basic_consume()方法表示從隊列開始消費消息,當有消息到達時,就會觸發回調函數callback()。這個回調函數打印出消息內容。
3、優化
下面介紹一些RabbitMQ的優化措施:
1、消息持久化:當消息被發送到隊列後,如果在RabbitMQ關閉前都沒有被消費,那麼這條消息就會被刪除。所以,為了防止消息在RabbitMQ宕機後丟失,需要將消息持久化。
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello', durable=True)
channel.basic_publish(
exchange='',
routing_key='hello',
body='Hello World!',
properties=pika.BasicProperties(delivery_mode=2)
)
print(" [x] Sent 'Hello World!'")
connection.close()
將消息持久化的方法是將隊列的durable參數和消息的delivery_mode參數均設置為2。
2、利用Publisher Confirms實現事務控制:所有發送到RabbitMQ服務中的消息都會由RabbitMQ作為生產者發送到交換器。
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.confirm_delivery() # 開啟delivery_confirm模式
channel.basic_publish(exchange='',
routing_key='hello.pika.confirm',
body='hello world from pika confirm.',
properties=pika.BasicProperties(
delivery_mode=2, # 確認消息持久化
))
confirmed = channel.wait_for_confirms()
if confirmed:
print('消息發送成功')
else:
print('消息發送失敗')
connection.close()
在開啟delivery_confirm模式後,每次發送一條消息,都要等待Broker的回應,只有Broker出現問題或者消息寫入後收不到Broker回應才會觸發confirm回調函數。
五、應用場景
消息隊列適用於很多場景,如:
1、電商場景:解決訂單流程中庫存鎖定、創建訂單、支付等問題。MQ在這場景中主要用於訂單下發和處理,並且在處理這些訂單的時候,需要根據產品庫存、用戶餘額等信息進行處理。
2、異步處理:在大數據處理或計算過程中,結果處理時間較長,需要將任務分到不同的節點進行異步處理,這樣可以節省大量計算資源。
3、發佈訂閱模式:氣象局、證券公司等需要實時發佈信息的機構可以使用RabbitMQ實現信息的實時訂閱。
六、結束語
消息隊列已經成為了現代軟件工程中不可或缺的重要組件。本文介紹了RabbitMQ的使用和優化,以及應用場景。希望通過本文的介紹,能夠讓讀者更好地理解消息隊列的工作原理並且應用到實際場景中。
原創文章,作者:小藍,如若轉載,請註明出處:https://www.506064.com/zh-hk/n/243327.html