Note that there are some explanatory texts on larger screens.

plurals
  1. PO
    primarykey
    data
    text
    <p><strong>Edit1: this is not quite correct...give me a few moments to get it right...</strong></p> <p>I think you may be invoking the Client class the wrong way. I'm not an expert with this, but I think your client should be subclassed from Process, and then run using the .start() function. So, define your Client class like this:</p> <pre><code>class Client(Process): def __init__(self, port_push, port_sub): (...) # your class init code here...make sure indentation is correct </code></pre> <p>Then at the end where you run the servers, create an instance of your Client class and start it like so:</p> <pre><code>client_class = Client(port_push, port_sub) client_class.start() </code></pre> <p><strong>Edit2: Here's an edited version of fccoelho's code that works for me.</strong></p> <p>The biggest problem appears to be that the ZMQ initialization stuff needs to be done in the <code>__call__</code> method, not in <code>__init__</code>. I suspect this is due to how memory is allocated in multiprocessing, in that the <code>__init__</code> function will be done in the parent process, while the <code>__call__</code> function is done in the child process with a separate memory space. Apparently ZMQ doesn't like this. I've also added some wait times to prevent the client from connecting to the server before the server is ready, and to prevent the server from sending messages before the client subscribes. Also using 127.0.0.1 instead of localhost (my computer doesn't like localhost for some reason). Also removed the annoying print messages around the poll call in the client, and fixed the indentation problem where the client checks the poll results on the pubsub socket.</p> <pre><code>import zmq import time import sys import random from multiprocessing import Process def server_push(port="5556"): context = zmq.Context() socket = context.socket(zmq.PUSH) socket.bind("tcp://127.0.0.1:%s" % port) print "Running server on port: ", port time.sleep(1.0) # serves only 5 request and dies for reqnum in range(10): if reqnum &lt; 6: socket.send("Continue") else: socket.send("Exit") print 'Push server sent "Exit" signal' break time.sleep(0.4) def server_pub(port="5558"): context = zmq.Context() socket = context.socket(zmq.PUB) socket.bind("tcp://127.0.0.1:%s" % port) socket.setsockopt(zmq.HWM, 1000) publisher_id = random.randrange(0,9999) print "Running server on port: ", port time.sleep(1.0) # serves only 5 request and dies for reqnum in range(10): # Wait for next request from client topic = random.randrange(8,10) messagedata = "server#%s" % publisher_id print "%s %s" % (topic, messagedata) socket.send("%d %s" % (topic, messagedata)) time.sleep(0.4) class Client: def __init__(self,port_push, port_sub): self.port_push = port_push self.port_sub = port_sub # Initialize poll set def __call__(self): time.sleep(0.5) print 'hello from class client!' context = zmq.Context() self.socket_pull = context.socket(zmq.PULL) self.socket_pull.connect ("tcp://127.0.0.1:%s" % self.port_push) print "Connected to server with port %s" % self.port_push self.socket_sub = context.socket(zmq.SUB) self.socket_sub.connect ("tcp://127.0.0.1:%s" % self.port_sub) self.socket_sub.setsockopt(zmq.SUBSCRIBE, "9") print "Connected to publisher with port %s" % self.port_sub poller = zmq.Poller() poller.register(self.socket_pull, zmq.POLLIN) poller.register(self.socket_sub, zmq.POLLIN) # Work on requests from both server and publisher should_continue = True print "listening" while should_continue: # print "hello" socks = dict(poller.poll()) # print poller if self.socket_pull in socks and socks[self.socket_pull] == zmq.POLLIN: message = self.socket_pull.recv() print "Recieved control command: %s" % message if message == "Exit": print "Recieved exit command, client will stop recieving messages" should_continue = False if self.socket_sub in socks and socks[self.socket_sub] == zmq.POLLIN: string = self.socket_sub.recv() topic, messagedata = string.split() print "Processing ... ", topic, messagedata def client(port_push, port_sub): print 'hello from function client!' context = zmq.Context() socket_pull = context.socket(zmq.PULL) socket_pull.connect ("tcp://127.0.0.1:%s" % port_push) print "Connected to server with port %s" % port_push socket_sub = context.socket(zmq.SUB) socket_sub.connect ("tcp://127.0.0.1:%s" % port_sub) socket_sub.setsockopt(zmq.SUBSCRIBE, "9") print "Connected to publisher with port %s" % port_sub # Initialize poll set poller = zmq.Poller() poller.register(socket_pull, zmq.POLLIN) poller.register(socket_sub, zmq.POLLIN) # Work on requests from both server and publisher should_continue = True while should_continue: socks = dict(poller.poll(1000)) if socket_pull in socks and socks[socket_pull] == zmq.POLLIN: message = socket_pull.recv() print "Recieved control command: %s" % message if message == "Exit": print "Recieved exit command, client will stop recieving messages" should_continue = False if socket_sub in socks and socks[socket_sub] == zmq.POLLIN: string = socket_sub.recv() topic, messagedata = string.split() print "Processing ... ", topic, messagedata if __name__ == "__main__": # Now we can run a few servers server_push_port = "5556" server_pub_port = "5558" Process(target=server_push, args=(server_push_port,)).start() Process(target=server_pub, args=(server_pub_port,)).start() # Process(target=client,args=(server_push_port,server_pub_port)).start() Process(target=Client(server_push_port,server_pub_port)).start() </code></pre> <p>Finally, here's a cleaner implementation of multi-process pubsub that's very bare-bones, but demonstrates things more clearly:</p> <pre><code>import zmq from multiprocessing import Process import time class ServerPubSub(Process): def __init__(self, port, n): Process.__init__(self) self.port = port self.n = n def run(self): self.context = zmq.Context() self.pub = self.context.socket(zmq.PUB) self.pub.bind('tcp://127.0.0.1:%d' % self.port) self.pub.setsockopt(zmq.HWM, 1000) time.sleep(1) end = False for i in range(self.n): print 'SRV: sending message %d' % i self.pub.send('Message %d' % i) print 'SRV: message %d sent' % i time.sleep(0.2) self.pub.close() class ClientPubSub(Process): def __init__(self, port, n): Process.__init__(self) self.port = port self.n = n def run(self): self.context = zmq.Context() self.sub = self.context.socket(zmq.SUB) self.sub.connect('tcp://127.0.0.1:%d' % self.port) self.sub.setsockopt(zmq.SUBSCRIBE, '') self.poller = zmq.Poller() self.poller.register(self.sub, zmq.POLLIN) end = False count = 0 while count &lt; self.n: ready = dict(self.poller.poll(0)) if self.sub in ready and ready[self.sub] == zmq.POLLIN: msg = self.sub.recv() print 'CLI: received message "%s"' % msg count += 1 self.sub.close() if __name__ == "__main__": port = 5000 n = 10 server = ServerPubSub(port, n) client = ClientPubSub(port, n) server.start() client.start() server.join() client.join() </code></pre>
    singulars
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. VO
      singulars
      1. This table or related slice is empty.
    2. VO
      singulars
      1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload