Karp 的技术博客
消息队列不只是"生产-消费"那么简单。在真实的业务场景中,我们需要面对消息消费失败、延时执行、顺序保障等各种复杂问题。本文梳理六种常见的队列模式,帮你建立完整的消息队列知识体系。

一、死信队列(Dead Letter Queue)

是什么

死信队列(DLQ)是用来存放无法被正常消费的消息的特殊队列。当一条消息反复消费失败、超过最大重试次数,或者因为格式错误无法解析时,它不会被直接丢弃,而是被转移到死信队列中"等待处理"。

你可以把它理解为消息系统的"回收站"——消息没有被直接删除,而是被隔离到一个专门的地方,等人来处理。

什么时候触发

消息变成"死信"通常有以下几种原因:

  • 消费失败达到最大重试次数:比如设置了重试 3 次,3 次都失败就进入 DLQ
  • 消息 TTL 过期:消息在队列中等待超过了设定的存活时间
  • 队列满了:队列达到最大长度,新消息无法入队,老消息被挤到 DLQ
  • 消息被消费者主动拒绝(reject/nack 且不重新入队)

RabbitMQ 示例

# 声明死信交换机和队列
channel.exchange_declare(exchange='dlx_exchange', exchange_type='direct')
channel.queue_declare(queue='dead_letter_queue')
channel.queue_bind(queue='dead_letter_queue', exchange='dlx_exchange', routing_key='dlx_key')

# 声明业务队列,绑定死信策略
args = {
    'x-dead-letter-exchange': 'dlx_exchange',
    'x-dead-letter-routing-key': 'dlx_key',
    'x-max-length': 10000,          # 队列最大长度
    'x-message-ttl': 60000          # 消息 TTL 60 秒
}
channel.queue_declare(queue='business_queue', arguments=args)

Kafka 的做法

Kafka 本身没有内置 DLQ,需要在消费端自己实现:

@KafkaListener(topics = "order-topic")
public void consume(ConsumerRecord<String, String> record) {
    try {
        processOrder(record.value());
    } catch (Exception e) {
        retryCount++;
        if (retryCount >= MAX_RETRY) {
            // 发送到死信 topic
            kafkaTemplate.send("order-topic-dlq", record.key(), record.value());
            log.error("消息进入死信队列: {}", record.value());
        } else {
            throw e; // 触发重试
        }
    }
}

最佳实践

  • 一定要有监控和告警,DLQ 中消息堆积说明业务有问题
  • 提供重新投递的能力,修复 bug 后可以把死信消息重新丢回原队列
  • 记录死信原因(异常堆栈、重试次数),方便排查

二、延迟队列(Delay Queue)

是什么

延迟队列实现的效果是:消息发出后不会立即被消费,而是等待指定的时间后才能被消费者拉取到

典型场景:

  • 下单后 30 分钟未支付,自动关闭订单
  • 用户注册后 24 小时发送引导邮件
  • 会议开始前 15 分钟发送提醒通知

实现方案对比

方案优点缺点
RabbitMQ 延迟插件原生支持,精度高需安装插件
RabbitMQ TTL + DLQ不需要插件只能固定延迟时间
Redis ZSET实现简单需要轮询,不够精确
Kafka 时间轮高吞吐实现复杂
数据库轮询最简单性能差,延迟高

RabbitMQ TTL + 死信实现延迟队列

原理:消息发到一个没有消费者的队列,设置 TTL。消息过期后自动进入死信队列,真正的消费者监听死信队列。

Producer → [延迟队列 TTL=30min, 无消费者] → 过期 → [死信交换机] → [实际消费队列] → Consumer
# 延迟队列(没有消费者)
args = {
    'x-dead-letter-exchange': 'order_exchange',
    'x-dead-letter-routing-key': 'order_cancel',
    'x-message-ttl': 1800000  # 30 分钟
}
channel.queue_declare(queue='order_delay_30min', arguments=args)

# 实际消费队列
channel.queue_declare(queue='order_cancel_queue')
channel.queue_bind(queue='order_cancel_queue', exchange='order_exchange', routing_key='order_cancel')

Redis ZSET 实现

import time
import redis
import json

r = redis.Redis()

# 生产者:score 是期望执行的时间戳
def delay_publish(queue, message, delay_seconds):
    execute_at = time.time() + delay_seconds
    r.zadd(queue, {json.dumps(message): execute_at})

# 消费者:轮询取出到期的消息
def delay_consume(queue):
    while True:
        now = time.time()
        # 取出所有 score <= 当前时间的消息
        messages = r.zrangebyscore(queue, 0, now, start=0, num=10)
        for msg in messages:
            if r.zrem(queue, msg):  # 原子删除,防止重复消费
                process(json.loads(msg))
        time.sleep(0.5)  # 轮询间隔

三、遗言队列(Last Will Queue)

是什么

遗言队列源自 MQTT 协议中的 Last Will and Testament(LWT) 机制。客户端在连接 Broker 时预先设置一条"遗言消息",当客户端异常断开(非正常 disconnect)时,Broker 自动将这条遗言消息发布到指定的 Topic。

就像真正的遗嘱一样——你活着的时候写好,死后由律师(Broker)帮你发出去。

适用场景

  • IoT 设备离线检测:"设备 A 已离线"
  • 在线状态管理:用户异常掉线后通知其他用户
  • 分布式服务健康检测:节点挂了自动发出告警

MQTT 遗言消息示例

import paho.mqtt.client as mqtt

client = mqtt.Client()

# 连接时设置遗言
client.will_set(
    topic="device/sensor-01/status",
    payload='{"status": "offline", "timestamp": "2026-03-24T10:00:00Z"}',
    qos=1,
    retain=True   # retain=True 保证新订阅者也能看到最后状态
)

