消息队列的基本概念

消息队列概念梳理

什么是消息队列

本文提到的消息队列(Message Queue,也称为 Message Broker),是一种广泛使用的分布式消息中间件。可以把消息队列看作是一种特殊场景下使用的数据库,只支持有限的数据存取操作(入队列和出队列),本质上,消息队列和数据库、Redis、ES 等数据系统没有区别,只是侧重点和使用场景不同而已。

那么消息队列和传统的数据库系统相比,有什么区别呢?

  1. 数据存取方式不同。数据库系统支持完整的 SQL 语言操作数据,可以实现复杂的结构化查询。而消息队列只使用队列来组织数据,存取操作也仅仅支持简单的入队和出队操作;
  2. 存放的数据性质不同。正如“消息队列”这个名称的含义,消息队列存放的数据是消息,具有“阅后即焚”的特点,虽然消息队列也有持久化的概念,但侧重点不一样,消息队列的持久化是为了保证消息不丢失,系统不需要像数据库一样花费大量的精力去处理数据的持久化逻辑;
  3. 吞吐量不同。可以理解消息队列是一个功能受限的数据库,正因为消息队列精简了很多不必要的逻辑,所以它可以获得更好的性能表现,体现在吞吐量上,消息队列的吞吐量要超过传统数据库一个量级(在常见的服务器环境下,MySQL 最大 QPS 大概是 8000,RabbitMQ 的吞吐量大概在 6w/s,Kafka 最大吞吐量高达 17w/s);
  4. 功能侧重点不同。数据库侧重于查询,MQ 则侧重于交换消息。数据库的作用类似于仓库,MQ 的作用类似于管道。

为什么引入消息队列

消息队列的三个作用:异步、解耦、削峰

异步

异步的作用是提高响应时间,增加系统吞吐量。在一个接口中有许多业务逻辑,如果某个业务不需要当前立即完成且耗时较长,就可以考虑使用异步方式实现。

举个例子,在 ESOP 系统中有许多业务需要给用户发送邮件。例如授予生效功能,当授予生效完成后,需要给员工发送一份通知邮件,如果使用同步方式处理,调用逻辑大概是这样的:

同步方式

接口的响应时间是 授予生效时间 + 发送通知 的时间,发送通知是个网络请求,会严重拉低接口整体的响应时间。并且邮件通知也不是必须在授予生效后就立即需要完成的操作,因此可以考虑异步调用邮件通知:

异步方式

改为异步调用后,授予生效的整体业务功能不受影响,并且响应时间也有了很大的提升。

解耦

解耦的作用是减少服务之间的相互依赖,防止一个服务改动影响了其他关联的服务。设计出低耦合的服务是所有架构师必须考虑的目标,因为耦合度高的系统维护起来非常困难。

回到 ESOP 的例子,授予生效关联了许多下游业务,例如发送邮件通知、生成成熟日志、更新资产信息等。如果这些操作都由授予生效的接口发起,那么各模块间的耦合度就非常高,一旦改动就牵一发而动全身。我们用伪代码说明:

public void effectGrant(GrantModel grant) {
    // 处理授予生效的业务逻辑
    // ......
    // 开始通知下游业务
    // 1. 发送邮件
    asyncSendEmail(grant);
    // 2. 生成成熟日志
    generateMatureLog(grant);
    // 3. 更新资产信息
    updateAssets(grant);
    // ......
    return;
}

可以看到,授予生效的接口中包含了大量不属于本业务的代码。并且如果有新的下游服务需要依赖生效接口,这个接口还需要进行修改,这种处理方式是不推荐的。

如果我们引入 MQ 进行业务解耦,则对应的接口伪代码就变成:

public void effectGrant(GrantModel grant) {
    // 处理授予生效的业务逻辑
    // ......
    // 通知下游业务
    sendMsg2MQ(GrantEvent.EFFECTED, grant);
    return;
}

可以看到,代码整洁了许多。并且即使有新的下游业务需要接入,这里也不需要修改任何代码。

削峰

削峰的作用是为了缓冲突发流量,防止下游服务被爆增的流量打垮。

假如上游服务 A 每秒可以处理 100 个请求,下游服务 B 每秒只能处理 50 个请求,在平时流量不大的情况下,两个服务相安无事,但如果某一时间段的突发流量暴增,超过了下游服务的处理能力,下游服务就可能直接崩溃:

上游请求超过下游处理能力导致下游服务崩溃

如果上下游服务中间接入消息队列,消息队列就像一个蓄水池,缓冲了上游的请求,让下游服务可以按照自身能力处理业务:

MQ 起到流量缓冲的作用

消息队列给系统带来了明显的好处,但同时也使得系统变得复杂,引入新的中间件,增加了系统不稳定的概率,也增加了维护成本。

消息队列的使用场景

消息队列主要的使用场景有三种:点对点模式、发布订阅模式和请求响应模式。

注意这里隐含的条件:一个队列中的消息只能被消费一次。(RabbitMQ 是这样的设计理念,但 Kafka 并没有这种要求)

点对点

点对点模式指的是消息从生产者出发,最终被某一个消费者消费。这是最常见的通信方式,通常用于任务队列。生产者将一些耗时大的任务发布到消息队列,消费者集群中的某个实例获取到消息进行任务处理。

点对点模式,特点是只有一个队列

任务队列是 MQ 重要的应用场景,这里有需要需要注意的细节,例如:

等等。这些问题都需要在设计系统之初就考虑清楚。这种模式下 MQ 通常使用 Pull 方式投递消息,由消费者决定接收消息的时机。

发布/订阅

这种模式的特点是消息会被多个消费者接收。通常用于事件驱动的系统。下游业务订阅感兴趣的上游业务事件,然后做后续的处理。

发布/订阅模式,特点是一个消息被多个消费者接收

发布订阅模式在消息队列中也被称为主题(Topic)模式,含义是类似的。MQ 不关心有多少消费者需要接收当前消息,只需要把消息发送给订阅了主题的消费者即可。这种模式下 MQ 通常是使用 Push 方式推送消息。

请求/响应

类似于通过 MQ 实现 RPC 的功能。生产者发送消息后,会等待消费者返回的响应结果。整个过程是异步的,可以理解为消费者通过回调通知生产者。

请求/响应模式,特点是两个服务互为生产者和消费者

这种模式的应用场景在功能上和 RPC 框架类似,但 RPC 框架是直接将两个服务连接起来,而 MQ 的方式中间有消息队列作为缓冲,因此服务间可以完全解耦,生产者发送请求的时候,不必要求消费者必须在线,等到消费者上线完成消费后再给生产者响应。

请求/响应模式可以结合上下游实现本地消息表,解决分布式事务的一致性问题

消息队列还有其他一些复杂的应用场景,但本质上还是属于这三种模式的范畴。

消息队列关注的功能

上面提到的异步、解耦和削峰三个作用是站在系统层面的抽象描述,当我们真正需要设计一个消息队列时,需要考虑哪些功能?

首先我们看一下消息队列的基本工作原理:

MQ 的基本工作原理

生产者和消费者是系统中单独的微服务,从图中我们可以看到在 MQ 工作过程中,需要关注的三个节点:

  1. 生产者到 MQ 的链路;
  2. MQ 本身的工作状态;
  3. MQ 到消费者的链路;

下面列出消息队列在实际应用中常见的一些场景和问题,分别从这三个方面分析如何实现。

1. 如何确保消息不丢失

消息不丢失的含义是:生产端发送的消息,最终一定会被消费端成功消费。消息丢失有时候不是必须处理的问题,这取决于业务场景。

在 MQTT 协议中,定义了三种传递消息的规范,这三种规范从低到高为:

