PHP
更新时间 2025-07-03 14:33:21
最近更新时间: 2025-07-03 14:33:21
编译工程生产消费
引入依赖
composer.json
{
"require": {
"php-amqplib/php-amqplib":"v3.6.2"
}
}
生产消息
<?php
require __DIR__."/../vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;
$exchange = 'php-exchange';
$queue = 'php-queue';
$connection = new AMQPStreamConnection("IP", 5672, "YOUR USERNAME", "YOUR PASSWORD", "/");
$channel = $connection->channel();
$channel->queue_declare($queue, false, true, false, false);
$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
$channel->queue_bind($queue, $exchange);
$messageBody = implode(' ', array_slice($argv, 1));
$message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
$channel->basic_publish($message, $exchange);
$channel->close();
$connection->close();
消费消息
<?php
require __DIR__."/../vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
$exchange = 'php-exchange';
$queue = 'php-queue';
$consumerTag = 'php-consumer-tag';
$connection = new AMQPStreamConnection("33.0.1.35", 5672, "user", "password", "/");
$channel = $connection->channel();
$channel->queue_declare($queue, false, true, false, false);
$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
$channel->queue_bind($queue, $exchange);
function process_message($message)
{
echo "\n--------\n";
echo $message->body;
echo "\n--------\n";
$message->ack();
// Send a message with the string "quit" to cancel the consumer.
if ($message->body === 'quit') {
$message->getChannel()->basic_cancel($message->getConsumerTag());
}
}
$channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message');
/**
* @param \PhpAmqpLib\Channel\AMQPChannel $channel
* @param \PhpAmqpLib\Connection\AbstractConnection $connection
*/
function shutdown($channel, $connection)
{
$channel->close();
$connection->close();
}
register_shutdown_function('shutdown', $channel, $connection);
$channel->consume();
ssl生产消息
<?php
require __DIR__."/../vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPSSLConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;
define('CERTS_PATH', realpath(__DIR__ . '/certs'));
$exchange = 'php-exchange';
$queue = 'php-queue';
$sslOptions = array(
'cafile' => CERTS_PATH . '/ca_certificate.pem',
'local_cert' => CERTS_PATH . '/client_rabbitmq_certificate.pem',
'local_pk' => CERTS_PATH . '/client_rabbitmq_key.pem',
'verify_peer' => true,
'verify_peer_name' => false,
);
$connection = new AMQPSSLConnection("10.10.33.196", 5671, "YOUR USERNAME", "YOUR PASSWORD", "/" , $sslOptions);
$channel = $connection->channel();
$channel->queue_declare($queue, false, true, false, false);
$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
$channel->queue_bind($queue, $exchange);
$channel->queue_declare($queue, false, true, false, false);
$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
$channel->queue_bind($queue, $exchange);
$messageBody = implode(' ', array_slice($argv, 1));
$message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
$channel->basic_publish($message, $exchange);
$channel->close();
$connection->close();
ssl消费消息
<?php
require __DIR__."/../vendor/autoload.php";
use PhpAmqpLib\Connection\AMQPSSLConnection;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;
define('CERTS_PATH', realpath(__DIR__ . '/certs'));
$exchange = 'php-exchange';
$queue = 'php-queue';
$consumerTag = 'php-consumer-tag';
$sslOptions = array(
'cafile' => CERTS_PATH . '/ca_certificate.pem',
'local_cert' => CERTS_PATH . '/client_rabbitmq_certificate.pem',
'local_pk' => CERTS_PATH . '/client_rabbitmq_key.pem',
'verify_peer' => false,
'verify_peer_name' => false,
);
$connection = new AMQPSSLConnection("10.10.33.196", 5671, "YOUR USERNAME", "YOUR PASSWORD", "/" , $sslOptions);
$channel = $connection->channel();
$channel->queue_declare($queue, false, true, false, false);
$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
$channel->queue_bind($queue, $exchange);
function process_message($message)
{
echo "\n--------\n";
echo $message->body;
echo "\n--------\n";
$message->ack();
// Send a message with the string "quit" to cancel the consumer.
if ($message->body === 'quit') {
$message->getChannel()->basic_cancel($message->getConsumerTag());
}
}
$channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message');
function shutdown($channel, $connection)
{
$channel->close();
$connection->close();
}
register_shutdown_function('shutdown', $channel, $connection);
$channel->consume();