1、zmq先容:创建和销毁套接字:zmq.socket(), zmq.close()配置和读取套接字:zmq.setsockopt(), zmq.getsockopt()为套接字建立连接:zmq.bind(), zmq.connect()发送和吸收: zmq.send(), zmq.recv()注:利用zmq.bind()连接的节点称之为做事端,它有着一个较为固定的网络地址;利用zmq.connect()连接的节点称为客户端,其地址不固定。2、zmq模式:紧张有三种常用模式: req/rep(要求答复模式):紧张用于远程调用及任务分配等。 pub/sub(订阅模式): 紧张用于数据分发。 push/pull(管道模式): 紧张用于多任务并行。3、zmq内置的有效绑定对:PUB and SUBREQ and REPREQ and XREPXREQ and REPXREQ and XREPXREQ and XREQXREP and XREPPUSH and PULLPAIR and PAIR4、详细模式举例(1)、req/rep(要求/答复模式):一对一模式,一问一答#server做事端import zmqcontext=zmq.Context()socket=context.socket(zmq.REP) #设置socket的类型,zmq.REP答复socket.bind("tcp://:15000") #绑定做事真个IP和端口while True: #循环吸收客户端发来的 message=socket.recv() #吸收客户端发送来的,注:是byte类型 print(message) socket.send_string("copy!") #再发回客户端结果:客户单没要求一次就打印一次体b'request'b'request'b'request'b'request'#client客户端import zmq, syscontext = zmq.Context()socket=context.socket(zmq.REQ) #设置socket类型,要求端socket.connect("tcp://localhost:15000") #连接做事真个IP和端口while True: data=input("input your request:") if data == "q": sys.exit() socket.send_string(data) #向做事端发送 message=socket.recv() #吸收做事端返回的,注:是byte类型 print(message)"""结果:没输入要求一次,就得到做事真个一次返回input your data:123b'copy!'input your data:456b'copy!'"""(2)、pub/sub(订阅模式):一对多模式一个发布者,多个订阅者,订阅者可以通过设置过滤器过滤数据。#publisher发布者import zmqcontext=zmq.Context()socket=context.socket(zmq.PUB)socket.bind("tcp://:15000")while True: data = input("input your data:") print(data) socket.send_string(data)"""结果:循环提示输入数据,当输入一次,就发送一次到订阅者input your data:123123input your data:456456input your data:789789input your data:"""#Subscriber订阅者import sysimport zmqcontext=zmq.Context()socket=context.socket(zmq.SUB)socket.connect("tcp://localhost:15000")socket.setsockopt_string(zmq.SUBSCRIBE,'')或者:socket.setsockopt_string(zmq.SUBSCRIBE,'123') #表示只过滤出收到为'123'的或者:socket.subscribe('topic') #订阅一个主题while True: message=socket.recv() print(message)"""结果:发布者每发布一次,都能订阅到b'123'b'456'b'789'"""(3)、push/pull(管道模式):管道是单向的,从PUSH端单向的向PULL端单向的推送数据流。由三部分组成,push进行数据推送,work进行数据缓存,pull进行数据竞争获取处理。差异于Publish-Subscribe存在一个数据缓存和处理负载。当连接被断开,数据不会丢失,重连后数据连续发送到对端。#推送端import zmqcontext=zmq.Context()socket=context.socket(zmq.PUSH) #设置socket类型PUSH推送socket.bind("tcp://:5557") #绑定IP和端口while True: data=input("input your data:") socket.send_string(data)"""input your data:123input your data:456input your data:789"""#worker端import zmqcontext=zmq.Context()socket_receive=context.socket(zmq.PULL) #设置socket类型PULL拉取推送真个socket_receive.connect("tcp://localhost:5557") #连接推送端IP和端口socket_sender=context.socket(zmq.PUSH) #再设置一个socket类型PUSH推送socket_sender.connect("tcp://localhost:5558") #连接IP和端口向其推送while True: data=socket_receive.recv_string() #拉取吸收 print(data) socket_sender.send_string(data) #再将推送出去"""123456789"""#拉取端import zmqcontext=zmq.Context()socket=context.socket(zmq.PULL) #设置socket类型PULL拉取消息socket.bind("tcp://:5558") #绑定IP和端口去拉取消息while True: message=socket.recv_string() print(message)"""123456789"""
