首页 » 网站推广 » phprabbitmq通道号技巧_万字长文从 C 入门学会 RabbitMQ 消息队列编程

phprabbitmq通道号技巧_万字长文从 C 入门学会 RabbitMQ 消息队列编程

duote123 2024-11-07 0

扫一扫用手机浏览

文章目录 [+]

目录

RabbitMQ 教程

phprabbitmq通道号技巧_万字长文从 C 入门学会 RabbitMQ 消息队列编程

Qos 、谢绝吸收

phprabbitmq通道号技巧_万字长文从 C 入门学会 RabbitMQ 消息队列编程
(图片来自网络侵删)

确认模式

持久化

TTL 韶光

行列步队 TTL 韶光

DLX 去世信交流器

延迟行列步队

优先级

事务机制

发送方确认机制

生产者、消费者、交流器、行列步队

多事情行列步队

交流器类型

Direct

Fanout

Topic

交流器绑定交流器

安装 RabbitMQ

RabbitMQ 简介

安装与配置

发布与订阅模型

消费者、属性

本文已推送到 github :https://github.com/whuanle/learnrabbitmq如果文章排版未便利阅读,可以到仓库下载原版 markdown 文件阅读。

RabbitMQ 简介

RabbitMQ 是一个实现了 AMQP 协议的行列步队,AMQP 被定义为作为通报中间件的开放标准的运用层协议。
它代表高等行列步队协议,具有定位、路由、行列步队、安全性和可靠性等特点。

目前社区上比较盛行的行列步队有 kafka、ActiveMQ、Pulsar、RabbitMQ、RabbitMQ 等。

笔者也编写了 一系列的 Kafka 教程,欢迎阅读:https://kafka.whuanle.cn/

RabbitMQ 的优点、用场等,大概是可靠性高、灵巧的路由规则配置、支持分布式支配、遵守 AMQP 协议等。
可以用于异步通讯、日志网络(日志网络还是 Kafka 比较好)、事宜驱动架构系统、运用通讯解耦等。

RabbitMQ 社区版本的特点如下:

支持多种通报协议、行列步队、通报确认、灵巧的行列步队路由、多种交流类型(交流器)。

支持 Kubernetes 等分布式支配,供应多种措辞的 SDK,如 Java、Go、C#。

可插入的身份验证、授权,支持 TLS 和 LDAP。

支持持续集成、操作度量和与其他企业系统集成的各种工具和插件。

供应一套用于管理和监视 RabbitMQ 的 HTTP-API、命令行工具和 UI。

RabbitMQ 的基本工具有以下几点,但是读者现在并不须要记住,在后面的章节中,笔者将会逐个先容。

生产者(Producer):推送到 RabbitMQ 的程序。

消费者(Consumer):从 RabbitMQ 消费的程序。

行列步队(Queue):RabbitMQ 存储的地方,消费者可以从行列步队中获取消息。

交流器(Exchange):吸收来自生产者的,并将路由到一个或多个行列步队中。

绑定(Binding):将行列步队和交流器关联起来,当生产者推送时,交流器将路由到行列步队中。

路由键(Routing Key):用于交流器将路由到特定行列步队的匹配规则。

RabbitMQ 的技能知识点大概分为:

用户和权限:配置用户、角色和其对应的权限。

Virtual Hosts:配置虚拟主机,用于分隔不同的行列步队环境。

Exchange 和 Queue 的属性:配置交流器和行列步队的属性,比如持久化、自动删除等。

Policies:定义策略来自动设置行列步队、交流器和链接的参数。

连接和通道:配置连接和通道的属性,如心跳间隔、最大帧大小等。

插件:启用和配置各种插件,如管理插件、STOMP 插件等。

集群和高可用性:配置集群和镜像行列步队,以供应高可用性。

日志和监控:配置日志级别、目标和监控插件。

安全性:配置 SSL/TLS 选项、认证后端等安全干系的设置。

由于笔者技能有限以及篇幅限定,本文只讲解与 C# 编程干系的技能细节,从中理解 RabbitMQ 的编码技巧和运作机制。

安装与配置安装 RabbitMQ

读者可以在 RabbitMQ 官方文档中找到完全的安装教程:https://www.rabbitmq.com/download.html

本文利用 Docker 的办法支配。

RabbitMQ 社区镜像列表:https://hub.docker.com/_/rabbitmq

创建目录用于映射存储卷:

mkdir -p /opt/lib/rabbitmq

支配容器:

docker run -itd --name rabbitmq -p 5672:5672 -p 15672:15672 \-v /opt/lib/rabbitmq:/var/lib/rabbitmq \rabbitmq:3.12.8-management

支配时占用两个端口。
5672 是 MQ 通讯端口,15672 是 Management UI 工具端口。

打开 15672 端口,会进入 Web 登录页面,默认账号密码都是 guest。

关于 RabbitMQ Management UI 的利用方法,后续再先容。

打开管理界面后会,在 Exchanges 菜单中,可以看到如下图表格。
这些是默认的交流器。
现在可以不须要理解这些东西,后面会有先容。

Virtual hostNameTypeFeatures/(AMQP default)directD/amq.directdirectD/amq.fanoutfanoutD/amq.headersheadersD/amq.matchheadersD/amq.rabbitmq.tracetopicD I/amq.topictopicD

发布与订阅模型

利用 C# 开拓 RabbitMQ,须要利用 nuget 引入 RabbitMQ.Client,官网文档地址:.NET/C# RabbitMQ Client Library — RabbitMQ

在连续阅读文章之前,请先创建一个掌握台程序。

生产者、消费者、交流器、行列步队

为了便于理解,本文制作了几十张图片,约定一些图形表示的含义:

对应生产者,利用如下图表示:

对付消费者,利用如下图表示:

对付行列步队,利用如下图表示:

对付交流器,利用如下图表示:

在 RabbitMQ 中,生产者发布的是不会直接进入到行列步队中,而是经由交流器(Exchange) 分发到各个行列步队中。
前面提到,支配 RabbitMQ 后,默认有 七个交流器,如 (AMQP default)amq.direct 等。

当然,对付现在来说,我们不须要理解交流器,以是,在本节的教程中,会利用默认交流器完成实验。

在忽略交流器存在的情形下,我们可以将生产和消费的流程简化如下图所示:

请一定要把稳,图中省略了交流器的存在,由于利用的是默认的交流器。
但是生产者推送必须是推送到交流器,而不是行列步队,这一句一定要弄清楚。

对付消费者来说,要利用行列步队,必须确保行列步队已经存在。

利用 C# 声明(创建)一个行列步队的代码和参数如下所示:

// 声明一个行列步队channel.QueueDeclare(// 行列步队名称queue: \公众myqueue\"大众,// 持久化配置,行列步队是否能够在 broker 重启后存活durable: false,// 连接关闭时被删除该行列步队exclusive: false,// 当末了一个消费者(如果有的话)退订时,是否该当自动删除这个行列步队autoDelete: false,// 额外的参数配置arguments: );

完全代码示例:

ConnectionFactory factory = new ConnectionFactory{HostName = \"大众localhost\"大众};// 连接using IConnection connection = factory.CreateConnection();// 通道using IModel channel = connection.CreateModel();channel.QueueDeclare(// 行列步队名称queue: \"大众myqueue\"大众,// 持久化配置,行列步队是否能够在 broker 重启后存活durable: false,// 连接关闭时被删除该行列步队exclusive: false,// 当末了一个消费者(如果有的话)退订时,是否该当自动删除这个行列步队autoDelete: false,// 额外的参数配置arguments: );

queue:行列步队的名称。

durable:设置是否持久化。
持久化的行列步队会存盘,在做事看重启的时候可以担保不丢失干系信息。

exclusive 设置是否排他。
如果一个行列步队被声明为排他行列步队,该行列步队仅对首次声明它的连接可见,并在连接断开时自动删除。

该配置是基于 IConnection 的,同一个 IConnection 创建的不同通道 (IModel) ,也会遵守此规则。

autoDelete:设置是否自动删除。
自动删除的条件是至少有一个消费者连接到这个行列步队,之后所有与这个行列步队连接的消费者都断开时,才会自动删除。

argurnents: 设置行列步队的其他一些参数,如行列步队的过期韶光等。

如果行列步队已经存在,不须要再实行 QueueDeclare()
重复调用 QueueDeclare(),如果参数相同,不会涌现副浸染,已经推送的也不会出问题。

但是,如果 QueueDeclare() 参数如果跟已存在的行列步队配置有差异,则可能会报错。

一样平常情形下,为了合理架构和可靠性,会由架构师等在行列步队中提前创建好交流器、行列步队,然后客户端直策应用即可。
一样平常不让程序启动时设置,这样会带来很大的不愿定性和副浸染。

生产者发送时的代码也很大略,指定要发送到哪个交流器或路由中即可。

请一定要把稳,RabbitMQ 生产者发送,推送到的是交流器,而不是直接推送到行列步队!

channel.BasicPublish(// 利用默认交流器exchange: string.Empty,// 推送到哪个行列步队中routingKey: \"大众myqueue\"大众,// 行列步队属性basicProperties: ,// 要发送的须要先转换为 byte[]body: Encoding.UTF8.GetBytes(\公众测试\公众));

BasicPublish 有三个重载:

BasicPublish( PublicationAddress addr, IBasicProperties basicProperties, ReadOnlyMemory<byte> body)

BasicPublish(string exchange, string routingKey, IBasicProperties basicProperties, ReadOnlyMemory<byte> body)

BasicPublish(string exchange, string routingKey, bool mandatory = false, IBasicProperties basicProperties = , ReadOnlyMemory<byte> body = default)

exchange: 交流器的名称,如果留空则会推送到默认交流器。

routingKey: 路由键,交流器根据路由键将存储到相应的行列步队之中。

basicProperties:属性,如过期韶光等。

mandatory:值为 false 时,如果交流器没有绑定得当的行列步队,则该会丢失。
值为 true 时,如果交流器没有绑定得当的行列步队,则会触发IModel.BasicReturn 事宜。

IBasicProperties basicProperties 参数是接口,我们可以利用 IModel.CreateBasicProperties() 创建一个接口工具。

IBasicProperties 接口中封装了很多属性,使得我们不须要利用字符串的显示通报配置。

IBasicProperties 其完全属性如下:

// 标识运用程序的 IDpublic String AppId { set; get; }// 标识集群的 IDpublic String ClusterId { set; get; }// 指定内容的编码办法,例如 \公众utf-8\公众public String ContentEncoding { set; get; }// 指定内容的 MIME 类型,例如 \"大众application/json\公众public String ContentType { set; get; }// 用于关联之间的关系,常日用于 RPC(远程过程调用)场景public String CorrelationId { set; get; }// 指定的持久化办法,值 1:不持久化,值 2:持久化public Byte DeliveryMode { set; get; }// 单位毫秒,指定该的过期韶光public String Expiration { set; get; }// 自定义的头部信息public IDictionary`2 Headers { set; get; }// 指定的唯一标识符public String MessageId { set; get; }// 是否持久化public Boolean Persistent { set; get; }// 指定的优先级,范围从 0 到 9public Byte Priority { set; get; }// 指定用于回答的行列步队名称public String ReplyTo { set; get; }// 指定用于回答的地址信息public PublicationAddress ReplyToAddress { set; get; }// 指定的韶光戳public AmqpTimestamp Timestamp { set; get; }// 的类型public String Type { set; get; }// 标识用户的 IDpublic String UserId { set; get; }

推送时,可以对单个细粒度地设置 IBasicProperties :

using IConnection connection = factory.CreateConnection();using IModel channel = connection.CreateModel();// 创建两个行列步队channel.QueueDeclare(queue: \"大众q1\公众, durable: false, exclusive: false, autoDelete: false);var properties = channel.CreateBasicProperties();// 示例 1:properties.Persistent = true;properties.ContentType = \公众application/json\"大众;properties.ContentEncoding = \"大众UTF-8\"大众;// 示例 2://properties.Persistent = true;//properties.ContentEncoding = \"大众gzip\公众;//properties.Headers = new Dictionary<string, object>();channel.BasicPublish(exchange: string.Empty,routingKey: \"大众q1\公众,basicProperties: properties,body: Encoding.UTF8.GetBytes($\"大众测试{i}\"大众));

对付 IBasicProperties 的利用,文章后面会有更加详细的先容。

现在,我们推送了 10 条到行列步队中,然后在 Management UI 中不雅观察。

int i = 0;while (i < 10){channel.BasicPublish(exchange: string.Empty,routingKey: \"大众myqueue\公众,basicProperties: ,body: Encoding.UTF8.GetBytes($\"大众测试{i}\"大众));i++;}

我们可以在 UI 的 Queues and Streams 中看到当前所有的行列步队。

可以看到当前行列步队中的 Ready 状态 Unacked 状态的数,分别对应上文中的等待投递给消费者的数和己经投递给消费者但是未收到确认旗子暗记的数

点击该行列步队后,会打开如下图所示的界面。

首先看 Overview。

Ready 指还没有被消费的数量。

Unacked 指消费但是没有 ack 的数量。

另一个 Message rates 图表,指的是发布、消费的速率,由于不主要,因此这里不解释。

在 Bindings 中,可以看到该行列步队绑定了默认的交流器。

然后编写一个消费者,消费该行列步队中的,其完全代码如下:

using RabbitMQ.Client;using RabbitMQ.Client.Events;using System.Text;ConnectionFactory factory = new ConnectionFactory{HostName = \"大众localhost\"大众};using IConnection connection = factory.CreateConnection();using IModel channel = connection.CreateModel();channel.QueueDeclare(// 行列步队名称queue: \公众myqueue\"大众,// 持久化配置,行列步队是否能够在 broker 重启后存活durable: false,// 连接关闭时被删除该行列步队exclusive: false,// 当末了一个消费者(如果有的话)退订时,是否该当自动删除这个行列步队autoDelete: false,// 额外的参数配置arguments: );// 定义消费者var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var message = Encoding.UTF8.GetString(ea.Body.Span);Console.WriteLine($\公众 [x] Received {message}\公众);};// 开始消费channel.BasicConsume(queue: \"大众myqueue\"大众, autoAck: true, consumer: consumer);Console.ReadLine();

把稳,如果填写了一个不存在的行列步队,那么程序会报非常。

在消费者程序未退出前,即 IConnection 未被 Dispose() 之前,可以在 Consumers 中看到消费者客户端程序信息。

那么,如果我们只消费,不设置自动 ack 呢?

将消费者代码改成:

channel.BasicConsume(queue: \"大众myqueue\"大众, autoAck: false, consumer: consumer);

完全代码如下:

using RabbitMQ.Client;using RabbitMQ.Client.Events;using System.Text;ConnectionFactory factory = new ConnectionFactory{HostName = \"大众localhost\"大众};using IConnection connection = factory.CreateConnection();using IModel channel = connection.CreateModel();channel.QueueDeclare(queue: \公众myqueue\"大众,durable: false,exclusive: false,autoDelete: false,arguments: );int i = 0;while (i < 10){channel.BasicPublish(exchange: string.Empty,routingKey: \"大众myqueue\"大众,basicProperties: ,body: Encoding.UTF8.GetBytes($\"大众测试{i}\"大众));i++;}// 定义消费者var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var message = Encoding.UTF8.GetString(ea.Body.Span);Console.WriteLine($\公众 [x] Received {message}\"大众);};// 开始消费channel.BasicConsume(queue: \公众myqueue\"大众, autoAck: false, consumer: consumer);Console.ReadLine();

此时会创造,所有的都已经读了,但是 Unacked 为 10。

如下图所示,autoAck: false 之后,如果重新启动程序(只消费,不推送),那么程序会连续重新消费一遍。

对付未 ack 的,消费者重新连接后,RabbitMQ 会再次推送。

与 Kafka 不同的是,Kafka 如果没有 ack 当前,则做事器会自动重新发送该条给消费者,如果该条未完成,则会一贯堵塞在这里。
而对付 RabbitMQ,未被 ack 的会被暂时忽略,自动消费下一条。
以是基于这一点,默认情形下,RabbitMQ 是不能担保顺序性。

当然, RabbitMQ 是很灵巧的,我们可以选择性地消费部分,避免当前壅塞导致程序不能往下消费:

// 定义消费者int i = 0;var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var message = Encoding.UTF8.GetString(ea.Body.Span);Console.WriteLine($\"大众 [x] Received {message}\"大众);i++;// 确认该被精确消费if (i % 2 == 0)channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);};// 开始消费channel.BasicConsume(queue: \"大众myqueue\"大众, autoAck: false, consumer: consumer);

在某些场景下,这个特性很有用,我们可以将多次实行失落败的先放一放,转而消费下一条,从而避免堆积。

多事情行列步队

如果同一个行列步队的不同客户端绑定到交流器中,多个消费者一起事情的话,那么会发生什么情形?

对付第一种情形,RabbitMQ 会将均匀分发给每个客户端。

该条件成立的根本是,两个消费者是不同的消费者,如果在同一个程序里面参加不同的实例去消费,但是由于其被识别为同一个消费者,则规则无效。

但是,RabbitMQ 并不会看未确认的数量,它只是盲目地将第 n 个发送给第 n 个消费者。

其余在指定交流器名称的情形下,我们可以将 routingKey 设置为空,这样发布的会由交流器转发到对应的行列步队中。

channel.BasicPublish(exchange: \"大众logs\"大众,routingKey: string.Empty,basicProperties: ,body: Encoding.UTF8.GetBytes($\"大众测试{i}\"大众));

而多行列步队对应一个交流器的情形比较繁芜,后面的章节会提到。

生产者和消费者都能够利用 QueueDeclare() 来声明一个行列步队。
所谓的声明,实际上是对 RabbitMQ Broker 要求创建一个行列步队,因此谁来创建都是一样的。

跟声明行列步队干系的,还有两个函数:

// 无论创建失落败与否,都不理会channel.QueueDeclareNoWait();// 判断行列步队是否存在,如果不存在则弹出非常,存在则什么也不会发生channel.QueueDeclarePassive();

此外,我们还可以删除行列步队:

// ifUnused: 行列步队没有被利用时// ifEmpty: 行列步队中没有堆积的时channel.QueueDelete(queue: \"大众aaa\公众, ifUnused: true, ifEmpty: true);交流器类型

生产者只能向交流器推送,而不能向行列步队推送。

推送时,可以指定交流器名称和路由键。

如下面代码所示:

channel.BasicPublish(exchange: string.Empty,routingKey: \"大众myqueue\公众,basicProperties: ,body: Encoding.UTF8.GetBytes($\"大众测试{i}\公众));

ExchangeType 中定义了几种交流器类型的名称。

public static class ExchangeType {public const string Direct = \公众direct\"大众;public const string Fanout = \"大众fanout\公众;public const string Headers = \"大众headers\"大众;public const string Topic = \公众topic\公众;private static readonly string[] s_all = {Fanout, Direct, Topic, Headers}; }

在利用一个交流器之前,须要先声明一个交流器:

channel.ExchangeDeclare(\"大众logs\"大众, ExchangeType.Fanout);

如果交流器已存在,重复实行声明代码,只要配置跟现存的交流器配置区配,则 RabbitMQ 啥也不干,不会涌现副浸染。

但是,不能涌现不一样的配置,例如已存在的交流器是 Fanout 类型,但是重新实行代码声明行列步队为 Direct 类型。

ExchangeDeclare 函数的定义如下:

ExchangeDeclare(string exchange, string type, bool durable = false, bool autoDelete = false, IDictionary<string, object> arguments = )

exchange: 交流器的名称。

type 交流器的类型,如 fanout、direct、topic。

durable: 设置是否持久 durab ,如果值为 true,则做事看重启后也不会丢失。

autoDelete:设置是否自动删除。

argument:其他一些构造化参数。

当然,交流器也可以被删除。

// ifUnused 只有在行列步队未被利用的情形下,才会删除channel.ExchangeDelete(exchange: \公众log\"大众, ifUnused: true);

还有一个 NotWait 方法。

channel.ExchangeDeclareNoWait(\公众logs\"大众, ExchangeType.Direct);//channel.ExchangeDeclareNoWait(...);

纵然重新声明交流器和删除时有问题,由于其返回 void,因此操作失落败也不会报非常。

也有个判断交流器是否存在的方法。
如果交流器不存在,则会抛出非常,如果交流器存在,则什么也不会发生。

channel.ExchangeDeclarePassive(\公众logs\"大众)

创建多个行列步队后,还须要将行列步队和交流器绑定起来。

如下代码所示,其交流器绑定了两个行列步队,生产者推送到交流器时,两个行列步队都会收到相同的。

ConnectionFactory factory = new ConnectionFactory{HostName = \"大众localhost\"大众};using IConnection connection = factory.CreateConnection();using IModel channel = connection.CreateModel();// 创建交流器channel.ExchangeDeclare(\公众logs\"大众, ExchangeType.Fanout);// 创建两个行列步队channel.QueueDeclare(queue: \"大众myqueue1\"大众,durable: false,exclusive: false,autoDelete: false,arguments: );channel.QueueDeclare(queue: \公众myqueue2\公众,durable: false,exclusive: false,autoDelete: false,arguments: );channel.QueueBind(queue: \"大众myqueue1\"大众, exchange: \"大众logs\"大众, routingKey: string.Empty);channel.QueueBind(queue: \"大众myqueue2\"大众, exchange: \"大众logs\"大众, routingKey: string.Empty);int i = 0;while (i < 10){channel.BasicPublish(exchange: \"大众logs\公众,routingKey: string.Empty,basicProperties: ,body: Encoding.UTF8.GetBytes($\公众测试{i}\公众));i++;}

推送后,每个绑定了 logs 交流器的行列步队都会收到相同的。

把稳,由于交流器不会存储,因此,再创建一个 myqueue3 的行列步队绑定 logs 交流器时,myqueue3 只会吸收到绑定之后推送的,不能得到更早之前的。

交流器有以下类型:

direct:根据 routingKey 将通报到行列步队。

topic:有点繁芜。
根据路由键与用于将行列步队绑定到交流器的模式之间的匹配将路由到一个或多个行列步队。

headers:本文不讲,以是不做阐明。

fanout:只要绑定即可,不须要理会路由。

Direct

direct 是根据 routingKey 将推送到不同的行列步队中。

首先,创建多个行列步队。

// 创建两个行列步队channel.QueueDeclare(queue: \"大众direct1\"大众);channel.QueueDeclare(queue: \"大众direct2\公众);

然后将行列步队绑定交流器时,绑定关系须要设置 routingKey。

// 利用 routingKey 绑定交流器channel.QueueBind(exchange: \"大众logs\公众, queue: \"大众direct1\公众, routingKey: \"大众debug\公众);channel.QueueBind(exchange: \"大众logs\"大众, queue: \"大众direct2\"大众, routingKey: \"大众info\"大众);

末了,推送时,须要指定交流器名称,以及 routingKey。

// 发送时,须要指定 routingKeychannel.BasicPublish(exchange: \公众logs\公众,routingKey: \公众debug\"大众,basicProperties: ,body: Encoding.UTF8.GetBytes($\公众测试\公众));

当推送到 logs 交流器时,交流器会根据 routingKey 将转发到对应的行列步队中。

完全的代码示例如下:

// 创建交流器channel.ExchangeDeclare(\"大众logs\"大众, ExchangeType.Direct);// 创建两个行列步队channel.QueueDeclare(queue: \"大众direct1\"大众);channel.QueueDeclare(queue: \公众direct2\公众);// 利用 routingKey 绑定交流器channel.QueueBind(exchange: \公众logs\"大众, queue: \公众direct1\"大众, routingKey: \"大众debug\"大众);channel.QueueBind(exchange: \"大众logs\"大众, queue: \"大众direct2\"大众, routingKey: \"大众info\"大众);// 发送时,须要指定 routingKeychannel.BasicPublish(exchange: \"大众logs\公众,routingKey: \公众debug\"大众,basicProperties: ,body: Encoding.UTF8.GetBytes($\"大众测试\"大众));

启动后,创造只有 direct1 行列步队可以收到,由于这是根据绑定时利用的 routingKey=debug 决定的。

Fanout

只要行列步队绑定了交流器,则每个交流器都会收到一样的,Fanout 会忽略 routingKey。

如下代码所示:

// 创建交流器channel.ExchangeDeclare(\公众logs1\"大众, ExchangeType.Fanout);// 创建两个行列步队channel.QueueDeclare(queue: \公众fanout1\"大众);channel.QueueDeclare(queue: \公众fanout2\"大众);// 利用 routingKey 绑定交流器channel.QueueBind(exchange: \公众logs1\"大众, queue: \公众fanout1\"大众, routingKey: \公众debug\公众);channel.QueueBind(exchange: \"大众logs1\"大众, queue: \"大众fanout2\"大众, routingKey: \公众info\"大众);// 发送时,须要指定 routingKeychannel.BasicPublish(exchange: \"大众logs1\"大众,routingKey: \公众debug\"大众,basicProperties: ,body: Encoding.UTF8.GetBytes($\公众测试\公众));

Topic

Topic 会根据 routingKey 查找符合条件的行列步队,行列步队可以利用 .# 三种符号进行区配,Topic 的区配规则比较灵巧,

在创建行列步队之后,绑定交流器时,routingKey 利用表达式。

// 利用 routingKey 绑定交流器channel.QueueBind(exchange: \"大众logs3\公众, queue: \"大众topic1\公众, routingKey: \公众red.#\"大众);channel.QueueBind(exchange: \公众logs3\"大众, queue: \公众topic2\"大众, routingKey: \"大众red.yellow.#\"大众);

推送时,routingKey 须要设置完全的名称。

// 发送channel.BasicPublish(exchange: \公众logs3\公众,routingKey: \"大众red.green\"大众,basicProperties: ,body: Encoding.UTF8.GetBytes($\"大众测试\公众));

首先,routingKey 会根据 . 符号进行划分。

比如 red.yellow.green 会被拆成 [red,yellow,green] 三个部分。

如果想模糊区配一个部分,则可以利用
比如 red..green ,可以区配到 red.aaa.greenred.666.green

可以在任何一部分利用,比如 .yellow...green

# 可以区配多个部分,比如 red.# 可以区配到 red.ared.a.ared.a.a.a

完全的代码示例如下:

// 创建交流器channel.ExchangeDeclare(\"大众logs3\"大众, ExchangeType.Topic);// 创建两个行列步队channel.QueueDeclare(queue: \"大众topic1\公众);channel.QueueDeclare(queue: \"大众topic2\"大众);// 利用 routingKey 绑定交流器channel.QueueBind(exchange: \公众logs3\"大众, queue: \公众topic1\公众, routingKey: \公众red.#\"大众);channel.QueueBind(exchange: \公众logs3\公众, queue: \公众topic2\"大众, routingKey: \"大众red.yellow.#\"大众);// 发送channel.BasicPublish(exchange: \公众logs3\公众,routingKey: \公众red.green\"大众,basicProperties: ,body: Encoding.UTF8.GetBytes($\公众测试\"大众));channel.BasicPublish(exchange: \"大众logs3\"大众,routingKey: \"大众red.yellow.green\公众,basicProperties: ,body: Encoding.UTF8.GetBytes($\"大众测试\"大众));

上面推送了两条到 logs 交流器中,个中 routingKey=red.green 的,被 red.# 区配到,因此会被转发到 topic1 行列步队中。

routingKey=red.yellow.green 的,可以被两个行列步队区配,因此 topic1 和 topic 2 都可以吸收到。

交流器绑定交流器

交流器除了可以绑定行列步队,也可以绑定交流器。

示例:

将 b2 绑定到 b1 中,b2 可以得到 b1 的。

channel.ExchangeBind(destination: \公众b2\公众, source: \公众b1\"大众, routingKey: string.Empty);

绑定之后,推送到 b1 交流器的,会被转发到 b2 交流器。

完全示例代码如下:

ConnectionFactory factory = new ConnectionFactory{HostName = \公众localhost\公众};using IConnection connection = factory.CreateConnection();using IModel channel = connection.CreateModel();channel.ExchangeDeclare(exchange: \公众b1\公众, ExchangeType.Fanout);channel.ExchangeDeclare(exchange: \"大众b2\公众, ExchangeType.Fanout);// 由于两者都是 ExchangeType.Fanout,// 以是 routingKey 利用 string.Emptychannel.ExchangeBind(destination: \"大众b2\"大众, source: \"大众b1\公众, routingKey: string.Empty);// 创建行列步队channel.QueueDeclare(queue: \公众q1\"大众, durable: false, exclusive: false, autoDelete: false);channel.QueueBind(queue: \公众q1\"大众, exchange: \"大众b2\"大众, routingKey: string.Empty);int i = 0;while (i < 10){channel.BasicPublish(exchange: \"大众b1\"大众,routingKey: string.Empty,basicProperties: ,body: Encoding.UTF8.GetBytes($\"大众测试{i}\公众));i++;}

当然,可以将交流器、行列步队同时绑定到 b1 交流器中。

其余,两个交流器的类型可以不同。
不过这样会导致区配规则有点繁芜。

channel.ExchangeDeclare(exchange: \"大众b1\公众, ExchangeType.Direct);channel.ExchangeDeclare(exchange: \"大众b2\"大众, ExchangeType.Fanout);

我们可以理解成在交流器绑定时,b2 相对付一个行列步队。
当 b1 设置成 Direct 交流器时,绑定交流器时还须要指定 routingKey。

channel.ExchangeBind(destination: \公众b2\"大众, source: \公众b1\"大众, routingKey: \公众demo\"大众);

而 b2 交流器和 q2 行列步队,依然是 Fanout 关系,不受影响。

意思是说,b1、b2 是一个关系,它们的映射关系不会影响到别人,也不会影响到下一层。

完全代码示例如下:

using RabbitMQ.Client;using System.Text;ConnectionFactory factory = new ConnectionFactory{HostName = \"大众localhost\公众};using IConnection connection = factory.CreateConnection();using IModel channel = connection.CreateModel();channel.ExchangeDeclare(exchange: \"大众b1\"大众, ExchangeType.Direct);channel.ExchangeDeclare(exchange: \"大众b2\公众, ExchangeType.Fanout);// 由于两者都是 ExchangeType.Fanout,// 以是 routingKey 利用 string.Emptychannel.ExchangeBind(destination: \"大众b2\"大众, source: \公众b1\"大众, routingKey: \"大众demo\"大众);// 创建两个行列步队channel.QueueDeclare(queue: \"大众q1\"大众, durable: false, exclusive: false, autoDelete: false);channel.QueueBind(queue: \"大众q1\"大众, exchange: \"大众b2\"大众, routingKey: string.Empty);int i = 0;while (i < 10){channel.BasicPublish(exchange: \"大众b1\公众,routingKey: \公众demo\"大众,basicProperties: ,body: Encoding.UTF8.GetBytes($\公众测试{i}\"大众));i++;}消费者、属性

消费者 BasicConsume 函数定义如下:

BasicConsume(string queue,bool autoAck,string consumerTag, IDictionary<string, object> arguments, IBasicConsumer consumer)

不同的消费订阅采取不同消费者标签 (consumerTag) 来区分彼 ,在同一个通道(IModel)中的消费者 须要通过消费者标签作区分,默认情形下不须要设置。

queue:行列步队的名称。

autoAck:设置是否自动确认。

consumerTag: 消费者标签,用来区分多个消费者。

arguments:设置消费者的其他参数。

前面,我们利用了 EventingBasicConsumer 创建 IBasicConsumer 接口的消费者程序,个中,EventingBasicConsumer 包含了以下事宜:

public event EventHandler<BasicDeliverEventArgs> Received;public event EventHandler<ConsumerEventArgs> Registered;public event EventHandler<ShutdownEventArgs> Shutdown;public event EventHandler<ConsumerEventArgs> Unregistered;

这些事宜会在处理的不同阶段被触发。

消费者程序有推、拉两莳花费模式,前面所提到的代码都是推模式,即涌现新的时,RabbitMQ 会自动推送到消费者程序中。

var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var message = Encoding.UTF8.GetString(ea.Body.Span);Console.WriteLine($\公众 [x] Received {message}\公众);channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);};// 开始消费channel.BasicConsume(queue: \"大众myqueue5\"大众, autoAck: false, consumer: consumer, consumerTag: \"大众demo\公众);

如果利用拉模式(BasicGet() 函数),那么在 RabbitMQ Broker 的行列步队中没有时,会返回 。

// 开始消费while (true){var result = channel.BasicGet(queue: \"大众q1\"大众, autoAck: false);// 如果没有拉到时if (result == ) {// 没有时,避免无限拉取 Thread.Sleep(100);continue; }Console.WriteLine(Encoding.UTF8.GetString(result.Body.Span));channel.BasicAck(deliveryTag: result.DeliveryTag, multiple: false);}

当利用 BasicGet() 手动拉取消息时,该程序不会作为消费者程序存在,也便是 RabbitMQ 的 Consumer 中看不到。

两种推拉模式之下,ack 时,均有一个 multiple 参数。

如果将 multiple 设为 false,则只确认指定 deliveryTag 的一条。

如果将 multiple 设为 true,则会确认所有比指定 deliveryTag 小的并且未被确认的。

的 deliveryTag 属性是 ulong 类型,表示的偏移量,从 1.... 开始算起。

在大批量吸收并进行处理时,可以利用 multiple 来确认一组,而不必逐条确认,这样可以提高效率。

Qos 、谢绝吸收

消费者程序可以设置 Qos。

channel.BasicQos(prefetchSize: 10, prefetchCount: 10, global: false);

prefetchSize:这个参数表示消费者所能吸收未确认的总体大小的上限,设置为 0 则表示没有上限。

prefetchCount: 的方法来设置消费者客户端最大能吸收的未确认的数。
这个配置跟滑动窗口数量意思差不多。

global 则有些分外。

当 global 为 false 时,只有新的消费者须要遵守规则。

如果是 global 为 true 时,同一个 IConnection 中的消费者均会被修正配置。

// 不受影响// var result = channel.BasicConsume(queue: \公众q1\公众, autoAck: false,... ...);channel.BasicQos(prefetchSize: 0, prefetchCount: 10, global: false);// 新的消费者受影响// var result = channel.BasicConsume(queue: \"大众q1\"大众, autoAck: false,... ...);

当收到时,如果须要明确谢绝该,可以利用 BasicReject,RabbitMQ 会将该从行列步队中移除。

BasicReject() 会触发去世信。

while (true){var result = channel.BasicGet(queue: \"大众q1\"大众, autoAck: false);if (result == ) continue;Console.WriteLine(Encoding.UTF8.GetString(result.Body.Span));channel.BasicReject(deliveryTag: result.DeliveryTag, requeue: true);}

如果 requeue 参数设置为 true ,则 RabbitMQ 会重新将这条存入行列步队,以便可以发送给下个订阅的消费者,或者说该程序重启后可以重新吸收。

如果 requeue 参数设置为 false ,则 RabbitMQ立即会把从行列步队中移除,而不会把它发送给新的消费者。

如果想批量谢绝。

channel.BasicNack(deliveryTag: result.DeliveryTag, multiple: true, requeue: true);

multiple 为 true 时,则表示谢绝 deliveryTag 编号之前所有未被当前消费者确认的。

BasicRecover() 方法用来从 RabbitMQ 重新获取还未被确认的

requeue=true 时,未被确认的会被重新加入到行列步队中,对付同一条来说,其会被分配给给其它消费者。

requeue=false,同条会被分配给与之前相同的消费者。

channel.BasicRecover(requeue: true);// 异步channel.BasicRecoverAsync(requeue: true);确认模式

前面提到,当 autoAck=false 时,虽然没有 ack,但是 RabbitMQ 还是会跳到下一个。

为了担保的顺序性,在未将当前消费完成的情形下,不许可自动消费下一个。

只须要利用 BasicQos 配置即可:

channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

ConnectionFactory factory = new ConnectionFactory{HostName = \"大众localhost\"大众};using IConnection connection = factory.CreateConnection();using IModel channel = connection.CreateModel();// 创建交流器channel.ExchangeDeclare(\公众acktest\公众, ExchangeType.Fanout);// 创建两个行列步队channel.QueueDeclare(queue: \"大众myqueue5\"大众);// 利用 routingKey 绑定交流器channel.QueueBind(exchange: \"大众acktest\"大众, queue: \"大众myqueue5\"大众, routingKey: string.Empty);int i = 0;while (i < 10){// 发送channel.BasicPublish(exchange: \"大众acktest\"大众,routingKey: string.Empty,basicProperties: ,body: Encoding.UTF8.GetBytes($\公众测试\"大众));i++;}// 未 ack 之前,不能消费下一个channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var message = Encoding.UTF8.GetString(ea.Body.Span);Console.WriteLine($\公众 [x] Received {message}\公众);// channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);};// 开始消费channel.BasicConsume(queue: \公众myqueue5\"大众, autoAck: false, consumer: consumer);

之前这段代码后,你会创造,第一条未被 ack 时,程序不会自动读取下一条,也不会重新拉取未被 ack 的。

如果我们想重新读取未被 ack 的,可以重新启动程序,或利用 BasicRecover() 让做事看重新推送。

持久化

前面提到了 BasicPublish 函数的定义:

BasicPublish(string exchange, string routingKey, bool mandatory = false, IBasicProperties basicProperties = , ReadOnlyMemory<byte> body = default)

当设置 mandatory = true 时,如果交流器无法根据自身的类型和路由键找到一个符合条件的行列步队,那么 RabbitMQ 触发客户真个 IModel.BasicReturn 事宜, 将返回给生产者 。

从设计上看,一个 IConnection 虽然可以创建多个 IModel(通道),但是只建议编写一个消费者程序或生产者程序,不建议稠浊多用。

由于各种事宜和行列步队配置,是针对一个 IModel(通道) 来设置的。

using IConnection connection = factory.CreateConnection();using IModel channel = connection.CreateModel();channel.BasicReturn += (object sender, BasicReturnEventArgs e) =>{};

当设置了 mandatory = true 时,如果该找不到行列步队存储,那么就会触发客户真个 BasicReturn 事宜吸收 BasicPublish 失落败的。

完全示例代码如下:

using RabbitMQ.Client;using RabbitMQ.Client.Events;using System.Runtime;using System.Text;ConnectionFactory factory = new ConnectionFactory{HostName = \公众localhost\"大众};using IConnection connection = factory.CreateConnection();using IModel channel = connection.CreateModel();channel.ExchangeDeclare(exchange: \"大众e2\"大众, type: ExchangeType.Fanout, durable: false, autoDelete: false);channel.BasicReturn += (object? s, BasicReturnEventArgs e) =>{Console.WriteLine($\公众无效:{Encoding.UTF8.GetString(e.Body.Span)}\"大众);};int i = 0;while (i < 10){channel.BasicPublish(exchange: \公众e2\"大众,routingKey: string.Empty,// mandatory=true,当没有行列步队吸收时,会触发 BasicReturn 事宜mandatory: true,basicProperties: ,body: Encoding.UTF8.GetBytes($\"大众测试{i}\"大众));i++;}Console.ReadLine();

在实际开拓中,当 mandatory=false 时,如果一条推送到交流器,但是却没有绑定行列步队,那么该条就会丢失,可能会导致严重的后果。

而在 RabbitMQ 中,供应了一种被称为备胎交流器的方案,这是通过在定义交流器时添加 alternate-exchange 参数来实现。
其浸染是当 A 交流器无法找到行列步队转发时,就会将转发到 B 行列步队中。

完全代码示例如下:

首先创建 e3_bak 行列步队,接着创建 e3 行列步队时设置其备胎交流器为 e3_bak。

然后,e3_bak 须要绑定一个行列步队消费。

ConnectionFactory factory = new ConnectionFactory{HostName = \"大众localhost\"大众};using IConnection connection = factory.CreateConnection();using IModel channel = connection.CreateModel();channel.ExchangeDeclare(exchange: \"大众e3_bak\"大众,type: ExchangeType.Fanout,durable: false,autoDelete: false);// 声明 e3 交流器,当 e3 交流器没有绑定行列步队时,将会被转发到 e3_bak 交流器channel.ExchangeDeclare(exchange: \"大众e3\公众,type: ExchangeType.Fanout,durable: false,autoDelete: false,arguments: new Dictionary<string, object> {{ \"大众alternate-exchange\公众, \"大众e3_bak\公众 }});channel.QueueDeclare(queue: \"大众q3\公众, durable: false, exclusive: false, autoDelete: false);channel.QueueBind(queue: \"大众q3\"大众, \公众e3_bak\"大众, routingKey: string.Empty);// 由于已经设置了 e3 的备用交流器,以是不会触发 BasicReturnchannel.BasicReturn += (object? s, BasicReturnEventArgs e) =>{Console.WriteLine($\"大众无效:{Encoding.UTF8.GetString(e.Body.Span)}\"大众);};int i = 0;while (i < 10){channel.BasicPublish(exchange: \公众e3\"大众,routingKey: string.Empty,// 由于已经设置了 e3 的备用交流器,以是开启这个不会触发 BasicReturnmandatory: true,basicProperties: ,body: Encoding.UTF8.GetBytes($\公众测试{i}\"大众));i++;}Console.ReadLine();

把稳,如果备胎交流器有没有绑定得当行列步队的话,那么该就会丢失。

如果 e3 是 Direct,e3_bak 也是 Direct,那么须要两者具有相同的 routingKey,如果 e3 中有个 routingKey = cat,但是 e3_bak 中不存在对应的 routingKey,那么该还是会丢失的。
还有其它一些情形,这里不再赘述。

推送时,有一个 IBasicProperties basicProperties 属性,前面的小节中已经先容过该接口的属性,当 IBasicProperties.DeliveryMode=2 时,将被标记为持久化,纵然 RabbitMQ 做事看重启,也不会丢失。

相对来说,通过前面的实验,你可以不雅观察到客户端把行列步队的都消费完毕后,行列步队中的都会消逝。
而对应 Kafka 来说,一个 topic 中的被消费,其依然会被保留。
这一点要把稳,利用 RabbitMQ 时,须要提前设置好行列步队的持久化,避免消费或未成功消费时,丢失。

生产者在推送时,可以利用 IBasicProperties.DeliveryMode=2 将该设置为持久化。

var ps = channel.CreateBasicProperties();ps.DeliveryMode = 2;channel.BasicPublish(exchange: \"大众e3\"大众,routingKey: string.Empty,mandatory: false,basicProperties: ps,body: Encoding.UTF8.GetBytes($\"大众测试{i}\公众)); TTL 韶光

设置 TTL 韶光后,该如果在一定韶光内没有被消费,那么该就成为了去世信。
对付这种,会有大概这么两个处理情形。

第一种,如果行列步队设置了 \"大众x-dead-letter-exchange\"大众 ,那么该会被从行列步队转发到另一个交流器中。
这种方法在去世信交流器一节中会先容。

第二种,被丢弃。

目前有两种方法可以设置的 TTL 。

第一种方法是通过行列步队属性设置,这样一来行列步队中所有都有相同的过期韶光。

第二种方法是对单条进行单独设置,每条的 TTL 可以不同。

如果两种设置一起利用,则的 TTL 以两者之间较小的那个数值为准。
在行列步队中的生存时一旦超过设置 TTL 值时,消费者将无法再收到该,以是最好设置去世信交流器。

第一种,对行列步队设置:

channel.QueueDeclare(queue: \"大众q4\"大众,durable: false,exclusive: false,autoDelete: false,arguments: new Dictionary<string, object>() { { \"大众x-message-ttl\公众, 6000 } });

第二种通过设置属性配置过期韶光。

var ps = channel.CreateBasicProperties();// 单位毫秒ps.Expiration = \"大众6000\公众;

对付第一种设置行列步队属性的方法,一旦过期就会从行列步队中抹去(如果设置了去世信交流器,会被转发到去世信交流器中)。
而在第二种方法中,纵然过期,也不会立时从行列步队中抹去,由于该条在即将投递到消费者之前,才会检讨是否过期。
对付第二种情形,当行列步队进行任何一次轮询操作时,才会被真正移除。

对付第二种情形,虽然是在被轮询时,过期了才会被真正移除,但是一旦过期,就会被转发到去世信行列步队中,只是不会立即移除。

行列步队 TTL 韶光

当对一个行列步队设置 TTL 时,如果该行列步队在规定韶光内没被利用,那么该行列步队就会被删除。
这个约束包括一段韶光内没有被消费(包括 BasicGet() 办法消费的)、没有被重新声明、没有消费者连接,否则被删除的倒计韶光会被重置。

channel.QueueDeclare(queue: \"大众q6\"大众,durable: false,exclusive: false,autoDelete: false,arguments: new Dictionary<string, object>{// 单位是毫秒,设置 行列步队过期韶光是 1 小时{\"大众x-expires\公众,136001000}});

DLX 去世信交流器

DLX(Dead-Letter-Exchange) 去世信交流器,在一个行列步队 A 中变成去世信之后,它能被重新被发送到另一个 B 交流器中。
个中 A 行列步队绑定了去世信交流器,那么在Management UI 界面会看到 DLX 标识,而 B 交流器便是一个普通的交流器,无需配置。

变成去世信 般是由于以下几种情形:

被消费者谢绝,BasicReject()BasicNack() 两个函数可以谢绝。

过期。

行列步队达到最大长度。

当这个行列步队 A 中存在去世信时,RabbitMQ 就会自动地将这个重新发布到设置的交流器 B 中。
一样平常会专门给主要的行列步队设置去世信交流器 B,而交流器 B 也须要绑定一个行列步队 C 才行,不然也会丢失。

设置行列步队涌现去世信时,将转发到哪个交流器中:

channel.QueueDeclare(queue: \"大众q7\公众, durable: false, exclusive: false, autoDelete: false,arguments: new Dictionary<string, object> {{ \"大众x-dead-letter-exchange\"大众, \公众e7_bak\"大众 } });

完全示例代码如下所示:

using RabbitMQ.Client;using RabbitMQ.Client.Events;using System.Text;ConnectionFactory factory = new ConnectionFactory{HostName = \公众localhost\公众};using IConnection connection = factory.CreateConnection();using IModel channel = connection.CreateModel();channel.ExchangeDeclare(exchange: \"大众e7_bak\"大众,type: ExchangeType.Fanout,durable: false,autoDelete: false);channel.QueueDeclare(queue: \"大众q7_bak\"大众, durable: false, exclusive: false, autoDelete: false);channel.QueueBind(queue: \"大众q7_bak\"大众, \"大众e7_bak\"大众, routingKey: string.Empty);channel.ExchangeDeclare(exchange: \"大众e7\"大众,type: ExchangeType.Fanout,durable: false,autoDelete: false);channel.QueueDeclare(queue: \公众q7\"大众, durable: false, exclusive: false, autoDelete: false,arguments: new Dictionary<string, object> {{ \公众x-dead-letter-exchange\公众, \"大众e7_bak\公众 } });channel.QueueBind(queue: \"大众q7\"大众, \公众e7\公众, routingKey: string.Empty);int i = 0;while (i < 10){channel.BasicPublish(exchange: \"大众e7\"大众,routingKey: string.Empty,mandatory: false,basicProperties: ,body: Encoding.UTF8.GetBytes($\"大众测试{i}\"大众));i++;}Thread.Sleep(1000);int y = 0;// 定义消费者channel.BasicQos(0, prefetchCount: 1, true);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var message = Encoding.UTF8.GetString(ea.Body.Span);Console.WriteLine($\公众 [x] Received {message}\"大众);if (y % 2 == 0)channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);// requeue 要设置为 false 才行,// 否则此被谢绝后还会被放回行列步队。
elsechannel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false);Interlocked.Add(ref y, 1);};// 开始消费channel.BasicConsume(queue: \公众q7\"大众, autoAck: false, consumer: consumer);Console.ReadLine();

延迟行列步队

RabbitMQ 本身没有直接支持延迟行列步队的功能。

那么为什么会涌现延迟行列步队这种东西呢?

紧张是由于推送后,不想立即被消费。
比如说,用户下单后,如果 10 分钟内没有支付,那么该订单会被自动取消。
以是须要做一个被延迟消费的功能。

以是说,实际需求是,该在一定韶光之后才能被消费者消费。

在 RabbitMQ 中做这个功能,须要利用两个交流器,以及至少两个行列步队。

思路是定义两个交流器 e8、e9 和两个行列步队 q8、q9,交流器 e8 和行列步队 q8 绑定、交流器 e9 和 q9 绑定。

最主要的一点来了,q9 设置了去世信行列步队,当 TTL 韶光到时,转发到 e9 交流器中。
以是,e9 交流器 - q9 行列步队 吸收到的都是到期(或者说过期)的。

在发送到 e8 交流器时,设置 TTL 韶光。
当 q8 行列步队中的过期时,会被转发到 e9 交流器,然后存入 q9 行列步队。

消费者只须要订阅 q9 行列步队,即可消费到期后的。

全部完全代码示例如下:

using RabbitMQ.Client;using RabbitMQ.Client.Events;using System.Text;ConnectionFactory factory = new ConnectionFactory{HostName = \"大众localhost\"大众};using IConnection connection = factory.CreateConnection();using IModel channel = connection.CreateModel();channel.ExchangeDeclare(exchange: \"大众e8\"大众,type: ExchangeType.Fanout,durable: false,autoDelete: false);channel.ExchangeDeclare(exchange: \"大众e9\"大众,type: ExchangeType.Fanout,durable: false,autoDelete: false);channel.QueueDeclare(queue: \公众q9\公众, durable: false, exclusive: false, autoDelete: false);channel.QueueBind(queue: \"大众q9\公众, \公众e9\"大众, routingKey: string.Empty);channel.QueueDeclare(queue: \公众q8\"大众, durable: false, exclusive: false, autoDelete: false,arguments: new Dictionary<string, object> {{ \"大众x-dead-letter-exchange\"大众, \"大众e9\"大众 } });channel.QueueBind(queue: \"大众q8\"大众, \"大众e8\"大众, routingKey: string.Empty);int i = 0;while (i < 10){var ps = channel.CreateBasicProperties();ps.Expiration = \公众6000\"大众;channel.BasicPublish(exchange: \"大众e8\公众,routingKey: string.Empty,mandatory: false,basicProperties: ps,body: Encoding.UTF8.GetBytes($\公众测试{i}\"大众));i++;}var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var message = Encoding.UTF8.GetString(ea.Body.Span);Console.WriteLine($\"大众 [x] 已到期 {message}\公众);};// 开始消费channel.BasicConsume(queue: \公众q9\公众, autoAck: true, consumer: consumer);Console.ReadLine();优先级

优先级越高,就会越快被消费者消费。

代码示例如下:

var ps = channel.CreateBasicProperties();// 优先级 0-9 ps.Priority = 9;channel.BasicPublish(exchange: \公众e8\公众,routingKey: string.Empty,mandatory: false,basicProperties: ps,body: Encoding.UTF8.GetBytes($\"大众测试{i}\"大众));

以是说,RabbitMQ 不一定可以担保的顺序性,这一点跟 Kafka 是有差异的。

事务机制

事务机制是,发布者确定一定推送到 RabbitMQ Broker 中,每每会跟业务代码一起利用。

比如说,用户成功支付之后,推送一个关照到 RabbitMQ 行列步队中。

数据库当然要干事务,这样在支付失落败后修正的数据会被回滚。
但是问题来了,如果已经推送了,但是数据库却回滚了。

这个时候会涉及到同等性,可以利用 RabbitMQ 的事务机制来处理,其思路跟数据库事务过程差不多,也是有提交和回滚操作。

其目的是确保成功推送到 RabbitMQ Broker 以及跟客户端其它代码保持数据同等,推送跟代码操作同时成功或同时回滚。

其完全的代码示例如下:

ConnectionFactory factory = new ConnectionFactory{HostName = \"大众localhost\"大众};using IConnection connection = factory.CreateConnection();using IModel channel = connection.CreateModel();// 客户端发送 Tx.Select.将信道置为事务模式;channel.TxSelect();try{// 发送channel.QueueDeclare(queue: \"大众transaction_queue\"大众, durable: false, exclusive: false, autoDelete: false, arguments: );string message = \公众Hello, RabbitMQ!\"大众;var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: \公众\公众, routingKey: \"大众transaction_queue\公众, basicProperties: , body: body);// 实行一系列操作// 提交事务channel.TxCommit();Console.WriteLine(\"大众 [x] Sent '{0}'\"大众, message);}catch (Exception e){// 回滚事务channel.TxRollback();Console.WriteLine(\"大众An error occurred: \公众 + e.Message);}Console.ReadLine();发送方确认机制

发送方确认机制,是担保一定推送到 RabbitMQ 的方案。

而事务机制,一样平常是为了担保同等性,推送和其它操作同时成功或同时失落败,不能涌现两者不一致的情形。

其完全代码示例如下:

using IConnection connection = factory.CreateConnection();using IModel channel = connection.CreateModel();// 开启示送方确认模式channel.ConfirmSelect();string exchangeName = \"大众exchange_name\公众;string routingKey = \公众routing_key\"大众;// 定义交流器channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct);// 发送string message = \"大众Hello, RabbitMQ!\"大众;var body = Encoding.UTF8.GetBytes(message);// 发布channel.BasicPublish(exchange: exchangeName, routingKey: routingKey, basicProperties: , body: body);// 等待确认已推送到 RabbitMQif (channel.WaitForConfirms()){Console.WriteLine(\"大众 [x] Sent '{0}'\"大众, message);}else{Console.WriteLine(\"大众Message delivery failed.\"大众);}Console.ReadLine();

文章写到这里,恰好一万词。

对付 RabbitMQ 集群、运维等技能,本文不再赘述。

标签:

相关文章

phphash缓存技巧_Redis 哈希Hash最全介绍

小编Redis的哈希(Hash)是一种非常有用的数据构造,它许可用户将多个字段和值组合成一个单独的数据项。每个哈希可以包含多个字段...

网站推广 2024-12-12 阅读0 评论0

php数据直方图技巧_拉勾网上Java vs PHP

step1:首先剖析一下要抓取的页面信息进行翻页操作的时候,post要求的参数紧张几个参数是city,pn,kd分别对应关键词城市...

网站推广 2024-12-12 阅读0 评论0