首页 » Web前端 » phpzerorpc技巧_安装Rabbitmq经由进程Rabbitmq实现RPC周全理解从入门到精晓

phpzerorpc技巧_安装Rabbitmq经由进程Rabbitmq实现RPC周全理解从入门到精晓

访客 2024-11-19 0

扫一扫用手机浏览

文章目录 [+]

# 扩展redis: 可以作为大略的行列步队celery: 本事便是基于行列步队进行的封装。
2.MQ办理了什么问题

MQ是一贯存在,不过随着微做事架构的盛行,成理解决微做事和微做事之间通信的常用工具。

# 扩展1.两个做事之间调用的办法:1.restful七层协议oss(http协议)2.rpc tcp socket层(远程过程调用)2.不管是利用restful还是rpc,都涉及到利用同步还是异步:1.异步: client利用rpc和server交互,client利用异步,不管有没有实行成功,就一直的异步的提交数据,数据在server行列步队排着队,等待开花费。
1.运用的解耦

1.以电商运用为例,运用中有订单系统,库存系统,物流系统,支付系统。
用户创建订单后,如果耦合调用库存系统,物流系统,支付系统,任何一个子系统涌现了故障,都会造成下单操作非常。
2.当转变成基于行列步队的办法后,系统间调用的问题会减少很多,比如物流系统由于发生故障,须要几分钟来修复。
在这几分钟的韶光里,物流系统要处理的内存被缓存在行列步队中,用户的下单操作可以正常完成。
当物流系统规复后,连续处理订单信息即可,订单用户感想熏染不到物流系统的故障。
提升系统的可用性。

2.流量削峰

1.举个列子,如果订单系统最多能处理一万次订单,这个处理能力搪塞正常时段的下单时绰绰有余,正常时段我们下单一秒后就能返回结果。
但是在高峰期,如果有两万次下单操作系统是处理不了的,只能限定定单超过一万后不许可用户下单。
2.利用行列步队做缓存,我们可以取消这个限定,把一秒内下的订单分散成一段韶光来处理,这时有些用户可能不才单十几秒后才能收到下单成功的操作,但是比不能下单的体验要好。
# 结:1.常日下比如有两万订单,这时我们server肯定消费不过来,我们将两万丢到行列步队中,进行消费即可。
--- 就叫流量消峰 = 如: 双十一,行列步队 多消费
3.分发(发布订阅: 不雅观察者模式)

多个做事对数据感兴趣,只须要监听同一类即可处理。

phpzerorpc技巧_安装Rabbitmq经由进程Rabbitmq实现RPC周全理解从入门到精晓

列如A产生数据,B对数据感兴趣。
如果没有行列步队A每次处理完须要调用一下B做事。
过了一段韶光C对数据也感兴趣,A就须要改代码,调用B做事,调用C做事。
只要有做事须要,A做事都要改动代码。
很未便利。

xxxxxxxxxx 有了行列步队后,A只管发送一次,B对感兴趣,只须要监听。
C感兴趣,C也去监听。
A做事作为根本做事完备不须要有改动。
4.异步(celery便是对行列步队的封装)

xxxxxxxxxx 有些做事间调用是异步的: 1.例如A调用B,B须要花费很永劫光实行,但是A须要知道B什么时候可以实行完,以前一样平常有两种办法,A过了一段韶光去调用B的查询api是否完成。
2.或者A供应一个callback api,B实行完之后调用api关照A做事。
这两种办法都不是很优雅。
python

phpzerorpc技巧_安装Rabbitmq经由进程Rabbitmq实现RPC周全理解从入门到精晓
(图片来自网络侵删)

1.利用总线,可以很方便办理这个问题,A调用B做事后,只须要监听B处理完成的,当B处理完成后,会发送一条给MQ,MQ会将此转发给A做事。
2.这样A做事既不用循环调用B的查询api,也不用供应callback api。
同样B做事也不用做这些操作。
A做事还能及时的得到异步处理成功的。
3.常见行列步队及比较

xxxxxxxxxx RabbitMQ: 支持持久化,断电后,重启,数据是不会丢的。
1. 吞吐量小: 几百万都是没问题的,确认: 我见告你,我消费完了,你在删 2.运用处景: 订单,对可靠性有哀求,就用它 Kafka: 吞吐量高,看重高吞吐量,不看重的可靠性 1.你拿走就没了,消费过程崩了,就没了。
2.运用处景,数据量特殊大。
# 结论: 1.Kafka在于分布式架构,RabbitMQ基于AMQP协议来实现,RocketMQ/思路来源于Kafka,改成了主从构造,在事务性可靠性方面做了优化。
2.广泛来说,电商,金融等对事物性哀求很高的,可以考虑RabbitMQ,对性能哀求或吞吐量高的可考虑Kafka。
python
二:Rabbitmq安装安装两种官网:https://www.rabbitmq.com/getstarted.htmldockerhub下载指定的rabbitmq:management的RabbitMQ

1.做事端原生安装

1 原生安装-安装扩展epel源-yum -y install erlang -yum -y install rabbitmq-server # 查询是否安装 rpm -qa rabbitmq-server -systemctl start rabbitmq-server # 以上也有web管理页面,只不过须要配置文件配置。
# 第一种办法客户端连接做事端,可以不用配置用户和密码,只须要ip连接。
第二种办法则须要配置用户名和密码。
2.做事端docker拉取

2 docker拉取-docker pull rabbitmq:management(自动开启了web管理界面) -docker run -di --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:management # 直接 run 如果没有rabbitmq就会自动拉 """ docker run -di --name: 指定rabbitmq -e: 环境变量 -e RABBITMQ_DEFAULT_USER=admin:用户名 -e RABBITMQ_DEFAULT_PASS=admin:密码 -p 15672:15672: rabbitmq web管理界面端口 -p 5672:5672: rabbitmq默认的监听端口 """

docker ps

http://47.101.159.222:15672/

3.Rabbitmq可视化界面创建用户(设置用户和密码)

4.命令创建Rabbitmq用户(设置用户和密码)

