消息队列不只是"生产-消费"那么简单。在真实的业务场景中,我们需要面对消息消费失败、延时执行、顺序保障等各种复杂问题。本文梳理六种常见的队列模式,帮你建立完整的消息队列知识体系。
一、死信队列(Dead Letter Queue)
是什么
死信队列(DLQ)是用来存放无法被正常消费的消息的特殊队列。当一条消息反复消费失败、超过最大重试次数,或者因为格式错误无法解析时,它不会被直接丢弃,而是被转移到死信队列中"等待处理"。
你可以把它理解为消息系统的"回收站"——消息没有被直接删除,而是被隔离到一个专门的地方,等人来处理。
什么时候触发
消息变成"死信"通常有以下几种原因:
- 消费失败达到最大重试次数:比如设置了重试 3 次,3 次都失败就进入 DLQ
- 消息 TTL 过期:消息在队列中等待超过了设定的存活时间
- 队列满了:队列达到最大长度,新消息无法入队,老消息被挤到 DLQ
- 消息被消费者主动拒绝(reject/nack 且不重新入队)
RabbitMQ 示例
# 声明死信交换机和队列
channel.exchange_declare(exchange='dlx_exchange', exchange_type='direct')
channel.queue_declare(queue='dead_letter_queue')
channel.queue_bind(queue='dead_letter_queue', exchange='dlx_exchange', routing_key='dlx_key')
# 声明业务队列,绑定死信策略
args = {
'x-dead-letter-exchange': 'dlx_exchange',
'x-dead-letter-routing-key': 'dlx_key',
'x-max-length': 10000, # 队列最大长度
'x-message-ttl': 60000 # 消息 TTL 60 秒
}
channel.queue_declare(queue='business_queue', arguments=args)Kafka 的做法
Kafka 本身没有内置 DLQ,需要在消费端自己实现:
@KafkaListener(topics = "order-topic")
public void consume(ConsumerRecord<String, String> record) {
try {
processOrder(record.value());
} catch (Exception e) {
retryCount++;
if (retryCount >= MAX_RETRY) {
// 发送到死信 topic
kafkaTemplate.send("order-topic-dlq", record.key(), record.value());
log.error("消息进入死信队列: {}", record.value());
} else {
throw e; // 触发重试
}
}
}最佳实践
- 一定要有监控和告警,DLQ 中消息堆积说明业务有问题
- 提供重新投递的能力,修复 bug 后可以把死信消息重新丢回原队列
- 记录死信原因(异常堆栈、重试次数),方便排查
二、延迟队列(Delay Queue)
是什么
延迟队列实现的效果是:消息发出后不会立即被消费,而是等待指定的时间后才能被消费者拉取到。
典型场景:
- 下单后 30 分钟未支付,自动关闭订单
- 用户注册后 24 小时发送引导邮件
- 会议开始前 15 分钟发送提醒通知
实现方案对比
| 方案 | 优点 | 缺点 |
|---|---|---|
| RabbitMQ 延迟插件 | 原生支持,精度高 | 需安装插件 |
| RabbitMQ TTL + DLQ | 不需要插件 | 只能固定延迟时间 |
| Redis ZSET | 实现简单 | 需要轮询,不够精确 |
| Kafka 时间轮 | 高吞吐 | 实现复杂 |
| 数据库轮询 | 最简单 | 性能差,延迟高 |
RabbitMQ TTL + 死信实现延迟队列
原理:消息发到一个没有消费者的队列,设置 TTL。消息过期后自动进入死信队列,真正的消费者监听死信队列。
Producer → [延迟队列 TTL=30min, 无消费者] → 过期 → [死信交换机] → [实际消费队列] → Consumer# 延迟队列(没有消费者)
args = {
'x-dead-letter-exchange': 'order_exchange',
'x-dead-letter-routing-key': 'order_cancel',
'x-message-ttl': 1800000 # 30 分钟
}
channel.queue_declare(queue='order_delay_30min', arguments=args)
# 实际消费队列
channel.queue_declare(queue='order_cancel_queue')
channel.queue_bind(queue='order_cancel_queue', exchange='order_exchange', routing_key='order_cancel')Redis ZSET 实现
import time
import redis
import json
r = redis.Redis()
# 生产者:score 是期望执行的时间戳
def delay_publish(queue, message, delay_seconds):
execute_at = time.time() + delay_seconds
r.zadd(queue, {json.dumps(message): execute_at})
# 消费者:轮询取出到期的消息
def delay_consume(queue):
while True:
now = time.time()
# 取出所有 score <= 当前时间的消息
messages = r.zrangebyscore(queue, 0, now, start=0, num=10)
for msg in messages:
if r.zrem(queue, msg): # 原子删除,防止重复消费
process(json.loads(msg))
time.sleep(0.5) # 轮询间隔三、遗言队列(Last Will Queue)
是什么
遗言队列源自 MQTT 协议中的 Last Will and Testament(LWT) 机制。客户端在连接 Broker 时预先设置一条"遗言消息",当客户端异常断开(非正常 disconnect)时,Broker 自动将这条遗言消息发布到指定的 Topic。
就像真正的遗嘱一样——你活着的时候写好,死后由律师(Broker)帮你发出去。
适用场景
- IoT 设备离线检测:"设备 A 已离线"
- 在线状态管理:用户异常掉线后通知其他用户
- 分布式服务健康检测:节点挂了自动发出告警
MQTT 遗言消息示例
import paho.mqtt.client as mqtt
client = mqtt.Client()
# 连接时设置遗言
client.will_set(
topic="device/sensor-01/status",
payload='{"status": "offline", "timestamp": "2026-03-24T10:00:00Z"}',
qos=1,
retain=True # retain=True 保证新订阅者也能看到最后状态
)
client.connect("broker.example.com", 1883)
# 正常运行时定期发布在线状态
while running:
client.publish("device/sensor-01/status", '{"status": "online"}', retain=True)
time.sleep(30)
# 如果进程崩溃、网络断开,broker 自动发布遗言消息触发条件
遗言消息只在异常断开时触发,以下情况不会触发:
- 客户端正常调用
disconnect() - 客户端主动发送 DISCONNECT 报文
以下情况会触发:
- 网络连接中断(TCP 断开)
- Keep-Alive 超时无心跳
- 进程崩溃
- 服务器主动关闭连接
四、优先级队列(Priority Queue)
是什么
普通队列是 FIFO(先进先出),而优先级队列允许高优先级的消息插队,优先被消费者处理。
场景
- VIP 用户的工单优先处理
- 告警消息优先于普通日志
- 紧急订单优先于普通订单
RabbitMQ 实现
# 声明优先级队列(最大优先级 10)
channel.queue_declare(
queue='ticket_queue',
arguments={'x-max-priority': 10}
)
# 发送普通工单
channel.basic_publish(
exchange='',
routing_key='ticket_queue',
body='普通用户工单',
properties=pika.BasicProperties(priority=1)
)
# 发送 VIP 工单
channel.basic_publish(
exchange='',
routing_key='ticket_queue',
body='VIP 用户工单',
properties=pika.BasicProperties(priority=9)
)注意事项
- 优先级队列有性能开销,Broker 需要对消息重新排序
x-max-priority建议设置为 1-10,太大会增加内存消耗- 只有在消费者来不及消费(消息有积压)时,优先级才会生效。如果消费速度大于生产速度,所有消息都是立即消费的,优先级没有意义
五、顺序队列(Sequential / FIFO Queue)
是什么
顺序队列保证消息严格按照发送顺序被消费。听起来简单,但在分布式环境中实现起来相当棘手。
为什么难
- 多个 Partition / Queue 之间天然无序
- 多个消费者并行消费,谁先处理完不确定
- 消息重试可能打乱顺序
Kafka 的方案
Kafka 只保证单个 Partition 内有序:
// 方案一:相同业务 Key 的消息路由到同一个 Partition
// 同一个用户的所有订单消息一定有序
producer.send(new ProducerRecord<>(
"order-topic",
userId, // key:按 userId 路由
orderJson // value
));
// 方案二:整个 Topic 只设置 1 个 Partition(牺牲吞吐量)RocketMQ 的方案
RocketMQ 提供了顺序消息的原生支持:
// 发送顺序消息:同一个 orderId 的消息发到同一个 Queue
SendResult result = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long orderId = (Long) arg;
int index = (int) (orderId % mqs.size());
return mqs.get(index);
}
}, orderId);代价
全局有序 = 单队列 + 单消费者 = 吞吐量极低。实际业务中,通常只需要局部有序(比如同一个订单的消息有序即可),不追求全局有序。
六、重试队列(Retry Queue)
是什么
重试队列是消费失败后,将消息投递到不同延迟级别的重试队列,实现梯度重试(Exponential Backoff),而不是立刻重试或者直接丢进死信。
为什么需要梯度重试
直接重试的问题:
- 如果是下游服务宕机,立刻重试只会反复失败
- 高频重试会对下游服务造成更大压力(雪崩效应)
- 重试次数耗尽后进入死信,但也许多等几分钟就好了
典型的重试策略
第 1 次重试 → 延迟 5 秒
第 2 次重试 → 延迟 30 秒
第 3 次重试 → 延迟 2 分钟
第 4 次重试 → 延迟 10 分钟
第 5 次重试 → 进入死信队列实现示例
RETRY_DELAYS = [5, 30, 120, 600] # 秒
def on_consume_fail(message, retry_count):
if retry_count >= len(RETRY_DELAYS):
# 重试次数用完,进入死信队列
publish_to_dlq(message)
return
delay = RETRY_DELAYS[retry_count]
message.headers['x-retry-count'] = retry_count + 1
# 投递到对应延迟级别的重试队列
publish_to_delay_queue(
queue=f'retry_queue_{delay}s',
message=message,
delay_seconds=delay
)RocketMQ 原生支持
RocketMQ 内置了 18 个重试延迟级别:
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h消费失败后返回 RECONSUME_LATER,Broker 自动按照梯度延迟重新投递,无需手动实现。
总结:六种队列模式速查表
| 队列类型 | 解决的问题 | 核心思想 | 典型场景 |
|---|---|---|---|
| 死信队列 | 消费失败的消息去哪 | 隔离异常消息,防止阻塞 | 反复失败的订单消息 |
| 延迟队列 | 定时/延时执行 | 消息延迟投递 | 30 分钟未支付关闭订单 |
| 遗言队列 | 客户端异常断开检测 | 预设遗言,异常时自动发布 | IoT 设备离线通知 |
| 优先级队列 | 重要消息优先处理 | 消息带优先级,高优先插队 | VIP 工单插队处理 |
| 顺序队列 | 消息顺序保障 | 相同 Key 路由到同一分区 | 订单状态变更顺序消费 |
| 重试队列 | 优雅的失败重试 | 梯度延迟,避免雪崩 | 调用第三方 API 失败重试 |
这六种模式不是互斥的,实际生产中往往会组合使用。比如:重试队列 + 死信队列是标配组合,延迟队列本身就可以用死信队列来实现。理解了这些模式,面对大部分消息队列的场景设计都能游刃有余。