首页 » Web前端 » phpredispubsub技巧_异步结果通知实现基于Redis实现我这操作很可以

phpredispubsub技巧_异步结果通知实现基于Redis实现我这操作很可以

访客 2024-12-03 0

扫一扫用手机浏览

文章目录 [+]

大概逻辑:当首次关照、或关照失落败时,设置(重新设置)在 Redis 对应的 Key 的过期韶光,Redis 会监听过期事宜,发生事宜时关照订阅者,订阅者吸收到事宜,做逻辑处理。
下面看详细的实现。

首先,修正 Redis 端配置打开功能。
由于该功能会花费一些 CPU 性能,以是在配置文件中是 默认关闭 的。
Ex表示打开 键过期事宜关照,每当有过期键被删除时发送,订阅者能收到 吸收到被实行事宜的键的名字

phpredispubsub技巧_异步结果通知实现基于Redis实现我这操作很可以

notify-keyspace-events Ex

其次,想要在 SpringBoot 中,订阅到 Redis 的事宜,也须要两个步骤:1、继续 org.springframework.data.redis.listener.adapter.MessageListenerAdapter 类,创建自己的监听器

phpredispubsub技巧_异步结果通知实现基于Redis实现我这操作很可以
(图片来自网络侵删)

@Componentpublic class OrderExpireEventListener extends MessageListenerAdapter { @Override public void onMessage(Message message, byte[] pattern) { byte[] body = message.getBody(); String msg = redisWrapper.getRedisTemplate().getStringSerializer().deserialize(body); // do something... // 如果关照失落败,须要重新打算下次关照韶光,设置 Redis // 至于数据类型,String 即可 }}

2、将创建的监听器,注册(委托设计模式)给 RedisMessageListenerContainer

@Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory factory, OrderExpireEventListener adapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(factory); container.addMessageListener(adapter, new PatternTopic("__keyevent@0__:expired")); return container;}

这里有个点须要把稳下,那便是 Redis 的键设计。

代码中的 __keyevent@0__:expired 频道匹配意味着,编号为 0 的库中所有键过期韶光都会被订阅到。
而这个 Redis 可能不单单只有这个业务在利用,有可能存在其他的业务也在利用。
总不可能来个任意的键都会须要去做过期处理。
最好是有个通用的设计规则,对 Key 的含义分割。
比如:产品固定前缀:业务:业务属性:业务唯一标识

app1:trans:notice:1615283234

代表:系统名为 app1 的 在交易模块 的 订单号为 1615283234 的关照业务的。
当监听器解析 Key 失落败时则解释是其他的键过期,不做处理。
一旦解析成功,则对进行路由分发。

键搞定了,值就看业务情形而定。
如果是关照的话,必须带上当前是第几次关照,根据这个再加上策略才能算出下次关照韶光(该键的过期韶光)。

一样平常大略的方法都存在多少的毛病,这种办法也不例外。
引用 Redis 官网的一段话:

Because Redis Pub/Sub is fire and forget currently there is no way to use this feature if your application demands reliable notification of events, that is, if your Pub/Sub client disconnects, and reconnects later, all the events delivered during the time the client was disconnected are lost

意思是说:Redis 目前发布订阅基于 发送即忘 策略,且没有 ACK 机制,意味着客户端重启掉线期间,会丢失。
加上 Pub/Sub 没有持久化机制,如果当订阅客户端由于网络缘故原由没收到,想再次重试,这是没法实现的。

如果斯时我还想跟内存行列步队那样子能够 对的延迟韶光进行自动排序,该如何实现呢?除此之外,Pub/Sub 是广播机制,如果存在多个订阅者,那么就会 同时收到键过期的,此时又该如何处理 竞争 问题?

基于 Sorted Set 实现

这时候我们要引入 Redis 的 Sorted Set 数据构造。
关于这个数据构造大略来说是 支持排序的 Set,靠的是与之关联的浮点值,称为 score 来实现的。
值得把稳的是,这个排序并不是放进去的时候排,是拿出来的时候(遐想到 性能 问题,后面有讲)。
这里引用一段官网的话:

Moreover, elements in a sorted sets are taken in order (so they are not ordered on request, order is a peculiarity of the data structure used to represent sorted sets).

以是我们只须要将延迟实行的韶光戳作为分数值,就能办理上文所说的排序问题,当然由于该构造是 Redis 的基本功能,自然也支持持久化,也便是办理了丢失问题。

大概设计如下:

首先看看,消费者线程该如何实现(SpringBoot 环境下)

@Slf4j@Componentpublic class ConsumerTask { @Autowired RedisTemplate<String, Object> redisTemplate;// Sorted Set 行列步队键 private static String KEY = "TEST:ZSET"; @Scheduled(cron = "0/1 ?") public void run() { try { this.doRun(); } catch (Exception e) { log.error("消费非常", e); } } private void doRun() { // zrange 分数从小到大 zrevrange 分数从大到小 // 拿出最新的待处理 Set<ZSetOperations.TypedTuple<Object>> tuples = redisTemplate.opsForZSet().rangeWithScores(KEY, 0, 0); if (CollectionUtils.isEmpty(tuples)) { log.info("行列步队无数据"); return ; } ZSetOperations.TypedTuple<Object> typedTuple = tuples.iterator().next(); if (typedTuple == null) { log.info("行列步队无数据"); return ; } Double score = typedTuple.getScore(); Object value = typedTuple.getValue(); if (System.currentTimeMillis() < score) { log.info("未到实行韶光..."); return ; } Long zrem = redisTemplate.opsForZSet().remove(KEY, value); if (Long.compare(1L, zrem) == 0) { log.info("删除数据成功,开始处理,数据:{}", value.toString()); // do someting... // 如果关照失落败,须要重新打算关照韶光(score 值)并在 Redis 设置(ZADD)该 } else { log.info("被其他的消费端抢占,不处理..."); } }}

跟之前的 推模式 比较,这次采取的是 拉模式,只管在多个消费端可能同时拿到同一个,不过这里通过 Long zrem = redisTemplate.opsForZSet().remove(KEY, value) 这方法,利用了 rem 命令的原子性 办理了竞争问题,也便是说只会有一个客户端删除成功。

仔细不雅观察的话,可以看到我们拿到的韶光戳是 Long 类型的,但是 Spring 供应的 Sorted Set 操作 api 参数是 Double 类型

org.springframework.data.redis.core.ZSetOperations#add(K, V, double)org.springframework.data.redis.core.ZSetOperations#rangeByScore(K, double, double)

那会不会有精确丢失问题?以是输出看下最大最小值

System.out.println(Long.MAX_VALUE); // 2 的 64 次方-1,19 个数位System.out.println(Long.MIN_VALUE); // 负的 2 的 64 次方 System.out.println(Double.MAX_VALUE); // 2 的 1024 次方 -1,308 个数位System.out.println(Double.MIN_VALUE); // 2 的 -1074次方

可以看到 Double 最大值远远大于 Long 类型,加上韶光戳不会有负数,以是可以放心转换。

在这里不演示生产者代码,过于大略,便是调用 zadd 命令而已。
这里也须要把稳,如果是异步关照场景 zadd 的值必须带上这是第几次关照,就如前面的方案一样。

到此为止,第一种方案存在的问题在第二种方案全部办理了。
下面看一种网上的比较多的实现办法。

基于 Sorted Set、List 实现

跟上一种比较多了一个 List 数据构造。
先来看下加入 List 之后的全体设计图

不得不说刚开始瞥见这种方案时,是存在迷惑的。
由于上面的 Sorted Set 已经实现了功能,为什么要引入 List 数据构造增加系统的繁芜度?唯一能看到的好处便是 List 数据构造供应了 壅塞 操作?经由与同事谈论后,得出下面几点结论:

客户端拉取消息 掌握并发的步骤减少。
当利用 List 时,只须要调用一个命令就可以办理竞争问题,而利用 Sorted Set 则须要利用 zrange 和 zrem 两条命令来实现,比较之下,多交互一次网络,且实现更繁芜。
客户端拉取消息的办法增多,同时,行列步队供应 壅塞式 访问,同样也 减少 了客户端由于无限循环造成的 CPU 摧残浪费蹂躏。
行列步队 pop 操作比 zrange 操为难刁难 Redis 来说性能开销更小,在这种频繁拉取的情形下更加得当。

这里须要把稳的一点是,搬运操作有多个命令一起完成,如下伪代码:

// 1、从 Sorted Set 中拿出 score 值在 前五秒 到 目前(包含现在)的所有元素Date now = new Date();Date fiveSecondBefore = DateUtils.addSeconds(now, -5);Set<Object> objects = redisTemplate.opsForZSet().rangeByScore("Sorted Set:Key", fiveSecondBefore.getTime(), now.getTime());if (CollectionUtils.isEmpty(objects)) {return ;}// 2、将这些元素从 Sorted Set 中删除Long removeResult = redisTemplate.opsForZSet().remove("Sorted Set:Key", objects);if (Long.compare(removeResult, objects.size()) != 0) {return ;}// 3、将这些元素放进 ListLong result = redisTemplate.opsForList().leftPushAll("List:Key", objects);

rangeByScore、remove、leftPushAll 这几个操作不具有原子性,可能在中途发生非常、宕机等情形,导致在搬运过程中丢失或重复搬运。
好在 Redis 供应了实行 lua 脚本功能,会担保同一脚本以原子性(atomic) 的办法实行,以是我们只须要原子性操作的多个步骤整合在自定义 lua 脚本中即可,如下:

local list_key = KEYS[1];local sorted_set_key = KEYS[2];local now = ARGV[1];local sorted_set_size = redis.call('ZCARD', sorted_set_key)if (tonumber(sorted_set_size) <= 0) then returnendlocal members = redis.call('ZRANGEBYSCORE', sorted_set_key, 0, tonumber(now));if (next(members) == nil) then returnendfor key,value in ipairs(members)do local zscore = redis.call('ZSCORE',sorted_set_key,value); if (tonumber(now) < tonumber(zscore)) then return zscore; end redis.call('ZREM', sorted_set_key, value); redis.call('RPUSH', list_key, value);endlocal topmember = redis.call('ZRANGE', sorted_set_key, 0, 0);local nextvalue = next(topmember);if (nextvalue == nil) then returnendfor k,v in ipairs(topmember)do return redis.call('ZSCORE', sorted_set_key, v);end

下面是 SpringBoot 定时调用该 lua 脚本进行搬运的示例代码:

@Scheduled(cron = "0/1 ?")public void run4() { ClassPathResource resource = new ClassPathResource("sorted_set_to_list.lua"); String luaScript = FileUtils.readFileToString(resource.getFile()); DefaultRedisScript<String> redisScript = new DefaultRedisScript<>(luaScript, String.class); // List<String> keys = Lists.newArrayList("TEST:LIST", "TEST:ZSET"); String now = String.valueOf(System.currentTimeMillis()); // 把稳这里的序列化器,须要换成 StringSerializer // 更换的默认的 Jackson2JsonRedisSerializer String executeResult = redisTemplate.execute(redisScript, redisTemplate.getStringSerializer(), redisTemplate.getStringSerializer(), keys, now); log.info("lua 脚本实行结果:{}", executeResult);}

末了再来看看消费者该如何实现

@Component@Slf4jpublic class ListConsumer implements ApplicationListener<ContextRefreshedEvent> { @Override public void onApplicationEvent(ContextRefreshedEvent event) { Executors.newSingleThreadExecutor().submit(new PopEventRunner()); } private static class PopEventRunner implements Runnable { @Override public void run() { RedisTemplate<String, Object> redisTemplate = (RedisTemplate<String, Object>) SpringUtil.getBean3("redisTemplate"); while (true) { try { Object leftPop = redisTemplate.opsForList().leftPop("TEST:LIST", Integer.MAX_VALUE, TimeUnit.SECONDS); if (leftPop == null) { continue ; } // do something... // 当关照失落败时,重新打算关照韶光并设置(ZADD)Redis } catch (Exception e) { log.error("监听非常", e); sleep(5); // 发生非常睡五秒 } } } }}

监听容器的刷新事宜,创建监听单线程,无限循环壅塞监听行列步队。
相对付前一种实现方案,该方案确实更加的贴合。
但仍有优化的余地,比如:

搬运线程的机遇,目前频率为 1 秒,以是极度情形会有 1 秒韶光的延迟。
且在 Sorted Set 为空情形下,对 CPU 是一种摧残浪费蹂躏。
小结

相对前一篇内存实现,Redis 这种办法更加的可靠,且在许可一点韶光的偏差和捐躯一点可靠性下,不失落为一种 性价比高 的选择。
如果当前景便是不许可有这些丢失,那还有什么办理方案吗?到时候我们再来讲终极杀招,利用 RabbitMQ 来实现。

作者:午饭吃什么链接:https://juejin.cn/post/6938779316847116325来源:掘金

标签:

相关文章

谷歌外链SEO提升网站排名的关键步骤

搜索引擎优化(SEO)已经成为网站建设和运营中不可或缺的一部分。其中,谷歌外链SEO更是备受关注。本文将从关键词布局、内容创作、外...

Web前端 2025-04-09 阅读0 评论0

贵州SEO优化步骤关键词布局的艺术与方法

搜索引擎优化(SEO)已成为企业提升网站排名、吸引潜在客户的重要手段。贵州作为我国西南地区的重要省份,拥有丰富的自然资源和独特的民...

Web前端 2025-04-09 阅读0 评论0

连江SEO优化哪家强本地优质服务提供商

搜索引擎优化(SEO)已经成为企业提升网络曝光度、获取潜在客户的重要手段。在众多SEO服务商中,如何选择一家优质的SEO优化公司成...

Web前端 2025-04-09 阅读0 评论0