收发普通消息 异步发送 消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息,发送方通过回调接口接收服务端响应,并处理响应结果。异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景。例如,视频上传后通知启动转码服务,转码完成后通知推送转码结果等。 参考如下示例代码 import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; public class AsyncProducerNormalExample { private static RPCHook getAclRPCHook() { return new AclClientRPCHook(new SessionCredentials("accessKey", // 分布式消息服务RocketMQ控制台用户管理菜单中创建的用户ID "accessSecret" // 分布式消息服务RocketMQ控制台用户管理菜单中创建的密钥 )); } public static void main(String[] args) throws Exception { DefaultMQProducer producer new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook()); // 填入控制台获取NAMESRV接入点地址 producer.setNamesrvAddr("XXX:xxx"); ; //如果需要开启SSL,请增加此行代码 producer.start(); int messageCount 10; final CountDownLatch countDownLatch new CountDownLatch(messageCount); for (int i 0; i < messageCount; i++) { try { Message msg new Message("YOUR TOPIC", "TagA", // 设置消息的TAG,若无可设置为空 "Hello RocketMQ".getBytes(RemotingHelper.DEFAULTCHARSET)); producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { countDownLatch.countDown(); System.out.println("send message success. msgId " + sendResult.getMsgId()); } @Override public void onException(Throwable e) { countDownLatch.countDown(); // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。 System.out.println("send message failed."); } }); } catch (Exception e) { e.printStackTrace(); } } countDownLatch.await(5, TimeUnit.SECONDS); producer.shutdown(); } }