4 创建用户rabbitmqctl add_user lqz 1235 分配权限# 设置用户为admin角色rabbitmqctl set_user_tags lqz administrator# 设置权限rabbitmqctl set_permissions -p "/" lqz "." "." "."# rabbitmqctl set_permissions -p "/" 用户名 "." "." "."三:客户端安装

pip3 install pika四: 基本利用(生产者消费者模型)

对付RabbitMQ来说,生产和消费不再针对内存里的一个Queue工具,而是某台做事器上的RabbitMQ Server实现的行列步队。
生产者

import pika# 创建连接工具# connection = pika.BlockingConnection(pika.ConnectionParameters(host='47.101.159.222:22')) # host指定rabbitmq做事器ip地址# 指定用户名和密码,如果rabbitmq没有设置用户名和密码,可以不指定credentials = pika.PlainCredentials("admin", "admin")connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222', credentials=credentials)) # host指定rabbitmq做事器ip地址,credentials指定用户名和密码# 创建channel工具,用于发送,吸收,声明行列步队channel = connection.channel() # connection.channel(): 在连接上创建一个频道,这个频道便是我们要通过它来发送,吸收的工具,类似于TCP中的socket工具,我们通过它来收发,声明行列步队# 声明行列步队,如果行列步队不存在,则创建行列步队,如果行列步队存在,则不创建channel.queue_declare(queue='datalog')# 生产者向行列步队中放一条channel.basic_publish(exchange='', # 交流机,如果不指定,则利用默认的交流机, 默认的交流机 routing_key='datalog', # 行列步队名称 body='zll nb!' # 发送的 )print("Sent 'Hello World!'")# 关闭连接connection.close()消费者

import pika, sys, osdef main(): # 创建连接工具 # connection = pika.BlockingConnection(pika.ConnectionParameters(host='47.101.159.222:22')) # host指定rabbitmq做事器ip地址 # 指定用户名和密码,如果rabbitmq没有设置用户名和密码,可以不指定 credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222', credentials=credentials)) # host指定rabbitmq做事器ip地址,credentials指定用户名和密码 # 创建channel工具,用于发送,吸收,声明行列步队 channel = connection.channel() # connection.channel(): 在连接上创建一个频道,这个频道便是我们要通过它来发送,吸收的工具,类似于TCP中的socket工具,我们通过它来收发,声明行列步队 """消费者也声明了行列步队,由于如果是消费者先启动,那么行列步队就不存在了,消费者就无法消费了,以是消费者也要声明行列步队""" channel.queue_declare(queue='datalog') def callback(ch, method, properties, body): print(" [x] Received %r" % body) # 消费者从行列步队queue指定的消费行列步队hello中取消息,拿到数据了之前将hello行列步队的数据丢到callback里面,如果行列步队中没有,则会一贯等待,直到有为止 # auto_ack=True:自动确认,如果不设置为True,那么会一贯处于未确认状态,纵然消费者已经消费了,也不会从行列步队中删除,这样就会造成的重复消费,以是一样平常都会设置为True # auto_ack=true: 行列步队吸收到既直接确认,就会删除行列步队中的,不会管后面数据会不会消费完。
channel.basic_consume(queue='datalog', on_message_callback=callback, auto_ack=True) # 默认为false,不自动确认,须要手动确认 print(' [] Waiting for messages. To exit press CTRL+C') channel.start_consuming() # 开始消费,如果行列步队中没有,那么就会一贯等待,直到有为止,如果行列步队中有,那么就会消费if __name__ == '__main__': main()

五: 确认机制 (安全之ack)

# auto_ack: 自动确认(行列步队吸收到就会确认消费,会丢失数据的可能性) 默认为falseauto_ack=true: 行列步队吸收到既直接确认,就会删除行列步队中的,不会管后面数据会不会消费完。
auto_ack=false: 设置为false的情形,那么会一贯处于未确认状态,纵然消费者已经消费了,也不会从行列步队中删除,这样就会造成的重复消费 # ch.basic_ack: 消费完后,自动确认消费(可靠性,担保数据都完全的消费): 常用推举ch.basic_ack(delivery_tag=method.delivery_tag): 真正的将消费完了后,再发确认,就会删除掉行列步队中的。
生产者

import pika# 创建连接工具# connection = pika.BlockingConnection(pika.ConnectionParameters(host='47.101.159.222:22')) # host指定rabbitmq做事器ip地址# 指定用户名和密码,如果rabbitmq没有设置用户名和密码,可以不指定credentials = pika.PlainCredentials("admin", "admin")connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222', credentials=credentials)) # host指定rabbitmq做事器ip地址,credentials指定用户名和密码# 创建channel工具,用于发送,吸收,声明行列步队channel = connection.channel() # connection.channel(): 在连接上创建一个频道,这个频道便是我们要通过它来发送,吸收的工具,类似于TCP中的socket工具,我们通过它来收发,声明行列步队# 声明行列步队,如果行列步队不存在,则创建行列步队,如果行列步队存在,则不创建channel.queue_declare(queue='datalog')# 生产者向行列步队中放一条channel.basic_publish(exchange='', # 交流机,如果不指定,则利用默认的交流机, 默认的交流机 routing_key='datalog', # 行列步队名称 body='zll nb!' # 发送的 )print("Sent 'Hello World!'")# 关闭连接connection.close()消费者

import pika, sys, osdef main(): # 创建连接工具 # connection = pika.BlockingConnection(pika.ConnectionParameters(host='47.101.159.222:22')) # host指定rabbitmq做事器ip地址 # 指定用户名和密码,如果rabbitmq没有设置用户名和密码,可以不指定 credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222', credentials=credentials)) # host指定rabbitmq做事器ip地址,credentials指定用户名和密码 # 创建channel工具,用于发送,吸收,声明行列步队 channel = connection.channel() # connection.channel(): 在连接上创建一个频道,这个频道便是我们要通过它来发送,吸收的工具,类似于TCP中的socket工具,我们通过它来收发,声明行列步队 """消费者也声明了行列步队,由于如果是消费者先启动,那么行列步队就不存在了,消费者就无法消费了,以是消费者也要声明行列步队""" channel.queue_declare(queue='datalog') def callback(ch, method, properties, body): print(" [x] Received %r" % body) # 真正的将消费完了,再发确认 ch.basic_ack(delivery_tag=method.delivery_tag) # 消费者从行列步队queue指定的消费行列步队hello中取消息,拿到数据了之前将hello行列步队的数据丢到callback里面,如果行列步队中没有,则会一贯等待,直到有为止 # auto_ack=True:自动确认,如果不设置为True,那么会一贯处于未确认状态,纵然消费者已经消费了,也不会从行列步队中删除,这样就会造成的重复消费,以是一样平常都会设置为True # auto_ack=true: 行列步队吸收到既直接确认,就会删除行列步队中的,不会管后面数据会不会消费完。
channel.basic_consume(queue='datalog', on_message_callback=callback, auto_ack=False) # 默认为false,不自动确认,须要手动确认 print(' [] Waiting for messages. To exit press CTRL+C') channel.start_consuming() # 开始消费,如果行列步队中没有,那么就会一贯等待,直到有为止,如果行列步队中有,那么就会消费if __name__ == '__main__': main()
六: 持久化(安全之durable持久化)

1.什么是rabbitmq持久化?数据支持持久化,运行过程中,rabbitmq宕机了,在重新启动起来,如果行列步队消费没被消费,那么就还是会存在。
2.配置行列步队持久化# 在创建行列步队的时候增加durable=True设置行列步队持久化,如果rabbitmq做事重启,行列步队不会丢失channel.queue_declare(queue='datalog',durable=True)3.配置持久化# 在发布的时候增加properties设置持久化,如果rabbitmq做事停滞,重启后,还在, 1:非持久化,2:持久化,默认为1 properties=pika.BasicProperties(delivery_mode=2,) # 把稳:1.没加持久化配置之前的行列步队不会支持持久化,须要加持久化配置之后重新创建。
生产者

import pika# 创建连接工具# connection = pika.BlockingConnection(pika.ConnectionParameters(host='47.101.159.222:22')) # host指定rabbitmq做事器ip地址# 指定用户名和密码,如果rabbitmq没有设置用户名和密码,可以不指定credentials = pika.PlainCredentials("admin", "admin")connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222', credentials=credentials)) # host指定rabbitmq做事器ip地址,credentials指定用户名和密码# 创建channel工具,用于发送,吸收,声明行列步队channel = connection.channel() # connection.channel(): 在连接上创建一个频道,这个频道便是我们要通过它来发送,吸收的工具,类似于TCP中的socket工具,我们通过它来收发,声明行列步队# 声明行列步队,如果行列步队不存在,则创建行列步队,如果行列步队存在,则不创建channel.queue_declare(queue='datalog', durable=True) # durable=True: 行列步队持久化,如果rabbitmq做事停滞,重启后,行列步队还在# 生产者向行列步队中放一条channel.basic_publish(exchange='', # 交流机,如果不指定,则利用默认的交流机, 默认的交流机 routing_key='datalog', # 行列步队名称 body='zll nb!', # 发送的 properties=pika.BasicProperties( delivery_mode=2, # 持久化,如果rabbitmq做事停滞,重启后,还在, 1:非持久化,2:持久化 ) )print("Sent 'Hello World!'")# 关闭连接connection.close()消费者

import pika, sys, osdef main(): # 创建连接工具 # connection = pika.BlockingConnection(pika.ConnectionParameters(host='47.101.159.222:22')) # host指定rabbitmq做事器ip地址 # 指定用户名和密码,如果rabbitmq没有设置用户名和密码,可以不指定 credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222', credentials=credentials)) # host指定rabbitmq做事器ip地址,credentials指定用户名和密码 # 创建channel工具,用于发送,吸收,声明行列步队 channel = connection.channel() # connection.channel(): 在连接上创建一个频道,这个频道便是我们要通过它来发送,吸收的工具,类似于TCP中的socket工具,我们通过它来收发,声明行列步队 """消费者也声明了行列步队,由于如果是消费者先启动,那么行列步队就不存在了,消费者就无法消费了,以是消费者也要声明行列步队""" channel.queue_declare(queue='datalog', durable=True) # durable=True: 行列步队持久化,如果rabbitmq做事重启,行列步队不会丢失 def callback(ch, method, properties, body): print(" [x] Received %r" % body) # 真正的将消费完了,再发确认 ch.basic_ack(delivery_tag=method.delivery_tag) # 消费者从行列步队queue指定的消费行列步队hello中取消息,拿到数据了之前将hello行列步队的数据丢到callback里面,如果行列步队中没有,则会一贯等待,直到有为止 # auto_ack=True:自动确认,如果不设置为True,那么会一贯处于未确认状态,纵然消费者已经消费了,也不会从行列步队中删除,这样就会造成的重复消费,以是一样平常都会设置为True # auto_ack=true: 行列步队吸收到既直接确认,就会删除行列步队中的,不会管后面数据会不会消费完。
channel.basic_consume(queue='datalog', on_message_callback=callback, auto_ack=False) # 默认为false,不自动确认,须要手动确认 print(' [] Waiting for messages. To exit press CTRL+C') channel.start_consuming() # 开始消费,如果行列步队中没有,那么就会一贯等待,直到有为止,如果行列步队中有,那么就会消费if __name__ == '__main__': main()

七: 闲置消费

1.什么是闲置消费?1.正常情形如果有多个消费者,是按照顺序第一个给第一个消费者,第二个给第二个消费者,以此类推,只能按照顺序。
2.但是可能第一个消费的消费者处理很耗时,一贯没结束,此时就可以让第二个消费者优先获取闲置的,次方法就称之为"闲置消费"。
2.配置闲置消费# 消费者配置,每次只吸收一条,处理完了再吸收下一条,这样可以担保的顺序性,不会涌现乱序的情形channel.basic_qos(prefetch_count=1) # 1代表每次只吸收一条,吸收完了再吸收下一条# 缺陷: 1.但是会降落效率,由于每次只处理一条,如果处理很快,那么效率就会降落
生产者

