Note that there are some explanatory texts on larger screens.

plurals
  1. PO
    text
    copied!<p>The multiprocessing <code>JoinableQueue</code> class is designed to allow limiting the backlog that can build up while waiting for child threads / processes to consume tasks. I'm going to assume you're reading work in from a file, and that the file is too large to easily hold in memory all at once.</p> <p>The following is my attempt at a solution that should limit memory usage. In this example I'm processing a newline terminated series of dates, converting them into a standard format, and writing them back out to a new file. </p> <p>I'm by no means an expert with the multiprocessing module, so should anyone see a mistake / better way to do it, I would like to hear it.</p> <pre><code>from multiprocessing import Process, Queue, JoinableQueue import time date_formats = [ "%Y%m", "%Y-%m-%d", "%y-%m-%d", "%y%m%d", "%Y%m%d", "%m/%d/%Y", "%m/%d/%y", "%m/%d/%Y %H:%M", "%m%d%y", "%m%d%Y", "%B, %d %Y", "%B, %d %y", "%d %B, %Y", "%d %B, %y", "%B %d %Y", "%B %d %y", "%B %d, %Y", "%B %d, %y", "%B %d %Y", "%B %d %y", "%b %d %Y", "%b %d, %Y", "%b %d %y", "%b %d, %y", "%d-%b-%y", "%Y-%m-%d %H:%M:%S" ] def convert_date(date): date = date.strip() for dateformat in date_formats: try: converted = time.strptime(date, dateformat) converted = time.strftime("%Y-%m-%d", converted) return converted except ValueError: continue def writer(result_queue): f = open("iso_dates.out", "wb") while True: try: date = result_queue.get(timeout=1) f.write(date + '\n') except: break f.close() def worker(work_queue, result_queue): while True: date = work_queue.get() if not date: break result_queue.put(convert_date(date)) work_queue.task_done() dates = open("dates.out", "rb") work_queue = JoinableQueue(512) #allow no more than 512 items on queue result_queue = Queue() writer_proc = Process(target=writer, args=(result_queue,)) worker_procs = 2 for i in range(worker_procs): p = Process(target=worker, args=(work_queue, result_queue)) p.daemon = True p.start() writer_proc.start() for date in dates: work_queue.put(date) #will block until tasks are consumed if maxsize is encountered work_queue.join() dates.close() </code></pre>
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload