首页 » SEO优化 » rabbitmqphp花费技巧_快速掌握RabbitMQ四两莳花费模式和QOS的C实现

rabbitmqphp花费技巧_快速掌握RabbitMQ四两莳花费模式和QOS的C实现

访客 2024-12-06 0

扫一扫用手机浏览

文章目录 [+]

  生产者代码如下:

static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在设备ip,这里便是本机 HostName = "127.0.0.1", UserName = "wyy",//用户名 Password = "123321"//密码 }; //创建连接connection using (var connection = factory.CreateConnection()) { //创建通道channel using (var channel = connection.CreateModel()) { Console.WriteLine("生产者准备就绪...."); string message = ""; //在掌握台输入,按enter键发送 while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase)) { message = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "myexchange", routingKey: "mykey", basicProperties: null, body: body); Console.WriteLine($"【{message}】发送到Broke成功!
"); } } } Console.ReadKey(); }
1 EventingBasicConsumer先容

  EventingBasicConsumer是发布/订阅模式的消费者,即只要订阅的queue中有了新,Broker就会立即把推送给消费者,这种模式可以担保及时地被消费者吸收到。
EventingBasicConsumer是长连接的:只须要创建一个Connection,然后在Connection的根本上创建通道channel,的发送都是通过channel来实行的,这样可以减少Connection的创建,比较节省资源。
前边我们已经利用了很多次EventingBaiscConsumer,这里大略展示一下利用的办法,注释比较详细,就不多先容了。

rabbitmqphp花费技巧_快速掌握RabbitMQ四两莳花费模式和QOS的C实现

static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在设备ip,这里便是本机 HostName = "127.0.0.1", UserName = "wyy",//用户名 Password = "123321"//密码 }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { #region EventingBasicConsumer //定义一个EventingBasicConsumer消费者 var consumer = new EventingBasicConsumer(channel); //吸收到时触发的事宜 consumer.Received += (model, ea) => { Console.WriteLine(Encoding.UTF8.GetString(ea.Body)); }; Console.WriteLine("消费者准备就绪...."); //调用消费方法 queue指定消费的行列步队,autoAck指定是否自动确认,consumer便是消费者工具 channel.BasicConsume(queue: "myqueue", autoAck: true, consumer: consumer); Console.ReadKey(); #endregion } } }

  实行程序,结果如下,只要我们在生产者端发送一条到Broker,Broker就会立即推送到消费者。

rabbitmqphp花费技巧_快速掌握RabbitMQ四两莳花费模式和QOS的C实现
(图片来自网络侵删)

2 BasicGet方法先容

  我们知道利用EventingBasicConsumer可以让消费者最及时地获取到,利用EventingBasicConsumer模式时消费者在被动的吸收,等于推送过来的,Broker是主动的一方。
那么能不能让消费者作为主动的一方,消费者什么时候想要了,就自己发送一个要求去找Broker要?答案利用Get办法。
Get办法是短连接的,消费者每次想要的时候,首先建立一个Connection,发送一次要求,Broker吸收到要求后,相应一条给消费者,然后断开连接。
RabbitMQ中Get办法和HTTP的要求相应流程基本一样,Get办法的实时性比较差,也比较耗费资源。
我们看一个Get办法的栗子:

static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在设备ip,这里便是本机 HostName = "127.0.0.1", UserName = "wyy",//用户名 Password = "123321"//密码 }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { #region BasicGet //通过BasicGet获取消息,开启自动确认 BasicGetResult result = channel.BasicGet(queue:"myqueue",autoAck:true); Console.WriteLine($"吸收到【{Encoding.UTF8.GetString(result.Body)}】"); //打印exchange和routingKey Console.WriteLine($"exchange:{result.Exchange},routingKey:{result.RoutingKey}"); Console.ReadLine(); #endregion } } }

  实行天生者和消费者程序,生产者发送三条,而消费者只获取了一条,这是由于channel.BasicGet()一次只获取一条,获取到后就把连接断开了。

  补充:RabbitMQ还有一莳花费者QueueBaicConsumer,用法和Get办法类似,QueueBaicConsumer在官方API中标记已过期,这里不再先容,有兴趣的小伙伴可以自己研究下。

2 Qos先容

  在先容Qos(做事质量)前我们先看一下利用EventingBasicConsumer的一个坑,利用代码演示一下,大略修正一下上边栗子的代码

  生产者代码如下,这里生产者发送了100条消费到Broker

static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在设备ip,这里便是本机 HostName = "127.0.0.1", UserName = "wyy",//用户名 Password = "123321"//密码 }; //创建连接connection using (var connection = factory.CreateConnection()) { //创建通道channel using (var channel = connection.CreateModel()) { Console.WriteLine("生产者准备就绪...."); #region 添加100条数据 for (int i = 0; i < 100; i++) { channel.BasicPublish(exchange: "myexchange", routingKey: "mykey", basicProperties: null, body: Encoding.UTF8.GetBytes($"第{i}条")); } #endregion } } Console.ReadKey(); }

  消费端代码如下,消费端采取的是自动确认(autoAck=true),即Broker把发送给消费者就会确认成功,不关心有没有处理完成,假设每条处理须要5s

