RabbitMQ是一个开源的,基于AMQP(Advanced Message Queuing Protocol)协议的完全的可复用的企业级队,RabbitMQ可以实现点对点,发布订阅等处理模式。
RabbitMQ是一个开源的AMQP实现,做事器端用Erlang措辞编写,支持Linux,windows,macOS,FreeBSD等操作系统,同时也支持很多措辞,如:Python,Java,Ruby,PHP,C#,JavaScript,Go,Elixir,Objective-C,Swift等。
我利用的环境是ubuntu18.04,

生产者实现
using System;using System.Text;using RabbitMQ.Client;namespace RabbitMQ{ class MainClass { static void Main(string[] args) { Console.WriteLine("生产者"); IConnectionFactory factory = new ConnectionFactory//创建连接工厂工具 { HostName = "106.12.90.208",//IP地址 Port = 5672,//端口号 UserName = "admin",//用户账号 Password = "admin"//用户密码 }; IConnection con = factory.CreateConnection();//创建连接工具 IModel channel = con.CreateModel();//创建连接会话工具 string name = "demo"; //声明一个行列步队 channel.QueueDeclare( queue: name,//行列步队名称 durable: false,//是否持久化,true持久化,行列步队会保存磁盘,做事看重启时可以担保不丢失干系信息。 exclusive: false,//是否排他,true排他的,如果一个行列步队声明为排他行列步队,该行列步队仅对首次声明它的连接可见,并在连接断开时自动删除. autoDelete: false,//是否自动删除。true是自动删除。自动删除的条件是:致少有一个消费者连接到这个行列步队,之后所有与这个行列步队连接的消费者都断开时,才会自动删除. arguments: null //设置行列步队的一些其它参数 ); string str = string.Empty; do { Console.WriteLine("发送内容:"); str = Console.ReadLine(); //内容 byte[] body = Encoding.UTF8.GetBytes(str); //发送 channel.BasicPublish("", name, null, body); Console.WriteLine("成功发送:" + str); }while(str.Trim().ToLower() != "exit"); con.Close(); channel.Close(); } }}
消费者实现
using System;using System.Text;using RabbitMQ.Client;using RabbitMQ.Client.Events;namespace RabbitMQ{ class MainClass { static void Main(string[] args) { Console.WriteLine("消费者"); IConnectionFactory factory = new ConnectionFactory//创建连接工厂工具 { HostName = "106.12.90.208",//IP地址 Port = 5672,//端口号 UserName = "admin",//用户账号 Password = "admin"//用户密码 }; IConnection conn = factory.CreateConnection(); IModel channel = conn.CreateModel(); string name = "demo"; //声明一个行列步队 channel.QueueDeclare( queue: name,//行列步队名称 durable: false,//是否持久化,true持久化,行列步队会保存磁盘,做事看重启时可以担保不丢失干系信息。 exclusive: false,//是否排他,true排他的,如果一个行列步队声明为排他行列步队,该行列步队仅对首次声明它的连接可见,并在连接断开时自动删除. autoDelete: false,//是否自动删除。true是自动删除。自动删除的条件是:致少有一个消费者连接到这个行列步队,之后所有与这个行列步队连接的消费者都断开时,才会自动删除. arguments: null ////设置行列步队的一些其它参数 ); //创建消费者工具 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { byte[] message = ea.Body;//吸收到的 Console.WriteLine("吸收到为:" + Encoding.UTF8.GetString(message)); }; //消费者开启监听 channel.BasicConsume(name, true, consumer); Console.ReadKey(); channel.Dispose(); conn.Close(); } }}
同时运行两个项目看效果
RabbitMQ的Worker模式
Worker模式实在便是一对多模式,我们定义两个消费者来看看效果
默认情形下,RabbitMQ会顺序的将message发给下一个消费者。每个消费者会得到均匀数量的message。这种办法称之为round-robin(轮询).但是很多情形下并不肯望均匀分配,而是要消费快的多消费,消费少的少消费.还有很多情形下一旦个中一个宕机,那么其余吸收者的无法吸收原来这个吸收者所要吸收的数据。我们修正个中一个消费者代码,让其等待3秒。在等待中停滞运行 看看效果
consumer.Received += (model, ea) =>{ Thread.Sleep(3000); byte[] message = ea.Body; Console.WriteLine("吸收到为:" + Encoding.UTF8.GetString(message));};
当消费者宕机后消费者1并没有接管宕机后的数据。以是我们须要确认来办理这个问题。
RabbitMQ确认Rabbit中存在两种确认模式
自动模式 - 只要从行列步队获取,无论消费者获取到后是否成功消费,都认为是成功消费.手动模式 - 消费从行列步队中获取消息后,做事器会将该处于不可用状态,等待消费者反馈。如果消费者在消费过程中涌现非常,断开连接切没有发送应答,那么RabbitMQ会将这个重新投递。修正两个消费者代码,并在个中一个中延迟确认。
consumer.Received += (model, ea) =>{ byte[] message = ea.Body; Console.WriteLine("吸收到为:" + Encoding.UTF8.GetString(message)); Thread.Sleep(3000); //等待三秒手动确认 channel.BasicAck(ea.DeliveryTag, true);//返回确认};////将autoAck设置false 关闭自动确认.channel.BasicConsume(name, false, consumer);
如果在延迟中消费者断开连接,那么RabbitMQ会重新投递未确认的
‘能者多劳’模式能者多劳是给消费速率快的消费更多的.少的责消费少的.能者多劳是建立在手动确认根本上实现。在延迟确认的消费中添加BasicQos
channel.QueueDeclare( queue: name,//行列步队名称 durable: false,//是否持久化,true持久化,行列步队会保存磁盘,做事看重启时可以担保不丢失干系信息。 exclusive: false,//是否排他,true排他的,如果一个行列步队声明为排他行列步队,该行列步队仅对首次声明它的连接可见,并在连接断开时自动删除. autoDelete: false,//是否自动删除。true是自动删除。自动删除的条件是:致少有一个消费者连接到这个行列步队,之后所有与这个行列步队连接的消费者都断开时,才会自动删除. arguments: null ////设置行列步队的一些其它参数 );//每次只能向消费者发送一条信息,再消费者未确认之前,不再向他发送信息channel.BasicQos(0,1,false);
可以看出消费快的消费者接管了更多的,这便是能者多劳模式的表示
Exchange模式在RabbitMQ的Exchange模式中生产者并不会直接把发送到Queue中,而是将发送到Exchange(交流机),消费者创建各自的行列步队绑定到交流机.
发布订阅模式(fanout)生产者实现, 把行列步队更换成了交流机, 发布时把交流机名称见告RabbitMQ,把交流机设置成fanout发布订阅模式
using System;using System.Text;using RabbitMQ.Client;namespace RabbitMQ{ class MainClass { static void Main(string[] args) { Console.WriteLine("生产者"); IConnectionFactory factory = new ConnectionFactory//创建连接工厂工具 { HostName = "106.12.90.208",//IP地址 Port = 5672,//端口号 UserName = "admin",//用户账号 Password = "admin"//用户密码 }; IConnection con = factory.CreateConnection();//创建连接工具 IModel channel = con.CreateModel();//创建连接会话工具 sstring exchangeName = "exchange1";//交流机名称 //把交流机设置成fanout发布订阅模式 channel.ExchangeDeclare(name, type: "fanout"); string str; do { str = Console.ReadLine(); //内容 byte[] body = Encoding.UTF8.GetBytes(str); //发布, channel.BasicPublish(exchangeName, "", null, body); }while(str.Trim().ToLower() != "exit"); con.Close(); channel.Close(); } }}
消费者实现
using System;using System.Text;using System.Threading;using RabbitMQ.Client;using RabbitMQ.Client.Events;namespace mq{ class MainClass { static void Main(string[] args) { IConnectionFactory factory = new ConnectionFactory//创建连接工厂工具 { HostName = "106.12.90.208",//IP地址 Port = 5672,//端口号 UserName = "admin",//用户账号 Password = "admin"//用户密码 }; IConnection conn = factory.CreateConnection(); IModel channel = conn.CreateModel(); //交流机名称 string exchangeName = "exchange1"; //声明交流机 channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout); //行列步队名称 string queueName = DateTime.Now.Second.ToString(); //声明行列步队 channel.QueueDeclare(queueName, false, false, false, null); //将行列步队与交流机进行绑定 channel.QueueBind(queueName, exchangeName, "", null); //定义消费者 var consumer = new EventingBasicConsumer(channel); Console.WriteLine($"行列步队名称:{queueName}"); //吸收事宜 consumer.Received += (model, ea) => { byte[] message = ea.Body;//吸收到的 Console.WriteLine($"吸收到信息为:{Encoding.UTF8.GetString(message)}"); //返回确认 channel.BasicAck(ea.DeliveryTag, true); }; //开启监听 channel.BasicConsume(queueName, false, consumer); Console.ReadKey(); } }}
当消费者绑定同样的交流机,可以看到两个不同的消费者都能接管莅临盆者发送的所有。
路由模式(Direct Exchange)路由模式下,在发布时指定不同的routeKey,交流机会根据不同的routeKey分发到不同的行列步队中
生产者实现
using System;using System.Text;using RabbitMQ.Client;namespace RabbitMQ{ class MainClass { static void Main(string[] args) { Console.WriteLine("生产者"); IConnectionFactory factory = new ConnectionFactory//创建连接工厂工具 { HostName = "106.12.90.208",//IP地址 Port = 5672,//端口号 UserName = "admin",//用户账号 Password = "admin"//用户密码 }; IConnection con = factory.CreateConnection();//创建连接工具 IModel channel = con.CreateModel();//创建连接会话工具 string exchangeName = "exchange1"; //交流机名称 string routeKey = "key1"; //匹配的key, //把交流机设置成Direct模式 channel.ExchangeDeclare(exchangeName, ExchangeType.Direct); string str; do { str = Console.ReadLine(); //内容 byte[] body = Encoding.UTF8.GetBytes(str); //发送 channel.BasicPublish(exchangeName, routeKey, null, body); }while(str.Trim().ToLower() != "exit"); con.Close(); channel.Close(); } }}
申明一个routeKey值为key1,并在发布的时候见告了RabbitMQ,通报时routeKey必须匹配,才会被行列步队吸收否则会被抛弃。
消费者实现
using System;using System.Text;using RabbitMQ.Client;using RabbitMQ.Client.Events;namespace mq{ class MainClass { static void Main(string[] args) { Console.WriteLine($"输入接管key名称:"); string routeKey = Console.ReadLine(); IConnectionFactory factory = new ConnectionFactory//创建连接工厂工具 { HostName = "106.12.90.208",//IP地址 Port = 5672,//端口号 UserName = "admin",//用户账号 Password = "admin"//用户密码 }; IConnection conn = factory.CreateConnection(); IModel channel = conn.CreateModel(); //交流机名称 string exchangeName = "exchange11"; //声明交流机 channel.ExchangeDeclare(exchangeName, ExchangeType.Direct); //行列步队名称 string queueName = DateTime.Now.Second.ToString(); //声明行列步队 channel.QueueDeclare(queueName, false, false, false, null); //将行列步队,key与交流机进行绑定 channel.QueueBind(queueName, exchangeName, routeKey, null); //定义消费者 var consumer = new EventingBasicConsumer(channel); Console.WriteLine($"行列步队名称:{queueName}"); //吸收事宜 consumer.Received += (model, ea) => { byte[] message = ea.Body;//吸收到的 Console.WriteLine($"吸收到信息为:{Encoding.UTF8.GetString(message)}"); //返回确认 channel.BasicAck(ea.DeliveryTag, true); }; //开启监听 channel.BasicConsume(queueName, false, consumer); Console.ReadKey(); } }}
在routeKey匹配的时候才会吸收,吸收者行列步队可以声明多个routeKey与交流机进行绑定
routeKey不匹配则不吸收。
通配符模式(Topic Exchange)通配符模式和路由模式实在差不多,不同于配符模式中的路由可以声明为模糊查询.
符号“#”匹配一个或多个词.符号“”匹配一个词。
RabbitMQ中通配符的通配符是用"."来分割字符串的.比如a.只能匹配到a.b,a.c,而a.#可以匹配到a.a.c,a.a.b.
天生者实现
using System;using System.Text;using RabbitMQ.Client;namespace RabbitMQ{ class MainClass { static void Main(string[] args) { Console.WriteLine("生产者"); IConnectionFactory factory = new ConnectionFactory//创建连接工厂工具 { HostName = "106.12.90.208",//IP地址 Port = 5672,//端口号 UserName = "admin",//用户账号 Password = "admin"//用户密码 }; IConnection con = factory.CreateConnection();//创建连接工具 IModel channel = con.CreateModel();//创建连接会话工具 string exchangeName = "exchange114"; //交流机名称 string routeKey = "key.a"; //匹配的key, //把交流机设置成Topic模式 channel.ExchangeDeclare(exchangeName, ExchangeType.Topic); string str; do { str = Console.ReadLine(); //内容 byte[] body = Encoding.UTF8.GetBytes(str); //发送 channel.BasicPublish(exchangeName, routeKey, null, body); }while(str.Trim().ToLower() != "exit"); con.Close(); channel.Close(); } }}
消费者实现
using System;using System.Text;using RabbitMQ.Client;using RabbitMQ.Client.Events;namespace mq{ class MainClass { static void Main(string[] args) { Console.WriteLine($"输入接管key名称:"); string routeKey = "key."; //利用通配符来匹配key IConnectionFactory factory = new ConnectionFactory//创建连接工厂工具 { HostName = "106.12.90.208",//IP地址 Port = 5672,//端口号 UserName = "admin",//用户账号 Password = "admin"//用户密码 }; IConnection conn = factory.CreateConnection(); IModel channel = conn.CreateModel(); //交流机名称 string exchangeName = "exchange114"; //声明交流机 channel.ExchangeDeclare(exchangeName, ExchangeType.Topic); //行列步队名称 string queueName = DateTime.Now.Second.ToString(); //声明行列步队 channel.QueueDeclare(queueName, false, false, false, null); //将行列步队与交流机进行绑定 channel.QueueBind(queueName, exchangeName, routeKey, null); //定义消费者 var consumer = new EventingBasicConsumer(channel); Console.WriteLine($"行列步队名称:{queueName}"); //吸收事宜 consumer.Received += (model, ea) => { byte[] message = ea.Body;//吸收到的 Console.WriteLine($"吸收到信息为:{Encoding.UTF8.GetString(message)}"); //返回确认 channel.BasicAck(ea.DeliveryTag, true); }; //开启监听 channel.BasicConsume(queueName, false, consumer); Console.ReadKey(); } }}
只有在通配符匹配通过的情形下才会吸收,
原文地址:https://www.cnblogs.com/linhuiy/p/12017453.html