生产端核心概念与可靠性目标
核心角色
- Producer:消息的生成源头与发送主体。作为可靠性的第一道防线,它负责将业务数据封装为消息,并通过SDK发送至Broker。
- Producer Group:具有相同业务逻辑的Producer集合。其核心作用在于实现负载均衡与故障转移,防止单点故障成为系统瓶颈。
发送模式与可靠性分级
RocketMQ提供三种发送模式,需根据业务对可靠性的敏感度进行选择:
- 同步发送:发送后阻塞等待Broker的ACK响应。可靠性最高,适用于订单创建、支付结算等核心业务链路。
- 异步发送:发送后立即返回,通过回调函数处理结果。兼顾效率与可靠性,适用于日志上报、非核心高并发场景。
- 单向发送:发送后不等待响应,也不处理结果。可靠性最低,仅适用于监控心跳等允许丢失的场景。
同步发送
try {
// 1. 发送消息,线程阻塞,等待 Broker 返回结果
SendResult result = producer.send(message);
// 2. 根据返回状态判断
if (result.getSendStatus() == SEND_OK) {
print("消息发送成功,消息ID: " + result.getMsgId());
// 继续执行后续业务逻辑
} else {
print("消息发送失败,状态: " + result.getSendStatus());
// 记录日志或进行本地重试
}
} catch (Exception e) {
print("发送过程发生异常: " + e.getMessage());
// 捕获异常,通常意味着网络不通或Broker不可用,需触发重试机制
}
异步发送
producer.send(message, new SendCallback() {
// 1. 发送动作立即返回,不阻塞主线程
@Override
public void onSuccess(SendResult result) {
// 2. 成功回调:Broker 确认收到消息
print("异步发送成功,消息ID: " + result.getMsgId());
}
@Override
public void onException(Throwable e) {
// 3. 异常回调:发送失败(如网络超时)
print("异步发送失败: " + e.getMessage());
// 在此处记录日志,或写入本地数据库进行兜底重试
}
});
print("主线程继续处理其他业务,不受发送影响...");
单向发送
try {
// 1. 发送消息,不等待结果,也不注册回调
producer.sendOneway(message);
print("消息已发出(无法确认是否成功)");
} catch (Exception e) {
// 2. 仅在本地网络发送失败时抛出异常(如找不到Broker)
print("本地发送异常: " + e.getMessage());
}
// 注意:即使这里没报错,也无法保证 Broker 真的成功持久化了消息
可靠性三大核心目标
- 无丢失:确保消息从生产端发出后,成功抵达Broker并完成持久化,杜绝因网络抖动或服务宕机导致的数据蒸发。
- 无重复:通过机制设计规避因重试或网络波动引发的重复投递,减轻下游消费端的幂等压力。
- 可追溯:全链路记录消息发送状态,确保异常发生时能快速定位、排查并进行兜底处理。
生产端可靠性最佳实践
路由与接入配置
- NameServer配置:Producer需配置多个NameServer地址(分号分隔),确保能高可用地获取Topic路由信息(Broker地址及Queue分布),防止因路由获取失败导致发送阻断。
持久化与超时设置
- 强制持久化:默认情况下消息为持久化,但需严防误操作将核心业务消息设置为临时消息(如设置
msg.setDelayTimeLevel(-1)),否则Broker重启会导致数据丢失。 - 超时时间调优:默认发送超时为3000ms。建议根据实际网络环境调整为3-5秒,既要避免因网络波动导致的误判,也要防止超时过长阻塞业务线程。
进阶兜底机制
终极兜底方案:本地消息表
单纯依赖SDK重试无法完全解决极端故障下的消息丢失问题。针对核心业务,必须实施“本地落库+定时重试+阈值告警”的组合拳:
单纯依赖SDK重试无法完全解决极端故障下的消息丢失问题。针对核心业务,必须实施“本地落库+定时重试+阈值告警”的组合拳:
- 本地落库:将消息内容同步写入本地数据库(与业务数据在同一事务中),确保消息不丢失。
- 定时重试:启动后台定时任务,扫描数据库中“未发送成功“的消息进行发送;按重试次数递增间隔发送。
- 阈值告警:设定最大重试次数(如5次)。超过阈值仍未成功,触发告警通知人工介入,防止死循环重试。
/**
* 核心业务方法:如:下订单并发送消息
*/
@Transactional(rollbackFor = Exception.class)
public void createOrder(OrderDTO orderDTO) {
// 1. 处理业务逻辑:保存订单
// ...
orderMapper.insert(order);
// 2. 保存本地消息(状态为0:待发送)
LocalMessageLog log = new LocalMessageLog();
// ...
log.setStatus(0); // 待发送
log.setRetryCount(0);
messageLogMapper.insert(log);
}
生产端最佳实践总结
-
核心业务优先使用同步发送,配置3-5次重试,设置3-5s发送超时时间,确保消息发送状态可感知。
-
所有消息必须配置为持久化,禁止使用单向发送,避免临时消息。
-
针对核心业务采用本地消息表方案,实施“本地落库+定时重试+阈值告警”的组合拳。
生产端的可靠性是消息全链路可靠的基础,只有确保消息成功、无重复地送达Broker,才能为后续的Broker存储与消费端处理奠定基础。