延迟队列(mq延迟队列)

软件教程 2022.12.13 153

目录:

网易传媒技术团队:消息中间件实现延迟队列的应用与实践

早期需要延迟处理的业务场景,更多的是通过定时任务扫表,然后执行满足条件的记录,具有频率高、命中低、资源消耗大的缺点。随着消息中间件的普及,延迟消息可以很好的处理这种场景,本文主要介绍延迟消息的使用场景以及基于常见的消息中间件如何实现延迟队列,最后给出了一个在网易公开课使用延迟队列的实践。

1、有效期:限时活动、拼团。。。

2、超时处理:取消超时未支付订单、超时自动确认收货。。。

4、重试:网络异常重试、打车派单、依赖条件未满足重试。。。

5、定时任务:智能设备定时启动。。。

1、RabbitMQ

1)简介:基于AMQP协议,使用Erlang编写,实现了一个Broker框架

a、Broker:接收和分发消息的代理服务器

b、Virtual Host:虚拟主机之间相互隔离,可理解为一个虚拟主机对应一个消息服务

c、Exchange:交换机,消息发送到指定虚拟机的交换机上

d、Binding:交换机与队列绑定,并通过路由策略和routingKey将消息投递到一个或多个队列中

e、Queue:存放消息的队列,FIFO,可持久化

f、Channel:信道,消费者通过信道消费消息,一个TCP连接上可同时创建成百上千个信道,作为消息隔离

2)延迟队列实现:RabbitMQ的延迟队列基于消息的存活时间TTL(Time To Live)和死信交换机DLE(Dead Letter Exchanges)实现

a、TTL:RabbitMQ支持对队列和消息各自设置存活时间,取二者中较小的值,即队列无消费者连接或消息在队列中一直未被消费的过期时间

b、DLE:过期的消息通过绑定的死信交换机,路由到指定的死信队列,消费者实际上消费的是死信队列上的消息

3)缺点:

a、配置麻烦,额外增加一个死信交换机和一个死信队列的配置

b、脆弱性,配置错误或者生产者消费者连接的队列错误都有可能造成延迟失效

2、RocketMQ

1)简介:来源于阿里,目前为Apache顶级开源项目,使用Java编写,基于长轮询的拉取方式,支持事务消息,并解决了顺序消息和海量堆积的问题

a、Broker:存放Topic并根据读取Producer的提交日志,将逻辑上的一个Topic分多个Queue存储,每个Queue上存储消息在提交日志上的位置

b、Name Server:无状态的节点,维护Topic与Broker的对应关系以及Broker的主从关系

2)延迟队列实现:RocketMQ发送延时消息时先把消息按照延迟时间段发送到指定的队列中(rocketmq把每种延迟时间段的消息都存放到同一个队列中),然后通过一个定时器进行轮训这些队列,查看消息是否到期,如果到期就把这个消息发送到指定topic的队列中

3)缺点:延迟时间粒度受限制(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h)

3、Kafka

1)简介:来源于Linkedin,目前为Apache顶级开源项目,使用Scala和Java编写,基于zookeeper协调的分布式、流处理的日志系统,升级版为Jafka

2)延迟队列实现:Kafka支持延时生产、延时拉取、延时删除等,其基于时间轮和JDK的DelayQueue实现

a、时间轮(TimingWheel):是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表

b、定时任务列表(TimerTaskList):是一个环形的双向链表,链表中的每一项表示的都是定时任务项

c、定时任务项(TimerTaskEntry):封装了真正的定时任务TimerTask

d、层级时间轮:当任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中,类似于钟表就是一个三级时间轮

e、JDK DelayQueue:存储TimerTaskList,并根据其expiration来推进时间轮的时间,每推进一次除执行相应任务列表外,层级时间轮也会进行相应调整

3)缺点:

a、延迟精度取决于时间格设置

b、延迟任务除由超时触发还可能被外部事件触发而执行

4、ActiveMQ

1)简介:基于JMS协议,Java编写的Apache顶级开源项目,支持点对点和发布订阅两种模式。

a、点对点(point-to-point):消息发送到指定的队列,每条消息只有一个消费者能够消费,基于拉模型

b、发布订阅(publish/subscribe):消息发送到主题Topic上,每条消息会被订阅该Topic的所有消费者各自消费,基于推模型

2)延迟队列实现:需要延迟的消息会先存储在JobStore中,通过异步线程任务JobScheduler将到达投递时间的消息投递到相应队列上

a、Broker Filter:Broker中定义了一系列BrokerFilter的子类构成拦截器链,按顺序对消息进行相应处理

b、ScheduleBroker:当消息中指定了延迟相关属性,并且jobId为空时,会生成调度任务存储到JobStore中,此时消息不会进入到队列

c、JobStore:基于BTree存储,key为任务执行的时间戳,value为该时间戳下需要执行的任务列表

d、JobScheduler:取JobStore中最小的key执行(调度时间最早的),执行时间=当前时间,将该任务列表依次投递到所属的队列,对于需要重复投递和投递失败的会再次存入JobStore中。

注: 此处JobScheduler的执行时间间隔可动态变化,默认0.5s,有新任务时会立即执行(Object-notifyAll())并设置时间间隔为0.1s,没有新任务后,下次执行时间为最近任务的调度执行时间。