import pika# 创建连接工具# connection = pika.BlockingConnection(pika.ConnectionParameters(host='47.101.159.222:22')) # host指定rabbitmq做事器ip地址# 指定用户名和密码,如果rabbitmq没有设置用户名和密码,可以不指定credentials = pika.PlainCredentials("admin", "admin")connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222', credentials=credentials)) # host指定rabbitmq做事器ip地址,credentials指定用户名和密码# 创建channel工具,用于发送,吸收,声明行列步队channel = connection.channel() # connection.channel(): 在连接上创建一个频道,这个频道便是我们要通过它来发送,吸收的工具,类似于TCP中的socket工具,我们通过它来收发,声明行列步队# 声明行列步队,如果行列步队不存在,则创建行列步队,如果行列步队存在,则不创建channel.queue_declare(queue='datalog', durable=True) # durable=True: 行列步队持久化,如果rabbitmq做事停滞,重启后,行列步队还在# 生产者向行列步队中放一条channel.basic_publish(exchange='', # 交流机,如果不指定,则利用默认的交流机, 默认的交流机 routing_key='datalog', # 行列步队名称 body='zll nb!', # 发送的 properties=pika.BasicProperties( delivery_mode=2, # 持久化,如果rabbitmq做事停滞,重启后,还在, 1:非持久化,2:持久化 ) )print("Sent 'Hello World!'")# 关闭连接connection.close()消费者1

import timeimport pika, sys, osdef main(): # 创建连接工具 # connection = pika.BlockingConnection(pika.ConnectionParameters(host='47.101.159.222:22')) # host指定rabbitmq做事器ip地址 # 指定用户名和密码,如果rabbitmq没有设置用户名和密码,可以不指定 credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222', credentials=credentials)) # host指定rabbitmq做事器ip地址,credentials指定用户名和密码 # 创建channel工具,用于发送,吸收,声明行列步队 channel = connection.channel() # connection.channel(): 在连接上创建一个频道,这个频道便是我们要通过它来发送,吸收的工具,类似于TCP中的socket工具,我们通过它来收发,声明行列步队 """消费者也声明了行列步队,由于如果是消费者先启动,那么行列步队就不存在了,消费者就无法消费了,以是消费者也要声明行列步队""" channel.queue_declare(queue='datalog', durable=True) # durable=True: 行列步队持久化,如果rabbitmq做事重启,行列步队不会丢失 def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(50) # 仿照处理任务,耗时50秒 # 真正的将消费完了,再发确认 ch.basic_ack(delivery_tag=method.delivery_tag) # 闲置消费 channel.basic_qos(prefetch_count=1) # 每次只吸收一条,处理完了再吸收下一条,这样可以担保的顺序性,不会涌现乱序的情形,但是会降落效率,由于每次只处理一条,如果处理很快,那么效率就会降落 # 消费者从行列步队queue指定的消费行列步队hello中取消息,拿到数据了之前将hello行列步队的数据丢到callback里面,如果行列步队中没有,则会一贯等待,直到有为止 # auto_ack=True:自动确认,如果不设置为True,那么会一贯处于未确认状态,纵然消费者已经消费了,也不会从行列步队中删除,这样就会造成的重复消费,以是一样平常都会设置为True # auto_ack=true: 行列步队吸收到既直接确认,就会删除行列步队中的,不会管后面数据会不会消费完。
channel.basic_consume(queue='datalog', on_message_callback=callback, auto_ack=False) # 默认为false,不自动确认,须要手动确认 print(' [] Waiting for messages. To exit press CTRL+C') channel.start_consuming() # 开始消费,如果行列步队中没有,那么就会一贯等待,直到有为止,如果行列步队中有,那么就会消费if __name__ == '__main__': main()
消费者2

import pika, sys, osdef main(): # 创建连接工具 # connection = pika.BlockingConnection(pika.ConnectionParameters(host='47.101.159.222:22')) # host指定rabbitmq做事器ip地址 # 指定用户名和密码,如果rabbitmq没有设置用户名和密码,可以不指定 credentials = pika.PlainCredentials("admin", "admin") connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222', credentials=credentials)) # host指定rabbitmq做事器ip地址,credentials指定用户名和密码 # 创建channel工具,用于发送,吸收,声明行列步队 channel = connection.channel() # connection.channel(): 在连接上创建一个频道,这个频道便是我们要通过它来发送,吸收的工具,类似于TCP中的socket工具,我们通过它来收发,声明行列步队 """消费者也声明了行列步队,由于如果是消费者先启动,那么行列步队就不存在了,消费者就无法消费了,以是消费者也要声明行列步队""" channel.queue_declare(queue='datalog', durable=True) # durable=True: 行列步队持久化,如果rabbitmq做事重启,行列步队不会丢失 def callback(ch, method, properties, body): print(" [x] Received %r" % body) # 真正的将消费完了,再发确认 ch.basic_ack(delivery_tag=method.delivery_tag) # 闲置消费 channel.basic_qos(prefetch_count=1) # 每次只吸收一条,处理完了再吸收下一条,这样可以担保的顺序性,不会涌现乱序的情形,但是会降落效率,由于每次只处理一条,如果处理很快,那么效率就会降落 # 消费者从行列步队queue指定的消费行列步队hello中取消息,拿到数据了之前将hello行列步队的数据丢到callback里面,如果行列步队中没有,则会一贯等待,直到有为止 # auto_ack=True:自动确认,如果不设置为True,那么会一贯处于未确认状态,纵然消费者已经消费了,也不会从行列步队中删除,这样就会造成的重复消费,以是一样平常都会设置为True # auto_ack=true: 行列步队吸收到既直接确认,就会删除行列步队中的,不会管后面数据会不会消费完。
channel.basic_consume(queue='datalog', on_message_callback=callback, auto_ack=False) # 默认为false,不自动确认,须要手动确认 print(' [] Waiting for messages. To exit press CTRL+C') channel.start_consuming() # 开始消费,如果行列步队中没有,那么就会一贯等待,直到有为止,如果行列步队中有,那么就会消费if __name__ == '__main__': main()