code name 说明
Qos 0 At most once delivery 最多交付一次。也就是说允许丢失消息,但不会有重复消息。一般用在对消息可靠性不高的场景,比如监控数据、日志信息等,允许少量数据丢失
Qos 1 At least once delivery 最少交付一次。也就是说不允许丢失消息,但会有重复消息
Qos 2 Exactly once delivery 恰好交付一次。不允许丢失消息,也不允许重复消息。这是最高的级别。在分布式系统中,这是不可能实现 的。业务上可以配合幂等操作模拟 Exactly once 的语义

大多数消息队列都实现了 Qos 1 级别的语义,也就是说可以实现消息不丢失,但会有消息重复的情况。

发送端如何保证

发送端有三种发送消息的策略:

发送策略 说明
同步 生产者向 MQ 发起发送消息的 RPC 请求,线程阻塞等待 MQ 的响应。只有当 MQ 返回成功的响应时,生产者才认为消息发送成功。这种方式适用于生产者需要 实时 得知消息发送的结果,属于实时性比较强的消息处理方式
异步 生产者向 MQ 发送消息,当前流程就完成了。消息是否发送成功,后续由其他线程判断处理。通常可以采用轮询消息发送结果、MQ 发起回调通知等方式获取消息的发送状态,这种方式适用于较为重要,但不需要当时关心发送结果的消息
Oneway 生产者发送消息后就不再关注后续结果。适用于不太重要的消息,丢失个别消息也不影响处理的场景,例如监控指标、INFO 日志等

上面提到的三种发送策略,首先 Oneway 是无法保证消息在发送过程中不丢失的,同步和异步两种方式生产者都可以得知消息的发送状态,如果 MQ 超时未返回或者返回错误的结果,生产者就可以通过重试继续发送消息,直到确认消息发送成功。

MQ 端如何保证

MQ 把接收到的消息首先存放在内存中。内存中消息随着服务断电就会丢失,如果要保证可靠的消息传递,MQ 需要将接收到的消息持久化,然后再向生产者返回确认信息。

消费端如何保证

消费端需要确保消息正常处理后,再发送 ack 给 MQ,通知 MQ 可以移除该消息。如果消费端的响应因为网络问题等原因没有返回给 MQ,MQ 就不能移除该消息,需要尝试再次投递。这种处理方式可能导致重复消费问题,需要下游业务做幂等处理(通过数据库唯一主键,或者在 Redis 中保存消费过的消息等方式)。

重试是一种业务上的冗余保证,但这种方式也会带来一些问题:

  1. 正如上面提到的,重试会有重复消费的问题;
  2. 有些情况重试没有意义,只会消耗资源,例如业务逻辑异常或者下游服务宕机等;
  3. 如果下游服务压力太大导致响应变慢,这时候重试只会加剧下游的压力;

2. 如何保证顺序消费

顺序消费的含义是:消费端严格按照生产端发送消息的顺序进行消费,上一个消息没有消费完成之前,不允许消费下一个消息。因为在许多场景下,消息对应的业务都有严格的先后顺序,比如 下单 -> 支付 -> 扣减库存,如果顺序搞反,会出现不可预估的错误。

我们先看一下,MQ 在什么场景下会出现消息顺序错误的情况。首先进入到队列中的消息是有序的,假设队列中存在三条消息:C -> B -> A,需要严格按照 ABC 的顺序执行。有两个消费端(C1 & C2)订阅该队列的消息。假设 C1 首先获取到消息 A 进行消费,C2 获取到消息 B 进行消费,此时 C1 因为故障或网络问题导致消费失败,但 C2 消费 B 消息成功,此时 C2 再次获取下一个消息,就会获取到未消费的 A 消息,导致消息的消费顺序错误。

消费顺序错误的根本原因是有多个消费端同时消费同一个队列的消息。因为不同消费端的消费速度不同,甚至是消费结果都无法事先预估,消费失败后进行重试,就会出现顺序不一致的情况。因此限制消费顺序需要从根本上避免多个消费端争抢消息的局面。常规的处理方式就是一个队列对应一个消费端,发送端将相同业务发送到固定的队列中,消费端在消费成功后返回 ack,防止出现消息丢失,也严格限制了消费顺序。

