🏁

消息队列面试指南

消息队列面试指南


一、消息队列基础

1.1 消息队列的作用

作用
说明
示例场景
解耦
生产者和消费者独立演进
订单系统与库存系统解耦
异步
非核心流程异步处理
注册后异步发送邮件
削峰
平滑处理流量高峰
秒杀请求写入队列慢慢消费
同步调用:
用户 → 订单服务 → 库存服务 → 积分服务 → 通知服务 → 响应
                    (任一服务慢或失败,整体受影响)

异步解耦:
用户 → 订单服务 → 响应
         ↓
      消息队列
    ↙    ↓    ↘
库存服务 积分服务 通知服务

1.2 消息队列的优缺点

优点: - 系统解耦,降低依赖 - 异步处理,提高响应速度 - 削峰填谷,保护下游系统 - 消息持久化,保证可靠性
缺点: - 系统复杂度增加 - 消息一致性问题 - 消息可能丢失或重复 - 运维成本增加

1.3 消息队列选型对比

特性
Kafka
RabbitMQ
RocketMQ
开发语言
Scala/Java
Erlang
Java
吞吐量
百万级
万级
十万级
延迟
毫秒级
微秒级
毫秒级
消息可靠性
事务消息
支持
支持
原生支持
延迟消息
不支持
支持(插件)
原生支持
消息顺序
Partition 内有序
队列内有序
队列内有序
消息回溯
支持
不支持
支持
适用场景
日志、大数据、流处理
业务消息、复杂路由
电商、金融、业务消息
选型建议: - 日志收集、大数据:Kafka(高吞吐) - 业务消息、复杂路由:RabbitMQ(功能丰富) - 电商、金融:RocketMQ(事务消息、延迟消息)

1.4 消息模型

点对点模型(P2P)
生产者 → Queue → 消费者
           ↘ 消费者(竞争消费)
  • 一条消息只能被一个消费者消费
  • 消费后消息从队列删除
发布订阅模型(Pub/Sub)
生产者 → Topic → 订阅者1
              → 订阅者2
              → 订阅者3
  • 一条消息可以被多个订阅者消费
  • 消息广播给所有订阅者

1.5 消息队列引入的问题

问题
说明
解决方案
可用性降低
MQ 故障导致系统不可用
集群部署、故障转移
复杂度增加
需要处理消息丢失、重复、顺序
完善的消息机制设计
一致性问题
消息处理失败导致数据不一致
事务消息、补偿机制

二、Kafka

2.1 Kafka 架构

                    Kafka Cluster
+----------------------------------------------------------+
|  +--------+    +--------+    +--------+                  |
|  | Broker |    | Broker |    | Broker |                  |
|  |   0    |    |   1    |    |   2    |                  |
|  +--------+    +--------+    +--------+                  |
|      ↑             ↑             ↑                       |
|      └─────────────┼─────────────┘                       |
||
|              +----------+                                |
|              | Zookeeper|  (或 KRaft)                    |
|              +----------+                                |
+----------------------------------------------------------+
       ↑                                    ↓
   Producer                            Consumer Group
核心概念: | 概念 | 说明 | |——|——| | Broker | Kafka 服务器节点 | | Topic | 消息主题,逻辑分类 | | Partition | 分区,Topic 的物理分片 | | Replica | 副本,保证高可用 | | Producer | 消息生产者 | | Consumer | 消息消费者 | | Consumer Group | 消费者组,组内竞争消费 | | Offset | 消息偏移量,消费位置 |

2.2 Kafka 分区策略

生产者分区策略
// 1. 指定分区
producer.send(new ProducerRecord<>("topic", 0, key, value));

// 2. 按 Key 哈希
producer.send(new ProducerRecord<>("topic", key, value));
// 分区 = hash(key) % numPartitions

// 3. 轮询(无 Key 时)
producer.send(new ProducerRecord<>("topic", value));

// 4. 自定义分区器
public class CustomPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                        Object value, byte[] valueBytes, Cluster cluster) {
        // 自定义分区逻辑
        return hash(key) % cluster.partitionCountForTopic(topic);
    }
}
分区数设计: - 分区数 = max(生产者吞吐量, 消费者吞吐量) / 单分区吞吐量 - 分区数过多:文件句柄增加、选举时间增加 - 分区数过少:并行度受限

