首页 » 网站建设 » ubuntuphprabbitmq技巧_NET Core运用RabbitMQ

ubuntuphprabbitmq技巧_NET Core运用RabbitMQ

访客 2024-11-26 0

扫一扫用手机浏览

文章目录 [+]

 RabbitMQ是一个开源的,基于AMQP(Advanced Message Queuing Protocol)协议的完全的可复用的企业级队,RabbitMQ可以实现点对点,发布订阅等处理模式。

RabbitMQ是一个开源的AMQP实现,做事器端用Erlang措辞编写,支持Linux,windows,macOS,FreeBSD等操作系统,同时也支持很多措辞,如:Python,Java,Ruby,PHP,C#,JavaScript,Go,Elixir,Objective-C,Swift等。

ubuntuphprabbitmq技巧_NET Core运用RabbitMQ

RabbitMQ安装

我利用的环境是ubuntu18.04,

ubuntuphprabbitmq技巧_NET Core运用RabbitMQ
(图片来自网络侵删)
RabbitMq须要Erlang措辞的支持,在安装RabbitMq之前须要安装Erlangsudo apt-get install erlang-nox更新源sudo apt-get update安装RabbitMqsudo apt-get install rabbitmq-server添加users用户,密码设置为adminsudo rabbitmqctl add_user users admin给添加的用户授予权限sudo rabbitmqctl set_user_tags users administrator授予virtual host中所有资源的配置、写、读权限以便管理个中的资源rabbitmqctl set_permissions -p users '.' '.' '.'官方供应的一个web管理工具(rabbitmq_management),定位到Rabbitmq安装目录然后启动web掌握台sudo rabbitmq-plugins enable rabbitmq_management成功后可以在浏览器输入http://localhost:15672/查看RabbitMq信息RabbitMQ常用命令启动sudo rabbitmq-server start停滞sudo rabbitmq-server stop重启sudo rabbitmq-server restart查看状态sudo rabbitmqctl status查看所有用户rabbitmqctl list_users查看用户权限rabbitmqctl list_user_permissions users删除用户权限rabbitmqctl clear_permissions [-p VHostPath] users删除用户rabbitmqctl delete_user users修正用户密码rabbitmqctl change_password users newpassword.NET Core 利用RabbitMQ通过install-package rabbitmq.client命令或nuget安装rabbitmq.client包

生产者实现