3. 如何处理重复消息

在消息不丢失的问题中已经阐述过,MQ 是无法避免重复消息的。通常的做法是在消费端做好幂等处理,即使接收到重复消息也可以减小影响。

通过给消息附加业务唯一性标识 key,消费端保存消费过的消息唯一 key,判断该消息是否已经消费过。

参考:如何处理消费过程中的重复消息

但幂等处理有局限性,除非是没有副作用的消息,比如内存中计算、落库等操作,可以通过一定手段达到幂等。但如果消息有副作用,例如发送邮件,这个保证幂等就比较困难了,需要做很多额外工作,最好的方式是将副作用从消息中拆分出来单独处理。

4. 如何处理消息积压

消息积压的含义是:下游消费端的消费能力小于上游服务的生产能力,导致 MQ 内积压了大量消息来不及消费。这种情况如果不及时处理,就可能导致 MQ 服务崩溃,引起更大的故障。

发送端如何处理

发送端对于消息积压,能做的不多。最多的就是梳理业务逻辑,减少不必要的消息。如果发送端由于突发流量导致消息激增,可以根据业务场景决定是否采取服务降级处理,减少下游的压力。

MQ 如何处理

MQ 事先预留好足够的容量存放消息。如果遇到上游的突发流量,超过了 MQ 本身的负载能力,需要紧急扩容。MQ 的扩容有两种思路:

如果 MQ 积压的消息已经触发了磁盘存储的阈值,这种情况有两种处理方式:

  1. 采取 背压 backpressure 的策略,对发送端返回错误消息,拒绝发送端后续的消息,直到 MQ 本身的压力得到缓解。这种方式适用于不太重要的业务和消息;
  2. 如果是重要数据,这个时候就没有其他办法了,只能先丢弃一部分数据,事后再进行补偿。

消费端如何处理

消费端的消费能力赶不上发送端的发送速度,这是消息积压的根本原因。因此如果 MQ 已经发生消息积压,就需要紧急提高消费端的处理能力。通常的处理方式是临时扩容:

  1. 首先如果是 consumer 出现 bug 导致消费失败,需要尽快修复或回滚代码,恢复 consumer 的消费能力;
  2. 然后临时在 MQ 上创建原先 10 倍或者 20 倍的队列,写一个临时分发消息的程序,将积压的消息分发到扩容后的队列上;
  3. 部署原先 10 倍的 consumer 实例,用来消费新队列的消息;
  4. 当 MQ 中的消息被消费完之后,视情况恢复原先的架构;

如果消息设置了过期时间(RabbitMQ 可以设置消息的 TTL),超过过期时间的数据会被 MQ 丢弃,这个时候就不是消息积压,而是丢失数据了。这种情况临时扩容已经来不及,只能事后补偿。在流量低谷时间段,手动查询丢失的数据,把消息重新发送到 MQ 中再次消费。

5. 如何支持事务消息

什么是事务消息?不同的 MQ 对此的定义不同。

RocketMQ 的定义是 绑定在发送端本地事务上的消息,事务提交和消息成功发送是一组原子性操作,只能同时成功或失败,这个定义接近于我们通常理解的事务。 Kafka 的定义是 发送多个跨 partition 的消息,保证这些消息同时成功或同时失败,没有中间状态,这是 Kafka 在流式数据中保证 Exactly once delivery 的机制。

我们这里讨论 RocketMQ 语义的事务消息。先看一下通过传统消息如何支持事务消息的语义:

ProducerMessage Queue本地事务已提交send msgresponseProducerMessage Queue
本地事务提交后再向消息队列发送消息

为什么不在事务内最后发送消息?如果网络延迟高的话,事务就会长时间不结束,事务内锁定的资源也就无法释放,会影响系统的性能。因此最好避免大事务

这种方式可以实现大部分事务消息的功能,但也有一些意外情况需要处理,后面我们统一描述。