八: 发布订阅(fanout)

发布订阅: 可以有多个订阅者来订阅发布者的# fanout:不须要routing_key,只须要将发送到交流机中,交流机会将发送到所有绑定到它的行列步队中# 实现发布订阅逻辑1.发布者 P 将发送到 X 交流机上面, 2.C1,C2,多个订阅者随机创建出多个行列步队,将订阅者行列步队绑定给 X 交流机, 3.X 交流机通过行列步队将数据发送给所有绑定 X 交流机的订阅者。

发布订阅/生产者

import pika# 指定用户名和密码,如果rabbitmq没有设置用户名和密码,可以不指定credentials = pika.PlainCredentials("admin", "admin")# host指定rabbitmq做事器ip地址,credentials指定用户名和密码connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222', credentials=credentials))# 在连接上创建一个频道,这个频道便是我们要通过它来发送,吸收的工具,类似于TCP中的socket工具,我们通过它来收发,声明行列步队channel = connection.channel()# 不须要声明行列步队,由于生产者不须要将放到行列步队中,只须要将发送到交流机中即可channel.exchange_declare(exchange='logs', exchange_type='fanout') # 声明交流机,交流机类型为fanout# exchange_type的三种类型:# 1、direct:根据routing_key将放到对应的行列步队中# 2、topic:根据routing_key和binding_key将放到对应的行列步队中# 3、fanout:不须要routing_key,只须要将发送到交流机中,交流机会将发送到所有绑定到它的行列步队中message = "info: Hello World!"# 发送到交流机中channel.basic_publish(exchange='logs', routing_key='', body=message) # 发送到交流机connection.close()订阅者/消费者

import pika# 指定用户名和密码,如果rabbitmq没有设置用户名和密码,可以不指定credentials = pika.PlainCredentials("admin", "admin")# host指定rabbitmq做事器ip地址,credentials指定用户名和密码connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222', credentials=credentials))# 在连接上创建一个频道,这个频道便是我们要通过它来发送,吸收的工具,类似于TCP中的socket工具,我们通过它来收发,声明行列步队channel = connection.channel()# 不须要声明行列步队,由于生产者不须要将放到行列步队中,只须要将发送到交流机中即可channel.exchange_declare(exchange='logs', exchange_type='fanout') # 声明交流机,交流机类型为fanout# 声明一个随机行列步队,exclusive=True表示这个行列步队只能被当前连接利用,当连接关闭时,行列步队会被删除,exclusive=True是为了防止多个消费者同时消费一个行列步队,导致重复消费result = channel.queue_declare(queue='', exclusive=True)# 获取随机行列步队的名称,随机的意义是什么: 每次运行程序都会创建一个新的行列步队,这样就不会有多个消费者同时消费同一个行列步队中的,这样就可以实现的负载均衡,每个消费者都会均匀的消费行列步队中的queue_name = result.method.queue# 默认会创建一个随机行列步队,行列步队名称是随机的。
这个行列步队只能被当前连接利用,当连接关闭时,行列步队会被删除。
# 也可以指定行列步队名称,但是要确保行列步队名称是唯一的,不然会报错print(queue_name)channel.queue_bind(exchange='logs', queue=queue_name) # 将行列步队绑定到交流机上,交流机类型为fanout,以是不须要指定routing_key,交流机会将发送到所有绑定到它上面的行列步队print(' [] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body): print(" [x] %r" % body)channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True)channel.start_consuming()

九:关键字(direct)

direct:根据routing_key将放到对应的行列步队中1.关键字1.将随机行列步队绑定到交流机上,routing_key指定路由键,这里指定的是key,2.表示只有路由键为info的才会被发送到该随机行列步队中,也便是说只有生产者发送的的路由键为key的才会被消费。
# 总结:将随机行列步队绑定到交流机上,routing_key为指定消费交流机的行列步队名称,从而实现指定消费,然后将从绑定的交流机的行列步队获取, 消费。
routing_key监听的行列步队名称
发布订阅/生产者

import pika# 指定用户名和密码,如果rabbitmq没有设置用户名和密码,可以不指定credentials = pika.PlainCredentials("admin", "admin")# host指定rabbitmq做事器ip地址,credentials指定用户名和密码connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222', credentials=credentials))# 在连接上创建一个频道,这个频道便是我们要通过它来发送,吸收的工具,类似于TCP中的socket工具,我们通过它来收发,声明行列步队channel = connection.channel()# 不须要声明行列步队,由于生产者不须要将放到行列步队中,只须要将发送到交流机中即可channel.exchange_declare(exchange='zll', exchange_type='direct') # 声明交流机,交流机类型为direct# exchange_type的三种类型:# 1、direct:根据routing_key将放到对应的行列步队中# 2、topic:根据routing_key和binding_key将放到对应的行列步队中# 3、fanout:不须要routing_key,只须要将发送到交流机中,交流机会将发送到所有绑定到它的行列步队中message = "info: Hello World!"# 发送到交流机中channel.basic_publish(exchange='zll', routing_key='bnb', body=message) # routing_key为bnb,会被发送到bnb行列步队中,如果没有bnb行列步队,会被丢弃,由于没有行列步队可以吸收connection.close()订阅者/消费者

