Push and Pull sockets let you distribute messages to multiple workers, arranged in a pipeline. A Push socket will distribute sent messages to its Pull clients evenly. This is equivalent to producer/consumer model but the results computed by consumer are not sent upstream but downstream to another pull/consumer socket.
Data always flows down the pipeline, and each stage of the pipeline is connected to at least one node. When a pipeline stage is connected to multiple nodes data is load-balanced among all connected nodes
# producer.py # Producers are created with ZMQ.PUSH socket types. Producer is bound to well known port to which consumers can connect too. import time import zmq def producer(): context = zmq.Context() zmq_socket = context.socket(zmq.PUSH) zmq_socket.bind("tcp://") # Start your result manager and workers before you start your producers for num in xrange(20000): work_message = { ‘num‘ : num } zmq_socket.send_json(work_message) producer()
# consumer.py # Consumer are created with ZMQ.PULL socket types to pull requests from producer and uses a push socket to connect and push result to result collector. import time import zmq import random def consumer(): consumer_id = random.randrange(1,10005) print "I am consumer #%s" % (consumer_id) context = zmq.Context() # recieve work consumer_receiver = context.socket(zmq.PULL) consumer_receiver.connect("tcp://") # send work consumer_sender = context.socket(zmq.PUSH) consumer_sender.connect("tcp://") while True: work = consumer_receiver.recv_json() data = work[‘num‘] result = { ‘consumer‘ : consumer_id, ‘num‘ : data} if data%2 == 0: consumer_sender.send_json(result) consumer()
# resultcollector.py # result collector are created with ZMQ.PULL socket type and act as consumer of results from intermediate consumers. They also are bound to well known port so that intermedia# te consumer can connect to it. import time import zmq import pprint def result_collector(): context = zmq.Context() results_receiver = context.socket(zmq.PULL) results_receiver.bind("tcp://") collecter_data = {} for x in xrange(1000): result = results_receiver.recv_json() if collecter_data.has_key(result[‘consumer‘]): collecter_data[result[‘consumer‘]] = collecter_data[result[‘consumer‘]] + 1 else: collecter_data[result[‘consumer‘]] = 1 if x == 999: pprint.pprint(collecter_data) result_collector()
# running it: python resultcollector.py python consumer.py python consumer.py python producer.py # Results shows the distribution of transmitted result to result collector: (D:\anaconda) C:\Users\admin\Desktop\opt>python resultcollector.py {5892: 1000} (D:\anaconda) C:\Users\admin\Desktop\opt>python resultcollector.py {1223: 1000} (D:\anaconda) C:\Users\admin\Desktop\opt>python resultcollector.py {5892: 1000} # consumer-1 (D:\anaconda) C:\Users\admin\Desktop\opt>python consumer.py I am consumer #5892 # consumer-2 (D:\anaconda) C:\Users\admin\Desktop\opt>python consumer.py I am consumer #1223 # producer (D:\anaconda) C:\Users\admin\Desktop\opt>python producer.py