首页 » Web前端 » phprabitmq逝世信技巧_RabbitMQ的去世信与延迟队列你真的会用吗

phprabitmq逝世信技巧_RabbitMQ的去世信与延迟队列你真的会用吗

duote123 2024-12-14 0

扫一扫用手机浏览

文章目录 [+]

而在中间件RabbitMQ的架构组件中,也存在着跟"去世信行列步队"在功能特性方面险些相同的组件,那便是"延迟行列步队/延时行列步队",同样也具有"延迟、延时处理任务"的功效!

当然啦,这两者还是有一丢丢差异的,最直不雅观确当然是名字上啦,从名字上你就可以看出来两者的"处事风格"是不一样的,详细表示在:

phprabitmq逝世信技巧_RabbitMQ的去世信与延迟队列你真的会用吗

一、创建上的差异:

(1)RabbitMQ的去世信行列步队DeadQueue是由"去世信交流机DLX"+"去世信路由DLK"组成的,当然,可能还会有"TTL",而DLX和DLK又可以绑定指向真正的行列步队RealQueue,这个行列步队RealQueue便是"消费者"真正监听的工具.

phprabitmq逝世信技巧_RabbitMQ的去世信与延迟队列你真的会用吗
(图片来自网络侵删)

(2)而RabbitMQ的延迟/延时行列步队DelayedQueue 则是由普通的行列步队来创建即可,唯一不同的地方在于其绑定的交流机为自定义的交流机,即"CustomExchange",在创建该交流机时只须要指定其的类型为 "x-delayed-message"即可."消费者"真正监听的行列步队也是它本人,即DelayedQueue

画外音:从这一点上看,延迟/延时行列步队的创建相对而言大略一些!

二、功能特性上的差异:

(1)去世信行列步队在实际运用时虽然可以实现"延时、延迟处理任务"的功效,但进入去世信中的却依然保留了行列步队的特性,即"FIFO" ~ 前辈先出,而不管先后进入行列步队中的TTL的值. 即假设先后进入去世信的为A、B、C,各自的TTL分别为:10s、3s、5s,理论上TTL先后到达的顺序是:B、C、A,然后从去世信出来,终极被路由到真正的行列步队中,即被消费的先后顺序该当为:B、C、A,然而现实却是残酷的,其终极消费的的顺序为:A、B、C,即"是怎么进去的,就怎么出来",保留了所谓的FIFO特性.

(2)或许是由于去世信有这种毛病,以是RabbitMQ供应了另一种组件,即"延迟行列步队",它可以很完美的办理上面去世信涌现的问题,即终极消费的的顺序为:B、C、A,我们将不才面用实际的代码进行实战实现与演习训练.

三、插件安装上的差异:

(1)去世信不须要额外的插件

(2)但是延迟行列步队在实际项目利用时却须要在Mq Server中安装一个插件,它的名字叫做:"rabbitmq_delayed_message_exchange",其安装过程可以参考链接: 里面就供应了Windows环境和Linux环境下的插件的安装过程(很大略,只须要不到3步的步骤.)

四、代码的实战实现~RabbitMQ的去世信行列步队

说了这么多,想必有些小伙伴有点不耐烦了,下面我将采取实际的代码对上面所先容的几点差异进行实现与演习训练(代码都是基于Spring Boot2.0搭建的项目环境实现与测试的)

(1)首先,我们须要创建去世信行列步队以及真正的行列步队,并实现干系的绑定:

//构建订单超时未支付的去世信行列步队模型 @Bean public Queue successKillDeadQueue(){ Map<String, Object> argsMap= Maps.newHashMap(); argsMap.put("x-dead-letter-exchange",env.getProperty("mq.kill.item.success.kill.dead.exchange")); argsMap.put("x-dead-letter-routing-key",env.getProperty("mq.kill.item.success.kill.dead.routing.key")); return new Queue(env.getProperty("mq.kill.item.success.kill.dead.queue"),true,false,false,argsMap); } //基本交流机 @Bean public TopicExchange successKillDeadProdExchange(){ return new TopicExchange(env.getProperty("mq.kill.item.success.kill.dead.prod.exchange"),true,false); } //创建基本交流机+基本路由 -> 去世信行列步队 的绑定 @Bean public Binding successKillDeadProdBinding(){ return BindingBuilder.bind(successKillDeadQueue()).to(successKillDeadProdExchange()).with(env.getProperty("mq.kill.item.success.kill.dead.prod.routing.key")); } //真正的行列步队 @Bean public Queue successKillRealQueue(){ return new Queue(env.getProperty("mq.kill.item.success.kill.dead.real.queue"),true); } //去世信交流机 @Bean public TopicExchange successKillDeadExchange(){ return new TopicExchange(env.getProperty("mq.kill.item.success.kill.dead.exchange"),true,false); } //去世信交流机+去世信路由->真正行列步队 的绑定 @Bean public Binding successKillDeadBinding(){ return BindingBuilder.bind(successKillRealQueue()).to(successKillDeadExchange()).with(env.getProperty("mq.kill.item.success.kill.dead.routing.key")); }

