一、MQ概述
消息隊列(Message Queuing,簡稱MQ)是一種進程間通信或系統間通信的方法。它通過將消息進行緩存,來實現異步通信,從而解耦發送方和接收方的耦合關係。PHP MQ是一款流行的MQ實現。
二、PHP MQ的使用
PHP MQ主要基於AMQP協議,用戶可以在PHP中使用AMQP擴展來操作MQ。下面,我們將簡單介紹如何使用PHP MQ。
//創建連接
$connection = new AMQPConnection([
'host' => 'localhost',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
]);
//建立通道
$channel = $connection->channel();
//定義隊列
$channel->queue_declare('hello', false, false, false, false);
//定義消息
$msg = new AMQPMessage('Hello World!');
//發送消息
$channel->basic_publish($msg, '', 'hello');
//關閉通道和連接
$channel->close();
$connection->close();
以上代碼創建了一個名為”hello”的隊列,並向隊列中發布了一條消息”Hello World!”。
三、PHP MQ的高級特性
1、持久化
如果我們希望MQ在發生故障或斷電時,仍能保證消息不丟失,可以使用持久化。代碼如下:
//創建連接
$connection = new AMQPConnection([
'host' => 'localhost',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
]);
//建立通道
$channel = $connection->channel();
//定義消息
$msg = new AMQPMessage('Hello World!', ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
//發送消息
$channel->basic_publish($msg, '', 'hello');
//關閉通道和連接
$channel->close();
$connection->close();
以上代碼添加了參數’delivery_mode’ => AMQPMessage::DELIVERY_MODE_PERSISTENT,該參數讓消息保持持久性。
2、事務模式
通過事務模式,我們可以將一組操作放到一個事務塊中,只有當所有操作都成功執行時,事務才會被提交。如果其中任何一個操作失敗,整組操作都將回滾。示例如下:
//創建連接
$connection = new AMQPConnection([
'host' => 'localhost',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
]);
//建立通道
$channel = $connection->channel();
//開啟事務
$channel->tx_select();
//定義消息
$msg = new AMQPMessage('Hello World!', ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
try {
//發送消息
$channel->basic_publish($msg, '', 'hello');
//提交事務
$channel->tx_commit();
} catch (Exception $e) {
//回滾事務
$channel->tx_rollback();
}
//關閉通道和連接
$channel->close();
$connection->close();
四、PHP MQ的優化
1、連接池
在高並發情況下,頻繁地創建和銷毀連接會導致很大的性能消耗。我們可以使用連接池重複利用已有的連接,提高性能。示例如下:
//創建連接池
$pool = new AMQPConnectionPool([
[
'host' => 'localhost',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
],
], 3, 10);
//獲取連接
$connection = $pool->get();
//建立通道
$channel = $connection->channel();
//定義消息
$msg = new AMQPMessage('Hello World!', ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
//發送消息
$channel->basic_publish($msg, '', 'hello');
//釋放連接
$pool->free($connection);
//關閉通道和連接
$channel->close();
$connection->close();
以上代碼創建了一個大小為3,最大為10的連接池,我們可以反覆利用已有的連接來發送和接收消息。
2、並發消費
在高並發情況下,一個進程來處理所有的消息是不現實的。我們可以用多個進程同時消費隊列,以提高消費能力。示例如下:
//創建連接
$connection = new AMQPConnection([
'host' => 'localhost',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
]);
//建立通道
$channel = $connection->channel();
//消費消息
$channel->basic_consume('hello', '', false, true, false, false, function ($msg) {
echo ' [x] Received ', $msg->body, "\n";
});
//處理消息
while (count($channel->callbacks)) {
$channel->wait();
}
//關閉通道和連接
$channel->close();
$connection->close();
以上代碼解析了”hello”隊列並消費它的所有消息,處理完消息後,關閉了通道和連接。
五、總結
本文詳細介紹了PHP MQ的基礎使用和高級特性,以及優化方式,希望對讀者有所幫助。
原創文章,作者:NONBX,如若轉載,請註明出處:https://www.506064.com/zh-hant/n/333094.html
微信掃一掃
支付寶掃一掃