2.3 Kafka 副本机制

Topic: orders (3 partitions, 3 replicas)

Broker 0          Broker 1          Broker 2
+----------+      +----------+      +----------+
| P0-Leader|      | P0-Follower     | P0-Follower
| P1-Follower     | P1-Leader|      | P1-Follower
| P2-Follower     | P2-Follower     | P2-Leader|
+----------+      +----------+      +----------+
核心概念: | 概念 | 说明 | |——|——| | Leader | 处理读写请求的副本 | | Follower | 从 Leader 同步数据的副本 | | ISR | In-Sync Replicas,与 Leader 同步的副本集合 | | OSR | Out-of-Sync Replicas,落后的副本 | | HW | High Watermark,消费者可见的最高位置 | | LEO | Log End Offset,日志末尾偏移量 |
ISR 机制
Leader: [msg1, msg2, msg3, msg4, msg5]LEO=5HW=3

Follower1: [msg1, msg2, msg3]  (ISR)
Follower2: [msg1, msg2]        (落后,可能被踢出 ISR)

2.4 消费者组与 Rebalance

Topic: orders (4 partitions)

Consumer Group A:
  Consumer1 → P0, P1
  Consumer2 → P2, P3

Consumer Group B:
  Consumer1 → P0, P1, P2, P3
Rebalance 触发条件: 1. 消费者加入/离开消费者组 2. 消费者心跳超时 3. Topic 分区数变化 4. 订阅的 Topic 变化
Rebalance 策略: | 策略 | 说明 | |——|——| | Range | 按 Topic 分区范围分配 | | RoundRobin | 轮询分配所有分区 | | Sticky | 尽量保持原有分配,减少变动 | | CooperativeSticky | 增量 Rebalance,避免 Stop-The-World |

2.5 Kafka 高性能原理

技术
说明
顺序写
磁盘顺序写性能接近内存
零拷贝
sendfile 减少数据拷贝
页缓存
利用 OS Page Cache
批量发送
批量压缩发送,减少网络开销
分区并行
多 Partition 并行读写
稀疏索引
快速定位消息位置
零拷贝原理
传统方式(4次拷贝):
磁盘 → 内核缓冲区 → 用户缓冲区 → Socket 缓冲区 → 网卡

零拷贝(2次拷贝):
磁盘 → 内核缓冲区 → 网卡
      (sendfile 系统调用)

2.6 Kafka 消息存储机制

Topic: orders
├── Partition-0/
│   ├── 00000000000000000000.log    # 消息数据
│   ├── 00000000000000000000.index  # 偏移量索引
│   ├── 00000000000000000000.timeindex  # 时间索引
│   ├── 00000000000000368769.log
│   ├── 00000000000000368769.index
│   └── ...
├── Partition-1/
└── Partition-2/
Segment 文件: - 每个 Partition 分为多个 Segment - 文件名是该 Segment 第一条消息的 Offset - 默认 1GB 或 7 天滚动
索引查找过程
查找 Offset=368800 的消息:
1. 二分查找确定 Segment 文件(368769.log)
2.368769.index 中二分查找最近的索引项
3. 从索引位置顺序扫描找到目标消息

2.7 Kafka Offset 管理

Offset 存储: - 旧版本:存储在 Zookeeper - 新版本:存储在内部 Topic __consumer_offsets
提交方式
// 1. 自动提交
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");

// 2. 手动同步提交
consumer.commitSync();

// 3. 手动异步提交
consumer.commitAsync((offsets, exception) -> {
    if (exception != null) {
        log.error("Commit failed", exception);
    }
});

// 4. 指定 Offset 提交
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("topic", 0), new OffsetAndMetadata(100));
consumer.commitSync(offsets);

2.8 Kafka 消息丢失场景与解决

环节
丢失场景
解决方案
生产端
发送失败未重试
acks=all + retries + 幂等
Broker
未持久化就宕机
acks=all + min.insync.replicas
消费端
自动提交后处理失败
手动提交 + 处理完再提交
生产者配置
props.put("acks", "all");  // 所有 ISR 确认
props.put("retries", 3);   // 重试次数
props.put("enable.idempotence", true);  // 幂等性
Broker 配置
min.insync.replicas=2  # 最少同步副本数
unclean.leader.election.enable=false  # 禁止非 ISR 副本选举