import pika# 指定用户名和密码,如果rabbitmq没有设置用户名和密码,可以不指定credentials = pika.PlainCredentials("admin", "admin")# host指定rabbitmq做事器ip地址,credentials指定用户名和密码connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222', credentials=credentials))# 在连接上创建一个频道,这个频道便是我们要通过它来发送,吸收的工具,类似于TCP中的socket工具,我们通过它来收发,声明行列步队channel = connection.channel()# 不须要声明行列步队,由于生产者不须要将放到行列步队中,只须要将发送到交流机中即可channel.exchange_declare(exchange='zll', exchange_type='direct') # 声明交流机,交流机类型为fanout# 声明一个随机行列步队,exclusive=True表示这个行列步队只能被当前连接利用,当连接关闭时,行列步队会被删除,exclusive=True是为了防止多个消费者同时消费一个行列步队,导致重复消费result = channel.queue_declare(queue='', exclusive=True)# 获取随机行列步队的名称,随机的意义是什么: 每次运行程序都会创建一个新的行列步队,这样就不会有多个消费者同时消费同一个行列步队中的,这样就可以实现的负载均衡,每个消费者都会均匀的消费行列步队中的queue_name = result.method.queue# 默认会创建一个随机行列步队,行列步队名称是随机的。
这个行列步队只能被当前连接利用,当连接关闭时,行列步队会被删除。
# 也可以指定行列步队名称,但是要确保行列步队名称是唯一的,不然会报错print(queue_name)# 将行列步队绑定到交流机上,routing_key指定路由键,这里指定的是info,表示只有路由键为info的才会被发送到该随机行列步队中,也便是说只有生产者发送的的路由键为info的才会被消费。
channel.queue_bind(exchange='zll', queue=queue_name, routing_key='nb') # 将行列步队绑定到交流机上,routing_key为指定消费交流机的行列步队名称,从而实现指定消费,然后将从绑定到交流机的行列步队获取channel.queue_bind(exchange='zll', queue=queue_name, routing_key='bnb')print(' [] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body): print(" [x] %r" % body)channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True)channel.start_consuming()
十:模糊匹配(topic)

topic:根据routing_key和binding_key将放到对应的行列步队中# 模糊匹配的关键# : 表示后面可以追随意任性字符 : 表示后面只能跟一个单词发布订阅/生产者

import pika# 指定用户名和密码,如果rabbitmq没有设置用户名和密码,可以不指定credentials = pika.PlainCredentials("admin", "admin")# host指定rabbitmq做事器ip地址,credentials指定用户名和密码connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222', credentials=credentials))# 在连接上创建一个频道,这个频道便是我们要通过它来发送,吸收的工具,类似于TCP中的socket工具,我们通过它来收发,声明行列步队channel = connection.channel()# 不须要声明行列步队,由于生产者不须要将放到行列步队中,只须要将发送到交流机中即可channel.exchange_declare(exchange='aaa', exchange_type='topic') # 声明交流机,交流机类型为direct# exchange_type的三种类型:# 1、direct:根据routing_key将放到对应的行列步队中# 2、topic:根据routing_key和binding_key将放到对应的行列步队中# 3、fanout:不须要routing_key,只须要将发送到交流机中,交流机会将发送到所有绑定到它的行列步队中message = "info: Hello World!"# 发送到交流机中channel.basic_publish(exchange='aaa', routing_key='bnb.xxxx', body=message) # routing_key为bnb,会被发送到bnb行列步队中,如果没有bnb行列步队,会被丢弃,由于没有行列步队可以吸收connection.close()消费者

import pika# 指定用户名和密码,如果rabbitmq没有设置用户名和密码,可以不指定credentials = pika.PlainCredentials("admin", "admin")# host指定rabbitmq做事器ip地址,credentials指定用户名和密码connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222', credentials=credentials))# 在连接上创建一个频道,这个频道便是我们要通过它来发送,吸收的工具,类似于TCP中的socket工具,我们通过它来收发,声明行列步队channel = connection.channel()# 不须要声明行列步队,由于生产者不须要将放到行列步队中,只须要将发送到交流机中即可channel.exchange_declare(exchange='aaa', exchange_type='topic') # 声明交流机,交流机类型为fanout# 声明一个随机行列步队,exclusive=True表示这个行列步队只能被当前连接利用,当连接关闭时,行列步队会被删除,exclusive=True是为了防止多个消费者同时消费一个行列步队,导致重复消费result = channel.queue_declare(queue='', exclusive=True)# 获取随机行列步队的名称,随机的意义是什么: 每次运行程序都会创建一个新的行列步队,这样就不会有多个消费者同时消费同一个行列步队中的,这样就可以实现的负载均衡,每个消费者都会均匀的消费行列步队中的queue_name = result.method.queue# 默认会创建一个随机行列步队,行列步队名称是随机的。
这个行列步队只能被当前连接利用,当连接关闭时,行列步队会被删除。
# 也可以指定行列步队名称,但是要确保行列步队名称是唯一的,不然会报错print(queue_name)# 将随机行列步队绑定到交流机上,routing_key为指定消费交流机的行列步队名称,从而实现指定消费,然后将从绑定的交流机的行列步队获取, routing_key监听的行列步队名称channel.queue_bind(exchange='aaa', queue=queue_name, routing_key='nb') # 将行列步队绑定到交流机上,routing_key为指定消费交流机的行列步队名称,从而实现指定消费,然后将从绑定到交流机的行列步队获取channel.queue_bind(exchange='aaa', queue=queue_name, routing_key='bnb.#')print(' [] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body): print(" [x] %r" % body)channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True)channel.start_consuming()

十一:通过rabbitmq实现rpc(基于RabbitMQ封装RPC)

# 通过RabbitMQ实现rpc# 实现逻辑1.做事端启动吸收,监听queue行列步队。
2.实列化客户端,调用call方法,将属性内包含: 1.回调函数随机行列步队,吸收做事端返回结果,做事端会将结果发送到这个行列步队。
2.客户但的随机uuid,标识唯一。
然后将body发送给做事端。
3.客户端,发布完后,进入非壅塞状态,如果没有吸收到做事端返回的结果,会一贯等待,直到收到结果,然后返回结果。
4.做事端吸收queue行列步队,调用函数将进行处理,获取裴波那契数列。
5.然后做事端进行发布,将发送到客户真个回调函数行列步队,客户真个uuid。
6.客户端监听吸收行列步队,调用函数处理,判断唯一uuid,确认body,然后成功收到并返回。
做事端

