RabbitMQ

同步异步

同步:就是实时接收,比如视频聊天

异步:就是延迟接收,比如微信打字

为什么要选择MQ

1、便于拓展

主业务只需要将消息发送到消息队列中就行,其他业务按照自己的业务消费,比如订单支付成功后,发送消息到消息队列,其他业务:更新订单状态、发送短信、增加积分等等都可以解耦

2、提高性能

耗时:主业务+所有附加业务(更新订单状态、发送短信、增加积分等等)-> 主业务+发送消息

3、业务安全

如果主业务因为附加业务失败而失败,并不合理,比如用户下单成功,因为短信通知失败而给用户退款吗,有点意思。

利用MQ主业务不会因为受到附加业务的影响,比如短信未发送成功,不会影响用户已下单的状态

各类MQ对比

特性/产品 RabbitMQ Kafka RocketMQ ActiveMQ Pulsar
开发语言 Erlang Scala/Java Java Java Java/C++
主要协议 AMQP 自定义协议 自定义协议 JMS, AMQP, STOMP Pulsar API
吞吐量 万级/秒 十万~百万级/秒 十万级/秒 万级/秒 百万级/秒
延迟 微秒~毫秒级 毫秒级 毫秒级 毫秒级 5ms左右
可靠性 高(持久化+确认) 高(副本机制) 高(同步刷盘) 中高 极高(分层存储)
消息顺序 队列内有序 分区内有序 严格有序 队列内有序 分区内有序
消息堆积 一般(内存+磁盘) 极强(磁盘存储) 强(磁盘存储) 一般 极强(分层存储)
集群模式 镜像队列 分布式集群 主从+Dledger 主从 分层架构(Broker+BookKeeper)
部署复杂度 简单 中等 中等 简单 复杂
管理界面 优秀(内置) 一般(第三方) 良好(控制台) 一般 良好
事务支持 有限支持 0.11+支持 完善支持 支持 支持

RabbitMQ

结构

image.png

  • publisher:生产者,也就是发送消息的一方
  • consumer:消费者,也就是消费消息的一方
  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
  • exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue

WorkQueues

基本概念

  • 定义:工作队列(Work Queue)模式也称为任务队列(Task Queue),用于将耗时任务分发给多个工作者(消费者)处理
  • 核心特点:
    • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
    • 消息在队列中按FIFO(先进先出)原则排队
    • 适用于任务解耦和负载均衡

消息分发机制

  • 轮询分发(Round-Robin)
    • 默认分发策略,不管消费者处理能力,平均分配消息
    • 问题:当消费者处理能力不同时,可能导致部分消费者过载
  • 公平分发(Fair Dispatch)
    • 通过basicQos(prefetchCount)设置预取数量
    • Broker只会在消费者空闲时推送新消息
    • 保证处理能力强的消费者获得更多消息

交换机类型

交换机类型示意图

基本概念

  • Publisher:生产者,不再发送消息到队列中,而是发给交换机
  • Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
  • Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。
  • Consumer:消费者,与以前一样,订阅队列,没有变化

重要特性:Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

交换机类型概览

类型 路由方式 适用场景 特点
Fanout 广播到所有绑定队列 事件通知、广播消息 忽略routing key
Direct 精确匹配routing key 点对点通信、任务路由 严格匹配
Topic 模式匹配routing key 多维度消息分类 支持通配符
Headers 匹配消息headers属性 复杂路由条件 不依赖routing key

1、Fanout(扇出交换机)

工作原理

  • 广播模式,将消息发送到所有与当前交换机绑定的队列
  • 完全忽略routing key,即使指定了也不生效
  • 适合”发布/订阅”模型

2、Direct(直连交换机)

工作原理

  • 交换机与队列绑定时,必须指定RoutingKey
  • 消息发送时指定RoutingKey,只有完全匹配的队列才能收到消息
  • 允许多个队列使用相同的RoutingKey(类似Fanout效果)

3、Topic(主题交换机)

工作原理

  • 与Direct类似,但支持RoutingKey通配符匹配
  • BindingKey和RoutingKey都由单词组成,以.分隔

通配符规则

  • #:匹配0个或多个单词(多级匹配)
  • *:匹配恰好1个单词(单级匹配)

匹配示例

Binding Key Routing Key: item.spu.insert Routing Key: item.spu Routing Key: log.error.service
item.# ✓ 匹配 ✓ 匹配 ✗ 不匹配
item.* ✗ 不匹配 ✓ 匹配 ✗ 不匹配
#.error.# ✗ 不匹配 ✗ 不匹配 ✓ 匹配
*.*.insert ✓ 匹配 ✗ 不匹配 ✗ 不匹配

4、Headers(头交换机)

工作原理

  • 不依赖RoutingKey,而是根据消息headers属性匹配
  • 队列绑定时设置header匹配条件(键值对)
  • 支持两种匹配模式:
    • x-match=all:所有header必须匹配(默认)
    • x-match=any:任意一个header匹配即可

