什么是消息队列
我们可以把消息队列看作是一个存放消息的容器,当我们需要使用消息的时候,直接从容器中取出消息供自己使用即可。由于队列Queue是一种先进先出的数据结构,所以消费消息时也是按照顺序来消费的。
参与消息传递的双方称为生产者和消费者,生产者负责发送消息,消费者负责处理消息。我们知道操作系统中的进程通信的一种很重要的方式就是消息队列。我们这里提到的消息队列稍微有点区别,更多指的是各个服务以及系统内部各个组件/模块之前的通信,属于一种中间件。
维基百科是这样介绍中间件的:
中间件(英语:Middleware),又译中间件、中介层,是一类提供系统软件和应用软件之间连接、便于软件各部件之间的沟通的软件,应用软件可以借助中间件在不同的技术架构之间共享信息与资源。中间件位于客户机服务器的操作系统之上,管理着计算资源和网络通信。
简单来说:中间件就是一类为应用软件服务的软件,应用软件是为用户服务的,用户不会接触或者使用到中间件。
除了消息队列之外,常见的中间件还有RPC框架、分布式组件、HTTP服务器、任务调度框架、配置中心、数据库层的分库分表工具和数据迁移工具等等。
关于中间件比较详细的介绍可以参考阿里巴巴淘系技术的一篇回答:https://www.zhihu.com/question/19730582/answer/1663627873。
随着分布式和微服务系统的发展,消息队列在系统设计中有了更大的发挥空间,使用消息队列可以降低系统耦合性、实现任务异步、有效地进行流量削峰,是分布式和微服务系统中重要的组件之一。
消息队列有什么用
通常来说,使用消息队列能为我们的系统带来下面三点好处:
- 通过异步处理提高系统性能(减少响应所需时间)
- 削峰/限流
- 降低系统耦合性
通过异步处理提高系统性能(减少响应所需时间)
将用户的请求数据存储到消息队列之后就立即返回结果。随后,系统再对消息进行消费。因为用户请求数据写入消息队列之后就立即返回给用户了,但是请求数据在后续的业务校验、写数据库等操作中可能失败。因此,使用消息队列进行异步处理之后,需要适当修改业务流程进行配合,比如用户在提交订单之后,订单数据写入消息队列,不能立即返回用户订单提交成功,需要在消息队列的订单消费者进程真正处理完该订单之后,甚至出库后,再通过电子邮件或短信通知用户订单成功,以免交易纠纷。这就类似我们平时手机订火车票和电影票。
削峰/限流
先将短时间高并发产生的事务消息存储在消息队列中,然后后端服务再慢慢根据自己的能力去消费这些消息,这样就避免直接把后端服务打垮掉。举例:在电子商务一些秒杀、促销活动中,合理使用消息队列可以有效抵御促销活动刚开始大量订单涌入对系统的冲击。如下图所示:
降低系统耦合性
使用消息队列还可以降低系统耦合性。我们知道如果模块之间不存在直接调用,那么新增模块或者修改模块就对其他模块影响较小,这样系统的可扩展性无疑更好一些。
生产者(客户端)发送消息到消息队列中去,接受者(服务端)处理消息,需要消费的系统直接去消息队列取消息进行消费即可而不需要和其他系统有耦合,这显然也提高了系统的扩展性。消息队列使用发布-订阅模式工作,消息发送者(生产者)发布消息,一个或多个消息接受者(消费者)订阅消息。从上图可以看到消息发送者(生产者)和消息接受者(消费者)之间没有直接耦合,消息发送者将消息发送至分布式消息队列即结束对消息的处理,消息接受者从分布式消息队列获取该消息后进行后续处理,并不需要知道该消息从何而来。对新增业务,只要对该类消息感兴趣,即可订阅该消息,对原有系统和业务没有任何影响,从而实现网站业务的可扩展性设计。消息接受者对消息进行过滤、处理、包装后,构造成一个新的消息类型,将消息继续发送出去,等待其他消息接受者订阅该消息。因此基于事件(消息对象)驱动的业务架构可以是一系列流程。另外,为了避免消息队列服务器宕机造成消息丢失,会将成功发送到消息队列的消息存储在消息生产者服务器上,等消息真正被消费者服务器处理后才删除消息。在消息队列服务器宕机后,生产者服务器会选择分布式消息队列服务器集群中的其他服务器发布消息。
备注:不要认为消息队列只能利用发布-订阅模式工作,只不过在解耦这个特定业务环境下是使用发布-订阅模式的。除了发布-订阅模式,还有点对点订阅模式(一个消息只有一个消费者),我们比较常用的是发布-订阅模式。另外,这两种消息模型是JMS提供的,AMQP协议还提供了另外5种消息模型。
实现分布式事务
我们知道分布式事务的解决方案之一就是MQ事务。
RocketMQ、Kafka、Pulsar、QMQ都提供了事务相关的功能。事务允许事件流应用将消费,处理,生产消息整个过程定义为一个原子操作。
使用消息队列会带来哪些问题
- 系统可用性降低:系统可用性在某种程度上降低,为什么这样说呢?在加入MQ之前,你不用考虑消息丢失或者说MQ挂掉等等的情况,但是,引入MQ之后你就需要去考虑了!
- 系统复杂性提高:加入MQ之后,你需要保证消息没有被重复消费、处理消息丢失的情况、保证消息传递的顺序性等等问题!
- 一致性问题:我上面讲了消息队列可以实现异步,消息队列带来的异步确实可以提高系统响应速度。但是,万一消息的真正消费者并没有正确消费消息怎么办?这样就会导致数据不一致的情况了!
JMS和AMQP
JMS是什么
JMS(JAVA Message Service,java消息服务)是Java的消息服务,JMS的客户端之间可以通过JMS服务进行异步的消息传输。JMS(JAVA Message Service,Java消息服务)API是一个消息服务的标准或者说是规范,允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。
JMS定义了五种不同的消息正文格式以及调用的消息类型,允许你发送并接收以一些不同形式的数据:
- StreamMessage:Java原始值的数据流
- MapMessage:一套名称-值对
- TextMessage:一个字符串对象
- ObjectMessage:一个序列化的Java对象
- BytesMessage:一个字节的数据流
ActiveMQ(已被淘汰)就是基于JMS规范实现的。
JMS两种消息模型
点到点(P2P)模型
使用队列(Queue)作为消息通信载体;满足生产者与消费者模式,一条消息只能被一个消费者使用,未被消费的消息在队列中保留直到被消费或超时。比如:我们生产者发送100条消息的话,两个消费者来消费一般情况下两个消费者会按照消息发送的顺序各自消费一半(也就是你一个我一个的消费)
发布/订阅(Pub/Sub)模型
发布订阅模型(Pub/Sub)使用主题(Topic)作为消息通信载体,类似于广播模式;发布者发布一条消息,该消息通过主题传递给所有的订阅者,在一条消息广播之后才订阅的用户则是收不到该条消息的。
AMQP是什么
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议(二进制应用层协议),是应用层协议的一个开放标准,为面向消息的中间件设计,兼容JMS。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不同的开发语言等条件的限制。RabbitMQ就是基于AMQP协议实现的。
JMS vs AMQP
对比方向 | JMS | AMQP |
---|---|---|
定义 | Java API | 协议 |
跨语言 | 否 | 是 |
跨平台 | 否 | 是 |
支持消息类型 | 提供两种消息模型:①Peer-2-Peer;②Pub/sub | 提供了五种消息模型:①direct exchange;②fanout exchange;③topic change;④headers exchange;⑤system exchange。本质来讲,后四种和JMS的pub/sub模型没有太大差别,仅是在路由机制上做了更详细的划分; |
支持消息类型 | 支持多种消息类型 ,我们在上面提到过 | byte[](二进制) |
总结:
- AMQP为消息定义了线路层(wire-levelprotocol)的协议,而JMS所定义的是API规范。在Java体系中,多个client均可以通过JMS进行交互,不需要应用修改代码,但是其对跨平台的支持较差。而AMQP天然具有跨平台、跨语言特性。
- JMS支持
TextMessage
、MapMessage
等复杂的消息类型;而AMQP仅支持byte[]
消息类型(复杂的类型可序列化后发送)。 - 由于Exchange提供的路由算法,AMQP可以提供多样化的路由方式来传递消息到消息队列,而JMS仅支持队列和主题/订阅方式两种。
RPC和消息队列的区别
RPC和消息队列都是分布式微服务系统中重要的组件之一,下面我们来简单对比一下两者:
- 从用途来看:RPC主要用来解决两个服务的远程通信问题,不需要了解底层网络的通信机制。通过RPC可以帮助我们调用远程计算机上某个服务的方法,这个过程就像调用本地方法一样简单。消息队列主要用来降低系统耦合性、实现任务异步、有效地进行流量削峰。
- 从通信方式来看:RPC是双向直接网络通讯,消息队列是单向引入中间载体的网络通讯。
- 从架构上来看:消息队列需要把消息存储起来,RPC则没有这个要求,因为前面也说了RPC是双向直接网络通讯。
- 从请求处理的时效性来看:通过RPC发出的调用一般会立即被处理,存放在消息队列中的消息并不一定会立即被处理。
RPC和消息队列本质上是网络通讯的两种不同的实现机制,两者的用途不同,万不可将两者混为一谈。
经典面试题
为什么使用消息队列
解耦
传统模式下系统间的耦合性太强。怎么说呢,举个例子:系统A通过接口调用发送数据到B、C、D三个系统,如果将来E系统接入或者B系统不需要接入了,那么系统A还需要修改代码,非常麻烦。如果系统A产生了一条比较关键的数据,那么它就要时时刻刻考虑B、C、D、E四个系统如果挂了该咋办?这条数据它们是否都收到了?显然,系统A跟其它系统严重耦合。而如果我们将数据(消息)写入消息队列,需要消息的系统直接自己从消息队列中消费。这样下来,系统A就不需要去考虑要给谁发送数据,不需要去维护这个代码,也不需要考虑其他系统是否调用成功、失败超时等情况,反正我只负责生产,别的我不管。
异步
先来看传统同步的情况,举个例子:系统A接收一个用户请求,需要进行写库操作,还需要同样的在B、C、D三个系统中进行写库操作。如果A自己本地写库只要1ms,而B、C、D三个系统写库分别要100ms、200ms、300ms,最终请求总延时是1 + 100 + 200 + 300 = 601ms,用户体验大打折扣。如果使用消息队列,那么系统A就只需要发送3条消息到消息队列中就行了,假如耗时5ms,A系统从接受一个请求到返回响应给用户,总时长是1 + 5 = 6ms,对于用户而言,体验好感度直接拉满。
削峰
如果没有使用缓存或者消息队列,那么系统就是直接基于数据库MySQL的,如果有那么一个高峰期,产生了大量的请求涌入MySQL,毫无疑问,系统将会直接崩溃。那如果我们使用消息队列,假设MySQL每秒钟最多处理1k条数据,而高峰期瞬间涌入了5k条数据,不过,这5k条数据涌入了消息队列。这样,我们的系统就可以从消息队列中根据数据库的能力慢慢的来拉取请求,不要超过自己每秒能处理的最大请求数量就行。也就是说消息队列每秒钟5k个请求进来,1k个请求出去,假设高峰期1个小时,那么这段时间就可能有几十万甚至几百万的请求积压在消息队列中。不过这个短暂的高峰期积压是完全可以的,因为高峰期过了之后,每秒钟就没有那么多的请求进入消息队列了,但是数据库依然会按照每秒1k个请求的速度处理。所以只要高峰期一过,系统就会快速的将积压的消息给处理掉。
MQ怎么保证消息可靠性?MQ怎么保证消息100%消费?如何保证消息不会丢失?
我们需要保障MQ消息的可靠性,需要从三个层面/维度解决:生产者100%投递、MQ持久化、消费者100%消费,这里的100%消费指的是消息不少消费,也不多消费。
消息生产阶段:从消息被生产出来,然后提交给MQ的过程中,只要能正常收到MQ Broker的ack确认响应,就表示发送成功,所以只要处理好返回值和异常,这个阶段是不会出现消息丢失的。
消息存储阶段:这个阶段一般会直接交给MQ消息中间件来保证,但是你要了解它的原理,比如Broker会做副本,保证一条消息至少同步两个节点再返回ack。
消息消费阶段:消费端从Broker上拉取消息,只要消费端在收到消息后,不立即发送消费确认给Broker,而是等到执行完业务逻辑后,再发送消费确认,也能保证消息的不丢失。
以kafka为例
一、生产者端
- Kafka消息发送端有个ACK机制。设置ack参数:ack=0,表示不重试,Kafka不需要返回ack,极有可能各种原因造成丢失;ack=1,表示Leader写入成功就返回ack了,Follower不一定同步成功;ack=all或ack=-1,表示ISR列表中的所有Follower同步完成再返回ack。只要能正常收到MQ Broker的ack确认响应,就表示发送成功,所以只要处理好返回值和异常,这个阶段是不会出现消息丢失的。
- 设置参数unclean.leader.election.enable:false,禁止选举ISR以外的Follower为Leader,只能从ISR列表中的节点中选举Leader;可能会牺牲Kafka的可用性,但是能够提高消息的可靠性。
- 重试机制,设置tries>1,表示消息重发次数。
- 设置最小同步副本数min.insync.replicas>1,没满足该值前,Kafka不提供读写服务,写操作会异常。
总结:生产消息时通过设置最小同步副本数和ACK机制,可以让MQ在性能与可靠性上达到平衡。
二、消费者端
手工提交offset(偏移量)
以RocketMQ为例
需要确保生产者、broker、消费者三者都不丢失数据。
- 生产者不丢失消息
方案1:confirm消息确认机制(同步发送消息) + 失败重试;
方案2:事务消息机制; - broker不丢失消息,开启同步刷盘策略 + 主从架构同步机制。
只要让一个master Broker收到消息之后同步写入磁盘,同时同步复制给其他slave Broker,再返回成功响应给生产者,此时就可以保证MQ自己不会弄丢消息 - 消费者不丢失消息,采用RocketMQ的消费者天然就可以保证你处理完消息之后,才会提交消息的offset到broker去,不过别采用多线程异步处理消息的方式。
如何保证mq有序且只消费一次?怎么解决消息被重复消费的问题?
最简单的实现方案,就是在数据库(redis或者关系库)中建一张消息日志表,这个表有两个字段:消息ID和消息执行状态
消息积压
增加消费端数量
MQ高可用的相关文章
消息队列消息丢失和消息重复发送的处理策略 | 消息队列如何保证消息不丢失,且只被消费一次,这篇就教会你 | 面试官:如何保障消息100%投递成功、消息幂等性? |
---|---|---|
如何保证MQ消息是有序的? | 消息被重复消费,怎么避免?有什么好的解决方案? | 如何保证消息队列里的数据顺序执行? |
面试基操:MQ怎么保障消息可靠性? | 使用MQ的时候,怎么确保消息100%不丢失 |
other
什么是MQ? | 消息队列原理和选型:Kafka、RocketMQ、RabbitMQ和ActiveMQ | 一篇文章把RabbitMQ、RocketMQ、Kafka三元归一 |
---|---|---|
这个队列的思路真的好,现在它是我简历上的亮点了。 | 面试官必问的3道MQ面试题,还有谁不会?? |