import pika# 指定用户名和密码,如果rabbitmq没有设置用户名和密码,可以不指定credentials = pika.PlainCredentials("admin", "admin")# host指定rabbitmq做事器ip地址,credentials指定用户名和密码connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222', credentials=credentials))# 在连接上创建一个频道,这个频道便是我们要通过它来发送,吸收的工具,类似于TCP中的socket工具,我们通过它来收发,声明行列步队channel = connection.channel()channel.queue_declare(queue='rpc_queue') # 声明行列步队,如果行列步队不存在,会自动创建def fib(n): if n == 0: return 0 elif n == 1: return 1 else: # 递归调用,打算斐波那契数列 return fib(n-1) + fib(n-2)def on_request(ch, method, props, body): # ch为频道,method为方法,props为属性,body为体 n = int(body) print(" [.] fib(%s)" % n) response = fib(n) ch.basic_publish(exchange='', routing_key=props.reply_to, # 将发送到客户真个回调函数 properties=pika.BasicProperties(correlation_id = \ props.correlation_id), # 将客户真个correlation_id通报给客户端 body=str(response)) # 发送ack,见告rabbitmq,已经被处理 ch.basic_ack(delivery_tag=method.delivery_tag)# 每次只吸收一个channel.basic_qos(prefetch_count=1)# 消费channel.basic_consume(queue='rpc_queue', on_message_callback=on_request) # queue为行列步队名,on_message_callback为回调函数,收到后,会调用回调函数print(" [x] Awaiting RPC requests")channel.start_consuming() # 开始吸收,进入壅塞状态,等待,直到收到为止,收到后,会调用on_request函数客户端

import pikaimport uuidclass FibonacciRpcClient(object): def __init__(self): # 指定用户名和密码,如果rabbitmq没有设置用户名和密码,可以不指定 self.credentials = pika.PlainCredentials("admin", "admin") # host指定rabbitmq做事器ip地址,credentials指定用户名和密码 self.connection = pika.BlockingConnection(pika.ConnectionParameters('47.101.159.222', credentials=self.credentials)) self.channel = self.connection.channel() # 声明一个随机行列步队,用来吸收rpc_server返回的结果 result = self.channel.queue_declare(queue='', exclusive=True) # exclusive=True表示这个行列步队只能被当前连接利用,当连接关闭时,行列步队会被删除,exclusive=True是为了防止多个客户端同时利用一个行列步队 # 获取随机行列步队的名称 self.callback_queue = result.method.queue self.channel.basic_consume( queue=self.callback_queue, on_message_callback=self.on_response, # 消费 auto_ack=True # 自动发送ack,见告rabbitmq,已经被处理 ) def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body def call(self, n): self.response = None # 天生一个随机的correlation_id, 用来标识, 客户端和做事端都会用这个id来标识, # 客户端会将这个id通报给做事端, 做事端会将这个id通报给客户端, 客户端和做事端都会将这个id与自己的id进行比较, 如果不一致, 则丢弃这个 self.corr_id = str(uuid.uuid4()) self.channel.basic_publish( exchange='', routing_key='rpc_queue', # 将发送到rpc_queue行列步队 properties=pika.BasicProperties( # 属性, 用来标识 reply_to=self.callback_queue, # 将发送到客户真个回调函数, 用来吸收做事端返回的结果, 做事端会将结果发送到这个行列步队 correlation_id=self.corr_id, # 将客户真个crrelation_id发送给做事端 ), body=str(n) # 将发送给做事端, 做事端会将这个作为参数通报给fib函数 ) while self.response is None: # 如果没有收到做事端返回的结果, 则一贯等待, 直到收到结果, 然后返回结果 self.connection.process_data_events() # 非壅塞版的start_consuming(), 用来吸收 return int(self.response)fibonacci_rpc = FibonacciRpcClient() # 实例化客户端, 用来发送, 并吸收做事端返回的结果, 并返回结果, 用来调用做事真个方法print(" [x] Requesting fib(30)")response = fibonacci_rpc.call(10) # 调用call方法, 发送, 并吸收做事端返回的结果, 然后打印结果print(" [.] Got %r" % response)

十二:python中的rpc框架

python自带的: SimpleXMLRPCServer(数据包大,数据慢) - HTTP通信第三方: ZeroRPC(底层利用ZeroMQ和MessagePack,速率快,相应韶光短,并发高),grpc(谷歌推出支持夸措辞) - TCP通信十三:SimpleXMLRPCServer做事端

from xmlrpc.server import SimpleXMLRPCServerclass RPCServer(object): def __init__(self): # 初始化父类,python3中不须要,python2中须要,否则会报错 super(RPCServer, self).__init__() print(self) self.send_data = {'server:' + str(i): i for i in range(100)} self.recv_data = None def getObj(self): # 吸收数据 return self.send_data def sendObj(self, data): # 发送数据 print('send data') self.recv_data = data print(self.recv_data)# 创建一个做事器,监听本机的8000端口,并设置许可访问的ip地址,如果不设置,默认只能本机访问server = SimpleXMLRPCServer(('localhost', 4242), allow_none=True)# 注册自察函数,可以查看做事器供应的方法,不注册的话,客户端只能调用register_function注册的方法# 为什么要注册自察函数呢?由于客户端调用方法时,是通过方法名来调用的,如果不注册自察函数,客户端就不知道做事器供应了哪些方法server.register_introspection_functions()# 注册实例,可以调用实例的方法,不注册的话,客户端只能调用register_function注册的方法server.register_instance(RPCServer())# 开始监听要求,进入壅塞状态,等待要求,直到收到要求为止,收到要求后,会调用注册的方法server.serve_forever()客户端

import timefrom xmlrpc.client import ServerProxydef xmlrpc_client(): print('xmlrpc client start') # 创建一个做事器代理,指定做事器的ip地址和端口 c = ServerProxy('http://localhost:4242') # 调用做事器的方法 data = {'client:' + str(i): i for i in range(100)} start = time.clock() # 计时 for i in range(5): # 重复调用50次 a = c.getObj() # 调用做事器的方法 print(a) for i in range(5): # 重复调用50次 c.sendObj(data) # 调用做事器的方法 print('xmlrpc total time %s' % (time.clock() - start))if __name__ == '__main__': xmlrpc_client()

十四:ZeroRPC实现rpc做事端

