RocketMQ PHP SDK 说明 分布式消息服务RocketMQ兼容了社区版 HTTP SDK,您可以使用社区版 HTTP SDK接入分布式消息服务RocketMQ。 前提条件: 1. 在PHP安装目录下的composer.json文件中加入社区PHP SDK 依赖。 2. 使用Composer安装依赖。 composer install 发送普通消息 client new MQClient( // 填写分布式消息服务RocketMQ控制台HTTP接入点 "${HTTPENDPOINT}", // 填写AccessKey,在管理控制台创建 "${ACCESSKEY}", // 填写SecretKey 在管理控制台创建 "${SECRETKEY}" ); // 所属的 Topic $topic "${TOPIC}"; // Topic所属实例ID,默认实例为空NULL $instanceId "${INSTANCEID}"; $this>producer $this>client>getProducer($instanceId, $topic); } public function run() { try { for ($i 1; $i putProperty("a", $i); // 设置消息KEY $publishMessage>setMessageKey("MessageKey"); $result $this>producer>publishMessage($publishMessage); print "Send mq message success. msgId is:" . $result>getMessageId() . ", bodyMD5 is:" . $result>getMessageBodyMD5() . "n"; } } catch (Exception $e) { printr($e>getMessage() . "n"); } }}$instance new NormalProducerExample();$instance>run();?> 消费普通消息 client new MQClient( // 填写分布式消息服务RocketMQ控制台HTTP接入点 "${HTTPENDPOINT}", // 填写AccessKey,在管理控制台创建 "${ACCESSKEY}", // 填写SecretKey 在管理控制台创建 "${SECRETKEY}" ); // 所属的 Topic $topic "${TOPIC}"; // 您在控制台创建的 Consumer ID(Group ID) $groupId "${GROUPID}"; // Topic所属实例ID,默认实例为空NULL $instanceId "${INSTANCEID}"; $this>consumer $this>client>getConsumer($instanceId, $topic, $groupId, "TagA"); } public function run() { // 在当前线程循环消费消息,建议是多开个几个线程并发消费消息 while (True) { try { // 长轮询消费消息 // 长轮询表示如果topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回 $messages $this>consumer>consumeMessage( 3, // 一次最多消费3条(最多可设置为16条) 3 // 长轮询时间3秒(最多可设置为30秒) ); } catch (MQExceptionMessageResolveException $e) { // 当出现消息Body存在不合法字符,无法解析的时候,会抛出此异常。 // 可以正常解析的消息列表。 $messages $e>getPartialResult()>getMessages(); // 无法正常解析的消息列表。 $failMessages $e>getPartialResult()>getFailResolveMessages(); $receiptHandles array(); foreach ($messages as $message) { // 处理业务逻辑。 $receiptHandles[] $message>getReceiptHandle(); printf("MsgID %sn", $message>getMessageId()); } foreach ($failMessages as $failMessage) { // 处理存在不合法字符,无法解析的消息。 $receiptHandles[] $failMessage>getReceiptHandle(); printf("Fail To Resolve Message. MsgID %sn", $failMessage>getMessageId()); } $this>ackMessages($receiptHandles); continue; } catch (Exception $e) { if ($e instanceof MQExceptionMessageNotExistException) { // 没有消息可以消费,接着轮询 printf("No message, contine long polling!RequestId:%sn", $e>getRequestId()); continue; } printr($e>getMessage() . "n"); sleep(3); continue; } print "consume finish, messages:n"; // 处理业务逻辑 $receiptHandles array(); foreach ($messages as $message) { $receiptHandles[] $message>getReceiptHandle(); printf("MessageID:%s TAG:%s BODY:%s nPublishTime:%d, FirstConsumeTime:%d, nConsumedTimes:%d, NextConsumeTime:%d,MessageKey:%sn", $message>getMessageId(), $message>getMessageTag(), $message>getMessageBody(), $message>getPublishTime(), $message>getFirstConsumeTime(), $message>getConsumedTimes(), $message>getNextConsumeTime(), $message>getMessageKey()); printr($message>getProperties()); } // $message>getNextConsumeTime()前若不确认消息消费成功,则消息会重复消费 // 消息句柄有时间戳,同一条消息每次消费拿到的都不一样 printr($receiptHandles); try { $this>ackMessages($receiptHandles); } catch (Exception $e) { if ($e instanceof MQExceptionAckMessageException) { // 某些消息的句柄可能超时了会导致确认不成功 printf("Ack Error, RequestId:%sn", $e>getRequestId()); foreach ($e>getAckMessageErrorItems() as $errorItem) { printf("tReceiptHandle:%s, ErrorCode:%s, ErrorMsg:%sn", $errorItem>getReceiptHandle(), $errorItem>getErrorCode(), $errorItem>getErrorCode()); } } } print "ack finishn"; } } public function ackMessages($receiptHandles) { try { $this>consumer>ackMessage($receiptHandles); } catch (Exception $e) { if ($e instanceof MQExceptionAckMessageException) { // 某些消息的句柄可能超时,会导致消费确认失败。 printf("Ack Error, RequestId:%sn", $e>getRequestId()); foreach ($e>getAckMessageErrorItems() as $errorItem) { printf("tReceiptHandle:%s, ErrorCode:%s, ErrorMsg:%sn", $errorItem>getReceiptHandle(), $errorItem>getErrorCode(), $errorItem>getErrorCode()); } } } }}$instance new ConsumerExample();$instance>run();?>