2.9 Kafka 消息重复消费与幂等

重复消费原因: 1. 消费者处理完成但提交 Offset 前宕机 2. Rebalance 导致重复消费 3. 生产者重试导致重复发送
幂等性保证
// 生产者幂等(Kafka 2.0+)
props.put("enable.idempotence", true);
// 原理:Producer ID + Sequence Number 去重

// 消费者幂等(业务层实现)
public void consume(Message msg) {
    String msgId = msg.getId();
    // 1. 数据库唯一索引
    // 2. Redis SETNX
    if (redis.setnx("consumed:" + msgId, "1", 86400)) {
        processMessage(msg);
    }
}

2.10 Kafka 消息顺序性保证

全局有序: - 只使用 1 个 Partition(牺牲并行度)
分区有序
// 相同 Key 的消息发送到同一分区
producer.send(new ProducerRecord<>("topic", orderId, message));
消费端保证
// 单线程消费
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        process(record);  // 顺序处理
    }
    consumer.commitSync();
}

2.11 Kafka 事务消息

// 初始化事务
producer.initTransactions();

try {
    producer.beginTransaction();

    // 发送消息
    producer.send(new ProducerRecord<>("topic1", "msg1"));
    producer.send(new ProducerRecord<>("topic2", "msg2"));

    // 提交消费者 Offset(消费-生产原子性)
    producer.sendOffsetsToTransaction(offsets, consumerGroupId);

    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
}
事务隔离级别
// 消费者只读取已提交的消息
props.put("isolation.level", "read_committed");

2.12 Kafka Controller 与 Zookeeper

Controller 职责: - 监控 Broker 状态 - 管理分区 Leader 选举 - 同步元数据到其他 Broker
Zookeeper 存储内容
/kafka
├── brokers/
│   ├── ids/          # Broker 注册信息
│   └── topics/       # Topic 元数据
├── controller        # Controller 选举
├── consumers/        # 消费者信息(旧版本)
└── config/           # 配置信息

2.13 Kafka KRaft 模式

KRaft(Kafka Raft):移除 Zookeeper 依赖
传统模式:
Broker ←→ Zookeeper ←→ Controller

KRaft 模式:
Broker ←→ Controller(内置 Raft)
优势: - 简化部署,减少组件 - 提升元数据处理性能 - 支持更多分区(百万级)

三、RabbitMQ

3.1 RabbitMQ 架构

Producer → Exchange → Binding → Queue → Consumer
              ↓
         Routing Key
核心概念: | 概念 | 说明 | |——|——| | Producer | 消息生产者 | | Exchange | 交换机,接收消息并路由 | | Binding | 绑定关系,Exchange 与 Queue 的关联 | | Queue | 队列,存储消息 | | Consumer | 消息消费者 | | Virtual Host | 虚拟主机,逻辑隔离 | | Connection | TCP 连接 | | Channel | 信道,复用 Connection |

3.2 Exchange 类型

Direct Exchange
Exchange (direct)
    ↓ routing_key="order"
Queue: order_queue (binding_key="order")
  • 精确匹配 Routing Key
Fanout Exchange
Exchange (fanout)
    ↓ 广播
Queue1, Queue2, Queue3
  • 忽略 Routing Key,广播到所有绑定队列
Topic Exchange
Exchange (topic)
    ↓ routing_key="order.create.vip"
Queue1 (binding_key="order.#")      ✓ 匹配
Queue2 (binding_key="order.create.*") ✓ 匹配
Queue3 (binding_key="*.delete.*")    ✗ 不匹配
  • 匹配一个单词
  • # 匹配零个或多个单词
Headers Exchange: - 根据消息头属性匹配,不使用 Routing Key

3.3 消息确认机制

生产者确认
// 1. 事务模式(性能差)
channel.txSelect();
channel.basicPublish(...);
channel.txCommit();

// 2. Confirm 模式(推荐)
channel.confirmSelect();
channel.basicPublish(...);
if (channel.waitForConfirms()) {
    // 发送成功
}