交换机属性

所有交换机类型都支持以下属性配置:

1
2
3
4
5
6
7
channel.exchangeDeclare(
"exchange_name", // 交换机名称
BuiltinExchangeType.DIRECT, // 交换机类型
true, // durable: 持久化(重启后保留)
false, // autoDelete: 自动删除(无绑定时自动删除)
null // arguments: 额外参数
);
属性 说明 建议值
durable 交换机是否持久化 生产环境设为true
autoDelete 无绑定时是否自动删除 临时交换机设为true
internal 是否仅内部使用(客户端无法发布) 通常false
arguments 扩展参数(如HA策略) 按需设置

选择指南

  1. 需要广播消息 → 选择 Fanout
  2. 简单精确路由 → 选择 Direct
  3. 需要通配符或多维度路由 → 选择 Topic
  4. 基于消息头的复杂匹配 → 考虑 Headers(优先评估Topic替代方案)
  5. 不确定时 → 从 Direct 开始,根据需求演进

80%的场景使用Direct和Topic交换机即可满足需求,Fanout用于广播场景,Headers仅在特殊需求时使用。

可靠性保证

在分布式系统中,消息从生产者→MQ→消费者的过程中,每个环节都可能出现故障。RabbitMQ提供了多种机制确保消息可靠性传输,下面详细分析三个关键环节的保障方案。

1、生产者可靠

1.1 重试机制

  • 触发条件:网络闪断、连接超时、MQ节点不可用

  • 配置示例(Spring Boot):

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    spring:
    rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
    retry:
    enabled: true
    max-attempts: 3 # 最大重试次数
    initial-interval: 1000ms # 初始重试间隔
    multiplier: 2.0 # 间隔倍增系数
  • 最佳实践

    • 仅对幂等操作启用重试
    • 重试间隔采用指数退避策略
    • 重试失败后记录日志并告警

1.2 确认机制

Publisher Confirm

  • 原理:MQ收到消息后异步返回ACK/NACK

  • 三种模式

    模式 说明 适用场景
    NONE 无确认 测试环境/非关键业务
    SIMPLE 同步等待确认 低吞吐量关键业务
    CORRELATED 异步回调确认 高吞吐量生产环境
  • 工作流程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // 开启发布确认
    channel.confirmSelect();

    // 发布消息
    channel.basicPublish(exchange, routingKey, props, message.getBytes());

    // 等待确认(同步)
    if (channel.waitForConfirms(5000)) {
    System.out.println("Message confirmed");
    }

Publisher Return

  • 触发条件:消息成功送达MQ但路由失败

  • 处理方式

    1
    2
    3
    4
    5
    // 设置Return回调
    rabbitTemplate.setReturnsCallback(returned -> {
    log.error("Message returned: {}", returned);
    // 保存到数据库或发送告警
    });

持久化消息确认时机

消息类型 确认时机 可靠性
临时消息 入队成功 ★★☆
持久消息 入队+刷盘完成 ★★★

Confirm和Return是互补机制,应同时启用。Confirm保证MQ收到消息,Return保证消息正确路由。


2、MQ可靠

2.1 持久化三要素

组件 持久化配置 重要性
交换机 durable=true 必须,否则重启后路由规则丢失
队列 durable=true 必须,否则重启后队列消失
消息 deliveryMode=2 关键业务必须设置
  • 声明示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    // 交换机持久化
    channel.exchangeDeclare("exchange_name", "direct", true);

    // 队列持久化
    channel.queueDeclare("queue_name", true, false, false, null);

    // 消息持久化
    AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
    .deliveryMode(2) // 2=持久化
    .build();
    channel.basicPublish("exchange", "routing.key", props, message.getBytes());

2.2 Lazy Queue

  • 工作原理

    • 消息直接写入磁盘,内存仅保留索引
    • 消费时按需加载到内存(预读机制)
    • 支持亿级消息堆积
  • 启用方式

    1
    2
    3
    4
    // 声明惰性队列
    Map<String, Object> args = new HashMap<>();
    args.put("x-queue-mode", "lazy");
    channel.queueDeclare("lazy_queue", true, false, false, args);
  • 性能对比

    指标 普通队列 Lazy Queue
    内存占用 高(100万条消息≈1GB) 低(仅索引)
    写入速度 稍慢(磁盘IO)
    读取速度 首次慢(需加载)
    重启恢复 慢(需重放) 快(直接读磁盘)
  • 适用场景

    • 长时间离线消费者
    • 预期消息大量积压
    • 内存资源受限的环境

Lazy Queue不替代消费者异常处理!应配合监控告警及时发现积压问题。


3、消费者可靠

3.1 消费者确认模式