3)缺点:投递到队列失败,将消息重新存入JobStore,消息调度执行时间=系统当前时间+延迟时间,会导致消息被真实投递的时间可能为设置的延迟时间的整数倍

5、Redis

1)简介:基于Key-Value的NoSQL数据库,由于其极高的性能常被当作缓存来使用,其数据结构支持:字符串、哈希、列表、集合、有序集合

2)延迟队列实现:Redis的延迟队列基于有序集合,score为执行时间戳,value为任务实体或任务实体引用

3)缺点:

a、实现复杂,本身不支持

b、完全基于内存,延迟时间长浪费内存资源

6、消息队列对比

1、公开课延迟队列技术选型

1)业务场景:关闭超时未支付订单、限时优惠活动、拼团

2)性能要求:订单、活动、拼团 数据量可控,上述MQ均能满足要求

3)可靠性:使用ActiveMQ、RabbitMQ、RocketMQ作为延迟队列更普遍

4)可用性:ActiveMQ、RocketMQ自身支持延迟队列功能,且目前公开课业务中使用的中间件为ActiveMQ和Kafka

5)延迟时间灵活:活动的开始和结束时间比较灵活,而RocketMQ时间粒度较粗,Kafka会依赖时间格有精度缺失

结论: 最终选择ActiveMQ来作为延迟队列

2、业务场景:关闭未支付订单

1)关闭微信未支付订单

2)关闭IOS未支付订单

3、ActiveMQ使用方式

1)activemq.xml中支持调度任务

2)发送消息时,设置message的延迟属性

其中:

a、延迟处理

AMQ_SCHEDULED_DELAY:设置多长时间后,投递给消费者(毫秒)

b、重复投递

AMQ_SCHEDULED_PERIOD:重复投递时间间隔(毫秒)

AMQ_SCHEDULED_REPEAT:重复投递次数

c、指定调度计划

AMQ_SCHEDULED_CRON:corn正则表达式

4、公开课使用中进行的优化

1)可靠性:针对实际投递时间可能翻倍的问题,结合ActiveMQ的重复投递,在消费者逻辑中做幂等处理来保证延迟时间的准确性

2)可追溯性:延迟消息及消费情况做数据库冗余存储

3)易用性:业务上定义好延迟枚举类型,直接使用JmsDelayTemplate发送,无需关心数据备份和参数等细节

1、无论是基于死信队列还是基于数据先存储后投递,本质上都是将延迟待发送的消息数据与正常订阅的队列分开存储,从而降低耦合度

2、无论是检查队头消息TTL还是调度存储的延迟数据,本质上都是通过定时任务来完成的,但是定时任务的触发策略以及延迟数据的存储方式决定了不同中间件之间的性能优劣

张浩,2018年加入网易传媒,高级Java开发工程师,目前在网易公开课主要做支付财务体系、版本迭代相关的工作。

RabbitMQ最佳实践

有些应用程序需要非常高的吞吐量,而其他一些应用程序却正在发布批处理作业,这些作业可能会延迟一段时间。在设计系统时,目标应该是最大限度地将性能和可用性结合起来,这对您的特定应用程序是有意义的。错误的体系结构设计决策或客户端错误,可能会损坏中间件或影响吞吐量。

您的发布服务器可能会停止运行,或者由于内存使用过多而导致服务器崩溃。本系列文章重点关注rabbitmq的最佳实践。应做和不应做两种不同使用类别的最佳实践相混合;高可用性和高性能(高吞吐量)。我们将讨论队列大小、常见错误、延迟队列、预取值、连接和通道、HIPE和集群中的节点数。这些通常都是最佳实践规则,基于我们在使用rabbitmq时获得的经验。

队列中的许多消息会对RAM的使用造成很大的负担。为了释放RAM,rabbitmq将(页面输出)消息刷新到磁盘。此过程会降低排队速度。当有许多消息需要分页取出时,分页过程通常会花费时间并阻止队列处理消息。许多消息可能会对中间件的性能产生负面影响。

当有许多消息重启集群时,也是费时的,因为必须重建索引。重新启动后,在群集中的节点之间同步消息也需要时间。

在rabbitmq 3.6中添加了一个名为lazy queues的功能。懒惰队列是消息自动存储到磁盘上的队列。只有在需要时才将消息加载到内存中。对于懒惰的队列,消息直接进入磁盘,因此RAM的使用被最小化,但是吞吐时间将花费更长的时间。

我们已经看到,懒惰的队列以更好的可预测性的方式,创建了一个更稳定的集群。要让您的消息不出现警告,请刷新到磁盘。你不会突然被一个性能冲击问题所困扰。如果您一次发送大量消息(例如处理批处理作业),或者如果您认为您的消费者一直无法跟上发布者的速度,我们建议您启用延迟队列。

对于经常受到消息峰值冲击的应用程序,以及要求吞吐量比其他任何东西都重要的应用程序,可以推荐的另一做法是设置队列的最大长度。这样可以通过丢弃来自队列头部的消息来保持队列的简短性,从而使队列永远不会超过max-length设置。

队列在rabbitmq中是单线程的,一个队列可以处理大约50k条消息/秒。如果您有多个队列和消费者,您可以在多核系统上获得更好的吞吐量。如果在底层节点上拥有与核心一样多的队列,那么您将获得最佳吞吐量。

rabbitmq管理接口为集群中的每个队列收集和计算度量。如果您有数千个活动队列和使用者,这可能会减慢服务器的运行速度。如果队列太多,CPU和RAM的使用也可能受到负面影响。

队列性能受限于一个CPU核心。因此,如果将队列拆分到不同的核心,您将获得更好的性能;如果您拥有rabbitmq集群,您也可以将他们拆分到不同的节点。

rabbitmq队列绑定到最初声明它们的节点。即使您创建了一个rabbitmq中间件集群,所有路由到特定队列的消息都将转到该队列所在的节点。您可以在节点之间平均地手动拆分队列,但缺点是您需要记住队列的位置。

如果您有多个节点或具有多个核心的单节点集群,我们建议使用两个插件来帮助您:

当您想要在生产者和消费者之间共享队列时,为队列命名是很重要的,但是如果您使用临时队列,则不重要。相反,您应该让服务器使用一个随机的队列名称,而不是你自己命名一个——或者修改rabbitmq策略。

客户机连接可能会失败,并可能留下未使用的资源(队列),留下许多队列可能会影响性能。自动删除队列有三种方法:

在 Erlang VM 的内部队列每个队列均使用用了一个优先级别,他们耗费了一些资源。在大多数情况下,不超过5个优先级就足够了。

一个常见的问题是如何处理发送到rabbitmq的消息的palyload(消息大小)。当然,您不应该在消息中发送非常大的文件信息,但是每秒的消息数是一个比它本身的消息大小更大的瓶颈。发送多个小消息可能是一个坏的选择。一个更好的办法是将它们捆绑成一个更大的消息,让消费者将其拆分。但是,如果捆绑多条消息,则需要记住这可能会影响处理时间。如果其中一条捆绑消息失败,是否需要重新处理所有这些消息?如何设置这个取决于带宽和体系结构。

每个连接使用大约100kb的RAM(如果使用TLS,甚至更多)。数千个连接可能是rabbitmq服务器的沉重负担。在最坏的情况下,服务器可能由于内存不足而崩溃。AMQP协议有一种称为“多路复用”的机制,它“复用”单个TCP连接。它建议每个进程只创建一个TCP连接,并在这个唯一一个连接的基础上为不同的线程使用多个通道。连接也应该是长连接的。AMQP连接的握手过程非常复杂,至少需要7个TCP数据包(如果使用了TLS,则需要更多)。

相反,如果需要,可以更频繁地打开和关闭通道。如果可能的话,甚至通道也应该是长寿命的,例如,在每个发布信息线程中复用相同的通道。每次发布信息时不用打开频道。最佳实践是复用连接,使用各通道在一个连接的基础上实现多路复用。理想情况下,每个进程只能有一个连接,然后在应用程序中为每个线程使用一个通道,而每个channel 复用同一个连接即可。

您还应该确保不在线程之间共享通道,因为大多数客户机不保证通道是线程安全的(因为这样会对性能产生严重的负面影响)。

确保不要在线程之间共享通道,因为大多数客户机不会使通道线程安全(因为这样会对性能产生严重的负面影响)。

为发布者和消费者区分连接以获得高吞吐量。当发布服务器向服务器发送太多要处理的消息时,rabbitmq可以对TCP连接施加反向压力。如果消费者使用相同的TCP连接,服务器可能不会从客户机接收消息确认。因此,消费性能也会受到影响。而随着消费速度的降低,服务器将不堪重负。

具有大量连接和通道的另一个影响为rabbitmq管理接口的性能。对于每个连接和通道性能,指标必须收集、分析和显示度量。

在连接失败的情况下,传输中的消息可能会丢失,并且可能需要重新传输此类消息。Acknowledgements 让服务器和客户机知道何时重新传输消息。客户机可以在收到消息时对其进行确认,也可以在客户机完全处理完消息后对其进行确认。Acknowledgement 具有性能影响,因此为了实现最快的吞吐量,应该禁用手动确认。

接收重要消息的消费应用程序在完成需要对其进行的任何操作之前不应确认消息,这样未处理的消息(工作进程崩溃、异常等)就不会丢失。

发布确认,是相同的事情,但用于发布。服务器收到来自发布服务器的消息时会进行确认。发布确认也会影响性能。但是,应该记住,如果发布者至少需要处理一次消息,就需要这样做。

所有未确认的消息必须驻留在服务器上的RAM中。如果您有太多未确认的消息,您将耗尽内存。限制未确认消息的一个有效方法是客户端预取的消息数做出相关设置。在预取部分了解有关预取的更多信息。

如果您不能承受丢失任何消息的代价,请确保您的队列声明为“持久”,并且您的消息以传递模式“持久”发送。

为了避免在中间件中丢失消息,需要为中间件重新启动、中间件硬件故障或中间件崩溃时做好准备。为了确保消息和中间件定义在重新启动后仍然存在,我们需要确保它们在磁盘上。在中间件重新启动期间,不持久的消息、交换和队列将会被丢失。

持久性消息更重,因为它们必须写入磁盘。请记住,即使您发送的是临时消息,懒惰的队列也会对性能产生相同的影响。对于高性能-请使用瞬态消息。

您可以通过amqps连接到rabbitmq,这是用tls包装的amqp协议。由于所有流量都必须加密和解密,因此TLS会影响性能。为了获得最大的性能,我们建议使用vpc对等,那么流量是私有的,并且是独立的,不涉及AMQP客户机/服务器。

在cloudamqp中,我们将rabbitmq服务器配置为只接受快速但安全的加密密码并确定其优先级。

预取值用于指定多少条消息将同时被发送给消费者。它被用来从你的消费者那里得到尽可能多的东西(饱和工作)。

From RabbitMQ.com: “The goal is to keep the consumers saturated with work, but to minimise the client's buffer size so that more messages stay in Rabbit's queue and are thus available for new consumers or to just be sent out to consumers as they become free.”

来自rabbitmq.com:“我们的目标是让消费者饱和工作,但要最大限度地减小客户机的缓冲区大小,因此更多的消息被留在Rabbit的队列中,从而对新的消费者可用,或者发送给那些变得空闲的消者。”

rabbitmq的默认预取设置为客户端提供了一个不受限制的缓冲区,这意味着rabbitmq在默认情况下会将尽可能多的消息发送给任何看起来准备接受它们的客户机。发送的消息由rabbitmq客户端库(在使用者中)缓存,直到对其进行处理。预取限制了在确认消息之前客户端可以接收的消息数。所有预取的消息都将从队列中删除,并且对其他使用者不可见。

A too small prefetch count may hurt performance since RabbitMQ is most of the time waiting to get permission to send more messages. The image below is illustrating long idling time. In the example, we have a QoS prefetch setting of 1. This means that RabbitMQ won't send out the next message until after the round trip completes (deliver, process, acknowledge). Round time in this picture is in total 125ms with a processing time of only 5ms.

预取数太小可能会影响性能,因为rabbitmq大多数时间都在等待获得发送更多消息的许可。下图显示的是长时间的空转时间。在本例中,QoS预取设置为1。这意味着rabbitmq在往返完成(传递、处理、确认)之前不会发送下一条消息。图片中的整个周期时间总共为125ms,处理时间仅为5ms。

另一方面,大量的预取数可以接收队列中的大量消息并将其传递给同一个消费者,但是其他使用者却处于空闲状态。

如果您有一个或几个消费者快速处理消息,我们建议您一次预取多个消息。尽量让你的客户端繁忙。如果您一直有大约相同的处理时间,并且网络行为保持不变-您只需在客户机上为每个消息计算总的往返时间/处理时间,即可获得估计的预取值。

如果您有许多消费者,并且处理时间很短,我们建议预取值设置的应该比单个或少数使用者要低一些。太低的值会让消费者空转很多,因为他们需要等待消息到达。过高的值可能会使一个消费者忙碌,而其他消费者则处于空闲状态。

如果您有许多使用者和/或处理时间较长,我们建议您将预取计数设置为1,以便消息在所有消费者中均匀分布。

请注意,如果客户端自动确认消息,则预取值将不起作用。

一个典型的错误是有一个无限的预取,其中一个客户机接收所有的消息,耗尽内存并崩溃,然后所有的消息都被重新传递。

有关rabbitmq预取的信息,请参阅推荐的rabbitmq文档。

HIPE将以增加启动时间为代价增加服务器吞吐量。启用HIPE时,将在启动时编译rabbitmq。根据我们的基准测试,吞吐量增加了20-80%。HIPE的缺点是启动时间也增加了很多,大约1-3分钟。在rabbitmq的文档中,hipe仍然被标记为实验性的。

如果您需要高可用性,请不要启用HIPE。

当您用一个节点创建一个cloudamqp实例时,您将得到一个具有高性能的单个节点。一个节点将为您提供 最高的性能 ,因为消息不需要在多个节点之间进行镜像。

当您使用两个节点创建一个CloudAMQP实例时,与单个节点的相比,您将获得一半的性能。节点位于不同的可用性区域,队列在可用性区域之间自动镜像。两个节点将为您提供 高可用性 ,因为一个节点可能崩溃或被标记为受损,但另一个节点仍将启动并运行,准备接收消息。

当您使用三个节点创建一个CloudAMQP实例时,与单个节点的相同计划大小相比,您将获得1/4的性能。节点位于不同的可用性区域,队列在可用性区域之间自动镜像。您也可以暂停少数组件-与允许每个节点响应相比,通过关闭少数组件,您减少了重复传递。暂停少数组件是三节点集群中的一种分区处理策略,它可以防止由于网络拆分而导致数据不一致。

我们在cloudamqp集群中注意到的一个常见错误是,用户创建了一个新的vhost,但忘记为新的vhost启用一个ha策略。如果没有HA策略,则不会在节点之间同步消息。

直接交换是最快速。如果有许多bindings ,rabbitmq必须计算将消息发送到何处。

有些插件可能非常好用,但它们可能会消耗大量的CPU或RAM。因此,不建议将它们用于生产服务器。确保禁用不使用的插件。您可以通过CloudAmqp中的控制面板启用许多不同的插件。

将rabbitmq管理统计速率模式设置为detailed会严重影响性能,不应在生产中使用。

确保您使用的是最新推荐的客户端库版本

保持最新稳定版本的rabbitmq和erlang。在为客户发布新的主要版本之前,我们通常会在很大程度上对其进行测试。请注意,在为新集群选择版本的下拉列表中,我们始终使用最推荐的版本作为所选选项(默认)。

Dead lettering和TTL是rabbitmq中的两个流行功能,应该谨慎使用。TTL和Dead lettering可以产生您没有预料到的性能影响。

使用x-dead-letter-exchange属性声明的队列将向指定的dead-letter-exchange 发送被拒绝、非确认或过期(带有ttl)的消息。如果您指定了x-dead-letter-routing-key,则消息的路由键将在dead lettered时更改。

通过使用x-message-ttl属性声明队列,如果消息在指定的时间内未被使用,则将从队列中丢弃消息。

延迟任务的几种高效解决方案

我们把需要延迟执行的任务叫做延迟任务。也就是说当发生某个事件之后或者之前的某个特定的时间点执行的一系列动作。

延迟任务的使用场景有以下这些:

延迟任务的特点有以下这些:

Redis实现延时任务,是通过其数据结构ZSET来实现的。ZSET会储存一个score和一个value,可以将value按照score进行排序。

延时任务的实现分为以下几步来实现:

(1) 将任务的执行时间作为score,要执行的任务数据作为value,jobId+topicName+groupId+delayTime作为key,通过zadd命令将数据存放在zset中;

(2) 用一个进程定时查询zset的score分数最小的元素,可以用ZRANGEBYSCORE key -inf +inf limit 0 1 withscores命令来实现;

(3) 如果最小的分数小于等于当前时间戳,就将该任务取出来执行并使用zrem原子命令删除数据,否则休眠一段时间后再查询。

redis的ZSET是通过跳跃表来实现的,复杂度为O(logN),N是存放在ZSET中元素的个数。用redis来实现可以依赖于redis自身的持久化来实现持久化,redis的集群来支持高并发和高可用。因此开发成本很小,可以做到很实时。

优点:

1、Redis zset支持高性能的 score 排序。

2、Redis可以动态扩缩容,当消息很多时候,我们可以用集群来提高消息处理的速度,满足容量和性能上的可扩展性。

3、Redis具有持久化机制,当出现故障的时候,可以通过AOF和RDB方式来对数据进行恢复,保证了数据的可靠性。

4、简单实用,快速落地。

缺点:

1、为了避免了当一个 KEY 在存储了较多的延时消息后,入队操作以及查询操作速度变慢的问题(两个操作的时间复杂度均为O(logN)),改进的办法是,将延迟的消息任务通过 hash 算法路由至不同的 Redis Key 上,再开启多个消费线程进行消费,提供吞吐量。

2、没有ack机制,消息存在丢失的可能性。

3、因为是通过定时轮询的方式拉取redis zset中的数据,所以存在一定的时间差,可以通过缩短轮询时间来较少时间差,但是频繁的轮询会造成CPU的浪费,可以通过wait/notify的方式解决该问题。

4、需要实现发送失败自动重试机制。

参考链接:

1、有赞开源实现:

2、美图开源实现:

RabbitMQ 本身并不直接提供对延迟队列的支持,我们依靠 RabbitMQ 的TTL以及死信队列功能,来实现延迟队列的效果。

死信队列实际上是一种 RabbitMQ 的消息处理机制,当 RabbmitMQ 在生产和消费消息的时候,消息遇到如下的情况,就会变成“死信”:

消息生存时间 TTL

TTL(Time-To-Live)是 RabbitMQ 的一种高级特性,表示了一条消息的最大生存时间,单位为毫秒。如果一条消息在 TTL 设置的时间内没有被消费,那么它就会变成一条死信,进入我们上面所说的死信队列。

有两种不同的方式可以设置消息的 TTL 属性,一种方式是直接在创建队列的时候设置整个队列的 TTL 过期时间,所有进入队列的消息,都被设置成了统一的过期时间,一旦消息过期,马上就会被丢弃,进入死信队列;另一种方式是针对单条消息设置,不过需要注意的是,使用这种方式设置的 TTL,消息可能不会按时死亡,因为 RabbitMQ 只会检查第一个消息是否过期。比如这种情况,第一个消息设置了 20s 的 TTL,第二个消息设置了 10s 的 TTL,那么 RabbitMQ 会等到第一个消息过期之后,才会让第二个消息过期。在RabbitMQ的3.5.8版本以后,我们就可以使用官方推荐的 rabbitmq delayed message exchange 插件很方便地实现延迟消息的功能。

优点:

1、息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。

2、通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延迟队列不可用或者消息丢失。

缺点:

1、需要自己搭建和运维集群。

rocketmq在发送延时消息时,是先把消息按照延迟时间段发送到指定的队列中(把延时时间段相同的消息放到同一个队列中,保证了消息处理的顺序性,可以让同一个队列中消息延时时间是相同的,整个RocketMQ中延时消息时按照递增顺序排序,保证信息处理的先后顺序性。)。之后,通过一个定时器来轮询处理这些队列里的信息,判断是否到期。对于到期的消息会发送到相应的处理队列中,进行处理。

注意 :目前RocketMQ只支持特定的延时时间段,1s,5s,10s,...2h,不能支持任意时间段的延时设置。

优点:

1、分布式、高吞吐量、高性能、高可靠。

缺点:

1、需要自己搭建和运维集群。

2、只支持特定的延时时间段。

ActiveMQ在5.4及以上版本开始支持持久化的延迟消息功能,甚至支持Cron表达式。默认是该功能是不开启的,如果开启需要修改配置文件activemq.xml,在broker节点上把schedulerSupport属性设置为true。

优点:

1、支持cron表达式,更灵活。

缺点:

1、需要自己搭建和运维集群。

数据量少的话可以尝试quartz、delayQueue、TimeWheel (时间轮)等方案,但是为了保证数据不丢失,需要借助第三方持久化存储系统,例如rocksDB等。

Kafka延时队列

TimingWheel是kafka时间轮的实现,内部包含了⼀个TimerTaskList数组,每个数组包含了⼀些链表组成的TimerTaskEntry事件,每个TimerTaskList表示时间轮的某⼀格,这⼀格的时间跨度为tickMs,同⼀个TimerTaskList中的事件都是相差在⼀个tickMs跨度内的,整个时间轮的时间跨度为interval = tickMs * wheelSize,该时间轮能处理的时间范围在cuurentTime到currentTime + interval之间的事件。

当添加⼀个时间他的超时时间⼤于整个时间轮的跨度时, expiration = currentTime + interval,则会将该事件向上级传递,上级的tickMs是下级的interval,传递直到某⼀个时间轮满足expiration currentTime + interval,然后计算对应位于哪⼀格,然后将事件放进去,重新设置超时时间,然后放进jdk延迟队列。

SystemTimer会取出queue中的TimerTaskList,根据expiration将currentTime往前推进,然后把⾥⾯所有的事件重新放进时间轮中,因为ct推进了,所以有些事件会在第0格,表示到期了,直接返回。

else if (expiration currentTime + tickMs) {

然后将任务提交到java线程池中处理。

服务端在处理客户端的请求,针对不同的请求,可能不会⽴即返回响应结果给客户端。在处理这类请求时,服务端会为这类请求创建延迟操作对象放⼊延迟缓存队列中。

延迟缓存的数据结构类似MAP,延迟操作对象从延迟缓存队列中完成并移除有两种⽅式:

1,延迟操作对应的外部事件发⽣时,外部事件会尝试完成延迟缓存中的延迟操作 。

2,如果外部事件仍然没有完成延迟操作,超时时间达到后,会强制完成延迟的操作。

DelayedOperation接口表示延迟的操作对象。此接口的实现类包括延迟加⼊,延迟心跳,延迟生产,延迟拉取。

延迟接口相关的方法:

tryComplete:尝试完成,外部事件发⽣时会尝试完成延迟的操作。该⽅法返回值为true,表示可以完成延迟操作,会调⽤强制完成的方法(forceComplete)。返回值为false,表示不可以完成延迟操作。

onComplete:完成的回调方法。

onExpiration:超时的回调方法。

外部事件触发完成和超时完成都会调⽤forceComplete(),并调⽤onComplete()。forceComplete和onComplete只会调⽤⼀次。多线程下⽤原⼦变量来控制只有⼀个线程会调⽤onComplete和forceComplete。

延迟⽣产和延迟拉取完成时的回调⽅法,尝试完成的延迟操作副本管理器在创建延迟操作时,会把回调⽅法传给延迟操作对象。当延迟操作完成时,在onComplete⽅法中会调⽤回调⽅法,返回响应结果给客户端。

创建延迟操作对象需要提供请求对应的元数据。延迟⽣产元数据是分区的⽣产结果;延迟拉取元数据是分区的拉取信息。

创建延迟的⽣产对象之前,将消息集写⼊分区的主副本中,每个分区的⽣产结果会作为延迟⽣产的元数据。创建

延迟的拉取对象之前,从分区的主副本中读取消息集,但并不会使⽤分区的拉取结果作为延迟拉取的元数据,因为延迟⽣产返回给客户端的响应结果可以直接从分区的⽣产结果中获取,⽽延迟的拉取返回给客户端的响应结果不能直接从分区的拉取结果中获取。

元数据包含返回结果的条件是:从创建延迟操作对象到完成延迟操作对象,元数据的含义不变。对于延迟的⽣产,服务端写⼊消息集到主副本返回的结果是确定的。是因为ISR中的备份副本还没有全部发送应答给主副本,才会需要创建延迟的⽣产。服务端在处理备份副本的拉取请求时,不会改变分区的⽣产结果。最后在完成延迟⽣产的操作对象时,服务端就可以把 “创建延迟操作对象” 时传递给它的分区⽣产结果直接返回给⽣产者 。对应延迟的拉取,读取了主副本的本地⽇志,但是因为消息数量不够,才会需要创建延迟的拉取,⽽不⽤分区的拉取结果⽽是⽤分区的拉取信息作为延迟拉取的元数据,是因为在尝试完成延迟拉取操作对象时,会再次读取主副本的本地⽇志,这次的读取有可能会让消息数量达到⾜够或者超时,从⽽完成延迟拉取操作对象。这样创建前和完成时延迟拉取操作对象的返回结果是不同的。但是拉取信息不管读取多少次都是⼀样的。

延迟的⽣产的外部事件是:ISR的所有备份副本发送了拉取请求;备份副本的延迟拉取的外部事件是:追加消息集到主副本;消费者的延迟拉取的外部事件是:增加主副本的最⾼⽔位。

服务端处理⽣产者客户端的⽣产请求,将消息集追加到对应主副本的本地⽇志后,会等待ISR中所有的备份刚本都向主副本发送应答 。⽣产请求包括多个分区的消息集,每个分区都有对应的ISR集合。当所有分区的ISR副本都向对应分区的主副本发送了应答,⽣产请求才能算完成。⽣产请求中虽然有多个分区,但是延迟的⽣产操作对象只会创建⼀个。

判断分区的ISR副本是否都已经向主副本发送了应答,需要检查ISR中所有备份副本的偏移量是否到了延迟⽣产元数据的指定偏移量(延迟⽣产的元数据是分区的⽣产结果中包含有追加消息集到本地⽇志返回下⼀个偏移量)。所以ISR所有副本的偏移量只要等于元数据的偏移量,就表示备份副本向主副本发送了应答。由于当备份副本向主副本发送拉取请求,服务端读取⽇志后,会更新对应备份副本的偏移量数据。所以在具体的实现上,备份副本并不需要真正发送应答给主副本,因为主副本所在消息代理节点的分区对象已经记录了所有副本的信息,所以尝试完成延迟的⽣产时,根据副本的偏移量就可以判断备份副本是否发送了应答。进⽽检查分区是否有⾜够的副本赶上指定偏移量,只需要判断主副本的最⾼⽔位是否等于指定偏移量(最⾼⽔位的值会选择ISR中所有备份副本中最⼩的偏移量来设置,最⼩的值都等于了指定偏移量,那么就代表所有的ISR都发送了应答)。

总结:服务端创建的延迟⽣产操作对象,在尝试完成时根据主副本的最⾼⽔位是否等于延迟⽣产操作对象中元数据的指定偏移量来判断。 具体步骤:

1,服务端处理⽣产者的⽣产请求,写⼊消息集到Leader副本的本地⽇志。

2,服务端返回追加消息集的下⼀个偏移量,并且创建⼀个延迟⽣产操作对象。元数据为分区的⽣产结果(其中就

包含下⼀个偏移量的值)。

3,服务端处理备份副本的拉取请求,⾸先读取主副本的本地⽇志。

4,服务端返回给备份副本读取消息集,并更新备份副本的偏移量。

5,选择ISR备份副本中最⼩的偏移量更新主副本的最⾼⽔位。

6,如果主副本的最⾼⽔位等于指定的下⼀个偏移量的值,就完成延迟的⽣产。

服务端处理消费者或备份副本的拉取请求,如果创建了延迟的拉取操作对象,⼀般都是客户端的消费进度能够⼀直赶上主副本。⽐如备份副本同步主副本的数据,备份副本如果⼀直能赶上主副本,那么主副本有新消息写⼊,备份副本就会⻢上同步。但是针对备份副本已经消费到主副本的最新位置,⽽主副本并没有新消息写⼊时:服务端没有⽴即返回空的拉取结果给备份副本,这时会创建⼀个延迟的拉取操作对象,如果有新的消息写⼊,服务端会等到收集⾜够的消息集后,才返回拉取结果给备份副本,有新的消息写⼊,但是还没有收集到⾜够的消息集,等到延迟操作对象超时后,服务端会读取新写⼊主副本的消息后,返回拉取结果给备份副本(完成延迟的拉取时,服务端还会再读取⼀次主副本的本地⽇志,返回新读取出来的消息集)。

客户端的拉取请求包含多个分区,服务端判断拉取的消息⼤⼩时,会收集拉取请求涉及的所有分区。只要消息的总⼤⼩超过拉取请求设置的最少字节数,就会调⽤forceComplete()⽅法完成延迟的拉取。

外部事件尝试完成延迟的⽣产和拉取操作时的判断条件:

拉取偏移量是指拉取到消息⼤⼩。对于备份副本的延迟拉取,主副本的结束偏移量是它的最新偏移量(LEO)。对于消费者的拉取延迟,主副本的结束偏移量是它的最⾼⽔位(HW)。备份副本要时刻与主副本同步,消费者只能消费到主副本的最⾼⽔位。

客户端的⼀个请求包括多个分区,服务端为每个请求都会创建⼀个延迟操作对象。⽽不是为每个分区创建⼀个延迟操作对象。服务端的“延迟操作缓存”管理了所有的“延迟操作对象”,缓存的键是每⼀个分区,缓存的值是分区对应的延迟操作列表。

⼀个客户端请求对应⼀个延迟操作,⼀个延迟操作对应多个分区。在延迟缓存中,⼀个分区对应多个延迟操作。

延迟缓存中保存了分区到延迟操作的映射关系。

根据分区尝试完成延迟的操作,因为⽣产者和消费者是以分区为最⼩单位来追加消息和消费消息。虽然延迟操作的创建是针对⼀个请求,但是⼀个请求中会有多个分区,在⽣产者追加消息时,⼀个⽣产请求总的不同分区包含的消息是不⼀样的。这样追加到分区对应的主副本的本地⽇志中,有的分区就可以去完成延迟的拉取,但是有的分区有可能还达不到完成延迟拉取操作的条件。同样完成延迟的⽣产也⼀样。所以在延迟缓存中要以分区为键来存储各个延迟操作。

由于⼀个请求创建⼀个延迟操作,⼀个请求⼜会包含多个分区,所以不同的延迟操作可能会有相同的分区。在加⼊到延迟缓存时,每个分区都对应相同的延迟操作。外部事件发⽣时,服务端会以分区为粒度,尝试完成这个分区中的所有延迟操作 。 如果指定分区对应的某个延迟操作可以被完成,那么延迟操作会从这个分区的延迟操作列表中移除。但这个延迟操作还有其他分区,其他分区中已经被完成的延迟操作也需要从延迟缓存中删除。但是不会⽴即被删除,因为分区作为延迟缓存的键,在服务端的数量会很多。只要分区对应的延迟操作完成了⼀个,就要⽴即检查所有分区,对服务端的性能影响⽐较⼤。所以采⽤⼀个清理器,会负责定时地清理所有分区中已经完成的延迟操作。

副本管理器针对⽣产请求和拉取请求都分别有⼀个全局的延迟缓存。⽣产请求对应延迟缓存中存储了延迟的⽣产。拉取请求对应延迟缓存中存储了延迟的拉取。

延迟缓存提供了两个⽅法:

tryCompleteElseWatch():尝试完成延迟的操作,如果不能完成,将延迟操作加⼊延迟缓存中。⼀旦将延迟操作加⼊延迟缓存的监控,延迟操作的每个分区都会监视该延迟操作。换句话说就是每个分区发⽣了外部事件后,都会去尝试完成延迟操作。

checkAndComplete():参数是延迟缓存的键,外部事件调⽤该⽅法,根据指定的键尝试完成延迟缓存中的延迟操作。

延迟缓存在调⽤tryCompleteElseWatch⽅法将延迟操作加⼊延迟缓存之前,会先尝试⼀次完成延迟的操作,如果不能完成,会调⽤⽅法将延迟操作加⼊到分区对应的监视器,之后还会尝试完成⼀次延迟操作,如果还不能完成,会将延迟操作加⼊定时器。如果前⾯的加⼊过程中,可以完成延迟操作后,那么就可以不⽤加⼊到其他分区的延迟缓存了。

延迟操作不仅存在于延迟缓存中,还会被定时器监控。定时器的⽬的是在延迟操作超时后,服务端可以强制完成延迟操作返回结果给客户端。延迟缓存的⽬的是让外部事件去尝试完成延迟操作。

延迟缓存的每个键都有⼀个监视器(类似每个分区有⼀个监视器),以链表结构来管理延迟操作。当外部事件发⽣时,会根据给定的键,调⽤这个键的对应监视器的tryCompleteWatch()⽅法,尝试完成监视器中所有的延迟操作。

监视器尝试完成所有延迟操作的过程中,会调⽤每个延迟操作的tryComplete()⽅法,判断能否完成延迟的操作。如果能够完成,就从链表中删除对应的延迟操作。

清理线程的作⽤是清理所有监视器中已经完成的延迟操作。

服务端创建的延迟操作会作为⼀个定时任务,加⼊定时器的延迟队列中。当延迟操作超时后,定时器会将延迟操作从延迟队列中弹出,并调⽤延迟操作的运⾏⽅法,强制完成延迟的操作。

定时器使⽤延迟队列管理服务端创建的所有延迟操作,延迟队列的每个元素是定时任务列表,⼀个定时任务列表可以存放多个定时任务条⽬。服务端创建的延迟操作对象,会先包装成定时任务条⽬,然后加⼊延迟队列指定的⼀个定时任务列表。延迟队列是定时器中保存定时任务列表的全局数据结构,服务端创建的延迟操作不是直接加⼊定时任务列表,⽽是加⼊时间轮。

时间轮和延迟队列的关系:

1,定时器拥有⼀个全局的延迟队列和时间轮,所有时间轮公⽤⼀个计数器。

2,时间轮持有延迟队列的引⽤。

3,定时任务条⽬添加到时间轮对应的时间格(槽)(槽中是定时任务列表)中,并且把该槽表也会加⼊到延迟队列中。

4,⼀个线程会将超时的定时任务列表会从延迟队列的poll⽅法弹出。定时任务列表超时并不⼀定代表定时任务超时,将定时任务重新加⼊时间轮,如果加⼊失败,说明定时任务确实超时,提交给线程池执⾏。

5,延迟队列的poll⽅法只会弹出超时的定时任务列表,队列中的每个元素(定时任务列表)按照超时时间排序,如果第⼀个定时任务列表都没有过期,那么其他定时任务列表也⼀定不会超时。

延迟操作本身的失效时间是客户端请求设置的,延迟队列的元素(每个定时任务列表)也有失效时间,当定时任务列表中的getDelay()⽅法返回值⼩于等于0,就表示定时任务列表已经过期,需要⽴即执⾏。

如果当前的时间轮放不下加⼊的时间时,就会创建⼀个更⾼层的时间轮。定时器只持有第⼀层的时间轮的引⽤,并不会持有更⾼层的时间轮。因为第⼀层的时间轮会持有第⼆层的时间轮的引⽤,第⼆层会持有第三层的时间轮的引⽤。定时器将定时任务加⼊到当前时间轮,要判断定时任务的失效时间⾸是否在当前时间轮的范围内,如果不在当前时间轮的范围内,则要将定时任务上升到更⾼⼀层的时间轮中。时间轮包含了定时器全局的延迟队列。

时间轮中的变量:tickMs=1:表示⼀格的⻓度是1毫秒;wheelSize=20表示⼀共20格,时间轮的范围就是20毫秒,定时任务的失效时间⼩于等于20毫秒的都会加⼊到这⼀层的时间轮中;interval=tickMs*wheelSize=20,如果需要创建更⾼⼀层的时间轮,那么低⼀层的时间轮的interval的值作为⾼⼀层数据轮的tickMs值;currentTime当前时间轮的当前时间,往前移动时间轮,主要就是更新当前时间轮的当前时间,更新后重新加⼊定时任务条⽬。

本文转载自互联网,如有侵权,联系删除

相关推荐