首页 » Web前端 » php应用xmpp接口技巧_一篇文章带你彻底玩转RabbitMQ

php应用xmpp接口技巧_一篇文章带你彻底玩转RabbitMQ

访客 2024-12-17 0

扫一扫用手机浏览

文章目录 [+]

MQ(行列步队),顾名思义,实质上是一种先入先出的行列步队,只不过个中存放的是。
它也是一种跨进程的通信机制,用于高下游通报信息。
在互联网架构中,MQ 是一种常见的高下游“逻辑解耦+物理解耦”的通信做事。
利用 MQ 后,发送上游只需依赖 MQ,无需依赖其他做事。

2、为什么要用 MQ

php应用xmpp接口技巧_一篇文章带你彻底玩转RabbitMQ

1、流量消峰

php应用xmpp接口技巧_一篇文章带你彻底玩转RabbitMQ
(图片来自网络侵删)

举个例子,假设订单系统最多能处理一万次订单。
在正常时段,这个处理能力足够应对用户的下单需求,由于用户下单后一秒就能收到结果。
然而,在高峰期,如果有两万次下单,操作系统就无法处理这么多订单,只能限定定单超过一万后不许可用户下单。

为理解决这个问题,我们可以利用行列步队作为缓冲区。
通过这种办法,我们可以取消对订单数量的限定,将一秒内的订单分散到一段韶光内进行处理。
虽然有些用户可能须要等待十几秒才能收到下单成功的操作,但这种体验仍旧比无法下单要好得多。

2、运用解耦

以电商运用为例,运用中包括订单系统、库存系统、物流系统和支付系统。
当用户创建订单后,如果直接调用库存系统、物流系统和支付系统,一旦个中任何一个子系统涌现故障,都会导致下单操作非常。
然而,当采取基于行列步队的办法后,系统间调用的问题会大大减少。
例如,当物流系统发生故障并须要几分钟来修复时,物流系统须要处理的订单信息会被缓存在行列步队中。
这样,用户的下单操作可以正常完成,而不受物流系统故障的影响。
当物流系统规复后,它只需连续处理订单信息即可。
对付下单的用户来说,他们险些感想熏染不到物流系统的故障,从而提升了全体系统的可用性。

3、异步处理

有些做事之间的调用是异步的,例如A调用B,B须要花费很永劫光实行。
但是A须要知道B什么时候可以实行完。
以前一样平常有两种常见的办法:一种是A过一段韶光去调用B的查询API查询;另一种是A供应一个回调API,B实行完之后调用该API关照A做事。
然而,这两种办法都不是很优雅。

利用总线可以很方便地办理这个问题。
当A调用B做事后,只须要监听B处理完成的。
一旦B处理完成,它会发送一条给行列步队(MQ)。
然后,MQ会将此转发给A做事。
这样,A做事既不须要循环调用B的查询API,也不须要供应回调API。
同样地,B做事也不须要做这些操作。
通过这种办法,A做事能够及时地得到异步处理成功的。

3、MQ 的分类