模式 处理方式 适用场景 风险
none 立即ACK 非关键业务/调试 消息丢失风险高
manual 业务代码手动ACK 关键业务/复杂处理 开发复杂度高
auto 异常时NACK/REJECT 大多数业务场景 需正确处理异常
  • Spring Boot配置

    1
    2
    3
    4
    5
    spring:
    rabbitmq:
    listener:
    simple:
    acknowledge-mode: manual # 或auto/none

3.2 失败重试策略

  • 本地重试(推荐):

    1
    2
    3
    4
    5
    6
    7
    8
    spring:
    rabbitmq:
    listener:
    simple:
    retry:
    enabled: true
    max-attempts: 3
    initial-interval: 1000ms
    • 优点:避免无效队列积压,减少网络开销
    • 缺点:重试期间线程阻塞
  • 重试耗尽处理

    策略 行为 适用场景
    RejectAndDontRequeueRecoverer 丢弃消息 可容忍丢失的日志类消息
    ImmediateRequeueMessageRecoverer 重新入队 临时性故障(如DB连接超时)
    RepublishMessageRecoverer 转发到死信队列 关键业务,需人工干预

3.3 幂等性保障

唯一ID方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 1. 生产者生成唯一ID
String messageId = UUID.randomUUID().toString();
MessageProperties props = new MessageProperties();
props.setMessageId(messageId);

// 2. 消费者处理
@Transactional
public void handleMessage(Message message) {
String id = message.getMessageProperties().getMessageId();
if (messageIdRepository.existsById(id)) {
log.info("Duplicate message: {}", id);
return;
}
// 业务处理
processBusiness(message);
// 保存ID
messageIdRepository.save(new MessageId(id));
}

业务状态校验

1
2
3
4
5
6
// 更新订单状态时校验
int affected = orderMapper.updateStatusIfNotPaid(orderId, "PAID");
if (affected == 0) {
log.warn("Order {} already paid", orderId);
return;
}

3.4 兜底方案

  • 定时对账

    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Scheduled(fixedDelay = 20000)
    public void reconciliation() {
    List<Order> pendingOrders = orderService.findPendingMoreThan(30min);
    for (Order order : pendingOrders) {
    if (paymentService.checkPaid(order.getId())) {
    orderService.confirmPayment(order.getId());
    }
    }
    }
  • 人工干预通道

    • 死信队列监控仪表盘
    • 消息重发管理界面
    • 失败消息分类统计

延迟消息

1. TTL+死信队列实现

架构

1
2
3
生产者 → 普通交换机 → TTL队列(无消费者) 
↓ (过期/拒绝)
死信交换机 → 死信队列 → 消费者

配置步骤

1
2
3
4
5
6
7
8
9
10
11
12
13
// 1. 声明死信交换机
channel.exchangeDeclare("dlx.exchange", "direct");

// 2. 声明TTL队列(绑定死信参数)
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx.exchange");
args.put("x-dead-letter-routing-key", "delay.process");
args.put("x-message-ttl", 5000); // 5秒过期
channel.queueDeclare("ttl.queue", true, false, false, args);

// 3. 绑定死信队列
channel.queueDeclare("dlq.queue", true, false, false, null);
channel.queueBind("dlq.queue", "dlx.exchange", "delay.process");

2. RabbitMQ延迟插件

  • 优势:原生支持延迟消息,无需死信队列

  • 安装

    1
    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  • 使用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // 声明延迟交换机
    Map<String, Object> args = new HashMap<>();
    args.put("x-delayed-type", "direct");
    channel.exchangeDeclare("delay.exchange", "x-delayed-message", true, false, args);

    // 发布延迟消息
    AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
    .headers(Collections.singletonMap("x-delay", 5000)) // 5秒延迟
    .build();
    channel.basicPublish("delay.exchange", "routing.key", props, message.getBytes());

3. 方案对比

方案 精确度 吞吐量 复杂度 适用场景
TTL+DLX 低(队列级TTL) 简单延迟需求
延迟插件 高(消息级延迟) 精确延迟场景
外部调度器 极高 极高 超长延迟(小时级+)

最佳实践:30分钟内的延迟用TTL+DLX,精确到秒级的用延迟插件,超长延迟考虑时间轮算法+数据库存储。


全链路可靠性保障方案

1
2
3
4
5
graph LR
A[生产者] -->|1. 重试+Confirm+Return| B(RabbitMQ)
B -->|2. 持久化+Lazy Queue| C[消费者]
C -->|3. 手动ACK+本地重试+幂等| D[业务系统]
D -->|4. 定时对账| A

核心原则

  1. 生产端:确保消息必达MQ
  2. MQ端:确保消息不丢失
  3. 消费端:确保消息必处理且只处理一次
  4. 兜底:建立最终一致性补偿机制

没有100%可靠的系统,只有可接受的失败概率。根据业务容忍度设计合理的可靠性方案,避免过度设计。