虽然基于MQ这个办法走不通了,但是这个项目中利用到Redis,以是我就想是否能够利用Redis来代替MQ实现延迟行列步队的功能,于是我就查了一下有没有现成可用的方案,别说,还真给我查到了两种方案,并且我还仔细研究比拟了这两个方案,创造要想很好的实现延迟行列步队,并不大略。
监听过期key基于监听过期key的办法来实现延迟行列步队是我查到的第一个方案,为了弄懂这个方案实现的细节,我还特地去扒了扒官网,还真有所收成
一谈到发布订阅模式,实在一想到的便是MQ,只不过Redis也实现了一套,并且跟MQ贼像,如图:

图中的channel的观点跟MQ中的topic的观点差不多,你可以把channel理解成MQ中的topic。
生产者在发送时须要到指定发送到哪个channel上,消费者订阅这个channel就能获取到。
2、keyspace notifications在Redis中,有很多默认的channel,只不过向这些channel发送的生产者不是我们写的代码,而是Redis本身。当消费者监听这些channel时,就可以感知到Redis中数据的变革。
这个功能Redis官方称为keyspace notifications,字面意思便是键空间关照。
这些默认的channel被分为两类:
以__keyspace@<db>__:为前缀,后面跟的是key的名称,表示监听跟这个key有关的事宜。举个例子,现在有个消费者监听了__keyspace@0__:sanyou这个channel,sanyou便是Redis中的一个普通key,那么当sanyou这个key被删除或者发生了其它事宜,那么消费者就会收到sanyou这个key删除或者其它事宜的以__keyevent@<db>__:为前缀,后面跟的是事宜类型,表示监听某个事宜同样举个例子,现在有个消费者监听了__keyevent@0__:expired这个channel,代表了监听key的过期事宜。那么当某个Redis的key过期了(expired),那么消费者就能收到这个key过期的。如果把expired换成del,那么监听的便是删除事宜。详细支持哪些事宜,可从官网查。上述db是指详细的数据库,Redis不是默认分为16个库么,序号从0-15,以是db便是0到15的数字,示例中的0便是指0对应的数据库。
3、延迟行列步队实现事理
通过对上面的两个观点理解之后,该当就对监听过期key的实现事理一览无余了,实在便是当这个key过期之后,Redis会发布一个key过期的事宜到__keyevent@<db>__:expired这个channel,只要我们的做事监听这个channel,那么就能知道过期的Key,从而就算实现了延迟行列步队功能。
以是这种办法实现延迟行列步队就只须要两步:
发送延迟任务,key是延迟本身,过期韶光便是延迟韶光监听__keyevent@<db>__:expired这个channel,处理延迟任务4、demo好了,基本观点和核心事理都说完了之后,又到了show me the code环节。
好巧不巧,Spring已经实现了监听__keyevent@__:expired这个channel这个功能,__keyevent@__:expired中的代表通配符的意思,监听所有的数据库。
以是demo写起来就很大略了,只需3步即可
引入pom
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <version>2.2.5.RELEASE</version></dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>2.2.5.RELEASE</version></dependency>
配置类
@Configurationpublic class RedisConfiguration { @Bean public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer(); redisMessageListenerContainer.setConnectionFactory(connectionFactory); return redisMessageListenerContainer; } @Bean public KeyExpirationEventMessageListener redisKeyExpirationListener(RedisMessageListenerContainer redisMessageListenerContainer) { return new KeyExpirationEventMessageListener(redisMessageListenerContainer); }}
KeyExpirationEventMessageListener实现了对__keyevent@__:expiredchannel的监听
当KeyExpirationEventMessageListener收到Redis发布的过期Key的的时候,会发布RedisKeyExpiredEvent事宜
以是我们只须要监听RedisKeyExpiredEvent事宜就可以拿到过期的Key,也便是延迟。
对RedisKeyExpiredEvent事宜的监听实现MyRedisKeyExpiredEventListener
@Componentpublic class MyRedisKeyExpiredEventListener implements ApplicationListener<RedisKeyExpiredEvent> { @Override public void onApplicationEvent(RedisKeyExpiredEvent event) { byte[] body = event.getSource(); System.out.println("获取到延迟:" + new String(body)); }}
全体工程目录也大略
代码写好,启动运用
之后我直接通过Redis命令设置,就没通过代码发送了,的key为sanyou,值为task,值不主要,过期韶光为5s
set sanyou task expire sanyou 5
如果上面都理论都精确,不出意外的话,5s后MyRedisKeyExpiredEventListener该当可以监听到sanyou这个key过期的,也就相称于拿到了延迟任务,掌握台会打印出获取到延迟:sanyou。
于是我满怀希望,悄悄地等待了5s。。
5、4、3、2、1,韶光一到,我查看掌握台,但是掌握台并没有按照预期打印出上面那句话。
为什么会没打印出?难道是代码写错了?正当我准备检讨代码的时候,官网的一段话道出了真实缘故原由。
我给大家翻译一下上面这段话讲的内容。
上面这段话紧张谈论的是key过期事宜的时效性问题,首先提到了Redis过期key的两种打消策略,便是口试八股文常背的两种:
惰性打消。当这个key过期之后,访问时,这个Key才会被打消定时打消。后台会定期检讨一部分key,如果有key过期了,就会被打消再后面那段话是核心,意思是说,key的过期事宜发布机遇并不是当这个key的过期韶光到了之后就发布,而是这个key在Redis中被清理之后,也便是真正被删除之后才会发布。
到这我终于明白了,上面的例子中纵然我设置了5s的过期韶光,但是当5s过去之后,只要两种打消策略都不知足,没人访问sanyou这个key,后台的定时清理的任务也没扫描到sanyou这个key,那么就不会发布key过期的事宜,自然而然也就监听不到了。
至于后台的定时清理的任务什么时候能扫到,这个没有固定时间,可能一到过期韶光就被扫到,也可能等一定韶光才会被扫到,这就可能会造成了客户端从发布到监听到的韶光差会大于即是过期韶光,从而造成一定韶光的延迟,这就其实有点坑了。。
5、坑除了上面测试demo的时候碰着的坑之外,在我深入研究之后,还创造了一些更离谱的坑。
丢太频繁
Redis的丢跟MQ不一样,由于MQ都会有的持久化机制,可能只有当机器宕机了,才会丢点,但是Redis丢就很离谱,比如说你的做事在重启的时候就会丢。
Redis实现的发布订阅模式,是没有持久化机制,当发布到某个channel之后,如果没有客户端订阅这个channel,那么这个就丢了,并不会像MQ一样进行持久化,等有消费者订阅的时候再给消费者消费。
以是说,假设做事重启期间,某个生产者或者是Redis本身发布了一条到某个channel,由于做事重启,没有监听这个channel,那么这个自然就丢了。
消费只有广播模式
Redis的发布订阅模式消费只有广播模式一种。
所谓的广播模式便是多个消费者订阅同一个channel,那么每个消费者都能消费到发布到这个channel的所有。
如图,生产者发布了一条,内容为sanyou,那么两个消费者都可以同时收到sanyou这条。
以是,如果通过监听channel来获取延迟任务,那么一旦做事实例有多个的话,还得担保不能重复处理,额外地增加了代码开拓量。
吸收到所有key的某个事宜
这个不属于Redis发布订阅模式的问题,而是Redis本身事宜关照的问题。
当消费者监听了以__keyevent@<db>__:开头的,那么会导致所有的key发生了事宜都会被关照给消费者。
举个例子,某个消费者监听了__keyevent@__:expired这个channel,那么只要key过期了,不管这个key是张三还会李四,消费者都能收到。
以是如果你只想消费某一类的key,那么还得自行加一些标记,比如的key加个前缀,消费的时候判断一下带前缀的key便是须要消费的任务。
以是,综上能够得出一个非常主要的结论,那便是监听Redis过期Key这种办法实现延迟行列步队,不稳定,坑贼多!
那有没有比较靠谱的延迟行列步队的实现方案呢?这就不得不提到我研究的第二种方案了。
Redisson实现延迟行列步队Redisson他是Redis的儿子(Redis son),基于Redis实现了非常多的功能,个中最常利用的便是Redis分布式锁的实现,但是除了实现Redis分布式锁之外,它还实现了延迟行列步队的功能。
先来个demo,后面再来说说这种实现的事理。
1、demo引入pom
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.13.1</version></dependency>
封装了一个RedissonDelayQueue类
@Component@Slf4jpublic class RedissonDelayQueue { private RedissonClient redissonClient; private RDelayedQueue<String> delayQueue; private RBlockingQueue<String> blockingQueue; @PostConstruct public void init() { initDelayQueue(); startDelayQueueConsumer(); } private void initDelayQueue() { Config config = new Config(); SingleServerConfig serverConfig = config.useSingleServer(); serverConfig.setAddress("redis://localhost:6379"); redissonClient = Redisson.create(config); blockingQueue = redissonClient.getBlockingQueue("SANYOU"); delayQueue = redissonClient.getDelayedQueue(blockingQueue); } private void startDelayQueueConsumer() { new Thread(() -> { while (true) { try { String task = blockingQueue.take(); log.info("吸收到延迟任务:{}", task); } catch (Exception e) { e.printStackTrace(); } } }, "SANYOU-Consumer").start(); } public void offerTask(String task, long seconds) { log.info("添加延迟任务:{} 延迟韶光:{}s", task, seconds); delayQueue.offer(task, seconds, TimeUnit.SECONDS); }}
这个类在创建的时候会去初始化延迟行列步队,创建一个RedissonClient工具,之后通过RedissonClient工具获取到RDelayedQueue和RBlockingQueue工具,传入的行列步队名字叫SANYOU,这个名字无所谓。
当延迟行列步队创建之后,会开启一个延迟任务的消费线程,这个线程会一贯从RBlockingQueue中通过take方法壅塞获取延迟任务。
添加任务的时候是通过RDelayedQueue的offer方法添加的。
controller类,通过接口添加任务,延迟韶光为5s
@RestControllerpublic class RedissonDelayQueueController { @Resource private RedissonDelayQueue redissonDelayQueue; @GetMapping("/add") public void addTask(@RequestParam("task") String task) { redissonDelayQueue.offerTask(task, 5); }}
启动项目,在浏览器输入如下连接,添加任务
http://localhost:8080/add?task=sanyou
悄悄等待5s,成功获取到任务。
2、实现事理
如下图便是上面demo中,一个延迟行列步队会在Redis内部利用到的channel和数据类型
SANYOU前面的前缀都是固定的,Redisson创建的时候会拼上前缀。
redisson_delay_queue_timeout:SANYOU,sorted set数据类型,存放所有延迟任务,按照延迟任务的到期韶光戳(提交任务时的韶光戳 + 延迟韶光)来排序的,以是列表的最前面的第一个元素便是全体延迟行列步队中最早要被实行的任务,这个观点很主要redisson_delay_queue:SANYOU,list数据类型,也是存放所有的任务,但是研究下来创造彷佛没什么用。。SANYOU,list数据类型,被称为目标行列步队,这个里面存放的任务都是已经到了延迟韶光的,可以被消费者获取的任务,以是上面demo中的RBlockingQueue的take方法是从这个目标行列步队中获取到任务的redisson_delay_queue_channel:SANYOU,是一个channel,用来关照客户端开启一个延迟任务有了这些观点之后,再来看看整体的运行事理图
生产者在提交任务的时候将任务放到redisson_delay_queue_timeout:SANYOU中,分数便是提交任务的韶光戳+延迟韶光,便是延迟任务的到期韶光戳客户端会有一个延迟任务,为了区分,后面我都说是客户端延迟任务。这个延迟任务会向Redis Server发送一段lua脚本,Redis实行lua脚本中的命令,并且是原子性的
这段lua脚本紧张干了两件事:
将到了延迟韶光的任务从redisson_delay_queue_timeout:SANYOU中移除,存到SANYOU这个目标行列步队获取到redisson_delay_queue_timeout:SANYOU中目前最早到过期韶光的延迟任务的到期韶光戳,然后发布到redisson_delay_queue_channel:SANYOU这个channel中当客户端监听到redisson_delay_queue_channel:SANYOU这个channel的时,会再次提交一个客户端延迟任务,延迟韶光便是(最早到过期韶光的延迟任务的到期韶光戳)- 当前韶光戳,这个韶光实在也便是redisson_delay_queue_channel:SANYOU中最早到过期韶光的任务还剩余的延迟韶光。
此处可以等待10s,好好想想。。
这样,一旦韶光来到了上面说的最早到过期韶光任务的到期韶光戳,redisson_delay_queue_timeout:SANYOU中上面说的最早到过期韶光的任务已经到期了,客户真个延迟任务也同时到期,于是开始实行lua脚本操作,及时将到了延迟韶光的任务放到目标行列步队中。然后再次发布剩余的延迟任务中最早到期的任务到期韶光戳到channe中,如此循环往来来往,一贯运行下去,担保redisson_delay_queue_timeout:SANYOU中到期的数据能及时放到目标行列步队中。
以是,上陈说了一大堆的紧张的浸染便是担保到了延迟韶光的任务能够及时被放到目标行列步队。
这里再补充两个分外情形,图中没有画出:
第一个便是如果redisson_delay_queue_timeout:SANYOU是新添加的任务(行列步队之前有或者没有任务)是行列步队中最早须要被实行的,也会发布到channel,之后就按时上面说的流程走了。
添加任务代码如下,也是通过lua脚本来的
第二种分外情形便是项目启动的时候会实行一次客户端延迟任务。项目在重启时,由于没有客户端延迟任务的实行,可能会涌现redisson_delay_queue_timeout:SANYOU行列步队中有到期但是没有被放到目标行列步队的可能,重启就实行一次便是为了担保到期的数据能被及时放到目标行列步队中。
3、与第一种方案比较现在来比较一下第一种方案和Redisson的这种方案,看看有没有第一种方案的那些坑。
第一个任务延迟的问题,Redisson方案理论上是没有延迟的,但是当数量增加,消费者消费缓慢这个情形下可能会导致延迟任务消费的延迟。
第二个丢的问题,Redisson方案很大程度上减轻了丢的可能性,由于所有的任务都是存在list和sorted set两种数据类型中,Redis有持久化机制,就算Redis宕机了,也就可能会丢一点点数据。
第三个广播消费任务的问题,这个是不会涌现的,由于每个客户端都是从同一个目标行列步队中获取任务的。
第四个问题是Redis内部channel发布事宜的问题,跟这种方案不沾边,就更不可能存在了。
以是,通过上面的比拟可以看出,Redisson这种实现方案就显得更加的靠谱了。