Note that there are some explanatory texts on larger screens.

plurals
  1. POPython multithreading without a queue working with large data sets
    primarykey
    data
    text
    <p>I am running through a csv file of about 800k rows. I need a threading solution that runs through each row and spawns 32 threads at a time into a worker. I want to do this without a queue. It looks like current python threading solution with a queue is eating up alot of memory. </p> <p>Basically want to read a csv file row and put into a worker thread. And only want 32 threads running at a time.</p> <p>This is current script. It appears that it is reading the entire csv file into queue and doing a queue.join(). Is it correct that it is loading the entire csv into a queue then spawning the threads?</p> <pre><code>queue=Queue.Queue() def worker(): while True: task=queue.get() try: subprocess.call(['php {docRoot}/cli.php -u "api/email/ses" -r "{task}"'.format( docRoot=docRoot, task=task )],shell=True) except: pass with lock: stats['done']+=1 if int(time.time())!=stats.get('now'): stats.update( now=int(time.time()), percent=(stats.get('done')/stats.get('total'))*100, ps=(stats.get('done')/(time.time()-stats.get('start'))) ) print("\r {percent:.1f}% [{progress:24}] {persec:.3f}/s ({done}/{total}) ETA {eta:&lt;12}".format( percent=stats.get('percent'), progress=('='*int((23*stats.get('percent'))/100))+'&gt;', persec=stats.get('ps'), done=int(stats.get('done')), total=stats.get('total'), eta=snippets.duration.time(int((stats.get('total')-stats.get('done'))/stats.get('ps'))) ),end='') queue.task_done() for i in range(32): workers=threading.Thread(target=worker) workers.daemon=True workers.start() try: with open(csvFile,'rb') as fh: try: dialect=csv.Sniffer().sniff(fh.readline(),[',',';']) fh.seek(0) reader=csv.reader(fh,dialect) headers=reader.next() except csv.Error as e: print("\rERROR[CSV] {error}\n".format(error=e)) else: while True: try: data=reader.next() except csv.Error as e: print("\rERROR[CSV] - Line {line}: {error}\n".format( line=reader.line_num, error=e)) except StopIteration: break else: stats['total']+=1 queue.put(urllib.urlencode(dict(zip(headers,data)+dict(campaign=row.get('Campaign')).items()))) queue.join() </code></pre>
    singulars
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload