Note that there are some explanatory texts on larger screens.

plurals
  1. POend daemon processes with multiprocessing module
    text
    copied!<p>I include an example usage of multiprocessing below. This is a process pool model. It is not as simple as it might be, but is relatively close in structure to the code I'm actually using. It also uses sqlalchemy, sorry.</p> <p>My question is - I currently have a situation where I have a relatively long running Python script which is executing a number of functions which each look like the code below, so the parent process is the same in all cases. In other words, multiple pools are created by one python script. (I don't have to do it this way, I suppose, but the alternative is to use something like os.system and subprocess.) The problem is that these processes hang around and hold on to memory. The docs say these daemon processes are supposed to stick around till the parent process exits, but what about if the parent process then goes on to generate another pool or processes and doesn't exit immediately.</p> <p>Calling terminate() works, but this doesn't seem terribly polite. Is there a good way to ask the processes to terminate nicely? I.e. clean up after yourself and go away now, I need to start up the next pool?</p> <p>I also tried calling join() on the processes. According to the documentation this means wait for the processes to terminate. What if they don't plan to terminate? What actually happens is that the process hangs.</p> <p>Thanks in advance.</p> <p>Regards, Faheem.</p> <pre><code>import multiprocessing, time class Worker(multiprocessing.Process): """Process executing tasks from a given tasks queue""" def __init__(self, queue, num): multiprocessing.Process.__init__(self) self.num = num self.queue = queue self.daemon = True def run(self): import traceback while True: func, args, kargs = self.queue.get() try: print "trying %s with args %s"%(func.__name__, args) func(*args, **kargs) except: traceback.print_exc() self.queue.task_done() class ProcessPool: """Pool of threads consuming tasks from a queue""" def __init__(self, num_threads): self.queue = multiprocessing.JoinableQueue() self.workerlist = [] self.num = num_threads for i in range(num_threads): self.workerlist.append(Worker(self.queue, i)) def add_task(self, func, *args, **kargs): """Add a task to the queue""" self.queue.put((func, args, kargs)) def start(self): for w in self.workerlist: w.start() def wait_completion(self): """Wait for completion of all the tasks in the queue""" self.queue.join() for worker in self.workerlist: print worker.__dict__ #worker.terminate() &lt;--- terminate used here worker.join() &lt;--- join used here start = time.time() from sqlalchemy import * from sqlalchemy.orm import * dbuser = '' password = '' dbname = '' dbstring = "postgres://%s:%s@localhost:5432/%s"%(dbuser, password, dbname) db = create_engine(dbstring, echo=True) m = MetaData(db) def make_foo(i): t1 = Table('foo%s'%i, m, Column('a', Integer, primary_key=True)) conn = db.connect() for i in range(10): conn.execute("DROP TABLE IF EXISTS foo%s"%i) conn.close() for i in range(10): make_foo(i) m.create_all() def do(i, dbstring): dbstring = "postgres://%s:%s@localhost:5432/%s"%(dbuser, password, dbname) db = create_engine(dbstring, echo=True) Session = scoped_session(sessionmaker()) Session.configure(bind=db) Session.execute("ALTER TABLE foo%s SET ( autovacuum_enabled = false );"%i) Session.execute("ALTER TABLE foo%s SET ( autovacuum_enabled = true );"%i) Session.commit() pool = ProcessPool(5) for i in range(10): pool.add_task(do, i, dbstring) pool.start() pool.wait_completion() </code></pre>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload