在大部分场景下业务系统如果只须要实现异步解耦、削峰填谷等能力,常规的普通就可以知足此类需求。除此之外,在某些分外的业务场景中,普通类型存在无法知足需求的情形。这就须要行列步队做事本身支持一些分外的类型,或者开拓者通过开拓一些定制化的代码实现目的。这里我们列举在利用行列步队过程中几种分外场景的例子:
顺序消费场景
生产者按照一定的先后顺序发布,消费者按照既定的先后顺序消费,即先发布的一定会先被客户端消费。

分布式事务场景
分布式架构下,随着系统的演进,数据库也进行了垂直拆分,如果选择利用行列步队进行高下游解耦的话,生产者和消费者须要担保数据同等性。
延时消费场景
生产者将发送到行列步队后,并不期望立马投递这条,而是推迟到某个韶光点之后将投递给消费者进行消费。
对付顺序和事务,这里就不进行详细先容了,大家有兴趣可以自行研究,本文后续内容会和大家一起详细谈论下延时更多的细节及运用处景。
延时先容延时(定时)的特点便是发送者成功发送一条后,这条并不会立时被消费者消费,而是在某个特定的韶光或者延迟一段韶光后,才被消费者可见并进行后续的消费,延时全体生命周期可以用如下示意图来表示:
发布者将一条延时发送到行列步队做事端;在估量投递韶光未到之前,对消费者不可见,消费者此时无法急速消费;投递韶光到达后,才对消费者可见,消费者此时可以消费;消费者获取此条并进行消费;消费者成功消费后,进行确认,此条将不再被消费。延时运用处景
交易场景
在生产者和消费者有韶光窗口的哀求下,我们可以考虑利用延时。如在电商交易场景下,交易中超时未支付的订单须要被关闭的场景,在订单创建时会发送一条延时。这条将会在30分钟往后投递给消费者,消费者收到此后,须要判断对应的订单是否已完成支付;如支付未完成,则关闭订单。
游戏场景
再比如在游戏社区里,游戏运营方常常会发起一些活动,玩家在活动期间内按照规则完成一系列任务,活动韶光截止后,游戏后台根据玩家完成任务的情形进行剖断,发送系统关照或者进行rank排名并派发褒奖等。
此种场景也可以采取延时来实现,上游系统发布活动公告后,同时发送一条延时,延时时间设置为活动周期的韶光。当活动截止后,下贱系统可以随即消费并进行相应的逻辑处理。
其他场景
同时延时也可以广泛运用于信息提醒等比较通用的场景。
如何实现延时先容完延时的一些观点及运用处景后,我们接下来剖析一下目前比较主流的几款开源中间件对延时的支持情形以及实现办法。
Kafka
原生Kafka默认是不支持延时的,须要开拓者自己实现一层代理做事,比如发送端将发送到延时Topic,代理做事消费延时Topic的然后转存起来,代理做事通过一定的算法,打算延时所附带的延时时间是否到达,然后将延时取出来并发送到实际的Topic里面,消费端从实际的Topic里面进行消费。
RabbitMQ
RabbitMQ实现延时有两种方案,第一种是采取rabbitmq-delayed-message-exchange 插件实现,第二种则是利用DLX(Dead Letter Exchanges)+ TTL(存活韶光)来间接实现。大致的实现思路如下:
创建一个普通行列步队delay_queue,为此行列步队设置去世信交流机 (通过x-dead-letter-exchange参数) 和 RoutingKey (通过x-dead-letter-routing-key参数),生产者将向delay_queue发送延时。创建步骤1中设置的去世信交流机,同时创建一个目标行列步队 target_queue,并利用步骤1中设置的RoutingKey将两者绑定起来。消费者将从target_queue里面消费延时。设置的存活韶光TTL,可以在步骤1中设置到行列步队级别delay_queue的存活韶光,或者在发送时动态设置级别的存活韶光。RocketMQ
开源RocketMQ支持延迟,但是不支持秒级精度。默认支持18个level的延迟,这是通过broker真个messageDelayLevel配置项确定的messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
行列步队做事在启动时,会创建一个内部topic:SCHEDULE_TOPIC_XXXX,根据延迟level的个数,创建对应数量的行列步队。生产者发送时可以设置延时等级,示例代码:
Message msg=new Message();msg.setTopic("TopicA");msg.setBody("this is a delay message".getBytes());//设置延迟level为5,对应延迟1分钟msg.setDelayTimeLevel(5);producer.send(msg);
发送的会暂存在Broker对应的内部topic中,再通过定时任务从内部topic中拉取数据,如果延迟韶光到了,就会把转发到目标topic下,消费者从目标topic消费。
阿里云行列步队RocketMQ版通过上一章节的谈论,我们可以看出目前几款主流的开源行列步队做事,在支持延时的场景下或多或少有些不完美的地方。紧张表示在以下几点:
Kafka不支持延时,须要完备开拓代理做事来实现,事情量大。RabbitMQ须要额外的插件,或者利用DLX+TTL的办法进行中转,实现不是非常直不雅观。RocketMQ支持延时,但是无法支持秒级延时。那么有没有一款行列步队做事,能够完美的支持延时(定时)。本节我们将先容阿里云行列步队RocketMQ版。
阿里云行列步队RocketMQ版基于Apache RocketMQ构建的低延迟、高并发、高可用、高可靠的分布式中间件。行列步队RocketMQ版既可为分布式运用系统供应异步解耦和削峰填谷的能力,同时也具备互联网运用所需的海量堆积、高吞吐、可靠重试等特性。同时支持丰富的类型包括普通、顺序、事务以及我们本文谈论的延时。接下来我们看下阿里云RocketMQ为延时供应的能力及上风:
支持秒级的延时(定时),同时延时时间可以最大设置为40天,基本知足所有场景。延时(定时)的投递精度可掌握在1~2秒之内。延时(定时)在某段韶光内是对消费者不可见的,从另一个维度看也属于积压的,阿里云行列步队RocketMQ版的不同实例规格可以支持亿级的积压。供应了多措辞支持,包括Java、.NET、CC++、GO、Python、PHP、Node.js等利用阿里云行列步队RocketMQ版收发延时(定时),只须要在掌握台创建Topic的时候选择定时/延时类型,既可以利用TCP或者http协议进行收发。
掌握台创建定时/延时Topic
Java措辞示例代码(TCP协议)
发送定时// 定时,单位毫秒(ms),在指定时间戳(当前韶光之后)进行投递,例如2020-03-07 16:21:00投递。如果被设置成当前韶光戳之前的某个时候,将急速投递给消费者。long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2020-03-07 16:21:00").getTime();msg.setStartDeliverTime(timeStamp);// 发送,只要不抛非常便是成功。SendResult sendResult = producer.send(msg);
发送延时
// 延时,单位毫秒(ms),在指定延迟韶光(当前韶光之后)进行投递,例如在3秒后投递。long delayTime = System.currentTimeMillis() + 3000;// 设置须要被投递的韶光。msg.setStartDeliverTime(delayTime);SendResult sendResult = producer.send(msg);
同时订阅延时的逻辑无需任何改造,完备可以按照订阅普通的办法,没有任何的代码侵入性。
结束语到此我们谈论了延时的特性、运用处景,比拟了各种行列步队对延时的支持情形,同时也向大家先容了阿里云行列步队RocketMQ版。我们在对中间件进行选型时,也会考虑到多方面的成分。除了中间件本身所能供应的能力外,也包括做事性能、稳定性、可扩展能力,以及须要结合开拓团队自身的技能栈等情形。
作者:阿里云办理方案架构师 鹿玄
本文为阿里云原创内容,未经许可不得转载。