phpmq是一个轻量级、高性能的消息队列系统,适用于各种场景下的消息传递。它提供了简单易用、灵活可扩展的API,支持多种协议,包括:AMQP、STOMP、HTTP等。phpmq的特点是:可靠、快速、安全,同时具有优秀的性能和可扩展性。下文将从多个方面对phpmq做详细的阐述。
一、PE和PO模型的区别
PE模型和PO模型是phpmq中的两个重要模型,它们通过选用不同的消息协议来实现不同的消息传递方式。
1. PE模型
PE模型是使用AMQP协议实现的消息传递方式,它基于可靠性和可恢复性的传输保障,保证了消息的可靠传递。PE模型中,生产者将消息发送到Exchange,Exchange根据路由规则将消息路由到对应的队列中,生产者和消费者之间没有直接的联系。
代码示例:
<?php
// 连接到RabbitMQ
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 创建Exchange
$channel->exchange_declare('exchange_name', 'direct', false, true, false);
// 创建Queue
$channel->queue_declare('queue_name', false, true, false, false);
// 绑定Exchange与Queue
$channel->queue_bind('queue_name', 'exchange_name', 'routing_key');
// 发送消息到Exchange
$channel->basic_publish(new AMQPMessage($message), 'exchange_name', 'routing_key');
// 手动ACK确认消息
function process_message(AMQPMessage $message) {
echo $message->body . "\n";
$channel->basic_ack($message->delivery_info('delivery_tag'));
}
$channel->basic_consume('queue_name', '', false, false, false, false, 'process_message');
while(count($channel->callbacks)) {
$channel->wait();
}
// 关闭连接
$channel->close();
$connection->close();
2. PO模型
PO模型是使用STOMP协议实现的消息传递方式,它基于一种异步、简单、灵活的传输方式,支持多种消息协议,包括:AMQP、JMS等。PO模型中,生产者通过消息发送到Broker,消费者从Broker中接收消息,生产者和消费者之间存在直接的联系。
代码示例:
<?php
// 连接到ActiveMQ
$stomp = new Stomp('tcp://localhost:61613');
$stomp->connect();
// 发送消息到Queue
$stomp->send('/queue/queue_name', $message);
// 接收消息
while(true) {
$message = $stomp->readFrame();
if ($message != NULL) {
echo "Received message: ".$message->body."\n";
$stomp->ack($message);
}
else {
echo "Failed to read a message\n";
}
}
// 关闭连接
$stomp->disconnect();
二、phpmq的消息确认机制
phpmq提供了两种消息确认机制,分别是客户端ACK和服务端ACK。
1. 客户端ACK
客户端ACK机制是在消费者从队列中获取消息后,手动确认消息已经被正确处理。在使用客户端ACK机制时,当消费者从队列中获取到一个消息后,可以执行对该消息的处理,然后手动确认这个消息已经被处理完成。当处理成功后,消费者应该调用basic_ack方法来告知Broker该消息已经被处理完成,然后Broker将从队列中移除该消息。
代码示例:
<?php
// 手动ACK确认消息
function process_message(AMQPMessage $message) {
echo $message->body . "\n";
$channel->basic_ack($message->delivery_info('delivery_tag'));
}
$channel->basic_consume('queue_name', '', false, false, false, false, 'process_message');
2. 服务端ACK
服务端ACK机制是指Broker确认消息已经被正确处理。在使用服务端ACK机制时,当消费者从队列中获取到一个消息后,可以执行对该消息的处理,处理完毕后不需要手动确认消息,Broker会自动确认该消息已经被消费,将从队列中移除该消息。
代码示例:
<?php
// 自动ACK确认消息
$channel->basic_consume('queue_name', '', false, false, false, false, function($message) {
echo $message->body . "\n";
});
三、phpmq的消息持久化机制
phpmq提供了消息持久化机制,可以确保消息在不可避免的系统故障或计划内维护期间也不会丢失。
1. 消息存储
消息持久化需要配合队列和Exchange的持久化使用,即队列和Exchange都需要设置为持久化模式。队列和Exchange的持久化可以在声明时设置,如下:
代码示例:
<?php
// 创建Exchange(设置durable为true)
$channel->exchange_declare('exchange_name', 'direct', false, true, false);
// 创建Queue(设置durable为true)
$channel->queue_declare('queue_name', false, true, false, false);
// 绑定Exchange与Queue(设置durable为true)
$channel->queue_bind('queue_name', 'exchange_name', 'routing_key');
2. 消息传递
在发布消息时,将消息设置为持久化即可,如下:
代码示例:
<?php
// 发送消息到Exchange(设置delivery_mode为2)
$channel->basic_publish(new AMQPMessage($message, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]), 'exchange_name', 'routing_key');
四、phpmq的支持协议
phpmq支持多种协议,包括:AMQP、STOMP、HTTP等。
1. AMQP协议
AMQP协议是一种企业级消息队列协议,是一个充分公开的、开放的、能力丰富的、可扩展的、可移植的,高性能的,异步消息通信协议。
代码示例:
<?php
// 连接到RabbitMQ
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// 创建Exchange
$channel->exchange_declare('exchange_name', 'direct', false, true, false);
// 创建Queue
$channel->queue_declare('queue_name', false, true, false, false);
// 绑定Exchange与Queue
$channel->queue_bind('queue_name', 'exchange_name', 'routing_key');
// 发送消息到Exchange
$channel->basic_publish(new AMQPMessage($message), 'exchange_name', 'routing_key');
// 消费消息
$callback = function($message) {
echo $message->body."\n";
};
$channel->basic_consume('queue_name', '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
// 关闭连接
$channel->close();
$connection->close();
2. STOMP协议
STOMP协议是一种简单、灵活和可扩展的协议,用于异步消息传递。STOMP协议被广泛用于Web应用程序和企业级消息传递系统。
代码示例:
<?php
// 连接到ActiveMQ
$stomp = new Stomp('tcp://localhost:61613');
$stomp->connect();
// 发送消息到Queue
$stomp->send('/queue/queue_name', $message);
// 接收消息
while(true) {
$message = $stomp->readFrame();
if ($message != NULL) {
echo "Received message: ".$message->body."\n";
$stomp->ack($message);
}
else {
echo "Failed to read a message\n";
}
}
// 关闭连接
$stomp->disconnect();
3. HTTP协议
HTTP协议是一种基于请求、响应模式的、无状态的应用层协议。phpmq通过HTTP协议提供了更便捷的消息发送方式。
代码示例:
<?php
// 发送HTTP请求
$client = new GuzzleHttp\Client();
$response = $client->post('http://localhost:8000/message', [
'json' => $message
]);
// 处理HTTP响应
echo $response->getBody()->getContents();
原创文章,作者:CXGD,如若转载,请注明出处:https://www.506064.com/n/144673.html