消息队列

mindmap
  消息生态系统全景图
    消息队列
      Kafka
      ActiveMQ
      RabbitMQ
      RocketMQ
      Pulsar
    标准、协议
      JMS
      AMQP
      MQTT
      OpenMessaging
    应用场景
      日志
      监控
      微服务
      流计算
      ETL
      IoT
    编程语言
      Java
      Scala
      Erlang
    实现技术
      网络通信
      序列化与反序列化
      一致性协议
      分布式事务
      异步编程模型
      数据压缩
      内存管理
      文件与高性能IO
      高可用

生产者与消费者之间的连接:

使用场景

消息模型

点对点

只能被一个消费者消费一次

stateDiagram-v2
  direction LR
  生产者1 --> 消息队列: 消息1
  生产者2 --> 消息队列: 消息2
  消息队列 --> 消费者1: 消息1
  消息队列 --> 消费者2: 消息2

RabbitMQ默认就是这种点对点模型,只有在中间加了一层Exchange才能实现发布订阅模型

发布订阅

消息生产者向频道发送一个消息之后,多个消费者可以从该频道订阅到这条消息并消费

stateDiagram-v2
  direction LR
  生产者 --> 主题: 消息
  主题 --> 消费者1: 消息
  主题 --> 消费者2: 消息

消息队列功能

可靠性

MQTT 协议中,给出了三种传递消息时能够提供的服务质量标准:

At least once + 幂等消费 = Exactly once

发送端的可靠性

通过本地消息表实现

接收端的可靠性

保证消费幂等性

保证消息具有唯一编号

消息堆积

生产端

如果代码发送消息的性能上不去,需要优先检查一下,是不是发消息之前的业务逻辑耗时太多导致的

同时需要注意的另一点就是单线程同步发送消息性能都会有一个上限,如果1ms发送一条 那么每秒也才只能发送1000条,不仅需要引入多线程处理,同时也可以选择批量发送

消费端

设计系统的时候,一定要保证消费端的消费性能要高于生产端的发送性能,这样的系统才能健康的持续运行

如果消费端出现问题,可能就会造成队列的消息堆积

此时,处理方案只需要恢复消费端的处理能力即可

但是如果消息队列即将被写满,就必须将快要满的这个队列的消息分发到其他消息队列,临时加派消费者加快处理这些消息,像Kafka一个分区可以由一个消费者来消费,临时应急可以把消费者加到跟分区数一样多的数量

但最重要的还是要排查解决这个异常情况,究竟是因为生产端异常发送了大量消息,还是因为消费端出了异常消费缓慢或者消费被阻塞了

一种错误的解决消费慢的方式(节点宕机部分数据就没了):

stateDiagram-v2
  MQ --> 消费节点
  消费节点 --> 内存队列
  内存队列 --> 业务线程1
  内存队列 --> 业务线程2
  内存队列 --> 业务线程3

另外一种方式就是背压机制,当消费能力跟不上时,可以限制生产者的生产来避免积压大量消息

积压监控

定义一种特殊的消息,启动一个监控程序将这个消息定时地循环写入到消息队列中,消息的内容可以是生成消息的时间戳并且也会作为队列的消费者消费数据。业务处理程序消费到这个消息时直接丢弃掉,而监控程序在消费到这个消息时就可以和这个消息的生成时间做比较来判断消息的积压程度

另外一种方式就是利用消息中间件本身提供的API,比如Kafka的lag

单队列并行消费

前有 10 条消息,对应的编号是 0-9,当前的消费位置是 5。同时来了三个消费者来拉消息,把编号为 5、6、7 的消息分别给三个消费者,每人一条。过了一段时间,三个消费成功的响应都回来了,这时候就可以把消费位置更新为 8 了,这样就实现并行消费,如果迟迟收不到某条消息的确认响应,则可以将该消息放到死信队列稍后重试,避免当前队列被某些消息卡主

这种方式只适合对消息的先后顺序没有要求的场景

消息失效

消息失效导致的大量消息丢失,只能写程序慢慢将丢失的那些消息补回来

消息中间件带来的好处

中间件是一种独立的系统软件或服务程序,分布式应用软件借助这种软件在不同的技术之间共享资源。 中间件位于客户机/ 服务器的操作系统之上,管理计算机资源和网络通讯。

消息中间件是指一种在需要进行网络通信的系统进行通道的建立,数据或文件发送的中间件。消息中间件的一个重要作用是可以跨平台操作,为不同操作系统上的应用软件集成提供便利。

协议

JMS

Java消息服务(Java Message Service,JMS)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。 Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

JMS编码接口之间的关系

stateDiagram-v2
  ConnectionFactory --> Connection: create
  Connection --> Session: create
  Session --> MessageConsumer: create
  Session --> MessageProducer: create
  MessageProducer --> Destination: send to
  MessageConsumer --> Destination: receive from
  Session --> Message: create

AMQP

高级消息队列协议即Advanced Message Queuing Protocol(AMQP)是一个用于统一面向消息中间件实现的一套标准协议,其设计目标是对于消息的排序、路由(包括点对点和订阅-发布)、保持可靠性、保证安全性。

MQTT

消息队列遥测传输(英语:Message Queuing Telemetry Transport),专门为物联网设备设计的一套标准的通信协议

物联网场景的设备一般是海量的,这需要MQTT消息集群支持前置负载均衡,后备集群节点支持水平扩容来支撑,一般都是用云厂商的商业服务

对比

