what?消费-生产者模型?对,没错!
便是大学操作系统课程里面的“消费者-生产者模式”,记得当时被这个问题坑的不轻啊。
在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步操作,而这种异步处理的办法大大的节省了做事器的要求韶光,从而提高了系统的吞吐量。而且不影响做事器做其他相应,不独占做事器资源。
如:注册用户这种做事,它可能解耦成好几种独立的做事(账号验证,邮箱验证码,手机短信码等)。它们作为消费者,等待用户输入数据,在前台数据提交之后会经由分解并发送到各个做事所在的url,分发的那个角色就相称于生产者。消费者在获取数据时候有可能一次不能处理完,那么它们各自有一个要求行列步队,那便是内存缓冲区了。做这项事情的框架叫做行列步队。

又比如:电商系统中的订单处理系统,传统处理模式是:下订单的时候,订单系统可能会调用库存系统的接口,这样两个别系之间存在一个严重依赖关系,如果库存系统宕机,那么全体流程都会受到影响。现在大多公司的处理方法是:引入行列步队,下完订单,订单系统完成持久化处理,将写入行列步队,返回用户订单下单成功。
对库存系统来说,采取拉/推的办法,获取下单信息,库存系统根据下单信息,进行库存操作。这样实现了两个别系间的解耦。
纵然不才单时库存系统不能正常利用。也不影响正常下单,由于下单后,订单系统写入行列步队就不再关心其他的后续操作了。
给一张构造图:
几个观点解释:
Broker:大略来说便是行列步队做事器实体。
Exchange:交流机,它指定按什么规则,路由到哪个行列步队。
Queue:行列步队载体,每个都会被投入到一个或多个行列步队。
Binding:绑定,它的浸染便是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行投递。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
producer:生产者,便是投递的程序。
consumer:消费者,便是接管的程序。
channel:通道,在客户真个每个连接里,可建立多个channel,每个channel代表一个会话任务。
行列步队的利用过程大概如下:
(1)客户端连接到行列步队做事器,打开一个channel。
(2)客户端声明一个exchange,并设置干系属性。
(3)客户端声明一个queue,并设置干系属性。
(4)客户端利用routing key,在exchange和queue之间建立好绑定关系。
(5)客户端投递到exchange。
exchange吸收到后,就根据的key和已经设置的binding,进行路由,将投递到一个或多个行列步队里。
exchange也有几个类型,完备根据key进行投递的叫做Direct交流机,例如,绑定时设置了routing key为”abc”,那么客户端提交的,只有设置了key为”abc”的才会投递到行列步队。对key进行模式匹配后进行投递的叫做Topic交流机,符号”#”匹配一个或多个词,符号””匹配恰好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.”只匹配”abc.def”。还有一种不须要key的,叫做Fanout交流机,它采纳广播模式,一个进来时,投递到与该交流机绑定的所有行列步队。
RabbitMQ支持的持久化,也便是数据写在磁盘上,为了数据安全考虑,我想大多数用户都会选择持久化。行列步队持久化包括3个部分:
(1)exchange持久化,在声明时指定durable => 1
(2)queue持久化,在声明时指定durable => 1
(3)持久化,在投递时指定delivery_mode => 2(1是非持久化)
如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不许可建立绑定。
好了,讲了这么多基本讲清楚了RabbitMQ的运用处景和好处,下面我们在windows平台上练一把手,更直不雅观的来看看RabbitMQ到底是什么?
那么我蛮来安装RabbitMQ+PHP环境:
1.安装RabbitMQ
安装RabbitMQ之前首先要安装Erlang措辞开拓包,下载地址:http://www.erlang.org/download/otp_win32_R15B.exe 默认安装即可
配置环境变量 ERLANG_HOME C:\Program Files (x86)\erl5.9
添加到PATH %ERLANG_HOME%\bin;
下载安装RabbitMQ,下载地址:http://www.rabbitmq.com/releases/rabbitmq-server/v3.3.4/rabbitmq-server-3.3.4.exe
配置环境变量 C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-2.8.0
添加到PATH %RABBITMQ_SERVER%\sbin;
然后到dos里面切换到RabbitMQ目录下,实行rabbitmq-plugins.bat enable rabbitmq_management, 安装完成之后以管理员身份启动 rabbitmq:输入命令:
rabbitmq-service.bat stop
rabbitmq-service.bat install
rabbitmq-service.bat start
然后,浏览器中输入:127.0.0.1:15672,用户名密码是guest ,如果能上岸就解释安装成功。
2.接下来要安装php的amqp扩展
先用phpinfo()查看php版本信息,及信息
末了根据上面的信息去下载相应的amqp版本:http://pecl.php.net/package/amqp
据上面信息我们的是32位非线程安全版本
加压后:
将php_amqp.dll复制到php/ext,同时在php.ini中添加如下代码:
extension=php_amqp.dll
然后将rabbitmq.1.dll复制到php根目录C:/xampp/php/,同时修正apache配置文件httpd.conf,添加如下代码:
# rabbitmq
LoadFile \"大众C:/xampp/php/rabbitmq.1.dll\"大众
末了重启看看是否已经加载了amqp模块:
RabbitMQ+PHP展示实例
新建rabbit_consumer.php作为消费者
<?php //配置信息 $conn_args = array( 'host' => '127.0.0.1', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost'=>'/' ); $e_name = 'e_linvo'; //交流机名 $q_name = 'q_linvo'; //行列步队名 $k_route = 'key_1'; //路由key //创建连接和channel $conn = new AMQPConnection($conn_args); if (!$conn->connect()) { die(\公众Cannot connect to the broker!\n\"大众); } $channel = new AMQPChannel($conn); //创建交流机 $ex = new AMQPExchange($channel); $ex->setName($e_name); $ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型 $ex->setFlags(AMQP_DURABLE); //持久化 echo \"大众Exchange Status:\"大众.$ex->declare().\公众\n\"大众; //创建行列步队 $q = new AMQPQueue($channel); $q->setName($q_name); $q->setFlags(AMQP_DURABLE); //持久化 echo \公众Message Total:\公众.$q->declare().\"大众\n\"大众; //绑定交流机与行列步队,并指定路由键 echo 'Queue Bind: '.$q->bind($e_name, $k_route).\"大众\n\"大众; //壅塞模式吸收 echo \"大众Message:\n\"大众; while(True){ $q->consume('processMessage'); //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答 } $conn->disconnect(); / 消费回调函数 处理 / function processMessage($envelope, $queue) { $msg = $envelope->getBody(); echo $msg.\"大众\n\公众; //处理 $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答 }?>
新建rabbit_publisher.php作为生产者
<?php//配置信息 $conn_args = array( 'host' => '127.0.0.1', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost'=>'/' ); $e_name = 'e_linvo'; //交流机名 //$q_name = 'q_linvo'; //无需行列步队名 $k_route = 'key_1'; //路由key //创建连接和channel $conn = new AMQPConnection($conn_args); if (!$conn->connect()) { die(\"大众Cannot connect to the broker!\n\"大众); } $channel = new AMQPChannel($conn); //创建交流机工具 $ex = new AMQPExchange($channel); $ex->setName($e_name); date_default_timezone_set(\"大众Asia/Shanghai\公众);//发送 //$channel->startTransaction(); //开始事务 for($i=0; $i<5; ++$i){ sleep(1);//休眠1秒 //内容 $message = \"大众TEST MESSAGE!\"大众.date(\"大众h:i:sa\"大众); echo \"大众Send Message:\公众.$ex->publish($message, $k_route).\公众\n\"大众; } //$channel->commitTransaction(); //提交事务 $conn->disconnect();?>
测试一下:
先起一个窗口同样切换到php目录,输入:php c:/xampp/htdocs/RabbitMQ/rabbit_consumer.php
运行消费者
然后中兴一个dos窗口,切换到php根目录,输入以下命令:php c:/xampp/htdocs/RabbitMQ/rabbit_publisher.php
运行生产者
消费者吸收到
这样就仿照了行列步队对的处理,希望我们通过这篇文章对RabbitMQ的认识都能有一定的提升。