// 3. 异步 Confirm
channel.addConfirmListener((deliveryTag, multiple) -> {
    // ACK 回调
}, (deliveryTag, multiple) -> {
    // NACK 回调,需要重发
});
消费者确认
// 手动 ACK
channel.basicConsume(queue, false, (tag, delivery) -> {
    try {
        processMessage(delivery.getBody());
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    } catch (Exception e) {
        // 拒绝并重新入队
        channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
    }
});

3.4 RabbitMQ 持久化

三个层面持久化
// 1. Exchange 持久化
channel.exchangeDeclare("exchange", "direct", true);

// 2. Queue 持久化
channel.queueDeclare("queue", true, false, false, null);

// 3. Message 持久化
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
    .deliveryMode(2)  // 持久化
    .build();
channel.basicPublish("exchange", "key", props, message);

3.5 死信队列(DLX)

死信产生条件: 1. 消息被拒绝(basic.reject/basic.nack)且 requeue=false 2. 消息 TTL 过期 3. 队列达到最大长度
// 声明死信交换机和队列
channel.exchangeDeclare("dlx.exchange", "direct");
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "dlx.key");

// 业务队列绑定死信交换机
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx.exchange");
args.put("x-dead-letter-routing-key", "dlx.key");
channel.queueDeclare("business.queue", true, false, false, args);

3.6 延迟队列实现

方案一:TTL + 死信队列
Producer → Exchange → Queue (TTL=30s) → 过期 → DLXDLQ → Consumer
// 队列级别 TTL
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 30000);  // 30秒
args.put("x-dead-letter-exchange", "dlx.exchange");
channel.queueDeclare("delay.queue", true, false, false, args);

// 消息级别 TTL
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
    .expiration("30000")  // 30秒
    .build();
方案二:延迟插件(rabbitmq_delayed_message_exchange)
// 声明延迟交换机
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("delayed.exchange", "x-delayed-message", true, false, args);

// 发送延迟消息
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
    .headers(Map.of("x-delay", 30000))  // 延迟30秒
    .build();
channel.basicPublish("delayed.exchange", "key", props, message);

3.7 RabbitMQ 高可用

镜像队列(Classic)
# 设置镜像策略
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
仲裁队列(Quorum Queue,推荐)
// 声明仲裁队列
Map<String, Object> args = new HashMap<>();
args.put("x-queue-type", "quorum");
channel.queueDeclare("quorum.queue", true, false, false, args);
特性
镜像队列
仲裁队列
一致性
最终一致
强一致(Raft)
性能
较高
略低
可靠性
可能丢消息
不丢消息
推荐
旧版本
新版本(3.8+)

3.8 RabbitMQ 消息可靠性保证

生产者 → Confirm → Exchange → Mandatory → Queue → 持久化 → Consumer → ACK
           ↓              ↓
        失败重试      Return 回调
完整可靠性配置
// 1. 开启 Confirm
channel.confirmSelect();

// 2. 开启 Return(消息无法路由时回调)
channel.addReturnListener((replyCode, replyText, exchange, routingKey, props, body) -> {
    // 消息无法路由,记录日志或重发
});

// 3. 发送时设置 mandatory=true
channel.basicPublish(exchange, routingKey, true, props, message);

// 4. 消费者手动 ACK
channel.basicConsume(queue, false, consumer);

3.9 RabbitMQ 流控机制

内存告警
# 内存使用超过 40% 触发流控
rabbitmqctl set_vm_memory_high_watermark 0.4
磁盘告警
# 磁盘剩余空间低于 50MB 触发流控
rabbitmqctl set_disk_free_limit 50MB
连接流控: - 生产者发送过快时,RabbitMQ 会阻塞 TCP 连接 - 通过 Credit 机制控制消费者预取
// 设置预取数量
channel.basicQos(100);  // 每次最多预取 100 条

四、RocketMQ

4.1 RocketMQ 架构

                    NameServer Cluster
                   +------------------+
                   | NameServer | NS  |
                   +------------------+
                          ↑ 注册
    +---------------------+---------------------+
    ↓                     ↓                     ↓
+--------+           +--------+           +--------+
| Broker |           | Broker |           | Broker |
| Master |←─同步─→   | Slave  |           | Master |
+--------+           +--------+           +--------+
    ↑                                         ↑
