PHP 编译工程生产消费 引入依赖 composer.json plaintext { "require": { "phpamqplib/phpamqplib":"v3.6.2" } } 生产消息 php channel(); $channel>queuedeclare($queue, false, true, false, false); $channel>exchangedeclare($exchange, AMQPExchangeType::DIRECT, false, true, false); $channel>queuebind($queue, $exchange); $messageBody implode(' ', arrayslice($argv, 1)); $message new AMQPMessage($messageBody, array('contenttype' > 'text/plain', 'deliverymode' > AMQPMessage::DELIVERYMODEPERSISTENT)); $channel>basicpublish($message, $exchange); $channel>close(); $connection>close(); 消费消息 php channel(); $channel>queuedeclare($queue, false, true, false, false); $channel>exchangedeclare($exchange, AMQPExchangeType::DIRECT, false, true, false); $channel>queuebind($queue, $exchange); function processmessage($message) { echo "nn"; echo $message>body; echo "nn"; $message>ack(); // Send a message with the string "quit" to cancel the consumer. if ($message>body 'quit') { $message>getChannel()>basiccancel($message>getConsumerTag()); } } $channel>basicconsume($queue, $consumerTag, false, false, false, false, 'processmessage'); / @param PhpAmqpLibChannelAMQPChannel $channel @param PhpAmqpLibConnectionAbstractConnection $connection / function shutdown($channel, $connection) { $channel>close(); $connection>close(); } registershutdownfunction('shutdown', $channel, $connection); $channel>consume();