static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在设备ip,这里便是本机 HostName = "127.0.0.1", UserName = "wyy",//用户名 Password = "123321"//密码 }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { #region EventingBasicConsumer //定义消费者 var consumer = new EventingBasicConsumer(channel); //吸收到时实行的任务 consumer.Received += (model, ea) => { Thread.Sleep(1000 5); Console.WriteLine($"处理【{Encoding.UTF8.GetString(ea.Body)}】完成"); }; Console.WriteLine("消费者准备就绪...."); //处理 channel.BasicConsume(queue: "myqueue", autoAck: true, consumer: consumer); Console.ReadKey(); #endregion } } }

  我们先实行生产者程序,实行完成后创造queue中有了100条ready状态的,表示成功发送到了行列步队

  接着我们实行消费者,消费者实行后,Broker会把一股脑发送过去,通过Web管理界面我们看到queue中已经没有了,如下:

  我们再看一下消费者的实行情形,创造消费者仅仅处理了4条,还有96条没有处理,这便是说消费者没有处理完,但是queue中的都已经删除了。
如果这时消费者挂掉了,所有未处理的都会丢失,在某些场合中,丢失数据的后果是十分严重的。

  对付上边的问题,我们可能会想到利用显示确认来担保不会丢失:将BasicConsume方法的autoAck设置为false,然后处理一条夹帐动确认一下,这样的话已处理的在吸收到确认回执时被删除,未处理的以Unacked状态存放在queue中。
如果消费者挂了,Unacked状态的会自动重新变成Ready状态,如此一来就不用担心丢失了,修正消费者代码如下:

static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在设备ip,这里便是本机 HostName = "127.0.0.1", UserName = "wyy",//用户名 Password = "123321"//密码 }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { #region EventingBasicConsumer //定义消费者 var consumer = new EventingBasicConsumer(channel); //吸收到时实行的任务 consumer.Received += (model, ea) => { Thread.Sleep(1000 5); //处理完成,手动确认 channel.BasicAck(ea.DeliveryTag, false); Console.WriteLine($"处理【{Encoding.UTF8.GetString(ea.Body)}】完成"); }; Console.WriteLine("消费者准备就绪...."); //处理 channel.BasicConsume(queue: "myqueue", autoAck: false, consumer: consumer); Console.ReadKey(); #endregion } } }

  重新实行生产者,然后实行消费者,Web管理个中看到结果如下:在实行消费者时,会一股脑的发送给消费者,然后状态都变成Unacked,消费者实行完一条数据手动确认后,这条从queue中删除。
当消费者挂了(我们可以直接把消费者关掉来仿照挂掉的情形),没有处理的会自动从Unacked状态变成Ready状态,不用担心丢失了!
打开Web管理界面看到状态如下:

  通过显式确认的办法可以办理丢失的问题,但这种办法也存在一些问题:①当有十万,百万条时,一股脑的把发送给消费者,可能会造成消费者内存爆满;②当处理比较慢的时,单一的消费者处理这些可能很永劫光,我们自然想到再添加一个消费者加快的处理速率,但是这些都被原来的消费者吸收了,状态为Unacked,以是这些不会再发送给新添加的消费者。
针对这些问题怎么去办理呢?

  RabbitMQ供应的Qos(做事质量)可以完美办理上边的问题,利用Qos时,Broker不会再把一股脑的发送给消费者,我们可以设置每次传输给消费者的条数n,消费者把这n条处理完成后,再获取n条数据进行处理,这样就不用担心丢失、做事端内存爆满的问题了,由于没有发送的状态都是Ready,以是当我们新增一个消费者时,也可以立即发送给新增的消费者。
把稳Qos只有在消费端利用显示确认时才有效,利用Qos的办法十分大略,在消费端调用 channel.BasicQos() 方法即可,修正做事端代码如下:

static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在设备ip,这里便是本机 HostName = "127.0.0.1", UserName = "wyy",//用户名 Password = "123321"//密码 }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.BasicQos(prefetchSize: 0, prefetchCount: 2, global: false); #region EventingBasicConsumer //定义消费者 var consumer = new EventingBasicConsumer(channel); //吸收到时实行的任务 consumer.Received += (model, ea) => { Thread.Sleep(1000 5); //处理完成,手动确认 channel.BasicAck(ea.DeliveryTag, false); Console.WriteLine($"处理【{Encoding.UTF8.GetString(ea.Body)}】完成"); }; Console.WriteLine("消费者准备就绪...."); //处理 channel.BasicConsume(queue: "myqueue", autoAck: false, consumer: consumer); Console.ReadKey(); #endregion } } }

  清空一下queue中的,重新启动生产者,然后启动消费者,打开Web管理界面,看到状态如下所示:

   channel.BasicQos(prefetchSize: 0, prefetchCount: 2, global: false) 方法中参数prefetchSize为预取的长度,一样平常设置为0即可,表示长度不限;prefetchCount表示预取的条数,即发送的最大条数;global表示是否在Connection中全局设置,true表示Connetion下的所有channel都设置为这个配置。

3 小结

  本节演示了RabbitMQ的两莳花费者:EventingBasicConsumer和BasicGet。
EventingBasicConsumer是基于长连接,发布订阅模式的消费办法,节省资源且实时性好,这是开拓中最常用的消费模式。
在一些须要消费者主动获取消息的场合,我们可以利用Get办法,Get办法是基于短连接的,要求相应模式的消费办法。

  Qos可以设置消费者一次吸收的最大条数,能够办理拥堵时造成的消费者内存爆满问题。
Qos也比较适用于耗时任务行列步队,当任务行列步队中的任务很多时,利用Qos后我们可以随时添加新的消费者来提高任务的处理效率。

标签:

相关文章

php的dom详解技巧_PHP 编程XML DOM

DOM 是什么?W3C DOM 供应了针对 HTML 和 XML 文档的标准工具集,以及用于访问和操作这些文档的标准接口。W3C...

SEO优化 2024-12-14 阅读0 评论0