Producer                                  Consumer
核心组件: | 组件 | 说明 | |——|——| | NameServer | 轻量级注册中心,无状态 | | Broker | 消息存储和转发 | | Producer | 消息生产者 | | Consumer | 消息消费者 | | Topic | 消息主题 | | MessageQueue | 消息队列,类似 Kafka Partition | | ConsumerGroup | 消费者组 |

4.2 RocketMQ 消息类型

类型
说明
适用场景
普通消息
无特殊要求
一般业务
顺序消息
保证消费顺序
订单状态流转
延迟消息
延迟投递
订单超时取消
事务消息
分布式事务
跨服务数据一致性
批量消息
批量发送
日志收集

4.3 RocketMQ 事务消息原理

1. 发送半消息(Half Message)
Producer ───────────────────────→ Broker
         ←── 半消息发送成功 ──

2. 执行本地事务
Producer: executeLocalTransaction()
         ├── COMMIT_MESSAGE  → 提交消息
         ├── ROLLBACK_MESSAGE → 回滚消息
         └── UNKNOW → 等待回查

3. 提交/回滚
Producer ─── Commit/Rollback ──→ Broker

4. 事务回查(本地事务状态未知时)
Broker ─── 回查本地事务状态 ──→ Producer
       ←── COMMIT/ROLLBACK ──
代码示例
TransactionMQProducer producer = new TransactionMQProducer("group");
producer.setTransactionListener(new TransactionListener() {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            // 执行本地事务
            orderService.createOrder(order);
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 事务回查
        String orderId = msg.getKeys();
        if (orderService.exists(orderId)) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
});

// 发送事务消息
producer.sendMessageInTransaction(msg, null);

4.4 RocketMQ 延迟消息

延迟级别(固定级别):
// 18 个延迟级别
// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

Message msg = new Message("topic", "body".getBytes());
msg.setDelayTimeLevel(3);  // 延迟 10s
producer.send(msg);
RocketMQ 5.0 任意延迟
Message msg = new Message("topic", "body".getBytes());
msg.setDeliverTimeMs(System.currentTimeMillis() + 60000);  // 延迟 60s
producer.send(msg);

4.5 RocketMQ 消息存储

Broker 存储结构:
store/
├── commitlog/           # 消息数据(顺序写)
│   ├── 00000000000000000000
│   └── 00000000001073741824
├── consumequeue/        # 消费队列(索引)
│   └── TopicA/
│       ├── 0/
│       └── 1/
└── index/               # 消息索引(按 Key 查询)
CommitLog: - 所有消息顺序写入同一文件 - 单个文件默认 1GB - 顺序写性能高
ConsumeQueue: - 每个 Topic 的每个 Queue 一个文件 - 存储消息在 CommitLog 中的位置 - 定长记录,查询效率高

4.6 RocketMQ 主从同步

模式
说明
可靠性
性能
异步复制
Master 写入后立即返回
可能丢消息
同步双写
Master 和 Slave 都写入后返回
不丢消息
较低
# Broker 配置
brokerRole=SYNC_MASTER  # 同步双写
# brokerRole=ASYNC_MASTER  # 异步复制

4.7 RocketMQ 消息过滤

Tag 过滤
// 生产者
Message msg = new Message("topic", "TagA", "body".getBytes());

// 消费者
consumer.subscribe("topic", "TagA || TagB");
SQL 过滤
// 生产者
Message msg = new Message("topic", "body".getBytes());
msg.putUserProperty("age", "18");

// 消费者
consumer.subscribe("topic", MessageSelector.bySql("age > 16"));

4.8 RocketMQ 消费模式