using System;using System.Text;using RabbitMQ.Client;namespace RabbitMQ{ class MainClass { static void Main(string[] args) { Console.WriteLine("生产者"); IConnectionFactory factory = new ConnectionFactory//创建连接工厂工具 { HostName = "106.12.90.208",//IP地址 Port = 5672,//端口号 UserName = "admin",//用户账号 Password = "admin"//用户密码 }; IConnection con = factory.CreateConnection();//创建连接工具 IModel channel = con.CreateModel();//创建连接会话工具 string name = "demo"; //声明一个行列步队 channel.QueueDeclare( queue: name,//行列步队名称 durable: false,//是否持久化,true持久化,行列步队会保存磁盘,做事看重启时可以担保不丢失干系信息。
exclusive: false,//是否排他,true排他的,如果一个行列步队声明为排他行列步队,该行列步队仅对首次声明它的连接可见,并在连接断开时自动删除. autoDelete: false,//是否自动删除。
true是自动删除。
自动删除的条件是:致少有一个消费者连接到这个行列步队,之后所有与这个行列步队连接的消费者都断开时,才会自动删除. arguments: null //设置行列步队的一些其它参数 ); string str = string.Empty; do { Console.WriteLine("发送内容:"); str = Console.ReadLine(); //内容 byte[] body = Encoding.UTF8.GetBytes(str); //发送 channel.BasicPublish("", name, null, body); Console.WriteLine("成功发送:" + str); }while(str.Trim().ToLower() != "exit"); con.Close(); channel.Close(); } }}

消费者实现

using System;using System.Text;using RabbitMQ.Client;using RabbitMQ.Client.Events;namespace RabbitMQ{ class MainClass { static void Main(string[] args) { Console.WriteLine("消费者"); IConnectionFactory factory = new ConnectionFactory//创建连接工厂工具 { HostName = "106.12.90.208",//IP地址 Port = 5672,//端口号 UserName = "admin",//用户账号 Password = "admin"//用户密码 }; IConnection conn = factory.CreateConnection(); IModel channel = conn.CreateModel(); string name = "demo"; //声明一个行列步队 channel.QueueDeclare( queue: name,//行列步队名称 durable: false,//是否持久化,true持久化,行列步队会保存磁盘,做事看重启时可以担保不丢失干系信息。
exclusive: false,//是否排他,true排他的,如果一个行列步队声明为排他行列步队,该行列步队仅对首次声明它的连接可见,并在连接断开时自动删除. autoDelete: false,//是否自动删除。
true是自动删除。
自动删除的条件是:致少有一个消费者连接到这个行列步队,之后所有与这个行列步队连接的消费者都断开时,才会自动删除. arguments: null ////设置行列步队的一些其它参数 ); //创建消费者工具 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { byte[] message = ea.Body;//吸收到的 Console.WriteLine("吸收到为:" + Encoding.UTF8.GetString(message)); }; //消费者开启监听 channel.BasicConsume(name, true, consumer); Console.ReadKey(); channel.Dispose(); conn.Close(); } }}

同时运行两个项目看效果

RabbitMQ的Worker模式

Worker模式实在便是一对多模式,我们定义两个消费者来看看效果

默认情形下,RabbitMQ会顺序的将message发给下一个消费者。
每个消费者会得到均匀数量的message。
这种办法称之为round-robin(轮询).但是很多情形下并不肯望均匀分配,而是要消费快的多消费,消费少的少消费.还有很多情形下一旦个中一个宕机,那么其余吸收者的无法吸收原来这个吸收者所要吸收的数据。
我们修正个中一个消费者代码,让其等待3秒。
在等待中停滞运行 看看效果

consumer.Received += (model, ea) =>{ Thread.Sleep(3000); byte[] message = ea.Body; Console.WriteLine("吸收到为:" + Encoding.UTF8.GetString(message));};

当消费者宕机后消费者1并没有接管宕机后的数据。
以是我们须要确认来办理这个问题。

RabbitMQ确认

Rabbit中存在两种确认模式

自动模式 - 只要从行列步队获取,无论消费者获取到后是否成功消费,都认为是成功消费.手动模式 - 消费从行列步队中获取消息后,做事器会将该处于不可用状态,等待消费者反馈。
如果消费者在消费过程中涌现非常,断开连接切没有发送应答,那么RabbitMQ会将这个重新投递。

修正两个消费者代码,并在个中一个中延迟确认。

consumer.Received += (model, ea) =>{ byte[] message = ea.Body; Console.WriteLine("吸收到为:" + Encoding.UTF8.GetString(message)); Thread.Sleep(3000); //等待三秒手动确认 channel.BasicAck(ea.DeliveryTag, true);//返回确认};////将autoAck设置false 关闭自动确认.channel.BasicConsume(name, false, consumer);

如果在延迟中消费者断开连接,那么RabbitMQ会重新投递未确认的

‘能者多劳’模式

能者多劳是给消费速率快的消费更多的.少的责消费少的.能者多劳是建立在手动确认根本上实现。
在延迟确认的消费中添加BasicQos

channel.QueueDeclare( queue: name,//行列步队名称 durable: false,//是否持久化,true持久化,行列步队会保存磁盘,做事看重启时可以担保不丢失干系信息。
exclusive: false,//是否排他,true排他的,如果一个行列步队声明为排他行列步队,该行列步队仅对首次声明它的连接可见,并在连接断开时自动删除. autoDelete: false,//是否自动删除。
true是自动删除。
自动删除的条件是:致少有一个消费者连接到这个行列步队,之后所有与这个行列步队连接的消费者都断开时,才会自动删除. arguments: null ////设置行列步队的一些其它参数 );//每次只能向消费者发送一条信息,再消费者未确认之前,不再向他发送信息channel.BasicQos(0,1,false);

可以看出消费快的消费者接管了更多的,这便是能者多劳模式的表示

Exchange模式

在RabbitMQ的Exchange模式中生产者并不会直接把发送到Queue中,而是将发送到Exchange(交流机),消费者创建各自的行列步队绑定到交流机.

发布订阅模式(fanout)

生产者实现, 把行列步队更换成了交流机, 发布时把交流机名称见告RabbitMQ,把交流机设置成fanout发布订阅模式

using System;using System.Text;using RabbitMQ.Client;namespace RabbitMQ{ class MainClass { static void Main(string[] args) { Console.WriteLine("生产者"); IConnectionFactory factory = new ConnectionFactory//创建连接工厂工具 { HostName = "106.12.90.208",//IP地址 Port = 5672,//端口号 UserName = "admin",//用户账号 Password = "admin"//用户密码 }; IConnection con = factory.CreateConnection();//创建连接工具 IModel channel = con.CreateModel();//创建连接会话工具 sstring exchangeName = "exchange1";//交流机名称 //把交流机设置成fanout发布订阅模式 channel.ExchangeDeclare(name, type: "fanout"); string str; do { str = Console.ReadLine(); //内容 byte[] body = Encoding.UTF8.GetBytes(str); //发布, channel.BasicPublish(exchangeName, "", null, body); }while(str.Trim().ToLower() != "exit"); con.Close(); channel.Close(); } }}

消费者实现

using System;using System.Text;using System.Threading;using RabbitMQ.Client;using RabbitMQ.Client.Events;namespace mq{ class MainClass { static void Main(string[] args) { IConnectionFactory factory = new ConnectionFactory//创建连接工厂工具 { HostName = "106.12.90.208",//IP地址 Port = 5672,//端口号 UserName = "admin",//用户账号 Password = "admin"//用户密码 }; IConnection conn = factory.CreateConnection(); IModel channel = conn.CreateModel(); //交流机名称 string exchangeName = "exchange1"; //声明交流机 channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout); //行列步队名称 string queueName = DateTime.Now.Second.ToString(); //声明行列步队 channel.QueueDeclare(queueName, false, false, false, null); //将行列步队与交流机进行绑定 channel.QueueBind(queueName, exchangeName, "", null); //定义消费者 var consumer = new EventingBasicConsumer(channel); Console.WriteLine($"行列步队名称:{queueName}"); //吸收事宜 consumer.Received += (model, ea) => { byte[] message = ea.Body;//吸收到的 Console.WriteLine($"吸收到信息为:{Encoding.UTF8.GetString(message)}"); //返回确认 channel.BasicAck(ea.DeliveryTag, true); }; //开启监听 channel.BasicConsume(queueName, false, consumer); Console.ReadKey(); } }}

当消费者绑定同样的交流机,可以看到两个不同的消费者都能接管莅临盆者发送的所有。

路由模式(Direct Exchange)

路由模式下,在发布时指定不同的routeKey,交流机会根据不同的routeKey分发到不同的行列步队中

生产者实现

using System;using System.Text;using RabbitMQ.Client;namespace RabbitMQ{ class MainClass { static void Main(string[] args) { Console.WriteLine("生产者"); IConnectionFactory factory = new ConnectionFactory//创建连接工厂工具 { HostName = "106.12.90.208",//IP地址 Port = 5672,//端口号 UserName = "admin",//用户账号 Password = "admin"//用户密码 }; IConnection con = factory.CreateConnection();//创建连接工具 IModel channel = con.CreateModel();//创建连接会话工具 string exchangeName = "exchange1"; //交流机名称 string routeKey = "key1"; //匹配的key, //把交流机设置成Direct模式 channel.ExchangeDeclare(exchangeName, ExchangeType.Direct); string str; do { str = Console.ReadLine(); //内容 byte[] body = Encoding.UTF8.GetBytes(str); //发送 channel.BasicPublish(exchangeName, routeKey, null, body); }while(str.Trim().ToLower() != "exit"); con.Close(); channel.Close(); } }}

申明一个routeKey值为key1,并在发布的时候见告了RabbitMQ,通报时routeKey必须匹配,才会被行列步队吸收否则会被抛弃。

消费者实现

using System;using System.Text;using RabbitMQ.Client;using RabbitMQ.Client.Events;namespace mq{ class MainClass { static void Main(string[] args) { Console.WriteLine($"输入接管key名称:"); string routeKey = Console.ReadLine(); IConnectionFactory factory = new ConnectionFactory//创建连接工厂工具 { HostName = "106.12.90.208",//IP地址 Port = 5672,//端口号 UserName = "admin",//用户账号 Password = "admin"//用户密码 }; IConnection conn = factory.CreateConnection(); IModel channel = conn.CreateModel(); //交流机名称 string exchangeName = "exchange11"; //声明交流机 channel.ExchangeDeclare(exchangeName, ExchangeType.Direct); //行列步队名称 string queueName = DateTime.Now.Second.ToString(); //声明行列步队 channel.QueueDeclare(queueName, false, false, false, null); //将行列步队,key与交流机进行绑定 channel.QueueBind(queueName, exchangeName, routeKey, null); //定义消费者 var consumer = new EventingBasicConsumer(channel); Console.WriteLine($"行列步队名称:{queueName}"); //吸收事宜 consumer.Received += (model, ea) => { byte[] message = ea.Body;//吸收到的 Console.WriteLine($"吸收到信息为:{Encoding.UTF8.GetString(message)}"); //返回确认 channel.BasicAck(ea.DeliveryTag, true); }; //开启监听 channel.BasicConsume(queueName, false, consumer); Console.ReadKey(); } }}

在routeKey匹配的时候才会吸收,吸收者行列步队可以声明多个routeKey与交流机进行绑定

routeKey不匹配则不吸收。

通配符模式(Topic Exchange)

通配符模式和路由模式实在差不多,不同于配符模式中的路由可以声明为模糊查询.

符号“#”匹配一个或多个词.符号“”匹配一个词。

RabbitMQ中通配符的通配符是用"."来分割字符串的.比如a.只能匹配到a.b,a.c,而a.#可以匹配到a.a.c,a.a.b.

天生者实现

using System;using System.Text;using RabbitMQ.Client;namespace RabbitMQ{ class MainClass { static void Main(string[] args) { Console.WriteLine("生产者"); IConnectionFactory factory = new ConnectionFactory//创建连接工厂工具 { HostName = "106.12.90.208",//IP地址 Port = 5672,//端口号 UserName = "admin",//用户账号 Password = "admin"//用户密码 }; IConnection con = factory.CreateConnection();//创建连接工具 IModel channel = con.CreateModel();//创建连接会话工具 string exchangeName = "exchange114"; //交流机名称 string routeKey = "key.a"; //匹配的key, //把交流机设置成Topic模式 channel.ExchangeDeclare(exchangeName, ExchangeType.Topic); string str; do { str = Console.ReadLine(); //内容 byte[] body = Encoding.UTF8.GetBytes(str); //发送 channel.BasicPublish(exchangeName, routeKey, null, body); }while(str.Trim().ToLower() != "exit"); con.Close(); channel.Close(); } }}

消费者实现

using System;using System.Text;using RabbitMQ.Client;using RabbitMQ.Client.Events;namespace mq{ class MainClass { static void Main(string[] args) { Console.WriteLine($"输入接管key名称:"); string routeKey = "key."; //利用通配符来匹配key IConnectionFactory factory = new ConnectionFactory//创建连接工厂工具 { HostName = "106.12.90.208",//IP地址 Port = 5672,//端口号 UserName = "admin",//用户账号 Password = "admin"//用户密码 }; IConnection conn = factory.CreateConnection(); IModel channel = conn.CreateModel(); //交流机名称 string exchangeName = "exchange114"; //声明交流机 channel.ExchangeDeclare(exchangeName, ExchangeType.Topic); //行列步队名称 string queueName = DateTime.Now.Second.ToString(); //声明行列步队 channel.QueueDeclare(queueName, false, false, false, null); //将行列步队与交流机进行绑定 channel.QueueBind(queueName, exchangeName, routeKey, null); //定义消费者 var consumer = new EventingBasicConsumer(channel); Console.WriteLine($"行列步队名称:{queueName}"); //吸收事宜 consumer.Received += (model, ea) => { byte[] message = ea.Body;//吸收到的 Console.WriteLine($"吸收到信息为:{Encoding.UTF8.GetString(message)}"); //返回确认 channel.BasicAck(ea.DeliveryTag, true); }; //开启监听 channel.BasicConsume(queueName, false, consumer); Console.ReadKey(); } }}

只有在通配符匹配通过的情形下才会吸收,

原文地址:https://www.cnblogs.com/linhuiy/p/12017453.html

相关文章

今日头条算法如何打造个化推荐系统

信息爆炸的时代已经到来。人们每天都要面对海量的信息,如何在海量信息中找到自己感兴趣的内容,成为了许多人关注的焦点。今日头条作为一款...

网站建设 2025-01-31 阅读1 评论0