对比项 JMS AMQP MQTT
定义 Java api Wire Protocol 自定义协议
跨语言
Model 点对点、发布订阅 direct exchange 、fanout exchange、topic change、headers exchange、system exchange 后四种和JMS的发布订阅大差别,仅是在路由机制上做了更详细的划分 发布订阅,通过 Topic 表示主题
消息类型 TextMessage、MapMessage、BytesMessage、StreamMessage、ObjectMessage、Message(只有消息头和属性) 二进制数据 二进制数据

消息队列设计

从几个方面考虑:

性能与可靠性

  1. 网络带宽与延迟:消息队列本质上还是一个I/O密集型系统,内部没有太多复杂的计算逻辑,因此网络无论对生产者、服务端还是消费者来说都比较重要,网络一抖动,全链路的吞吐量可能就会受影响。
  2. 生产者的发送模式:选择 发送即忘/异步/同步 不同的发送模式,会直接影响Producer的性能和可靠性。
  3. 服务端的物理硬件:特别是磁盘和内存,会直接关系到Broker的存储和消费性能。
  4. 消费者的Rebalance:在再平衡期间,整个消费会暂停,因此如何最大程度降低再平衡的影响,对消费者端来说比较重要
  5. 数据一致性协议:等级越强的一致性吞吐量越低,延迟越高

存储

元数据的存储:使用第三方组件或通过分布式算法实现,二者是运维复杂度与开发复杂度的平衡

消息数据的存储:

写性能:

写可靠性:

读性能:

客户端

客户端基础功能:

生产者

生产相关功能:

集群管控操作:集群管控操作一般是用来完成资源的创建、查询、修改、删除等集群管理动作

生产事务:

sequenceDiagram
  title 2PC方案
  生产者 ->> 事务协调者: 初始化事务
  生产者 ->> broker1: 发送数据
  生产者 ->> broker2: 发送数据
  生产者 ->> broker3: 发送数据
  生产者 ->> 事务协调者: 提交事务
  事务协调者 -->> broker1: 通知数据可见
  事务协调者 -->> broker2: 通知数据可见
  事务协调者 -->> broker3: 通知数据可见

消费者

消费模型:

分区消费模式:

消费分组:用来组织消费者、分区、消费进度关系的逻辑概念,需要有消费分组来记录消息消费到哪了

协调者:消费者启动后需要向协调者请求,由协调者来分配消费者与分区的关系

消费分区分配策略:

消费确认:确认后删除数据或确认后保存消费进度

消费失败处理:

可观测性

关键指标

消息可靠投递方案

sequenceDiagram
  生产者 ->> 数据库: 双写业务库+消息库
  生产者 ->> MQ: 发送消息
  MQ ->> 生产者: 消息确认
  生产者 ->> 数据库: 更新消息库,确认已投递
  loop
    定时任务 ->> 数据库: 消息库查找未投递成功的消息
    定时任务 ->> MQ: 重新发送数据
  end

检测消息丢失

对每条消息使用连续的递增序列化,在消费者端如果发现序号缺失,就代表消息丢失了

需要考虑像Kafka这种多分区架构,序号至在分区内有序,同时如果有多个生产者并发生产消息,序号也只能在单个生产者内保持有序

生产者弄丢数据

可能由于网络原因,数据没有到MQ,就在半路没了

对于异步发送可以使用confirm机制,confirm机制当MQ接收到消息后,会给生产者回传一个ack,如果MQ没能处理这个消息,会回传nack

而同步发送只需要在发送代码里做好异常捕获,当发送失败后就能很好在本地重试

MQ弄丢数据

只要开启数据持久化,消息丢失的可能性很小

同时需要深入理解MQ的配置参数,配好参数能降低丢数据的风险

消费端弄丢数据

使用消息确认机制,处理完消息手动ack

选型

发展趋势

消息处理(低延迟) -> 流处理(高吞吐) -> 消息流一体

单体 -> 分布式 -> 云原生

云原生

存算分离

存算分离架构是目前实现弹性消息队列集群的主要技术方案

对于消息队列存算分离的场景,分布式存储服务是存储层比较合适的选择。它具备分布式存储能力和较高的读写性能,一旦引入存储层,消息队列本身就不用有副本的概念了,数据可靠性由存储层来保证,计算层要做的就是接收请求,根据请求转发到存储层去读写数据,由于是无状态的,所以计算层可以不断扩展,以提升整个集群的提升对外服务能力

分层存储

为了降低存储成本,消息队列可以采取冷热数据分离的方式,将一部分数据存储到远端诸如对象存储的服务上。业界主流消息队列的分层思路,主要有生产时实时写入到远端和异步攒一批数据上传到远端两种方式。从性能的角度,生产主要关注的是实时写入远程还是异步上传文件到远程,消费需要关注的是从远程读取数据的方式,以及预读算法的设计。同时也要关注资源的隔离,避免远端存取导致的资源占用性能问题以及远端不可用时的读写功能降级

Serverless

消息队列可以作为 Serverless 的数据源头及目的地

事件总线架构

数据集成

将数据从数据源搬到数据目标的过程,消息队列连接器是一种具体的实现方式

stateDiagram-v2
  direction LR
  数据源 --> 源数据连接器
  源数据连接器 --> 分布式任务调度平台
  分布式任务调度平台 --> 目标数据连接器
  目标数据连接器 --> 数据目标

典型的数据集成适合数据源和数据目标是一对一的场景,消息队列连接器适合数据源和数据目标是一对多的场景

统一消息服务