Push 模式(推荐):
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group");
consumer.subscribe("topic", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    for (MessageExt msg : msgs) {
        process(msg);
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
Pull 模式
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("group");
consumer.subscribe("topic", "*");
consumer.start();

while (true) {
    List<MessageExt> msgs = consumer.poll();
    for (MessageExt msg : msgs) {
        process(msg);
    }
    consumer.commitSync();
}

4.9 RocketMQ 消息回溯

// 按时间回溯
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
consumer.setConsumeTimestamp("20260109120000");  // 从指定时间开始消费

五、消息可靠性

5.1 如何保证消息不丢失

生产端                  Broker                  消费端
   |                      |                       |
   |-- 同步发送 + 重试 -->|                       |
   |<-- ACK --------------|                       |
   |                      |-- 持久化 + 副本 ----->|
   |                      |                       |-- 处理消息
   |                      |<-- 手动 ACK ----------|
三个环节保证
环节
Kafka
RabbitMQ
RocketMQ
生产端
acks=all + retries
Confirm 模式
同步发送 + 重试
Broker
副本 + min.insync.replicas
持久化 + 镜像队列
同步双写
消费端
手动提交 Offset
手动 ACK
手动 ACK

5.2 如何保证消息不重复消费

幂等性方案
方案
实现
适用场景
唯一 ID + 去重表
数据库唯一索引
通用
Redis SETNX
消费前检查
高并发
状态机
状态只能单向流转
订单状态
乐观锁
版本号控制
更新操作
def consume(message):
    msg_id = message.id

    # Redis 去重
    if not redis.setnx(f"consumed:{msg_id}", "1", ex=86400):
        return  # 已消费

    try:
        # 业务处理
        process(message)
    except Exception as e:
        # 处理失败,删除标记,允许重试
        redis.delete(f"consumed:{msg_id}")
        raise e

5.3 如何保证消息顺序性

Kafka
// 相同 Key 发送到同一 Partition
producer.send(new ProducerRecord<>("topic", orderId, message));

// 单线程消费
consumer.poll().forEach(record -> process(record));
RocketMQ
// 顺序消息
SendResult result = producer.send(msg, (mqs, msg1, arg) -> {
    int index = arg.hashCode() % mqs.size();
    return mqs.get(index);  // 相同 orderId 发送到同一队列
}, orderId);

// 顺序消费
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
    // 单线程顺序消费
    return ConsumeOrderlyStatus.SUCCESS;
});

5.4 消息确认机制对比

机制
Kafka
RabbitMQ
RocketMQ
生产确认
acks
Confirm
SendResult
消费确认
Offset 提交
ACK/NACK
ACK
自动确认
enable.auto.commit
autoAck
默认自动
手动确认
commitSync/Async
basicAck
返回 SUCCESS

5.5 At Least Once vs At Most Once vs Exactly Once

语义
说明
实现方式
At Most Once
最多一次,可能丢失
发送后不确认
At Least Once
至少一次,可能重复
确认后再删除
Exactly Once
精确一次,不丢不重
幂等 + 事务
Kafka Exactly Once
props.put("enable.idempotence", true);  // 幂等生产者
props.put("transactional.id", "tx-1");  // 事务

producer.initTransactions();
producer.beginTransaction();
producer.send(record);
producer.commitTransaction();

六、消息积压

6.1 消息积压原因分析

原因
说明
排查方法
消费速度慢
消费者处理逻辑耗时
监控消费耗时
消费者故障
消费者宕机或异常
检查消费者状态
消费者数量不足
并行度不够
检查消费者数量
流量突增
生产速度远超消费速度
监控生产速率
下游服务慢
依赖的服务响应慢
检查下游服务

6.2 消息积压处理方案

紧急处理
1. 快速扩容消费者
   - Kafka:增加消费者(不超过分区数)
   - RabbitMQ:增加消费者实例

2. 临时跳过非关键消息
   - 记录日志,后续补偿

3. 消息转移
   - 将积压消息转移到临时队列
   - 批量消费者处理
Kafka 扩容消费者
// 增加分区(需要重新分配)
bin/kafka-topics.sh --alter --topic test --partitions 10

// 增加消费者实例(自动 Rebalance)
RocketMQ 临时处理
// 临时增加消费线程
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(64);

6.3 消费者扩容方案

MQ
扩容方式
注意事项
Kafka
增加消费者(≤分区数)
分区数是并行度上限
RabbitMQ
增加消费者实例
无上限,但要控制预取
RocketMQ
增加消费者(≤队列数)
队列数是并行度上限

6.4 消息积压监控与告警

监控指标: | 指标 | 说明 | 告警阈值 | |——|——|———-| | Lag | 消费延迟(条数) | > 10000 | | ConsumerOffset | 消费位置 | 增长缓慢 | | ProducerRate | 生产速率 | 突增 | | ConsumeRate | 消费速率 | 下降 |
Kafka 监控
# 查看消费延迟
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group my-group

