收发事务消息 发送消息 参考如下示例代码。 import time from rocketmq.client import TransactionMQProducer, Message, TransactionStatus endpoint "${ENDPOINT}" 填写分布式消息服务RocketMQ控制台Namesrv接入点 accesskey "${ACCESSKEY}" 填写AccessKey 在分布式消息服务RocketMQ控制台用户管理菜单中创建的用户ID accesssecret "${SECRETKEY}" 填写SecretKey 在分布式消息服务RocketMQ控制台用户管理菜单中创建的用户密钥 topic "${TOPIC}" 填写Topic,在管理控制台创建 producergroup "${GROUP}" 生产者组group def transactioncheckercallback(msg, userargs): return TransactionStatus.COMMIT def transactionlocalexecute(msg, userargs): return TransactionStatus.UNKNOWN producer TransactionMQProducer(producergroup, transactioncheckercallback) producer.setnameserveraddress(endpoint) producer.setsessioncredentials(accesskey, accesssecret, "") producer.start() msg Message(topic) msg.setbody("Hello RocketMQ") ret producer.sendmessageintransaction(msg, transactionlocalexecute, None) print(ret.status, ret.msgid, ret.offset) while True: time.sleep(3600) 订阅消息 参考如下示例代码。 import time from rocketmq.client import PushConsumer, ConsumeStatus endpoint "${ENDPOINT}" 填写分布式消息服务RocketMQ控制台Namesrv接入点 accesskey "${ACCESSKEY}" 填写AccessKey 在分布式消息服务RocketMQ控制台用户管理菜单中创建的用户ID accesssecret "${SECRETKEY}" 填写SecretKey 在分布式消息服务RocketMQ控制台用户管理菜单中创建的用户密钥 topic "${TOPIC}" 填写Topic,在管理控制台创建 group "${GROUP}" 填写订阅组group,在管理控制台创建 def callback(msg): print(msg.id, msg.body) return ConsumeStatus.CONSUMESUCCESS consumer PushConsumer(group) consumer.setnameserveraddress(endpoint) consumer.setsessioncredentials(accesskey, accesssecret, "") consumer.subscribe(topic, callback) consumer.start() while True: time.sleep(3600) consumer.shutdown()