import zerorpcclass RPCServer(object): def __init__(self): print(self) self.send_data = {'server:'+str(i): i for i in range(100)} self.recv_data = None def getObj(self): print('get data') return self.send_data def sendObj(self, data): print('send data') self.recv_data = data print(self.recv_data)# 创建一个做事器,监听本机的8000端口,并设置许可访问的ip地址,如果不设置,默认只能本机访问s = zerorpc.Server(RPCServer())# 绑定端口,并设置许可访问的ip地址,如果不设置,默认只能本机访问s.bind('tcp://0.0.0.0:4243')# 开始监听要求,进入壅塞状态,等待要求,直到收到要求为止,收到要求后,会调用注册的方法s.run()客户端

import zerorpcclass RPCServer(object): def __init__(self): print(self) self.send_data = {'server:'+str(i): i for i in range(100)} self.recv_data = None def getObj(self): print('get data') return self.send_data def sendObj(self, data): print('send data') self.recv_data = data print(self.recv_data)# 创建一个做事器,监听本机的8000端口,并设置许可访问的ip地址,如果不设置,默认只能本机访问s = zerorpc.Server(RPCServer())# 绑定端口,并设置许可访问的ip地址,如果不设置,默认只能本机访问s.bind('tcp://0.0.0.0:4243')# 开始监听要求,进入壅塞状态,等待要求,直到收到要求为止,收到要求后,会调用注册的方法s.run()

十五:什么是RPC?1.RPC先容?

RPC 是指远程过程调用,也便是说两台做事器,A 和 B,一个运用支配在A 做事器上,想要调用B 做事器上运用供应的函数或方法,由于不在一个内存空间,不能直接调用,须要通过网络来表达调用的语句和传达调用的数据。
2.RPC是如何调用的?

1.要办理通讯的问题,紧张是通过在客户端和做事器之间建立TCP连接,远程过程调用的所有交互的数据都在这个连接里传输。
连接可以是按需连接,调用结束后就断掉,也可以是长连接,多个远程调用共享同一个连接。
2.要办理寻址的问题,也便是说,A做事器上的运用怎么怎么见告底层的 RPC 框架,如何连接到 B 做事器(如主机或IP地址)以及特定的端口,方法的名称是什么,这样才能完成调用。
比如基于Wbe做事协议栈的RPC,就要供应一个endpoint URl, 或者是 UDDI做事上查找。
如果是RMI调用的话,还须要一个RMI Registry 来注册做事的地址。
3.当A做事器上的运用发起远程过程调用时,方法的参数须要通过底层的网络协议如TCP传输到B做事器。
由于网络协议是基于二进制的,内存中的参数的值要序列化成二进制形式,也便是序列化(Serialize) 或编组(marshal),通过寻址和传输序列化的二进制发送给B做事器。
4.B做事器收到要求后,须要对参数进行反序列化(序列化的逆操作),规复为内存中的表达办法,然后找到对应的方法(寻址的一部分)进行本地调用,然后得到返回值。
5.返回值还要发送回做事器A上的运用,也要经由序列化的办法发送,做事器A吸收到后,再反序列化,规复为内存中的表达办法,交给A做事器上的运用。
3.为什么要利用RPC?

便是无法在一个进程内,乃至一个打算机内通过本地调用的办法完成需求,比如不同的系统间的通讯,乃至不同的组织间的通讯。
由于打算能力须要横向扩展,须要在多台机器组成的集群上支配运用。
4.常见的RPC框架

功能

Hessian

Montan

rpcx

gRPC

Thrift

Dubbo

Dubbox

Spring Cloud

开拓措辞

跨措辞

Java

Go

跨措辞

跨措辞

Java

Java

Java

分布式(做事管理)

×

×

×

多序列化框架支持

hessian

√(支持Hessian2、Json,可扩展)

× 只支持protobuf)

×(thrift格式)

多种注册中央

×

×

×

管理中央

×

×

×

跨编程措辞

×(支持php client和C server)

×

×

×

×

支持REST

×

×

×

×

×

×

关注度

上手难度

运维本钱

开源机构

Caucho

Weibo

Apache

Google

Apache

Alibaba

Dangdang

Apache

5.实际的场景中的选择

copy# Spring Cloud : Spring百口桶,用起来很舒畅,只有你想不到,没有它做不到。
可惜由于发布的比较晚,海内还没涌现比较成功的案例,大部分都是试水,不过毕竟有Spring作背书,还是比较看好。
# Dubbox: 相对付Dubbo支持了REST,估计是很多公司选择Dubbox的一个主要缘故原由之一,但如果利用Dubbo的RPC调用办法,做事间仍旧会存在API强依赖,各有利弊,懂的取舍吧。
# Thrift: 如果你比较高冷,完备可以基于Thrift自己搞一套抽象的自定义框架吧。
# Montan: 可能由于出来的比较晚,目前除了新浪微博16年初发布的,# Hessian: 如果是初创公司或系统数量还没有超过5个,推举选择这个,毕竟在开拓速率、运维本钱、上手难度等都是比较轻量、大略的,纵然在往后迁移至SOA,也是无缝迁移。
# rpcx/gRPC: 在做事没有涌现严重性能的问题下,或技能栈没有变更的情形下,可能一贯不会引入,纵然引入也只是小部分模块优化利用。

原文链接:https://www.cnblogs.com/My-IronMan/p/17015001.html

标签:

相关文章

介绍百度码,技术革新背后的智慧之光

随着科技的飞速发展,互联网技术已经成为我们生活中不可或缺的一部分。而在这个信息爆炸的时代,如何快速、准确地获取信息,成为了人们关注...

Web前端 2025-01-03 阅读1 评论0

介绍皮箱密码,开启神秘之门的钥匙

皮箱,作为日常生活中常见的收纳工具,承载着我们的珍贵物品。面对紧闭的皮箱,许多人却束手无策。如何才能轻松打开皮箱呢?本文将为您揭秘...

Web前端 2025-01-03 阅读1 评论0

介绍盗号器,网络安全的隐忧与应对步骤

随着互联网的快速发展,网络安全问题日益突出。盗号器作为一种非法工具,对网民的个人信息安全构成了严重威胁。本文将深入剖析盗号器的原理...

Web前端 2025-01-03 阅读1 评论0