在分布式系统中,消息发送的“成功”是一个具有欺骗性的概念。很多开发者认为调用了 basicPublish 没报错就是发送成功了,但事实并非如此。本文将带你拆解 RabbitMQ 生产者的三种发送模式,理清“发送”与“投递”的区别。
核心预备知识:TCP 缓冲区的“黑洞”
在进入模式讲解前,必须理解:默认情况下,basicPublish 只要将数据从 JVM 内存拷贝到**操作系统 TCP 发送缓冲区(Send Buffer)**就会立即返回。
-
这意味着: 即使此时网线被拔掉,或者 Broker 宕机,只要缓冲区没满,你的代码依然会认为“发送成功”
模式一:默认模式(发后即忘 / Fire-and-Forget)
这是 RabbitMQ 最原始的发送方式。
1. 特点与原理
-
非确认制:生产者只负责把数据推给 Socket,不关心 Broker 是否收到。
-
物理反馈:唯一能让它报错的情况是底层 Socket 彻底断开(经过 TCP 重传超时)或缓冲区溢出导致的阻塞。
2. 优缺点
-
优点:吞吐量极高,延迟极低,不占用生产者内存。
-
缺点:极不可靠。存在“消息黑洞”,网络抖动、Exchange 配置错误、Broker 故障都会导致消息无感知丢失。
3. 代码示例
// 默认模式:没有任何确认机制
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
System.out.println("消息已写入本地 TCP 缓冲区");
模式二:同步确认模式(Standard Confirm)
为了解决“黑洞”问题,RabbitMQ 引入了发布确认(Publisher Confirms)。同步确认是最直接的实现。
1. 特点与原理
-
串行等待:每发送一条消息,生产者线程都会阻塞,等待 Broker 回传一个
Ack信号。 -
应用层闭环:只有收到 Broker 的确认,
waitForConfirms()才会返回 true。
2. 优缺点
-
优点:确保消息物理上到达了 Broker,逻辑简单,易于编写。
-
缺点:性能极差。每一条消息都需要一次网络往返(RTT),这会让原本几万的 TPS 降至几百。
3. 代码示例
channel.confirmSelect(); // 开启确认模式
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
if (channel.waitForConfirms()) {
System.out.println("消息已成功到达 Broker");
} else {
System.err.println("Broker 拒绝了消息(可能磁盘满或内部错误)");
}
模式三:异步监听模式(Asynchronous Confirm)
这是生产环境中最常用的高性能可靠方案。
1. 特点与原理
-
回调机制:生产者发送消息后不等待,而是注册一个
ConfirmListener。当 Broker 处理完消息后,会通过回调函数通知生产者。 -
全双工通讯:发送和确认在不同的线程/时机进行,互不干扰。
2. 优缺点
-
优点:高性能与高可靠的完美平衡。不会阻塞发送线程,且能精准捕捉每一条消息的成败。
-
问题:编程复杂度高。需要开发者在本地维护一个“待确认消息清单”,并处理重发逻辑。
3. 代码示例
// 1. 开启确认模式
channel.confirmSelect();
// 2. 维护一个本地序号与消息内容的映射(用于失败重发)
ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
// 3. 注册异步监听器
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) {
// multiple 代表是否为批量确认
if (multiple) {
outstandingConfirms.headMap(deliveryTag, true).clear();
} else {
outstandingConfirms.remove(deliveryTag);
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) {
String body = outstandingConfirms.get(deliveryTag);
System.err.println("消息投递失败,Tag: " + deliveryTag + ",准备重试内容: " + body);
// 执行重发逻辑...
}
});
// 4. 发送并记录
long nextSeqNo = channel.getNextPublishSeqNo();
outstandingConfirms.put(nextSeqNo, "Hello RabbitMQ");
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, true, null, "Hello RabbitMQ".getBytes());
总结:如何判断发送“成功”?
在不同的模式下,“成功”的定义完全不同:
| 维度 | 默认模式 | 同步模式 | 异步模式 |
| 成功的标准 | 写入本地 TCP 缓冲区成功 | 收到 Broker 回传的 Ack 信号 | 触发 handleAck 回调 |
| 感知网络中断 | 极其迟钝(靠 TCP 超时) | 敏感(等待 Ack 超时) | 敏感(连接断开触发异常) |
| 感知路由失败 | 无法感知 | 无法感知(除非设 mandatory) | 配合 ReturnListener 可感知 |