RocketMQ C++ SDK 消费顺序消息 include include include "rocketmq/DefaultMQPushConsumer.h" using namespace rocketmq; class OrderlyMessageListener : public MessageListenerOrderly { public: ConsumeStatus consumeMessage(const std::vector &msgs) { for (auto item msgs.begin(); item ! msgs.end(); item++) { std::cout getTopic() getMsgId() setNamesrvAddr("your access point"); consumer>setSessionCredentials("ak", "sk", "VOLC"); OrderlyMessageListener messageListener new OrderlyMessageListener(); consumer>subscribe("topicname", "tag"); consumer>registerMessageListener(messageListener); consumer>start(); std::thisthread::sleepfor(std::chrono::seconds(60)); consumer>shutdown(); return 0; } 使用C++客户端收发事务消息 简介 业务侧通过 sendMessageInTransaction 发送消息到 RocketMQ 服务端。 业务侧通过 executeLocalTransaction 执行本地事务。 实现业务查询事务执行是否成功的接口 checkLocalTransaction。 使用C++客户端发送事务消息 include include include include "rocketmq/TransactionMQProducer.h" include "rocketmq/MQClientException.h" include "rocketmq/TransactionListener.h" using namespace std; using namespace rocketmq; class DefineTransactionListener : public TransactionListener { public: LocalTransactionState executeLocalTransaction(const MQMessage &msg, void arg) { / 执行本地事务 1. 成功返回COMMITMESSAGE 2. 失败返回ROLLBACKMESSAGE 3. 不确定返回UNKNOWN。服务端会触发定时任务回查状态 / std::cout << "Execute Local Transaction,Received Message Topic:" << msg.getTopic() << ", transactionId:" << msg.getTransactionId() << std::endl; return UNKNOWN; } LocalTransactionState checkLocalTransaction(const MQMessageExt &msg) { / 回查本地事务执行情况 1. 成功返回COMMITMESSAGE 2. 失败返回ROLLBACKMESSAGE 3. 不确定返回UNKNOWN。则等待下次定时任务回查。 / std::cout << "Check Local Transaction,Received Message Topic:" << msg.getTopic() << ", MsgId:" << msg.getMsgId() << std::endl; return COMMITMESSAGE; } }; int main(){ TransactionMQProducer producer("producergroupname"); producer.setNamesrvAddr("accesspoint"); producer.setSessionCredentials("ak", "sk", "channel"); DefineTransactionListener exampleTransactionListener new DefineTransactionListener(); producer.setTransactionListener(exampleTransactionListener); producer.start(); int count 3; for (int i 0; i < count; ++i) { MQMessage msg("TRANSACTION TOPIC", "TAG", "Transaction content"); try { SendResult sendResult producer.sendMessageInTransaction(msg, &i); std::cout << "SendResult:" << sendResult.getSendStatus() << ", Message ID: " << sendResult.getMsgId() << std::endl; thisthread::sleepfor(chrono::seconds(1)); } catch (MQException e) { std::cout << "ErrorCode: " << e.GetError() << " Exception:" << e.what() << std::endl; } } std::cout << "Send " << count << " messages OK " << endl; std::cout << "Wait for local transaction check..... " << std::endl; for (int i 0; i < 6; ++i) { thisthread::sleepfor(chrono::seconds(10)); std::cout << "Running " << i 10 + 10 << " Seconds......" << std::endl; } producer.shutdown(); return 0; }