首页 » 网站推广 » phpconsumecallback技巧_Java互联网架构分布式架构核心组件消息队列

phpconsumecallback技巧_Java互联网架构分布式架构核心组件消息队列

访客 2024-12-08 0

扫一扫用手机浏览

文章目录 [+]

什么是RabbitMQ?

RabbitMQ 是一个基于AMQP协议,做事端用Erlang开拓的行列步队,支持Java、Python、Ruby、.NET、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。
所谓行列步队便是用来实现系统与系统之间,程序与程序之间进行通信的中间件。
整体来看是一个异步的过程,由生产者(Publish)光降盆,这个会被先放到一个容器中,当知足一定条件时,这个会被消费者(Subscribe )拿走去消费。
这个容器便是行列步队。
生产者和消费者之间遵守的协议便是AMQP协议。
其次还可以对消费者设置一个优先级(Priority),以及对消费者的要求进行限流,对负载进行有效均衡。

phpconsumecallback技巧_Java互联网架构分布式架构核心组件消息队列

02

phpconsumecallback技巧_Java互联网架构分布式架构核心组件消息队列
(图片来自网络侵删)

运用处景有哪些?

1、用户注册时须要发送验证邮箱或者短信验证。
利用行列步队之后,运用程序只须要关心注册成功即可。
不须要等待邮件或者短信发送成功的相应。
由于这个是由消费者去完成的。

2、电商系统,用户下单消费成功之后,对应的库存须要进行更新。
我们可以调用库存系统供应的接口,但是这样如果库存系统涌现故障就会导致库存不能准确的更新。
而且耦合性非常高。
我们可以通过行列步队进行解耦,而且行列步队具有持久化功能。
担保数据的准确性

3、秒杀系统,可以通过行列步队过滤掉部分要求,缓解做事的压力。

4、注册、日志、监控系统等大部分只哀求终极同等性的场景

03

行列步队常见关键词

AMQP的核心是Producer(生产者)、Broker(行列步队的做事器实体)、Consumer(消费者)

Producer/Consumer观点比较好理解,无非便是一个生产者创建一个信息去由消费者去进行干系的逻辑处理。

Broker行列步队的做事器,一个Broker可以包含多个VirtualHost(虚拟主机),紧张起到了一个隔离的浸染。
而一个VirtualHost又包括以下三部分

Exchange(交流机):由它按照某些规则 去决定终极路由到哪个行列步队。

Binding:绑定,它的浸染便是把 Exchange 和 Queue 按照路由规则绑定起来。
如果没有bind,会直接被丧失落。

Queue:存储的地方,每个都会被投入到一个或多个行列步队。

整体流程如下图(源自网络)

04

Java操作RabbitMQ

1、创建链接工厂的工具类

import com.rabbitmq.client.ConnectionFactory;public class RabbitFactory { public ConnectionFactory getFactory() { // 创建链接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost(\"大众127.0.0.1\公众); factory.setUsername(\公众guest\"大众); factory.setPassword(\"大众guest\"大众); factory.setPort(5672); return factory; }}

2、生产者

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;/ 生产者/public class Producer { / 声明行列步队名 / public final static String QUEUENAME = \"大众rabbitMQ\"大众; public static void main(String[] args) throws IOException, TimeoutException { // 获取连接工厂 RabbitFactory rabbitFactory = new RabbitFactory(); ConnectionFactory factory = rabbitFactory.getFactory(); // 创建一个新的链接 Connection connection = factory.newConnection(); // 创建一个通道 Channel channel = connection.createChannel(); // 声明一个行列步队 channel.queueDeclare(QUEUENAME, false, false, false, null); //发送一个至行列步队中 channel.basicPublish(\"大众\公众, QUEUENAME, null,\"大众Hello World\"大众.getBytes()); System.out.println(\"大众Producer Send Message Over\"大众); // 关闭通道和连接 channel.close(); connection.close(); }}

3、消费者代码

import com.rabbitmq.client.;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Customer { / 声明行列步队名 / public final static String QUEUENAME = \公众rabbitMQ\"大众; public static void main(String[] args) throws IOException, TimeoutException { RabbitFactory rabbitFactory = new RabbitFactory(); ConnectionFactory factory = rabbitFactory.getFactory(); // 创建一个新的链接 Connection connection = factory.newConnection(); // 创建一个通道 Channel channel = connection.createChannel(); // 声明一个行列步队 channel.queueDeclare(QUEUENAME, false, false, false, null); //创建一个消费者 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, \"大众UTF-8\"大众); System.out.println(\公众[x]Received : \"大众 + message); } }; //channel绑定行列步队、消费者、autoAck为true表示一旦收到则自动回答确认 channel.basicConsume(QUEUENAME, true, consumer); }}

05

Python操作RabbitMQ

1、定义一个生产者

import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))# 创建频道channel = connection.channel()# 声明一个行列步队channel.queue_declare('mq_test')# 不能直接到达queue,须要经由exchange,exchange='' 表示利用默认的exchangechannel.basic_publish(exchange='', routing_key='mq_test', body='hello world')print('push end')# 关闭发送的mq连接connection.close()

2、定义一个消费者

import pika# 连接mqconnection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672))# 创建一个通道channel = connection.channel()# 声明mq_test的行列步队,如果不存在 则自动创建channel.queue_declare(queue='mq_test')# 定义一个回调函数,打印收到的信息def callback(ch, method, properties, body): print('receive msg: %s' % body)# 指明从哪个行列步队(queue)吸收message# no_ack=True,表示不对进行确认channel.basic_consume(callback, queue='mq_test', no_ack=True)print('waiting for msg')channel.start_consuming()

标签:

相关文章