ActiveMQ: 优点:单机吞吐量达到万级,时效性在毫秒级别,可用性高,基于主从架构实现高可用性,可靠性较低的概率丢失数据。
缺陷:官方社区对ActiveMQ 5.x的掩护越来越少,高吞吐量场景较少利用。
Kafka: 优点:性能卓越,单机写入吞吐量约在百万条/秒,最大的优点是吞吐量高。
时效性在毫秒级别,可用性非常高,Kafka是分布式的,一个数据多个副本,少数机器宕机不会丢失数据,不会导致不可用。
消费者采取Pull办法获取消息,有序,通过掌握能够担保所有被消费且仅被消费一次。
有精良的第三方Kafka Web管理界面Kafka-Manager。
在日志领域比较成熟,被多家公司和多个开源项目利用。
功能支持较为大略,紧张支持大略的MQ功能,在大数据领域的实时打算以及日志采集被大规模利用。
缺陷:Kafka单机超过64个行列步队/分区时,Load会发生明显的飙高征象,行列步队越多,load越高,发送相应韶光变长。
利用短轮询办法,实时性取决于轮询间隔韶光。
消费失落败不支持重试。
支持顺序,但是一台代理宕机后,就会产生乱序。
社区更新较慢。
RocketMQ: 优点:单机吞吐量达到十万级,可用性非常高,分布式架构,可以做到0丢失。
MQ功能较为完善,还是分布式的,扩展性好,支持10亿级别的堆积,不会由于堆积导致性能低落。
源码是Java,我们可以自己阅读源码,定制自己公司的MQ。
缺陷:支持的客户端措辞不多,目前是Java及C++,个中C++不成熟。
社区生动度一样平常,没有在MQ核心中去实现JMS等接口,有些系统要迁移须要修正大量代码。
RabbitMQ: 优点:由于Erlang措辞的高并发特性,性能较好;吞吐量达到万级,MQ功能比较完备。
健壮、稳定、易用、跨平台、支持多种措辞如Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等。
支持AJAX文档完好;开源供应的管理界面非常棒,用起来很好用。
社区生动度高;更新频率相称高。
缺陷:商业版须要收费,学习本钱较高。

4、MQ 的选择

ActiveMQ 是一个开源的行列步队系统,具有高可靠性和高性能的特点。
它支持多种通报模式,包括点对点、发布/订阅和要求/相应等。
ActiveMQ 供应了丰富的功能和灵巧的配置选项,适用于各种企业级运用程序的开拓和支配。
RabbitMQ 是一个用 Erlang 措辞编写的开源行列步队系统,具有高可靠性和可扩展性。
它支持多种通报模式,包括点对点、发布/订阅和路由等。
RabbitMQ 供应了丰富的插件和工具,可以方便地与其他系统集成,并供应了强大的管理和监控功能。
Kafka 是一个分布式流处理平台,紧张用于大规模数据流的处理和存储。
它具有高吞吐量、低延迟和可扩展性的特点,适用于构建实时数据处理和剖析系统。
Kafka 支持多个消费者组和分区,可以实现负载均衡和容错机制。
RocketMQ 是阿里巴巴开源的一款分布式中间件,具有高可靠性、高吞吐量和低延迟的特点。
它支持多种通报模式,包括事务、定时和顺序等。
RocketMQ 供应了丰富的配置选项和监控工具,适用于构建大规模的分布式系统。

5、RabbitMQ 的观点

在实际运用中,RabbitMQ扮演了一个可靠的通信中介的角色。
可以把它想象成一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员终极会把你的快递送到收件人那里。
同理,当您须要发送一条时,你只须要将该发送到RabbitMQ做事器,RabbitMQ就会卖力将这个路由到精确的目的地。

6、四大核心观点

生产者

产生数据发送的程序是生产者

交流机

交流机是 RabbitMQ 非常主要的一个部件,一方面它吸收来自生产者的,另一方面它将推送到行列步队中。
交流机必须确切知道如何处理它吸收到的,是将这些推送到特定行列步队还是推送到多个行列步队,亦或者是把丢弃,这个得有交流机类型决定

行列步队

行列步队是 RabbitMQ 内部利用的一种数据构造,只管流经 RabbitMQ 和运用程序,但它们只能存储在行列步队中。
行列步队仅受主机的内存和磁盘限定的约束,实质上是一个大的缓冲区。
许多生产者可以将发送到一个行列步队,许多消费者可以考试测验从一个行列步队吸收数据。
这便是我们利用行列步队的办法

消费者

消费与吸收具有相似的含义。
消费者大多时候是一个等待吸收的程序。
请把稳生产者,消费者和中间件很多时候并不在同一机器上。
同一个运用程序既可以是生产者又是可以是消费者。

7、RabbitMQ的六种事情模式

7.1、simple大略模式

1、 产生着将放入行列步队;

2、 的消费者(consumer)监听(while)行列步队,如果行列步队中有,就消费掉,被拿走后,自动从行列步队中删除(隐患可能没有被消费者精确处理,已经从行列步队中消逝了,造成的丢失)运用处景:谈天(中间有一个过度的做事器;p端,c端);

7.2、work事情模式(资源的竞争)

1、 产生者将放入行列步队消费者可以有多个,消费者1,消费者2,同时监听同一个行列步队,被消费?C1C2共同争抢当前的行列步队内容,谁先拿到谁卖力消费(隐患,高并发情形下,默认会产生某一个被多个消费者共同利用,可以设置一个开关(syncronize,与同步锁的性能不一样)担保一条只能被一个消费者利用);

2、 运用处景:红包;大项目中的资源调度(任务分配系统不需知道哪一个任务实行系统在空闲,直接将任务扔到行列步队中,空闲的系统自动争抢);

7.3、publish/subscribe发布订阅(共享资源)

1、 X代表交流机rabbitMQ内部组件,erlang产生者是代码完成,代码的实行效率不高,产生者将放入交流机,交流机发布订阅把发送到所有行列步队中,对应行列步队的消费者拿到进行消费;

2、 干系场景:邮件群发,群谈天,广播(广告);

7.4、routing路由模式

1、 生产者将发送给交流机按照路由判断,路由是字符串(info)当前产生的携带路由字符(工具的方法),交流机根据路由的key,只能匹配上路由key对应的行列步队,对应的消费者才能消费;;

2、 根据业务功能定义路由字符串;

3、 从系统的代码逻辑中获取对应的功能字符串,将任务扔到对应的行列步队中业务场景:error关照;EXCEPTION;缺点关照的功能;传统意义的缺点关照;客户关照;利用key路由,可以将程序中的缺点封装成传入到行列步队中,开拓者可以自定义消费者,实时吸收缺点;;

7.5、topic 主题模式(路由模式的一种)

1、 星号井号代表通配符; 2、 星号代表多个单词,井号代表一个单词; 3、 路由功能添加模糊匹配; 4、 产生者产生,把交给交流机; 5、 交流机根据key的规则模糊匹配到对应的行列步队,由行列步队的监听消费者吸收消费;

7.6、RPC 模式

RPC即客户端远程调用做事真个方法 ,利用MQ可以实现RPC的异步调用,基于Direct交流机实现,流程如下:

1、 客户端即是生产者也是消费者,向RPC要求行列步队发送RPC调用,同时监听RPC相应行列步队;

2、 做事端监听RPC要求行列步队的,收到后实行做事真个方法,得到方法返回的结果;

3、 做事端将RPC方法的结果发送到RPC相应行列步队;

4、 客户端(RPC调用方)监听RPC相应行列步队,吸收到RPC调用结果;

8、RabbitMQ事情事理

Broker:吸收和分发的运用,RabbitMQ Server 便是 Message Broker

Virtual host:出于多租户和安全成分设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 观点。
当多个不同的用户利用同一个 RabbitMQ server 供应的做事时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等

Connection:publisher/consumer 和 broker 之间的 TCP 连接

Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。
Channel 是在 connection 内部建立的逻辑连接,如果运用程序支持多线程,常日每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,以是 channel 之间是完备隔离的。
Channel 作为轻量级的Connection 极大减少了操作系统建立 TCP connection 的开销

Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发到 queue 中去。
常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout(multicast)

Queue:终极被送到这里等待 consumer 取走

Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

第二部分

这两个知识点可以查看主页其他两个文章

RbbitMQ 持久化和权重分配信息

RaabitMq去世信行列步队

第三部分

1、导入依赖

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>

2、simple大略模式

P:生产者,也便是要发送的程序

C:消费者:的接管者,会一贯等待到来。

queue:行列步队,图中赤色部分。
类似一个邮箱,可以缓存;生产者向个中投递,消费者从个中取出。

2.1、编写消费者

package com.littyxin;import org.springframework.amqp.rabbit.annotation.Queue;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component// 生产端没有指定交流机只有routingKey和Object。
//消费方产生hello行列步队,放在默认的交流机(AMQP default)上。
//而默认的交流机有一个特点,只要你的routerKey的名字与这个//交流机的行列步队有相同的名字,他就会自动路由上。
//生产端routingKey 叫hello ,消费端生产hello行列步队。
//他们就路由上了@RabbitListener(queuesToDeclare = @Queue(value = "hello")) //表示RabbitMQ消费者,声明一个行列步队public class HelloConsumer { @RabbitHandler //当消费者从行列步队取出时的回调方法 public void receive(String message){ System.out.println("message = " + message); }}

2.2、编写生产者测试类

package com.littyxin;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;@SpringBootTest(classes = RabbitmqSpringbootApplication.class)@RunWith(SpringRunner.class)public class TestRabbitMQ { //注入rabbitTemplate @Autowired private RabbitTemplate rabbitTemplate; //hello world @Test public void testHelloWorld(){ //转换和发送 1.routingKey 2. rabbitTemplate.convertAndSend("hello","hello world"); }}

2.3、运行生产者测试类

3、work事情模式(资源的竞争)

Work queues,也被称为(Task queues),任务模型。
当处理比较耗时的时候,可能生产的速率会远远大于的消费速率。
长此以往,就会堆积越来越多,无法及时处理。
此时就可以利用work 模型:让多个消费者绑定到一个行列步队,共同消费行列步队中的。
行列步队中的一旦消费,就会消逝,因此任务是不会被重复实行的。

轮询分发

一个生产者发送,由多个事情线程(消费者)轮询吸收

3.1、消费者

package com.littyxin;import org.springframework.amqp.rabbit.annotation.Queue;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class WorkConsumer { //第一个消费者 @RabbitListener(queuesToDeclare = @Queue("work")) //@RabbitListener在方法上代表它监听这个方法作为行列步队消费回调 public void receive1(String message){ System.out.println("message1 = " + message); } //第二个消费者 @RabbitListener(queuesToDeclare = @Queue("work")) //@RabbitListener在方法上代表它监听这个方法作为行列步队消费回调 public void receive2(String message){ System.out.println("message2 = " + message); }}

3.2、生产者

@Testpublic void testWork(){ for (int i = 0; i < 10; i++) { rabbitTemplate.convertAndSend("work","work模型"); }}

3.3、运行测试

4、fanout广播模型

publish/subscribe发布订阅(共享资源)

4.1、消费者

package com.littyxin;import org.springframework.amqp.rabbit.annotation.Exchange;import org.springframework.amqp.rabbit.annotation.Queue;import org.springframework.amqp.rabbit.annotation.QueueBinding;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class FanoutConsumer { @RabbitListener(bindings = { @QueueBinding( value = @Queue, //创建临时行列步队 exchange =@Exchange(value = "logs",type = "fanout") //绑定的交流机 ) }) public void receive1(String message){ System.out.println("message1 = " + message); } @RabbitListener(bindings = { @QueueBinding( value = @Queue, //创建临时行列步队 exchange =@Exchange(value = "logs",type = "fanout") //绑定的交流机 ) }) public void receive2(String message){ System.out.println("message2 = " + message); }}

4.2、生产者

@Testpublic void testFanout(){ rabbitTemplate.convertAndSend("logs","","Fanout模型发送的");}

4.3、运行测试

5、route路由模型

在Fanout模式中,一条,会被所有订阅的行列步队都消费。
但是,在某些场景下,我们希望不同的被不同的行列步队消费。
这时就要用到Direct类型的Exchange。

在Direct模型下:

行列步队与交流机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)的发送方在 向 Exchange发送时,也必须指定的 RoutingKey。
Exchange不再把交给每一个绑定的行列步队,而是根据的Routing Key进行判断,只有行列步队的Routingkey与的 Routing key完备同等,才会吸收到

P:生产者,向Exchange发送,发送时,会指定一个routing key。
X:Exchange(交流机),吸收生产者的,然后把递交给与routing key完备匹配的行列步队C1:消费者,其所在行列步队指定了须要routing key 为 error 的C2:消费者,其所在行列步队指定了须要routing key 为 info、error、warning 的

在上面这张图中,我们可以看到 X 绑定了两个行列步队,绑定类型是 direct。
行列步队 Q1 绑定键为 orange,行列步队 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green

在这种绑定情形下,生产者发布到 exchange 上,绑定键为 orange 的会

5.1、消费者

package com.littyxin;import org.springframework.amqp.rabbit.annotation.Exchange;import org.springframework.amqp.rabbit.annotation.Queue;import org.springframework.amqp.rabbit.annotation.QueueBinding;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class RouteConsumer { @RabbitListener(bindings = { @QueueBinding( value = @Queue,//临时行列步队 exchange = @Exchange(value = "directs",type = "direct"),//指定交流机名称和类型 key = {"info","error","warn"} ) }) public void receive1(String message){ System.out.println("message1 = " + message); } @RabbitListener(bindings = { @QueueBinding( value = @Queue,//临时行列步队 exchange = @Exchange(value = "directs",type = "direct"),//指定交流机名称和类型 key = {"error"} ) }) public void receive2(String message){ System.out.println("message2 = " + message); }}

5.2、生产者

@Testpublic void testRoute(){ rabbitTemplate.convertAndSend("directs","error","发送info的key的路由信息");}

5.3、运行测试

6、Topic动态路由模型

Topic类型的Exchange与Direct比较,都是可以根据RoutingKey把路由到不同的行列步队。
只不过Topic类型Exchange可以让行列步队在绑定Routing key 的时候利用通配符!
这种模型Routingkey 一样平常都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

Topic 的哀求

发送到类型是 topic 交流机的的 routing_key 不能随意写,必须知足一定的哀求,它必须是一个单词列表,以点号分别隔。
这些单词可以是任意单词,比如说:"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit".这种类型的。
当然这个单词列表最多不能超过 255 个字节。

在这个规则列表中,个中有两个更换符须要把稳:

(星号)可以代替一个单词

#(井号)可以替代零个或多个单词

Topic 匹配案例

下图绑定关系如下:

Q1-->绑定的是

中间带 orange 带 3 个单词的字符串(.orange.)

Q2-->绑定的是

末了一个单词是 rabbit 的 3 个单词(..rabbit)第一个单词是 lazy 的多个单词(lazy.#)

上图是一个行列步队绑定关系图,我们来看看他们之间数据吸收情形是怎么样的quick.orange.rabbit------->被行列步队 Q1Q2 吸收到lazy.orange.elephant ------->被行列步队 Q1Q2 吸收到quick.orange.fox ------->被行列步队 Q1 吸收到lazy.brown.fox ------->被行列步队 Q2 吸收到lazy.pink.rabbit-------> 虽然知足两个绑定但只被行列步队 Q2 吸收一次quick.brown.fox ------->不匹配任何绑定不会被任何行列步队吸收到会被丢弃quick.orange.male.rabbit-------> 是四个单词不匹配任何绑定会被丢弃lazy.orange.male.rabbit-------> 是四个单词但匹配 Q2

6.1、消费者

package com.littyxin;import org.springframework.amqp.rabbit.annotation.Exchange;import org.springframework.amqp.rabbit.annotation.Queue;import org.springframework.amqp.rabbit.annotation.QueueBinding;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Componentpublic class TopicConsumer { @RabbitListener(bindings = { @QueueBinding( value = @Queue,//临时行列步队 exchange = @Exchange(type = "topic",name = "topics"), key = {"user.save","user."} ) }) public void receive1(String message){ System.out.println("message1 = " + message); } @RabbitListener(bindings = { @QueueBinding( value = @Queue,//临时行列步队 exchange = @Exchange(type = "topic",name = "topics"), key = {"order.#","produce.#","user."} ) }) public void receive2(String message){ System.out.println("message2 = " + message); }}

6.2、生产者

@Testpublic void testTopic(){ rabbitTemplate.convertAndSend("topics","user.save","user.save 路由");}

6.3、运行测试

后边会有Rabbitmq和Springboot干系的文章,关注我!

相关文章