七、高级特性

7.1 延迟消息实现方案对比

方案
优点
缺点
RabbitMQ TTL + DLX
简单
只能队列级别 TTL
RabbitMQ 延迟插件
任意延迟
需要安装插件
RocketMQ 延迟级别
原生支持
固定级别
RocketMQ 5.0
任意延迟
版本要求
Redis ZSet
灵活
需要轮询
时间轮
高效
实现复杂
Redis ZSet 实现
def delay_message(msg, delay_seconds):
    execute_time = time.time() + delay_seconds
    redis.zadd("delay_queue", {json.dumps(msg): execute_time})

def consume_delay_messages():
    while True:
        now = time.time()
        messages = redis.zrangebyscore("delay_queue", 0, now)
        for msg in messages:
            process(json.loads(msg))
            redis.zrem("delay_queue", msg)
        time.sleep(0.1)

7.2 事务消息实现原理

RocketMQ 事务消息
1. 发送半消息(对消费者不可见)
2. 执行本地事务
3. 根据本地事务结果提交或回滚
4. 定时回查未确认的事务
Kafka 事务
1. 初始化事务
2. 开始事务
3. 发送消息(可跨多个 Topic/Partition)
4. 提交消费者 Offset(可选)
5. 提交或回滚事务

7.3 死信队列应用场景

场景
说明
消息处理失败
多次重试后进入死信队列
消息过期
TTL 到期未消费
队列满
队列达到最大长度
异常消息
格式错误、业务异常
处理死信
// 监听死信队列
@RabbitListener(queues = "dlx.queue")
public void handleDeadLetter(Message message) {
    // 1. 记录日志
    log.error("Dead letter: {}", message);

    // 2. 人工处理或告警
    alertService.notify(message);

    // 3. 存储到数据库
    deadLetterRepository.save(message);
}

7.4 消息回溯与重放

Kafka
// 按时间回溯
Map<TopicPartition, Long> timestamps = new HashMap<>();
timestamps.put(new TopicPartition("topic", 0), targetTimestamp);
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);

// 重置 Offset
consumer.seek(partition, offset);
RocketMQ
// 按时间回溯
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
consumer.setConsumeTimestamp("20260109120000");

7.5 消息轨迹追踪

RocketMQ 消息轨迹
// 开启消息轨迹
DefaultMQProducer producer = new DefaultMQProducer("group", true);

// 查询轨迹
// 通过 RocketMQ Console 或 API 查询
自定义追踪
// 消息中添加 TraceId
Message msg = new Message("topic", body);
msg.putUserProperty("traceId", TraceContext.getTraceId());
msg.putUserProperty("spanId", TraceContext.getSpanId());

八、生产实践

8.1 生产者设计最佳实践

实践
说明
同步发送 + 重试
重要消息使用同步发送
批量发送
提高吞吐量
消息压缩
减少网络传输
合理分区
根据业务选择分区策略
幂等生产
开启幂等性
// Kafka 生产者配置
props.put("acks", "all");
props.put("retries", 3);
props.put("batch.size", 16384);
props.put("linger.ms", 5);
props.put("compression.type", "lz4");
props.put("enable.idempotence", true);

8.2 消费者设计最佳实践

实践
说明
手动提交
处理完成后再提交
批量消费
提高吞吐量
幂等消费
保证重复消费不影响
异常处理
合理的重试和死信策略
消费限速
保护下游服务
// Kafka 消费者配置
props.put("enable.auto.commit", false);
props.put("max.poll.records", 500);
props.put("max.poll.interval.ms", 300000);

8.3 消息队列容量规划

日消息量:1000 万条
单条消息:1 KB
日存储量:1000 万 × 1 KB = 10 GB

保留 7 天:10 GB × 7 = 70 GB
3 副本:70 GB × 3 = 210 GB

峰值 QPS = 日均 QPS × 峰值系数
        = (1000/ 86400) × 101157 × 10 = 11570 QPS

8.4 消息队列监控指标

指标
说明
告警阈值
消息积压
未消费消息数
> 10000
消费延迟
消费时间差
> 5 分钟
生产速率
每秒生产消息数
突增 50%
消费速率
每秒消费消息数
下降 50%
Broker 磁盘
磁盘使用率
> 80%
Broker 内存
内存使用率
> 80%