接下来我们看一下 事务消息 是如何处理的:

ProducerMessage Queue半消息是消费端无法获取的消息send half msgresponse开始执行本地事务send commit msg投递半消息responsesend rollback msg删除半消息responsealt[ 本地事务提交 ][ 本地事务回滚 ]send commit/rollback msg failedMQ 主动检查事务状态check transaction statusopt[ 如果确认消息发送发送失败 ]ProducerMessage Queue
RocketMQ 的事务消息处理流程

参考RocketMQ 的事务消息

现在对比一下传统消息和事务消息在一些异常情况下对事务的支持:

问题 传统消息 事务消息
正常的分布式事务 支持 支持
生产者提交本地事务后意外宕机 需要额外的补偿机制 MQ 主动查询事务状态,如果是生产者是集群则不影响下游业务,否则会主动回滚半消息,业务上再进行额外补偿
生产者提交本地事务后发送消息失败 本地重试,多次重试不成功就额外补救 如果是网络抖动,MQ 的主动轮询机制能自动恢复事务,不影响下游业务
生产者无法确认消息是否发送 本地重试,可能导致重复消息,需要下游做幂等处理 MQ 可以过滤重复消息,不影响下游

可以发现,事务消息对待异常情况,有更多的容错机制,可以更好地支持分布式事务。

6. 如何支持延时消息

延时消息的定义是:生产端发送消息,消息需要等待一段时间后才能被消费端接收。

延时消息常见的使用场景包括:下单成功后发送检查支付的延时 30 min 的消息,下游服务最终消费消息时判断订单是否支付,如果没有支付就取消该订单。

目前市面上开源的消息队列中,RocketMQ 是原生支持延时消息的(但开源版本和商业版本支持力度不同);RabbitMQ 本身不支持,可以通过插件支持,也可以通过死信队列支持;Kafka 本身也不支持延时消息。

参考:如何在 MQ 中实现支持任意延迟的消息?

个人感觉延时消息最好的实现方式是在生产端。通过定时任务处理,只有在需要的时间点才生成消息发送给 MQ,这样 MQ 也无需消耗额外的资源去处理这些暂时无法消费的消息。

7. 如何支持优先级消息

优先级消息的含义是:让 MQ 按照消息优先级投递,高优先级的消息可以“插队”进行优先处理,优先级消息可以保证在下游消费端消费能力有限,MQ 开始积压消息时,优先处理高优先级的消息。

优先级消息适用于相同业务但处理对象有等级区分的场景,比如同样是下单操作,服务会给 VIP 会员提供更大的可用性保证。

RabbitMQ 在 3.5.0 版本开始支持优先级队列,需要事先配置好队列优先级,不可以动态调整。RocketMQ 和 Kafka 不支持优先级队列。

8. 如何实现 MQ 高可用

MQ 作为微服务架构下重要的中间件,如果出现故障,会同时影响上下游的业务,因此保证 MQ 自身的高可用十分重要。

RabbitMQ 支持镜像集群模式,集群中所有节点存放相同的数据,如果某个节点宕机,其他节点也可以提供服务,实现高可用方案。但这种方案的伸缩性不强。

Kafka 本身的架构也支持实现高可用。Kafka 的高可用方案也提供了很好的伸缩性。

9. 如何处理消费不成功的消息

如果消息始终消费不成功,在手动 ack 的场景下,该消息就无法移除出队列,就会阻塞其他消息的正常消费。

这种情况下 RabbitMQ 提供了“死信队列”的功能,对于多次消费失败的消息,MQ 会将其移动到死信队列中等待后续处理,防止阻塞原队列中其他消息的消费。

Kafka 不支持私信队列,但可以在消费端处理。如果某条消息多次消费不成功,消费端可以将其记录到日志或数据库中,然后丢弃该消息,等待后续处理。

总结

本章节梳理了消息队列的一些通用概念和思想,具体的消息队列产品放到后续整理。