大概逻辑:当首次关照、或关照失落败时,设置(重新设置)在 Redis 对应的 Key 的过期韶光,Redis 会监听过期事宜,发生事宜时关照订阅者,订阅者吸收到事宜,做逻辑处理。下面看详细的实现。
首先,修正 Redis 端配置打开功能。由于该功能会花费一些 CPU 性能,以是在配置文件中是 默认关闭 的。Ex表示打开 键过期事宜关照,每当有过期键被删除时发送,订阅者能收到 吸收到被实行事宜的键的名字
notify-keyspace-events Ex
其次,想要在 SpringBoot 中,订阅到 Redis 的事宜,也须要两个步骤:1、继续 org.springframework.data.redis.listener.adapter.MessageListenerAdapter 类,创建自己的监听器

@Componentpublic class OrderExpireEventListener extends MessageListenerAdapter { @Override public void onMessage(Message message, byte[] pattern) { byte[] body = message.getBody(); String msg = redisWrapper.getRedisTemplate().getStringSerializer().deserialize(body); // do something... // 如果关照失落败,须要重新打算下次关照韶光,设置 Redis // 至于数据类型,String 即可 }}
2、将创建的监听器,注册(委托设计模式)给 RedisMessageListenerContainer
@Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory factory, OrderExpireEventListener adapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(factory); container.addMessageListener(adapter, new PatternTopic("__keyevent@0__:expired")); return container;}
这里有个点须要把稳下,那便是 Redis 的键设计。
代码中的 __keyevent@0__:expired 频道匹配意味着,编号为 0 的库中所有键过期韶光都会被订阅到。而这个 Redis 可能不单单只有这个业务在利用,有可能存在其他的业务也在利用。总不可能来个任意的键都会须要去做过期处理。最好是有个通用的设计规则,对 Key 的含义分割。比如:产品固定前缀:业务:业务属性:业务唯一标识
app1:trans:notice:1615283234
代表:系统名为 app1 的 在交易模块 的 订单号为 1615283234 的关照业务的。当监听器解析 Key 失落败时则解释是其他的键过期,不做处理。一旦解析成功,则对进行路由分发。
键搞定了,值就看业务情形而定。如果是关照的话,必须带上当前是第几次关照,根据这个再加上策略才能算出下次关照韶光(该键的过期韶光)。
一样平常大略的方法都存在多少的毛病,这种办法也不例外。引用 Redis 官网的一段话:
Because Redis Pub/Sub is fire and forget currently there is no way to use this feature if your application demands reliable notification of events, that is, if your Pub/Sub client disconnects, and reconnects later, all the events delivered during the time the client was disconnected are lost
意思是说:Redis 目前发布订阅基于 发送即忘 策略,且没有 ACK 机制,意味着客户端重启掉线期间,会丢失。加上 Pub/Sub 没有持久化机制,如果当订阅客户端由于网络缘故原由没收到,想再次重试,这是没法实现的。
如果斯时我还想跟内存行列步队那样子能够 对的延迟韶光进行自动排序,该如何实现呢?除此之外,Pub/Sub 是广播机制,如果存在多个订阅者,那么就会 同时收到键过期的,此时又该如何处理 竞争 问题?
基于 Sorted Set 实现这时候我们要引入 Redis 的 Sorted Set 数据构造。关于这个数据构造大略来说是 支持排序的 Set,靠的是与之关联的浮点值,称为 score 来实现的。值得把稳的是,这个排序并不是放进去的时候排,是拿出来的时候(遐想到 性能 问题,后面有讲)。这里引用一段官网的话:
Moreover, elements in a sorted sets are taken in order (so they are not ordered on request, order is a peculiarity of the data structure used to represent sorted sets).
以是我们只须要将延迟实行的韶光戳作为分数值,就能办理上文所说的排序问题,当然由于该构造是 Redis 的基本功能,自然也支持持久化,也便是办理了丢失问题。
大概设计如下:
首先看看,消费者线程该如何实现(SpringBoot 环境下)
@Slf4j@Componentpublic class ConsumerTask { @Autowired RedisTemplate<String, Object> redisTemplate;// Sorted Set 行列步队键 private static String KEY = "TEST:ZSET"; @Scheduled(cron = "0/1 ?") public void run() { try { this.doRun(); } catch (Exception e) { log.error("消费非常", e); } } private void doRun() { // zrange 分数从小到大 zrevrange 分数从大到小 // 拿出最新的待处理 Set<ZSetOperations.TypedTuple<Object>> tuples = redisTemplate.opsForZSet().rangeWithScores(KEY, 0, 0); if (CollectionUtils.isEmpty(tuples)) { log.info("行列步队无数据"); return ; } ZSetOperations.TypedTuple<Object> typedTuple = tuples.iterator().next(); if (typedTuple == null) { log.info("行列步队无数据"); return ; } Double score = typedTuple.getScore(); Object value = typedTuple.getValue(); if (System.currentTimeMillis() < score) { log.info("未到实行韶光..."); return ; } Long zrem = redisTemplate.opsForZSet().remove(KEY, value); if (Long.compare(1L, zrem) == 0) { log.info("删除数据成功,开始处理,数据:{}", value.toString()); // do someting... // 如果关照失落败,须要重新打算关照韶光(score 值)并在 Redis 设置(ZADD)该 } else { log.info("被其他的消费端抢占,不处理..."); } }}
跟之前的 推模式 比较,这次采取的是 拉模式,只管在多个消费端可能同时拿到同一个,不过这里通过 Long zrem = redisTemplate.opsForZSet().remove(KEY, value) 这方法,利用了 rem 命令的原子性 办理了竞争问题,也便是说只会有一个客户端删除成功。
仔细不雅观察的话,可以看到我们拿到的韶光戳是 Long 类型的,但是 Spring 供应的 Sorted Set 操作 api 参数是 Double 类型
org.springframework.data.redis.core.ZSetOperations#add(K, V, double)org.springframework.data.redis.core.ZSetOperations#rangeByScore(K, double, double)
那会不会有精确丢失问题?以是输出看下最大最小值
System.out.println(Long.MAX_VALUE); // 2 的 64 次方-1,19 个数位System.out.println(Long.MIN_VALUE); // 负的 2 的 64 次方 System.out.println(Double.MAX_VALUE); // 2 的 1024 次方 -1,308 个数位System.out.println(Double.MIN_VALUE); // 2 的 -1074次方
可以看到 Double 最大值远远大于 Long 类型,加上韶光戳不会有负数,以是可以放心转换。
在这里不演示生产者代码,过于大略,便是调用 zadd 命令而已。这里也须要把稳,如果是异步关照场景 zadd 的值必须带上这是第几次关照,就如前面的方案一样。
到此为止,第一种方案存在的问题在第二种方案全部办理了。下面看一种网上的比较多的实现办法。
基于 Sorted Set、List 实现跟上一种比较多了一个 List 数据构造。先来看下加入 List 之后的全体设计图
不得不说刚开始瞥见这种方案时,是存在迷惑的。由于上面的 Sorted Set 已经实现了功能,为什么要引入 List 数据构造增加系统的繁芜度?唯一能看到的好处便是 List 数据构造供应了 壅塞 操作?经由与同事谈论后,得出下面几点结论:
客户端拉取消息 掌握并发的步骤减少。当利用 List 时,只须要调用一个命令就可以办理竞争问题,而利用 Sorted Set 则须要利用 zrange 和 zrem 两条命令来实现,比较之下,多交互一次网络,且实现更繁芜。客户端拉取消息的办法增多,同时,行列步队供应 壅塞式 访问,同样也 减少 了客户端由于无限循环造成的 CPU 摧残浪费蹂躏。行列步队 pop 操作比 zrange 操为难刁难 Redis 来说性能开销更小,在这种频繁拉取的情形下更加得当。这里须要把稳的一点是,搬运操作有多个命令一起完成,如下伪代码:
// 1、从 Sorted Set 中拿出 score 值在 前五秒 到 目前(包含现在)的所有元素Date now = new Date();Date fiveSecondBefore = DateUtils.addSeconds(now, -5);Set<Object> objects = redisTemplate.opsForZSet().rangeByScore("Sorted Set:Key", fiveSecondBefore.getTime(), now.getTime());if (CollectionUtils.isEmpty(objects)) {return ;}// 2、将这些元素从 Sorted Set 中删除Long removeResult = redisTemplate.opsForZSet().remove("Sorted Set:Key", objects);if (Long.compare(removeResult, objects.size()) != 0) {return ;}// 3、将这些元素放进 ListLong result = redisTemplate.opsForList().leftPushAll("List:Key", objects);
rangeByScore、remove、leftPushAll 这几个操作不具有原子性,可能在中途发生非常、宕机等情形,导致在搬运过程中丢失或重复搬运。好在 Redis 供应了实行 lua 脚本功能,会担保同一脚本以原子性(atomic) 的办法实行,以是我们只须要原子性操作的多个步骤整合在自定义 lua 脚本中即可,如下:
local list_key = KEYS[1];local sorted_set_key = KEYS[2];local now = ARGV[1];local sorted_set_size = redis.call('ZCARD', sorted_set_key)if (tonumber(sorted_set_size) <= 0) then returnendlocal members = redis.call('ZRANGEBYSCORE', sorted_set_key, 0, tonumber(now));if (next(members) == nil) then returnendfor key,value in ipairs(members)do local zscore = redis.call('ZSCORE',sorted_set_key,value); if (tonumber(now) < tonumber(zscore)) then return zscore; end redis.call('ZREM', sorted_set_key, value); redis.call('RPUSH', list_key, value);endlocal topmember = redis.call('ZRANGE', sorted_set_key, 0, 0);local nextvalue = next(topmember);if (nextvalue == nil) then returnendfor k,v in ipairs(topmember)do return redis.call('ZSCORE', sorted_set_key, v);end
下面是 SpringBoot 定时调用该 lua 脚本进行搬运的示例代码:
@Scheduled(cron = "0/1 ?")public void run4() { ClassPathResource resource = new ClassPathResource("sorted_set_to_list.lua"); String luaScript = FileUtils.readFileToString(resource.getFile()); DefaultRedisScript<String> redisScript = new DefaultRedisScript<>(luaScript, String.class); // List<String> keys = Lists.newArrayList("TEST:LIST", "TEST:ZSET"); String now = String.valueOf(System.currentTimeMillis()); // 把稳这里的序列化器,须要换成 StringSerializer // 更换的默认的 Jackson2JsonRedisSerializer String executeResult = redisTemplate.execute(redisScript, redisTemplate.getStringSerializer(), redisTemplate.getStringSerializer(), keys, now); log.info("lua 脚本实行结果:{}", executeResult);}
末了再来看看消费者该如何实现
@Component@Slf4jpublic class ListConsumer implements ApplicationListener<ContextRefreshedEvent> { @Override public void onApplicationEvent(ContextRefreshedEvent event) { Executors.newSingleThreadExecutor().submit(new PopEventRunner()); } private static class PopEventRunner implements Runnable { @Override public void run() { RedisTemplate<String, Object> redisTemplate = (RedisTemplate<String, Object>) SpringUtil.getBean3("redisTemplate"); while (true) { try { Object leftPop = redisTemplate.opsForList().leftPop("TEST:LIST", Integer.MAX_VALUE, TimeUnit.SECONDS); if (leftPop == null) { continue ; } // do something... // 当关照失落败时,重新打算关照韶光并设置(ZADD)Redis } catch (Exception e) { log.error("监听非常", e); sleep(5); // 发生非常睡五秒 } } } }}
监听容器的刷新事宜,创建监听单线程,无限循环壅塞监听行列步队。相对付前一种实现方案,该方案确实更加的贴合。但仍有优化的余地,比如:
搬运线程的机遇,目前频率为 1 秒,以是极度情形会有 1 秒韶光的延迟。且在 Sorted Set 为空情形下,对 CPU 是一种摧残浪费蹂躏。小结相对前一篇内存实现,Redis 这种办法更加的可靠,且在许可一点韶光的偏差和捐躯一点可靠性下,不失落为一种 性价比高 的选择。如果当前景便是不许可有这些丢失,那还有什么办理方案吗?到时候我们再来讲终极杀招,利用 RabbitMQ 来实现。
作者:午饭吃什么链接:https://juejin.cn/post/6938779316847116325来源:掘金