消息队列面试指南
一、消息队列基础
1.1 消息队列的作用
作用 | 说明 | 示例场景 |
解耦 | 生产者和消费者独立演进 | 订单系统与库存系统解耦 |
异步 | 非核心流程异步处理 | 注册后异步发送邮件 |
削峰 | 平滑处理流量高峰 | 秒杀请求写入队列慢慢消费 |
同步调用:
用户 → 订单服务 → 库存服务 → 积分服务 → 通知服务 → 响应
(任一服务慢或失败,整体受影响)
异步解耦:
用户 → 订单服务 → 响应
↓
消息队列
↙ ↓ ↘
库存服务 积分服务 通知服务1.2 消息队列的优缺点
优点:
- 系统解耦,降低依赖
- 异步处理,提高响应速度
- 削峰填谷,保护下游系统
- 消息持久化,保证可靠性
缺点:
- 系统复杂度增加
- 消息一致性问题
- 消息可能丢失或重复
- 运维成本增加
1.3 消息队列选型对比
特性 | Kafka | RabbitMQ | RocketMQ |
开发语言 | Scala/Java | Erlang | Java |
吞吐量 | 百万级 | 万级 | 十万级 |
延迟 | 毫秒级 | 微秒级 | 毫秒级 |
消息可靠性 | 高 | 高 | 高 |
事务消息 | 支持 | 支持 | 原生支持 |
延迟消息 | 不支持 | 支持(插件) | 原生支持 |
消息顺序 | Partition 内有序 | 队列内有序 | 队列内有序 |
消息回溯 | 支持 | 不支持 | 支持 |
适用场景 | 日志、大数据、流处理 | 业务消息、复杂路由 | 电商、金融、业务消息 |
选型建议:
- 日志收集、大数据:Kafka(高吞吐)
- 业务消息、复杂路由:RabbitMQ(功能丰富)
- 电商、金融:RocketMQ(事务消息、延迟消息)
1.4 消息模型
点对点模型(P2P):
生产者 → Queue → 消费者
↘ 消费者(竞争消费)- 一条消息只能被一个消费者消费
- 消费后消息从队列删除
发布订阅模型(Pub/Sub):
生产者 → Topic → 订阅者1
→ 订阅者2
→ 订阅者3- 一条消息可以被多个订阅者消费
- 消息广播给所有订阅者
1.5 消息队列引入的问题
问题 | 说明 | 解决方案 |
可用性降低 | MQ 故障导致系统不可用 | 集群部署、故障转移 |
复杂度增加 | 需要处理消息丢失、重复、顺序 | 完善的消息机制设计 |
一致性问题 | 消息处理失败导致数据不一致 | 事务消息、补偿机制 |
二、Kafka
2.1 Kafka 架构
Kafka Cluster
+----------------------------------------------------------+
| +--------+ +--------+ +--------+ |
| | Broker | | Broker | | Broker | |
| | 0 | | 1 | | 2 | |
| +--------+ +--------+ +--------+ |
| ↑ ↑ ↑ |
| └─────────────┼─────────────┘ |
| ↓ |
| +----------+ |
| | Zookeeper| (或 KRaft) |
| +----------+ |
+----------------------------------------------------------+
↑ ↓
Producer Consumer Group核心概念:
| 概念 | 说明 |
|——|——|
| Broker | Kafka 服务器节点 |
| Topic | 消息主题,逻辑分类 |
| Partition | 分区,Topic 的物理分片 |
| Replica | 副本,保证高可用 |
| Producer | 消息生产者 |
| Consumer | 消息消费者 |
| Consumer Group | 消费者组,组内竞争消费 |
| Offset | 消息偏移量,消费位置 |
2.2 Kafka 分区策略
生产者分区策略:
// 1. 指定分区
producer.send(new ProducerRecord<>("topic", 0, key, value));
// 2. 按 Key 哈希
producer.send(new ProducerRecord<>("topic", key, value));
// 分区 = hash(key) % numPartitions
// 3. 轮询(无 Key 时)
producer.send(new ProducerRecord<>("topic", value));
// 4. 自定义分区器
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
// 自定义分区逻辑
return hash(key) % cluster.partitionCountForTopic(topic);
}
}分区数设计:
- 分区数 = max(生产者吞吐量, 消费者吞吐量) / 单分区吞吐量
- 分区数过多:文件句柄增加、选举时间增加
- 分区数过少:并行度受限
2.3 Kafka 副本机制
Topic: orders (3 partitions, 3 replicas)
Broker 0 Broker 1 Broker 2
+----------+ +----------+ +----------+
| P0-Leader| | P0-Follower | P0-Follower
| P1-Follower | P1-Leader| | P1-Follower
| P2-Follower | P2-Follower | P2-Leader|
+----------+ +----------+ +----------+核心概念:
| 概念 | 说明 |
|——|——|
| Leader | 处理读写请求的副本 |
| Follower | 从 Leader 同步数据的副本 |
| ISR | In-Sync Replicas,与 Leader 同步的副本集合 |
| OSR | Out-of-Sync Replicas,落后的副本 |
| HW | High Watermark,消费者可见的最高位置 |
| LEO | Log End Offset,日志末尾偏移量 |
ISR 机制:
Leader: [msg1, msg2, msg3, msg4, msg5]
↑ LEO=5
↑ HW=3
Follower1: [msg1, msg2, msg3] (在 ISR 中)
Follower2: [msg1, msg2] (落后,可能被踢出 ISR)2.4 消费者组与 Rebalance
Topic: orders (4 partitions)
Consumer Group A:
Consumer1 → P0, P1
Consumer2 → P2, P3
Consumer Group B:
Consumer1 → P0, P1, P2, P3Rebalance 触发条件:
1. 消费者加入/离开消费者组
2. 消费者心跳超时
3. Topic 分区数变化
4. 订阅的 Topic 变化
Rebalance 策略:
| 策略 | 说明 |
|——|——|
| Range | 按 Topic 分区范围分配 |
| RoundRobin | 轮询分配所有分区 |
| Sticky | 尽量保持原有分配,减少变动 |
| CooperativeSticky | 增量 Rebalance,避免 Stop-The-World |
2.5 Kafka 高性能原理
技术 | 说明 |
顺序写 | 磁盘顺序写性能接近内存 |
零拷贝 | sendfile 减少数据拷贝 |
页缓存 | 利用 OS Page Cache |
批量发送 | 批量压缩发送,减少网络开销 |
分区并行 | 多 Partition 并行读写 |
稀疏索引 | 快速定位消息位置 |
零拷贝原理:
传统方式(4次拷贝):
磁盘 → 内核缓冲区 → 用户缓冲区 → Socket 缓冲区 → 网卡
零拷贝(2次拷贝):
磁盘 → 内核缓冲区 → 网卡
(sendfile 系统调用)2.6 Kafka 消息存储机制
Topic: orders
├── Partition-0/
│ ├── 00000000000000000000.log # 消息数据
│ ├── 00000000000000000000.index # 偏移量索引
│ ├── 00000000000000000000.timeindex # 时间索引
│ ├── 00000000000000368769.log
│ ├── 00000000000000368769.index
│ └── ...
├── Partition-1/
└── Partition-2/Segment 文件:
- 每个 Partition 分为多个 Segment
- 文件名是该 Segment 第一条消息的 Offset
- 默认 1GB 或 7 天滚动
索引查找过程:
查找 Offset=368800 的消息:
1. 二分查找确定 Segment 文件(368769.log)
2. 在 368769.index 中二分查找最近的索引项
3. 从索引位置顺序扫描找到目标消息2.7 Kafka Offset 管理
Offset 存储:
- 旧版本:存储在 Zookeeper
- 新版本:存储在内部 Topic
__consumer_offsets提交方式:
// 1. 自动提交
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
// 2. 手动同步提交
consumer.commitSync();
// 3. 手动异步提交
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
log.error("Commit failed", exception);
}
});
// 4. 指定 Offset 提交
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("topic", 0), new OffsetAndMetadata(100));
consumer.commitSync(offsets);2.8 Kafka 消息丢失场景与解决
环节 | 丢失场景 | 解决方案 |
生产端 | 发送失败未重试 | acks=all + retries + 幂等 |
Broker | 未持久化就宕机 | acks=all + min.insync.replicas |
消费端 | 自动提交后处理失败 | 手动提交 + 处理完再提交 |
生产者配置:
props.put("acks", "all"); // 所有 ISR 确认
props.put("retries", 3); // 重试次数
props.put("enable.idempotence", true); // 幂等性Broker 配置:
min.insync.replicas=2 # 最少同步副本数
unclean.leader.election.enable=false # 禁止非 ISR 副本选举2.9 Kafka 消息重复消费与幂等
重复消费原因:
1. 消费者处理完成但提交 Offset 前宕机
2. Rebalance 导致重复消费
3. 生产者重试导致重复发送
幂等性保证:
// 生产者幂等(Kafka 2.0+)
props.put("enable.idempotence", true);
// 原理:Producer ID + Sequence Number 去重
// 消费者幂等(业务层实现)
public void consume(Message msg) {
String msgId = msg.getId();
// 1. 数据库唯一索引
// 2. Redis SETNX
if (redis.setnx("consumed:" + msgId, "1", 86400)) {
processMessage(msg);
}
}2.10 Kafka 消息顺序性保证
全局有序:
- 只使用 1 个 Partition(牺牲并行度)
分区有序:
// 相同 Key 的消息发送到同一分区
producer.send(new ProducerRecord<>("topic", orderId, message));消费端保证:
// 单线程消费
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
process(record); // 顺序处理
}
consumer.commitSync();
}2.11 Kafka 事务消息
// 初始化事务
producer.initTransactions();
try {
producer.beginTransaction();
// 发送消息
producer.send(new ProducerRecord<>("topic1", "msg1"));
producer.send(new ProducerRecord<>("topic2", "msg2"));
// 提交消费者 Offset(消费-生产原子性)
producer.sendOffsetsToTransaction(offsets, consumerGroupId);
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}事务隔离级别:
// 消费者只读取已提交的消息
props.put("isolation.level", "read_committed");2.12 Kafka Controller 与 Zookeeper
Controller 职责:
- 监控 Broker 状态
- 管理分区 Leader 选举
- 同步元数据到其他 Broker
Zookeeper 存储内容:
/kafka
├── brokers/
│ ├── ids/ # Broker 注册信息
│ └── topics/ # Topic 元数据
├── controller # Controller 选举
├── consumers/ # 消费者信息(旧版本)
└── config/ # 配置信息2.13 Kafka KRaft 模式
KRaft(Kafka Raft):移除 Zookeeper 依赖
传统模式:
Broker ←→ Zookeeper ←→ Controller
KRaft 模式:
Broker ←→ Controller(内置 Raft)优势:
- 简化部署,减少组件
- 提升元数据处理性能
- 支持更多分区(百万级)
三、RabbitMQ
3.1 RabbitMQ 架构
Producer → Exchange → Binding → Queue → Consumer
↓
Routing Key核心概念:
| 概念 | 说明 |
|——|——|
| Producer | 消息生产者 |
| Exchange | 交换机,接收消息并路由 |
| Binding | 绑定关系,Exchange 与 Queue 的关联 |
| Queue | 队列,存储消息 |
| Consumer | 消息消费者 |
| Virtual Host | 虚拟主机,逻辑隔离 |
| Connection | TCP 连接 |
| Channel | 信道,复用 Connection |
3.2 Exchange 类型
Direct Exchange:
Exchange (direct)
↓ routing_key="order"
Queue: order_queue (binding_key="order")- 精确匹配 Routing Key
Fanout Exchange:
Exchange (fanout)
↓ 广播
Queue1, Queue2, Queue3- 忽略 Routing Key,广播到所有绑定队列
Topic Exchange:
Exchange (topic)
↓ routing_key="order.create.vip"
Queue1 (binding_key="order.#") ✓ 匹配
Queue2 (binding_key="order.create.*") ✓ 匹配
Queue3 (binding_key="*.delete.*") ✗ 不匹配- 匹配一个单词
#匹配零个或多个单词
Headers Exchange:
- 根据消息头属性匹配,不使用 Routing Key
3.3 消息确认机制
生产者确认:
// 1. 事务模式(性能差)
channel.txSelect();
channel.basicPublish(...);
channel.txCommit();
// 2. Confirm 模式(推荐)
channel.confirmSelect();
channel.basicPublish(...);
if (channel.waitForConfirms()) {
// 发送成功
}
// 3. 异步 Confirm
channel.addConfirmListener((deliveryTag, multiple) -> {
// ACK 回调
}, (deliveryTag, multiple) -> {
// NACK 回调,需要重发
});消费者确认:
// 手动 ACK
channel.basicConsume(queue, false, (tag, delivery) -> {
try {
processMessage(delivery.getBody());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
// 拒绝并重新入队
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
}
});3.4 RabbitMQ 持久化
三个层面持久化:
// 1. Exchange 持久化
channel.exchangeDeclare("exchange", "direct", true);
// 2. Queue 持久化
channel.queueDeclare("queue", true, false, false, null);
// 3. Message 持久化
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 持久化
.build();
channel.basicPublish("exchange", "key", props, message);3.5 死信队列(DLX)
死信产生条件:
1. 消息被拒绝(basic.reject/basic.nack)且 requeue=false
2. 消息 TTL 过期
3. 队列达到最大长度
// 声明死信交换机和队列
channel.exchangeDeclare("dlx.exchange", "direct");
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "dlx.key");
// 业务队列绑定死信交换机
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx.exchange");
args.put("x-dead-letter-routing-key", "dlx.key");
channel.queueDeclare("business.queue", true, false, false, args);3.6 延迟队列实现
方案一:TTL + 死信队列:
Producer → Exchange → Queue (TTL=30s) → 过期 → DLX → DLQ → Consumer// 队列级别 TTL
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 30000); // 30秒
args.put("x-dead-letter-exchange", "dlx.exchange");
channel.queueDeclare("delay.queue", true, false, false, args);
// 消息级别 TTL
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.expiration("30000") // 30秒
.build();方案二:延迟插件(rabbitmq_delayed_message_exchange):
// 声明延迟交换机
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("delayed.exchange", "x-delayed-message", true, false, args);
// 发送延迟消息
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.headers(Map.of("x-delay", 30000)) // 延迟30秒
.build();
channel.basicPublish("delayed.exchange", "key", props, message);3.7 RabbitMQ 高可用
镜像队列(Classic):
# 设置镜像策略
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'仲裁队列(Quorum Queue,推荐):
// 声明仲裁队列
Map<String, Object> args = new HashMap<>();
args.put("x-queue-type", "quorum");
channel.queueDeclare("quorum.queue", true, false, false, args);特性 | 镜像队列 | 仲裁队列 |
一致性 | 最终一致 | 强一致(Raft) |
性能 | 较高 | 略低 |
可靠性 | 可能丢消息 | 不丢消息 |
推荐 | 旧版本 | 新版本(3.8+) |
3.8 RabbitMQ 消息可靠性保证
生产者 → Confirm → Exchange → Mandatory → Queue → 持久化 → Consumer → ACK
↓ ↓
失败重试 Return 回调完整可靠性配置:
// 1. 开启 Confirm
channel.confirmSelect();
// 2. 开启 Return(消息无法路由时回调)
channel.addReturnListener((replyCode, replyText, exchange, routingKey, props, body) -> {
// 消息无法路由,记录日志或重发
});
// 3. 发送时设置 mandatory=true
channel.basicPublish(exchange, routingKey, true, props, message);
// 4. 消费者手动 ACK
channel.basicConsume(queue, false, consumer);3.9 RabbitMQ 流控机制
内存告警:
# 内存使用超过 40% 触发流控
rabbitmqctl set_vm_memory_high_watermark 0.4磁盘告警:
# 磁盘剩余空间低于 50MB 触发流控
rabbitmqctl set_disk_free_limit 50MB连接流控:
- 生产者发送过快时,RabbitMQ 会阻塞 TCP 连接
- 通过 Credit 机制控制消费者预取
// 设置预取数量
channel.basicQos(100); // 每次最多预取 100 条四、RocketMQ
4.1 RocketMQ 架构
NameServer Cluster
+------------------+
| NameServer | NS |
+------------------+
↑ 注册
+---------------------+---------------------+
↓ ↓ ↓
+--------+ +--------+ +--------+
| Broker | | Broker | | Broker |
| Master |←─同步─→ | Slave | | Master |
+--------+ +--------+ +--------+
↑ ↑
Producer Consumer核心组件:
| 组件 | 说明 |
|——|——|
| NameServer | 轻量级注册中心,无状态 |
| Broker | 消息存储和转发 |
| Producer | 消息生产者 |
| Consumer | 消息消费者 |
| Topic | 消息主题 |
| MessageQueue | 消息队列,类似 Kafka Partition |
| ConsumerGroup | 消费者组 |
4.2 RocketMQ 消息类型
类型 | 说明 | 适用场景 |
普通消息 | 无特殊要求 | 一般业务 |
顺序消息 | 保证消费顺序 | 订单状态流转 |
延迟消息 | 延迟投递 | 订单超时取消 |
事务消息 | 分布式事务 | 跨服务数据一致性 |
批量消息 | 批量发送 | 日志收集 |
4.3 RocketMQ 事务消息原理
1. 发送半消息(Half Message)
Producer ───────────────────────→ Broker
←── 半消息发送成功 ──
2. 执行本地事务
Producer: executeLocalTransaction()
├── COMMIT_MESSAGE → 提交消息
├── ROLLBACK_MESSAGE → 回滚消息
└── UNKNOW → 等待回查
3. 提交/回滚
Producer ─── Commit/Rollback ──→ Broker
4. 事务回查(本地事务状态未知时)
Broker ─── 回查本地事务状态 ──→ Producer
←── COMMIT/ROLLBACK ──代码示例:
TransactionMQProducer producer = new TransactionMQProducer("group");
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 执行本地事务
orderService.createOrder(order);
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 事务回查
String orderId = msg.getKeys();
if (orderService.exists(orderId)) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.ROLLBACK_MESSAGE;
}
});
// 发送事务消息
producer.sendMessageInTransaction(msg, null);4.4 RocketMQ 延迟消息
延迟级别(固定级别):
// 18 个延迟级别
// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
Message msg = new Message("topic", "body".getBytes());
msg.setDelayTimeLevel(3); // 延迟 10s
producer.send(msg);RocketMQ 5.0 任意延迟:
Message msg = new Message("topic", "body".getBytes());
msg.setDeliverTimeMs(System.currentTimeMillis() + 60000); // 延迟 60s
producer.send(msg);4.5 RocketMQ 消息存储
Broker 存储结构:
store/
├── commitlog/ # 消息数据(顺序写)
│ ├── 00000000000000000000
│ └── 00000000001073741824
├── consumequeue/ # 消费队列(索引)
│ └── TopicA/
│ ├── 0/
│ └── 1/
└── index/ # 消息索引(按 Key 查询)CommitLog:
- 所有消息顺序写入同一文件
- 单个文件默认 1GB
- 顺序写性能高
ConsumeQueue:
- 每个 Topic 的每个 Queue 一个文件
- 存储消息在 CommitLog 中的位置
- 定长记录,查询效率高
4.6 RocketMQ 主从同步
模式 | 说明 | 可靠性 | 性能 |
异步复制 | Master 写入后立即返回 | 可能丢消息 | 高 |
同步双写 | Master 和 Slave 都写入后返回 | 不丢消息 | 较低 |
# Broker 配置
brokerRole=SYNC_MASTER # 同步双写
# brokerRole=ASYNC_MASTER # 异步复制4.7 RocketMQ 消息过滤
Tag 过滤:
// 生产者
Message msg = new Message("topic", "TagA", "body".getBytes());
// 消费者
consumer.subscribe("topic", "TagA || TagB");SQL 过滤:
// 生产者
Message msg = new Message("topic", "body".getBytes());
msg.putUserProperty("age", "18");
// 消费者
consumer.subscribe("topic", MessageSelector.bySql("age > 16"));4.8 RocketMQ 消费模式
Push 模式(推荐):
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group");
consumer.subscribe("topic", "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
process(msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();Pull 模式:
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("group");
consumer.subscribe("topic", "*");
consumer.start();
while (true) {
List<MessageExt> msgs = consumer.poll();
for (MessageExt msg : msgs) {
process(msg);
}
consumer.commitSync();
}4.9 RocketMQ 消息回溯
// 按时间回溯
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
consumer.setConsumeTimestamp("20260109120000"); // 从指定时间开始消费五、消息可靠性
5.1 如何保证消息不丢失
生产端 Broker 消费端
| | |
|-- 同步发送 + 重试 -->| |
|<-- ACK --------------| |
| |-- 持久化 + 副本 ----->|
| | |-- 处理消息
| |<-- 手动 ACK ----------|三个环节保证:
环节 | Kafka | RabbitMQ | RocketMQ |
生产端 | acks=all + retries | Confirm 模式 | 同步发送 + 重试 |
Broker | 副本 + min.insync.replicas | 持久化 + 镜像队列 | 同步双写 |
消费端 | 手动提交 Offset | 手动 ACK | 手动 ACK |
5.2 如何保证消息不重复消费
幂等性方案:
方案 | 实现 | 适用场景 |
唯一 ID + 去重表 | 数据库唯一索引 | 通用 |
Redis SETNX | 消费前检查 | 高并发 |
状态机 | 状态只能单向流转 | 订单状态 |
乐观锁 | 版本号控制 | 更新操作 |
def consume(message):
msg_id = message.id
# Redis 去重
if not redis.setnx(f"consumed:{msg_id}", "1", ex=86400):
return # 已消费
try:
# 业务处理
process(message)
except Exception as e:
# 处理失败,删除标记,允许重试
redis.delete(f"consumed:{msg_id}")
raise e5.3 如何保证消息顺序性
Kafka:
// 相同 Key 发送到同一 Partition
producer.send(new ProducerRecord<>("topic", orderId, message));
// 单线程消费
consumer.poll().forEach(record -> process(record));RocketMQ:
// 顺序消息
SendResult result = producer.send(msg, (mqs, msg1, arg) -> {
int index = arg.hashCode() % mqs.size();
return mqs.get(index); // 相同 orderId 发送到同一队列
}, orderId);
// 顺序消费
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
// 单线程顺序消费
return ConsumeOrderlyStatus.SUCCESS;
});5.4 消息确认机制对比
机制 | Kafka | RabbitMQ | RocketMQ |
生产确认 | acks | Confirm | SendResult |
消费确认 | Offset 提交 | ACK/NACK | ACK |
自动确认 | enable.auto.commit | autoAck | 默认自动 |
手动确认 | commitSync/Async | basicAck | 返回 SUCCESS |
5.5 At Least Once vs At Most Once vs Exactly Once
语义 | 说明 | 实现方式 |
At Most Once | 最多一次,可能丢失 | 发送后不确认 |
At Least Once | 至少一次,可能重复 | 确认后再删除 |
Exactly Once | 精确一次,不丢不重 | 幂等 + 事务 |
Kafka Exactly Once:
props.put("enable.idempotence", true); // 幂等生产者
props.put("transactional.id", "tx-1"); // 事务
producer.initTransactions();
producer.beginTransaction();
producer.send(record);
producer.commitTransaction();六、消息积压
6.1 消息积压原因分析
原因 | 说明 | 排查方法 |
消费速度慢 | 消费者处理逻辑耗时 | 监控消费耗时 |
消费者故障 | 消费者宕机或异常 | 检查消费者状态 |
消费者数量不足 | 并行度不够 | 检查消费者数量 |
流量突增 | 生产速度远超消费速度 | 监控生产速率 |
下游服务慢 | 依赖的服务响应慢 | 检查下游服务 |
6.2 消息积压处理方案
紧急处理:
1. 快速扩容消费者
- Kafka:增加消费者(不超过分区数)
- RabbitMQ:增加消费者实例
2. 临时跳过非关键消息
- 记录日志,后续补偿
3. 消息转移
- 将积压消息转移到临时队列
- 批量消费者处理Kafka 扩容消费者:
// 增加分区(需要重新分配)
bin/kafka-topics.sh --alter --topic test --partitions 10
// 增加消费者实例(自动 Rebalance)RocketMQ 临时处理:
// 临时增加消费线程
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(64);6.3 消费者扩容方案
MQ | 扩容方式 | 注意事项 |
Kafka | 增加消费者(≤分区数) | 分区数是并行度上限 |
RabbitMQ | 增加消费者实例 | 无上限,但要控制预取 |
RocketMQ | 增加消费者(≤队列数) | 队列数是并行度上限 |
6.4 消息积压监控与告警
监控指标:
| 指标 | 说明 | 告警阈值 |
|——|——|———-|
| Lag | 消费延迟(条数) | > 10000 |
| ConsumerOffset | 消费位置 | 增长缓慢 |
| ProducerRate | 生产速率 | 突增 |
| ConsumeRate | 消费速率 | 下降 |
Kafka 监控:
# 查看消费延迟
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group my-group七、高级特性
7.1 延迟消息实现方案对比
方案 | 优点 | 缺点 |
RabbitMQ TTL + DLX | 简单 | 只能队列级别 TTL |
RabbitMQ 延迟插件 | 任意延迟 | 需要安装插件 |
RocketMQ 延迟级别 | 原生支持 | 固定级别 |
RocketMQ 5.0 | 任意延迟 | 版本要求 |
Redis ZSet | 灵活 | 需要轮询 |
时间轮 | 高效 | 实现复杂 |
Redis ZSet 实现:
def delay_message(msg, delay_seconds):
execute_time = time.time() + delay_seconds
redis.zadd("delay_queue", {json.dumps(msg): execute_time})
def consume_delay_messages():
while True:
now = time.time()
messages = redis.zrangebyscore("delay_queue", 0, now)
for msg in messages:
process(json.loads(msg))
redis.zrem("delay_queue", msg)
time.sleep(0.1)7.2 事务消息实现原理
RocketMQ 事务消息:
1. 发送半消息(对消费者不可见)
2. 执行本地事务
3. 根据本地事务结果提交或回滚
4. 定时回查未确认的事务Kafka 事务:
1. 初始化事务
2. 开始事务
3. 发送消息(可跨多个 Topic/Partition)
4. 提交消费者 Offset(可选)
5. 提交或回滚事务7.3 死信队列应用场景
场景 | 说明 |
消息处理失败 | 多次重试后进入死信队列 |
消息过期 | TTL 到期未消费 |
队列满 | 队列达到最大长度 |
异常消息 | 格式错误、业务异常 |
处理死信:
// 监听死信队列
@RabbitListener(queues = "dlx.queue")
public void handleDeadLetter(Message message) {
// 1. 记录日志
log.error("Dead letter: {}", message);
// 2. 人工处理或告警
alertService.notify(message);
// 3. 存储到数据库
deadLetterRepository.save(message);
}7.4 消息回溯与重放
Kafka:
// 按时间回溯
Map<TopicPartition, Long> timestamps = new HashMap<>();
timestamps.put(new TopicPartition("topic", 0), targetTimestamp);
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
// 重置 Offset
consumer.seek(partition, offset);RocketMQ:
// 按时间回溯
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
consumer.setConsumeTimestamp("20260109120000");7.5 消息轨迹追踪
RocketMQ 消息轨迹:
// 开启消息轨迹
DefaultMQProducer producer = new DefaultMQProducer("group", true);
// 查询轨迹
// 通过 RocketMQ Console 或 API 查询自定义追踪:
// 消息中添加 TraceId
Message msg = new Message("topic", body);
msg.putUserProperty("traceId", TraceContext.getTraceId());
msg.putUserProperty("spanId", TraceContext.getSpanId());八、生产实践
8.1 生产者设计最佳实践
实践 | 说明 |
同步发送 + 重试 | 重要消息使用同步发送 |
批量发送 | 提高吞吐量 |
消息压缩 | 减少网络传输 |
合理分区 | 根据业务选择分区策略 |
幂等生产 | 开启幂等性 |
// Kafka 生产者配置
props.put("acks", "all");
props.put("retries", 3);
props.put("batch.size", 16384);
props.put("linger.ms", 5);
props.put("compression.type", "lz4");
props.put("enable.idempotence", true);8.2 消费者设计最佳实践
实践 | 说明 |
手动提交 | 处理完成后再提交 |
批量消费 | 提高吞吐量 |
幂等消费 | 保证重复消费不影响 |
异常处理 | 合理的重试和死信策略 |
消费限速 | 保护下游服务 |
// Kafka 消费者配置
props.put("enable.auto.commit", false);
props.put("max.poll.records", 500);
props.put("max.poll.interval.ms", 300000);8.3 消息队列容量规划
日消息量:1000 万条
单条消息:1 KB
日存储量:1000 万 × 1 KB = 10 GB
保留 7 天:10 GB × 7 = 70 GB
3 副本:70 GB × 3 = 210 GB
峰值 QPS = 日均 QPS × 峰值系数
= (1000 万 / 86400) × 10
≈ 1157 × 10 = 11570 QPS8.4 消息队列监控指标
指标 | 说明 | 告警阈值 |
消息积压 | 未消费消息数 | > 10000 |
消费延迟 | 消费时间差 | > 5 分钟 |
生产速率 | 每秒生产消息数 | 突增 50% |
消费速率 | 每秒消费消息数 | 下降 50% |
Broker 磁盘 | 磁盘使用率 | > 80% |
Broker 内存 | 内存使用率 | > 80% |
8.5 消息队列故障排查
消息丢失排查:
1. 检查生产者日志,确认发送成功
2. 检查 Broker 日志,确认接收和存储
3. 检查消费者日志,确认消费情况
4. 检查 Offset 提交情况消息积压排查:
1. 检查消费者状态(是否存活)
2. 检查消费速率(是否下降)
3. 检查下游服务(是否响应慢)
4. 检查消费者日志(是否有异常)8.6 消息队列性能调优
Kafka 调优:
# Broker
num.io.threads=8
num.network.threads=3
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
# Producer
batch.size=32768
linger.ms=5
buffer.memory=33554432
compression.type=lz4
# Consumer
fetch.min.bytes=1
fetch.max.wait.ms=500
max.partition.fetch.bytes=1048576RocketMQ 调优:
# Broker
sendMessageThreadPoolNums=16
pullMessageThreadPoolNums=16
flushDiskType=ASYNC_FLUSH
# Producer
sendMsgTimeout=3000
compressMsgBodyOverHowmuch=4096
# Consumer
consumeThreadMin=20
consumeThreadMax=64
pullBatchSize=32九、面试高频问题
9.1 为什么使用消息队列?
回答模板:
在我们的项目中使用消息队列主要解决三个问题:
1. 解耦:订单系统创建订单后,需要通知库存、积分、通知等多个系统。
使用 MQ 后,订单系统只需发送消息,不需要知道有哪些下游系统。
2. 异步:用户注册后需要发送邮件和短信,这些操作比较耗时。
使用 MQ 异步处理,用户无需等待,提升响应速度。
3. 削峰:秒杀场景下,瞬间流量可能达到平时的 100 倍。
使用 MQ 缓冲请求,后端按照自己的处理能力慢慢消费。9.2 如何保证消息不丢失?
回答模板:
消息丢失可能发生在三个环节,需要分别保证:
1. 生产端:
- Kafka:设置 acks=all,开启重试
- RabbitMQ:开启 Confirm 模式
- RocketMQ:使用同步发送,检查返回结果
2. Broker 端:
- Kafka:设置 min.insync.replicas >= 2
- RabbitMQ:开启持久化和镜像队列
- RocketMQ:使用同步双写模式
3. 消费端:
- 关闭自动确认,处理完成后手动确认
- 处理失败时不确认,让消息重新投递9.3 如何保证消息顺序性?
回答模板:
消息顺序性需要从生产和消费两端保证:
1. 生产端:
- 将需要保证顺序的消息发送到同一分区/队列
- Kafka:使用相同的 Key
- RocketMQ:使用 MessageQueueSelector
2. 消费端:
- 单线程消费,或者使用顺序消费模式
- RocketMQ:使用 MessageListenerOrderly
- Kafka:一个分区只被一个消费者消费
注意:全局有序性能很差,通常只需要保证业务相关消息的局部有序。9.4 消息积压如何处理?
回答模板:
消息积压的处理分为紧急处理和根本解决:
紧急处理:
1. 快速扩容消费者(注意分区数限制)
2. 临时跳过非关键消息,记录日志后续补偿
3. 如果是消费者 bug,修复后重新部署
根本解决:
1. 优化消费者处理逻辑,提高消费速度
2. 增加分区/队列数,提高并行度
3. 批量消费,减少网络开销
4. 异步处理,不阻塞消费线程
5. 建立监控告警,提前发现问题9.5 Kafka、RabbitMQ、RocketMQ 如何选择?
回答模板:
根据业务场景选择:
1. Kafka:
- 适合日志收集、大数据、流处理
- 高吞吐量,百万级 TPS
- 消息回溯能力强
2. RabbitMQ:
- 适合业务消息、复杂路由
- 延迟低,微秒级
- 功能丰富,多种 Exchange 类型
3. RocketMQ:
- 适合电商、金融等业务场景
- 原生支持事务消息、延迟消息
- 阿里开源,中文文档丰富
我们项目选择 [具体 MQ] 是因为 [具体原因]...最后更新:2026-01-09






Loading Comments...