收发顺序消息 订阅顺序消息 参考如下示例代码 import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.remoting.RPCHook; public class ConsumerFifoExample { private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials( "accessKey", // 分布式消息服务RocketMQ控制台用户管理菜单中创建的用户ID "accessSecret" // 分布式消息服务RocketMQ控制台用户管理菜单中创建的密钥 )); } public static void main(String[] args) throws Exception { / 创建Consumer,如果想开启消息轨迹,可以按照如下方式创建: DefaultMQPushConsumer consumer new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely(), true, null); / DefaultMQPushConsumer consumer new DefaultMQPushConsumer("YOUR GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely()); // 填入控制台NAMESRV接入点地址 consumer.setNamesrvAddr("XXX:xxx"); // consumer.setUseTLS(true); // 如果需要开启SSL,请增加此行代码 / 如果想要消费指定TAG的消息,可以按照如下方式订阅: 为订阅所有的TAG pushConsumer.subscribe(TOPICNAME, "Tag1"); / consumer.subscribe("TopicTest", ""); consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) > { System.out.printf("Receive New Messages: %s %n", msgs); return ConsumeOrderlyStatus.SUCCESS; }); consumer.start(); System.out.println("Consumer Started."); } }