8.5 消息队列故障排查

消息丢失排查
1. 检查生产者日志,确认发送成功
2. 检查 Broker 日志,确认接收和存储
3. 检查消费者日志,确认消费情况
4. 检查 Offset 提交情况
消息积压排查
1. 检查消费者状态(是否存活)
2. 检查消费速率(是否下降)
3. 检查下游服务(是否响应慢)
4. 检查消费者日志(是否有异常)

8.6 消息队列性能调优

Kafka 调优
# Broker
num.io.threads=8
num.network.threads=3
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400

# Producer
batch.size=32768
linger.ms=5
buffer.memory=33554432
compression.type=lz4

# Consumer
fetch.min.bytes=1
fetch.max.wait.ms=500
max.partition.fetch.bytes=1048576
RocketMQ 调优
# Broker
sendMessageThreadPoolNums=16
pullMessageThreadPoolNums=16
flushDiskType=ASYNC_FLUSH

# Producer
sendMsgTimeout=3000
compressMsgBodyOverHowmuch=4096

# Consumer
consumeThreadMin=20
consumeThreadMax=64
pullBatchSize=32

九、面试高频问题

9.1 为什么使用消息队列?

回答模板
在我们的项目中使用消息队列主要解决三个问题:

1. 解耦:订单系统创建订单后,需要通知库存、积分、通知等多个系统。
   使用 MQ 后,订单系统只需发送消息,不需要知道有哪些下游系统。

2. 异步:用户注册后需要发送邮件和短信,这些操作比较耗时。
   使用 MQ 异步处理,用户无需等待,提升响应速度。

3. 削峰:秒杀场景下,瞬间流量可能达到平时的 100 倍。
   使用 MQ 缓冲请求,后端按照自己的处理能力慢慢消费。

9.2 如何保证消息不丢失?

回答模板
消息丢失可能发生在三个环节,需要分别保证:

1. 生产端:
   - Kafka:设置 acks=all,开启重试
   - RabbitMQ:开启 Confirm 模式
   - RocketMQ:使用同步发送,检查返回结果

2. Broker 端:
   - Kafka:设置 min.insync.replicas >= 2
   - RabbitMQ:开启持久化和镜像队列
   - RocketMQ:使用同步双写模式

3. 消费端:
   - 关闭自动确认,处理完成后手动确认
   - 处理失败时不确认,让消息重新投递

9.3 如何保证消息顺序性?

回答模板
消息顺序性需要从生产和消费两端保证:

1. 生产端:
   - 将需要保证顺序的消息发送到同一分区/队列
   - Kafka:使用相同的 Key
   - RocketMQ:使用 MessageQueueSelector

2. 消费端:
   - 单线程消费,或者使用顺序消费模式
   - RocketMQ:使用 MessageListenerOrderly
   - Kafka:一个分区只被一个消费者消费

注意:全局有序性能很差,通常只需要保证业务相关消息的局部有序。

9.4 消息积压如何处理?

回答模板
消息积压的处理分为紧急处理和根本解决:

紧急处理:
1. 快速扩容消费者(注意分区数限制)
2. 临时跳过非关键消息,记录日志后续补偿
3. 如果是消费者 bug,修复后重新部署

根本解决:
1. 优化消费者处理逻辑,提高消费速度
2. 增加分区/队列数,提高并行度
3. 批量消费,减少网络开销
4. 异步处理,不阻塞消费线程
5. 建立监控告警,提前发现问题

9.5 Kafka、RabbitMQ、RocketMQ 如何选择?

回答模板
根据业务场景选择:

1. Kafka:
   - 适合日志收集、大数据、流处理
   - 高吞吐量,百万级 TPS
   - 消息回溯能力强

2. RabbitMQ:
   - 适合业务消息、复杂路由
   - 延迟低,微秒级
   - 功能丰富,多种 Exchange 类型

3. RocketMQ:
   - 适合电商、金融等业务场景
   - 原生支持事务消息、延迟消息
   - 阿里开源,中文文档丰富

我们项目选择 [具体 MQ] 是因为 [具体原因]...

最后更新:2026-01-09
你觉得这篇文章怎么样?
YYDS
比心
加油
菜狗
views

Loading Comments...