client.connect("broker.example.com", 1883)

# 正常运行时定期发布在线状态
while running:
    client.publish("device/sensor-01/status", '{"status": "online"}', retain=True)
    time.sleep(30)

# 如果进程崩溃、网络断开,broker 自动发布遗言消息

触发条件

遗言消息只在异常断开时触发,以下情况不会触发:

  • 客户端正常调用 disconnect()
  • 客户端主动发送 DISCONNECT 报文

以下情况会触发:

  • 网络连接中断(TCP 断开)
  • Keep-Alive 超时无心跳
  • 进程崩溃
  • 服务器主动关闭连接

四、优先级队列(Priority Queue)

是什么

普通队列是 FIFO(先进先出),而优先级队列允许高优先级的消息插队,优先被消费者处理。

场景

  • VIP 用户的工单优先处理
  • 告警消息优先于普通日志
  • 紧急订单优先于普通订单

RabbitMQ 实现

# 声明优先级队列(最大优先级 10)
channel.queue_declare(
    queue='ticket_queue',
    arguments={'x-max-priority': 10}
)

# 发送普通工单
channel.basic_publish(
    exchange='',
    routing_key='ticket_queue',
    body='普通用户工单',
    properties=pika.BasicProperties(priority=1)
)

# 发送 VIP 工单
channel.basic_publish(
    exchange='',
    routing_key='ticket_queue',
    body='VIP 用户工单',
    properties=pika.BasicProperties(priority=9)
)

注意事项

  • 优先级队列有性能开销,Broker 需要对消息重新排序
  • x-max-priority 建议设置为 1-10,太大会增加内存消耗
  • 只有在消费者来不及消费(消息有积压)时,优先级才会生效。如果消费速度大于生产速度,所有消息都是立即消费的,优先级没有意义

五、顺序队列(Sequential / FIFO Queue)

是什么

顺序队列保证消息严格按照发送顺序被消费。听起来简单,但在分布式环境中实现起来相当棘手。

为什么难

  • 多个 Partition / Queue 之间天然无序
  • 多个消费者并行消费,谁先处理完不确定
  • 消息重试可能打乱顺序

Kafka 的方案

Kafka 只保证单个 Partition 内有序

// 方案一:相同业务 Key 的消息路由到同一个 Partition
// 同一个用户的所有订单消息一定有序
producer.send(new ProducerRecord<>(
    "order-topic",
    userId,          // key:按 userId 路由
    orderJson        // value
));

// 方案二:整个 Topic 只设置 1 个 Partition(牺牲吞吐量)

RocketMQ 的方案

RocketMQ 提供了顺序消息的原生支持:

// 发送顺序消息:同一个 orderId 的消息发到同一个 Queue
SendResult result = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Long orderId = (Long) arg;
        int index = (int) (orderId % mqs.size());
        return mqs.get(index);
    }
}, orderId);

代价

全局有序 = 单队列 + 单消费者 = 吞吐量极低。实际业务中,通常只需要局部有序(比如同一个订单的消息有序即可),不追求全局有序。


六、重试队列(Retry Queue)

是什么

重试队列是消费失败后,将消息投递到不同延迟级别的重试队列,实现梯度重试(Exponential Backoff),而不是立刻重试或者直接丢进死信。

为什么需要梯度重试

直接重试的问题:

  • 如果是下游服务宕机,立刻重试只会反复失败
  • 高频重试会对下游服务造成更大压力(雪崩效应)
  • 重试次数耗尽后进入死信,但也许多等几分钟就好了

典型的重试策略

第 1 次重试 → 延迟 5 秒
第 2 次重试 → 延迟 30 秒
第 3 次重试 → 延迟 2 分钟
第 4 次重试 → 延迟 10 分钟
第 5 次重试 → 进入死信队列

实现示例

RETRY_DELAYS = [5, 30, 120, 600]  # 秒

def on_consume_fail(message, retry_count):
    if retry_count >= len(RETRY_DELAYS):
        # 重试次数用完,进入死信队列
        publish_to_dlq(message)
        return

    delay = RETRY_DELAYS[retry_count]
    message.headers['x-retry-count'] = retry_count + 1

    # 投递到对应延迟级别的重试队列
    publish_to_delay_queue(
        queue=f'retry_queue_{delay}s',
        message=message,
        delay_seconds=delay
    )

RocketMQ 原生支持

RocketMQ 内置了 18 个重试延迟级别:

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

消费失败后返回 RECONSUME_LATER,Broker 自动按照梯度延迟重新投递,无需手动实现。


总结:六种队列模式速查表

队列类型解决的问题核心思想典型场景
死信队列消费失败的消息去哪隔离异常消息,防止阻塞反复失败的订单消息
延迟队列定时/延时执行消息延迟投递30 分钟未支付关闭订单
遗言队列客户端异常断开检测预设遗言,异常时自动发布IoT 设备离线通知
优先级队列重要消息优先处理消息带优先级,高优先插队VIP 工单插队处理
顺序队列消息顺序保障相同 Key 路由到同一分区订单状态变更顺序消费
重试队列优雅的失败重试梯度延迟,避免雪崩调用第三方 API 失败重试

这六种模式不是互斥的,实际生产中往往会组合使用。比如:重试队列 + 死信队列是标配组合,延迟队列本身就可以用死信队列来实现。理解了这些模式,面对大部分消息队列的场景设计都能游刃有余。

版权属于:karp
作品采用:本作品采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可。
更新于: 2026年03月24日 08:21
0

目录

来自 《消息队列进阶:你必须掌握的六种队列模式》