phpmq是一個輕量級、高性能的消息隊列系統,適用於各種場景下的消息傳遞。它提供了簡單易用、靈活可擴展的API,支持多種協議,包括:AMQP、STOMP、HTTP等。phpmq的特點是:可靠、快速、安全,同時具有優秀的性能和可擴展性。下文將從多個方面對phpmq做詳細的闡述。
一、PE和PO模型的區別
PE模型和PO模型是phpmq中的兩個重要模型,它們通過選用不同的消息協議來實現不同的消息傳遞方式。
1. PE模型
PE模型是使用AMQP協議實現的消息傳遞方式,它基於可靠性和可恢復性的傳輸保障,保證了消息的可靠傳遞。PE模型中,生產者將消息發送到Exchange,Exchange根據路由規則將消息路由到對應的隊列中,生產者和消費者之間沒有直接的聯繫。
代碼示例:
<?php
// 連接到RabbitMQ
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 創建Exchange
$channel->exchange_declare('exchange_name', 'direct', false, true, false);
// 創建Queue
$channel->queue_declare('queue_name', false, true, false, false);
// 綁定Exchange與Queue
$channel->queue_bind('queue_name', 'exchange_name', 'routing_key');
// 發送消息到Exchange
$channel->basic_publish(new AMQPMessage($message), 'exchange_name', 'routing_key');
// 手動ACK確認消息
function process_message(AMQPMessage $message) {
echo $message->body . "\n";
$channel->basic_ack($message->delivery_info('delivery_tag'));
}
$channel->basic_consume('queue_name', '', false, false, false, false, 'process_message');
while(count($channel->callbacks)) {
$channel->wait();
}
// 關閉連接
$channel->close();
$connection->close();
2. PO模型
PO模型是使用STOMP協議實現的消息傳遞方式,它基於一種非同步、簡單、靈活的傳輸方式,支持多種消息協議,包括:AMQP、JMS等。PO模型中,生產者通過消息發送到Broker,消費者從Broker中接收消息,生產者和消費者之間存在直接的聯繫。
代碼示例:
<?php
// 連接到ActiveMQ
$stomp = new Stomp('tcp://localhost:61613');
$stomp->connect();
// 發送消息到Queue
$stomp->send('/queue/queue_name', $message);
// 接收消息
while(true) {
$message = $stomp->readFrame();
if ($message != NULL) {
echo "Received message: ".$message->body."\n";
$stomp->ack($message);
}
else {
echo "Failed to read a message\n";
}
}
// 關閉連接
$stomp->disconnect();
二、phpmq的消息確認機制
phpmq提供了兩種消息確認機制,分別是客戶端ACK和服務端ACK。
1. 客戶端ACK
客戶端ACK機制是在消費者從隊列中獲取消息後,手動確認消息已經被正確處理。在使用客戶端ACK機制時,當消費者從隊列中獲取到一個消息後,可以執行對該消息的處理,然後手動確認這個消息已經被處理完成。當處理成功後,消費者應該調用basic_ack方法來告知Broker該消息已經被處理完成,然後Broker將從隊列中移除該消息。
代碼示例:
<?php
// 手動ACK確認消息
function process_message(AMQPMessage $message) {
echo $message->body . "\n";
$channel->basic_ack($message->delivery_info('delivery_tag'));
}
$channel->basic_consume('queue_name', '', false, false, false, false, 'process_message');
2. 服務端ACK
服務端ACK機制是指Broker確認消息已經被正確處理。在使用服務端ACK機制時,當消費者從隊列中獲取到一個消息後,可以執行對該消息的處理,處理完畢後不需要手動確認消息,Broker會自動確認該消息已經被消費,將從隊列中移除該消息。
代碼示例:
<?php
// 自動ACK確認消息
$channel->basic_consume('queue_name', '', false, false, false, false, function($message) {
echo $message->body . "\n";
});
三、phpmq的消息持久化機制
phpmq提供了消息持久化機制,可以確保消息在不可避免的系統故障或計劃內維護期間也不會丟失。
1. 消息存儲
消息持久化需要配合隊列和Exchange的持久化使用,即隊列和Exchange都需要設置為持久化模式。隊列和Exchange的持久化可以在聲明時設置,如下:
代碼示例:
<?php
// 創建Exchange(設置durable為true)
$channel->exchange_declare('exchange_name', 'direct', false, true, false);
// 創建Queue(設置durable為true)
$channel->queue_declare('queue_name', false, true, false, false);
// 綁定Exchange與Queue(設置durable為true)
$channel->queue_bind('queue_name', 'exchange_name', 'routing_key');
2. 消息傳遞
在發布消息時,將消息設置為持久化即可,如下:
代碼示例:
<?php
// 發送消息到Exchange(設置delivery_mode為2)
$channel->basic_publish(new AMQPMessage($message, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]), 'exchange_name', 'routing_key');
四、phpmq的支持協議
phpmq支持多種協議,包括:AMQP、STOMP、HTTP等。
1. AMQP協議
AMQP協議是一種企業級消息隊列協議,是一個充分公開的、開放的、能力豐富的、可擴展的、可移植的,高性能的,非同步消息通信協議。
代碼示例:
<?php
// 連接到RabbitMQ
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 創建Exchange
$channel->exchange_declare('exchange_name', 'direct', false, true, false);
// 創建Queue
$channel->queue_declare('queue_name', false, true, false, false);
// 綁定Exchange與Queue
$channel->queue_bind('queue_name', 'exchange_name', 'routing_key');
// 發送消息到Exchange
$channel->basic_publish(new AMQPMessage($message), 'exchange_name', 'routing_key');
// 消費消息
$callback = function($message) {
echo $message->body."\n";
};
$channel->basic_consume('queue_name', '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
// 關閉連接
$channel->close();
$connection->close();
2. STOMP協議
STOMP協議是一種簡單、靈活和可擴展的協議,用於非同步消息傳遞。STOMP協議被廣泛用於Web應用程序和企業級消息傳遞系統。
代碼示例:
<?php
// 連接到ActiveMQ
$stomp = new Stomp('tcp://localhost:61613');
$stomp->connect();
// 發送消息到Queue
$stomp->send('/queue/queue_name', $message);
// 接收消息
while(true) {
$message = $stomp->readFrame();
if ($message != NULL) {
echo "Received message: ".$message->body."\n";
$stomp->ack($message);
}
else {
echo "Failed to read a message\n";
}
}
// 關閉連接
$stomp->disconnect();
3. HTTP協議
HTTP協議是一種基於請求、響應模式的、無狀態的應用層協議。phpmq通過HTTP協議提供了更便捷的消息發送方式。
代碼示例:
<?php
// 發送HTTP請求
$client = new GuzzleHttp\Client();
$response = $client->post('http://localhost:8000/message', [
'json' => $message
]);
// 處理HTTP響應
echo $response->getBody()->getContents();
原創文章,作者:CXGD,如若轉載,請註明出處:https://www.506064.com/zh-tw/n/144673.html