(2)将项目运行起来,登录RabbitMQ的后端掌握台,可以看到成功创建了相应的去世信行列步队和真正的行列步队等组件,如下图所示:

(3)紧接着,我们在Controller中建立一个要求方法,用于吸收前端要求过来的,并将该附以TTL值,塞入去世信行列步队中,如下所示:

//去世信行列步队-生产者 @RequestMapping(value = "dead/msg/send",method = RequestMethod.GET) @ResponseBody public BaseResponse sendDQMsg(@RequestParam String msg,@RequestParam Long ttl){ BaseResponse response=new BaseResponse(StatusCode.Success); try { Message realMsg=MessageBuilder.withBody(msg.getBytes("UTF-8")).build(); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); rabbitTemplate.convertAndSend(env.getProperty("mq.kill.item.success.kill.dead.prod.exchange"), env.getProperty("mq.kill.item.success.kill.dead.prod.routing.key"), realMsg, message -> { MessageProperties mp=message.getMessageProperties(); mp.setDeliveryMode(MessageDeliveryMode.PERSISTENT); //TODO:动态设置TTL mp.setExpiration(String.valueOf(ttl)); log.info("去世信行列步队生产者-发出:{} TTL:{}",msg,ttl); return message; }); }catch (Exception e){ response=new BaseResponse(StatusCode.Fail.getCode(),e.getMessage()); } return response; }

(4)末了是写一个Spring Bean类充当消费者,在个中监听"实际行列步队"的:

@RabbitListener(queues = {"${mq.kill.item.success.kill.dead.real.queue}"},containerFactory = "singleListenerContainer") public void consumeExpireOrder(@Payload byte[] msg){ try { log.info("去世信行列步队-监听者-吸收:{}",new String(msg,"UTF-8")); }catch (Exception e){ log.error("去世信行列步队-监听者-发生非常:",e.fillInStackTrace()); } }

末了,我们进入测试环节,打开Postman,前后输入3次不同的要求信息,个中各自的TTL也不尽相同,即A的TTL=50s,B的TTL=20s,C的TTL=10s,终极在Console掌握台等待,你会创造消费者监听的的顺序为:A、B、C,而不是C、B、A,如下图所示:

五、代码的实战实现~RabbitMQ的延迟/延时行列步队

很明显,由于去世信存在的这个毛病,故而其在上面的运用处景中是不太适用的!
即去世信行列步队在 的TTL不一致,且后入去世信的TTL小于前入的TTL的运用处景中是不适用的,而像"订单超时未支付"的运用处景,由于大家都一样,都是固定的30min或者 1h,故而这种场景,去世信是相称适宜的。

因此,为理解决实际项目中"TTL不一致且不固定"的运用处景,我们须要搬上"延迟/延时行列步队"(当然啦,Redisson的延迟/延迟行列步队也是可以实现的!
),下面我们用代码加以实现!

(1)首先是创建"延迟/延时行列步队"等干系的组件,如下所示;

//TODO:RabbitMQ延迟行列步队 @Bean public Queue delayQueue(){ return QueueBuilder.durable(env.getProperty("mq.kill.delay.queue")).build(); } @Bean public CustomExchange delayExchange(){ Map<String,Object> map=Maps.newHashMap(); map.put("x-delayed-type","direct"); return new CustomExchange(env.getProperty("mq.kill.delay.exchange"),"x-delayed-message",true,false,map); } @Bean public Binding delayBinding(){ return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(env.getProperty("mq.kill.delay.routingKey")).noargs(); }

(2)其生产者发送的代码我们仍旧是放在一个Controller的要求方法中,如下所示:

//延迟行列步队-生产者 @RequestMapping(value = "delay/msg/send",method = RequestMethod.GET) @ResponseBody public BaseResponse sendDelayMsg(@RequestParam String msg,@RequestParam Long ttl){ BaseResponse response=new BaseResponse(StatusCode.Success); try { String info=msg; Message realMsg=MessageBuilder.withBody(info.getBytes("UTF-8")).build(); rabbitTemplate.convertAndSend(env.getProperty("mq.kill.delay.exchange"),env.getProperty("mq.kill.delay.routingKey"), realMsg, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { MessageProperties mp=message.getMessageProperties(); mp.setDeliveryMode(MessageDeliveryMode.PERSISTENT); mp.setHeader("x-delay",ttl); log.info("延迟行列步队生产者-发出:{} TTL:{}",msg,ttl); return message; } }); }catch (Exception e){ response=new BaseResponse(StatusCode.Fail.getCode(),e.getMessage()); } return response; }

(3)末了是用于监听延迟行列步队中的消费者的代码,如下所示:

/ 延时行列步队-监听器-消费者 @Author:debug (SteadyJack) @Link: weixin-> debug0868 qq-> 1948831260/@Componentpublic class DelayQueueMqListener { private static final Logger log= LoggerFactory.getLogger(DelayQueueMqListener.class); //监听 @RabbitListener(queues = {"${mq.kill.delay.queue}"}) public void consumeMsg(@Payload byte[] msg){ try { String info=new String(msg,"UTF-8"); log.info("延时行列步队监听到:{} ",info); }catch (Exception e){ log.error("延时行列步队-监听器-消费者-监听-发生非常:",e.fillInStackTrace()); } }}

(4)将项目跑起来,可以看到RabbitMQ的后端掌握台已经建立了该行列步队,如下图所示:

(5)末了,我们打开postman,前后输入3次不同的要求信息,个中各自的TTL也不尽相同,即A的TTL=50s,B的TTL=20s,C的TTL=10s,终极在Console掌握台等待,你会创造消费者监听的的顺序为:A、B、C,而不是C、B、A,如下图所示:

从该运行结果上看,会创造这才是我们真正想要的结果,即按照韶光TTL的大小来决定被消费的先后顺序,而且,你可以看出消费时的韶光跟发出的韶光刚好差 TTL !

在文章的末了的,我们大略总结一下本文所讲的内容,即紧张先容、比拟并实战了RabbitMQ中两款具有"延时、延迟处理任务"功效的组件,即"去世信行列步队"和"延迟行列步队",其差异性紧张表示在:创建上的不同、功能特性的不同、插件安装上的不同等方面。

总体来说,如果是想追求传输的稳定性、可靠性且TTL是固定的话,那么建议选择"去世信行列步队",由于从一开始就在行列步队中待着,等到TTL一到才被路由到真正的行列步队!
而"延迟行列步队"则不同,即发送出去的须要等待 TTL 的韶光才进入"延迟行列步队",如果在等待的期间,Mq Server 宕机了,那很可能就丢失了…..

课程不雅观看: https://www.ixigua.com/i6806173584910713356/

标签:

相关文章

若何插入php文件技巧_php的文件上传

这里首先声明一下这一章的内容比较多,比较难,你要抱着和自己去世磕的态度。细微之处不放过,多敲多练是王道。 学习就像爬山,得一步一步...

Web前端 2024-12-16 阅读0 评论0

大数据时代,浩天科技引领行业新风向

随着互联网、物联网、云计算等技术的飞速发展,大数据已经成为当今世界最具影响力的新兴领域之一。浩天科技作为大数据领域的领军企业,凭借...

Web前端 2024-12-16 阅读0 评论0

大数据时代,理性与挑战

随着互联网技术的飞速发展,大数据已经成为当今时代最具影响力的技术之一。大数据作为一种全新的数据资源,具有规模庞大、类型多样、价值巨...

Web前端 2024-12-16 阅读0 评论0

大数据时代,百万级数据背后的智慧与挑战

随着科技的飞速发展,大数据时代已经到来。在这个时代,海量数据如同涌动的海洋,蕴含着无尽的智慧和机遇。本文将从大数据的定义、特点、应...

Web前端 2024-12-16 阅读0 评论0