Note that there are some explanatory texts on larger screens.

plurals
  1. POin python: child processes going defunct while others are not, unsure why
    primarykey
    data
    text
    <p>edit: the answer was that the os was axing processes because i was consuming all the memory</p> <p>i am spawning enough subprocesses to keep the load average 1:1 with cores, however at some point within the hour, this script could run for days, 3 of the processes go :</p> <pre><code>tipu 14804 0.0 0.0 328776 428 pts/1 Sl 00:20 0:00 python run.py tipu 14808 64.4 24.1 2163796 1848156 pts/1 Rl 00:20 44:41 python run.py tipu 14809 8.2 0.0 0 0 pts/1 Z 00:20 5:43 [python] &lt;defunct&gt; tipu 14810 60.3 24.3 2180308 1864664 pts/1 Rl 00:20 41:49 python run.py tipu 14811 20.2 0.0 0 0 pts/1 Z 00:20 14:04 [python] &lt;defunct&gt; tipu 14812 22.0 0.0 0 0 pts/1 Z 00:20 15:18 [python] &lt;defunct&gt; tipu 15358 0.0 0.0 103292 872 pts/1 S+ 01:30 0:00 grep python </code></pre> <p>i have no idea why this is happening, attached is the master and slave. i can attach the mysql/pg wrappers if needed as well, any suggestions?</p> <p><code>slave.py</code>:</p> <pre><code>from boto.s3.key import Key import multiprocessing import gzip import os from mysql_wrapper import MySQLWrap from pgsql_wrapper import PGSQLWrap import boto import re class Slave: CHUNKS = 250000 BUCKET_NAME = "bucket" AWS_ACCESS_KEY = "" AWS_ACCESS_SECRET = "" KEY = Key(boto.connect_s3(AWS_ACCESS_KEY, AWS_ACCESS_SECRET).get_bucket(BUCKET_NAME)) S3_ROOT = "redshift_data_imports" COLUMN_CACHE = {} DEFAULT_COLUMN_VALUES = {} def __init__(self, job_queue): self.log_handler = open("logs/%s" % str(multiprocessing.current_process().name), "a"); self.mysql = MySQLWrap(self.log_handler) self.pg = PGSQLWrap(self.log_handler) self.job_queue = job_queue def do_work(self): self.log(str(os.getpid())) while True: #sample job in the abstract: mysql_db.table_with_date-iteration job = self.job_queue.get() #queue is empty if job is None: self.log_handler.close() self.pg.close() self.mysql.close() print("good bye and good day from %d" % (os.getpid())) self.job_queue.task_done() break #curtail iteration table = job.split('-')[0] #strip redshift table from job name redshift_table = re.sub(r"(_[1-9].*)", "", table.split(".")[1]) iteration = int(job.split("-")[1]) offset = (iteration - 1) * self.CHUNKS #columns redshift is expecting #bad tables will slip through and error out, so we catch it try: colnames = self.COLUMN_CACHE[redshift_table] except KeyError: self.job_queue.task_done() continue #mysql fields to use in SELECT statement fields = self.get_fields(table) #list subtraction determining which columns redshift has that mysql does not delta = (list(set(colnames) - set(fields.keys()))) #subtract columns that have a default value and so do not need padding if delta: delta = list(set(delta) - set(self.DEFAULT_COLUMN_VALUES[redshift_table])) #concatinate columns with padded \N select_fields = ",".join(fields.values()) + (",\\N" * len(delta)) query = "SELECT %s FROM %s LIMIT %d, %d" % (select_fields, table, offset, self.CHUNKS) rows = self.mysql.execute(query) self.log("%s: %s\n" % (table, len(rows))) if not rows: self.job_queue.task_done() continue #if there is more data potentially, add it to the queue if len(rows) == self.CHUNKS: self.log("putting %s-%s" % (table, (iteration+1))) self.job_queue.put("%s-%s" % (table, (iteration+1))) #various characters need escaping clean_rows = [] redshift_escape_chars = set( ["\\", "|", "\t", "\r", "\n"] ) in_chars = "" for row in rows: new_row = [] for value in row: if value is not None: in_chars = str(value) else: in_chars = "" #escape any naughty characters new_row.append("".join(["\\" + c if c in redshift_escape_chars else c for c in in_chars])) new_row = "\t".join(new_row) clean_rows.append(new_row) rows = ",".join(fields.keys() + delta) rows += "\n" + "\n".join(clean_rows) offset = offset + self.CHUNKS filename = "%s-%s.gz" % (table, iteration) self.move_file_to_s3(filename, rows) self.begin_data_import(job, redshift_table, ",".join(fields.keys() + delta)) self.job_queue.task_done() def move_file_to_s3(self, uri, contents): tmp_file = "/dev/shm/%s" % str(os.getpid()) self.KEY.key = "%s/%s" % (self.S3_ROOT, uri) self.log("key is %s" % self.KEY.key ) f = gzip.open(tmp_file, "wb") f.write(contents) f.close() #local saving allows for debugging when copy commands fail #text_file = open("tsv/%s" % uri, "w") #text_file.write(contents) #text_file.close() self.KEY.set_contents_from_filename(tmp_file, replace=True) def get_fields(self, table): """ Returns a dict used as: {"column_name": "altered_column_name"} Currently only the debug column gets altered """ exclude_fields = ["_qproc_id", "_mob_id", "_gw_id", "_batch_id", "Field"] query = "show columns from %s" % (table) fields = self.mysql.execute(query) #key raw field, value mysql formatted field new_fields = {} #for field in fields: for field in [val[0] for val in fields]: if field in exclude_fields: continue old_field = field if "debug_mode" == field.strip(): field = "IFNULL(debug_mode, 0)" new_fields[old_field] = field return new_fields def log(self, text): self.log_handler.write("\n%s" % text) def begin_data_import(self, table, redshift_table, fields): query = "copy %s (%s) from 's3://bucket/redshift_data_imports/%s' \ credentials 'aws_access_key_id=%s;aws_secret_access_key=%s' delimiter '\\t' \ gzip NULL AS '' COMPUPDATE ON ESCAPE IGNOREHEADER 1;" \ % (redshift_table, fields, table, self.AWS_ACCESS_KEY, self.AWS_ACCESS_SECRET) self.pg.execute(query) </code></pre> <p><code>master.py</code>:</p> <pre><code>from slave import Slave as Slave import multiprocessing from mysql_wrapper import MySQLWrap as MySQLWrap from pgsql_wrapper import PGSQLWrap as PGSQLWrap class Master: SLAVE_COUNT = 5 def __init__(self): self.mysql = MySQLWrap() self.pg = PGSQLWrap() def do_work(table): pass def get_table_listings(self): """Gathers a list of MySQL log tables needed to be imported""" query = 'show databases' result = self.mysql.execute(query) #turns list[tuple] into a flat list databases = list(sum(result, ())) #overriding during development databases = ['db1', 'db2', 'db3']] exclude = ('mysql', 'Database', 'information_schema') scannable_tables = [] for database in databases: if database in exclude: continue query = "show tables from %s" % database result = self.mysql.execute(query) #turns list[tuple] into a flat list tables = list(sum(result, ())) for table in tables: exclude = ("Tables_in_%s" % database, "(", "201303", "detailed", "ltv") #exclude any of the unfavorables if any(s in table for s in exclude): continue scannable_tables.append("%s.%s-1" % (database, table)) return scannable_tables def init(self): #fetch redshift columns once and cache #get columns from redshift so we can pad the mysql column delta with nulls tables = ('table1', 'table2', 'table3') for table in tables: #cache columns query = "SELECT column_name FROM information_schema.columns WHERE \ table_name = '%s'" % (table) result = self.pg.execute(query, async=False, ret=True) Slave.COLUMN_CACHE[table] = list(sum(result, ())) #cache default values query = "SELECT column_name FROM information_schema.columns WHERE \ table_name = '%s' and column_default is not \ null" % (table) result = self.pg.execute(query, async=False, ret=True) #turns list[tuple] into a flat list result = list(sum(result, ())) Slave.DEFAULT_COLUMN_VALUES[table] = result def run(self): self.init() job_queue = multiprocessing.JoinableQueue() tables = self.get_table_listings() for table in tables: job_queue.put(table) processes = [] for i in range(Master.SLAVE_COUNT): process = multiprocessing.Process(target=slave_runner, args=(job_queue,)) process.daemon = True process.start() processes.append(process) #blocks this process until queue reaches 0 job_queue.join() #signal each child process to GTFO for i in range(Master.SLAVE_COUNT): job_queue.put(None) #blocks this process until queue reaches 0 job_queue.join() job_queue.close() #do not end this process until child processes close out for process in processes: process.join() #toodles ! print("this is master saying goodbye") def slave_runner(queue): slave = Slave(queue) slave.do_work() </code></pre>
    singulars
    1. This table or related slice is empty.
    plurals
    1. This table or related slice is empty.
    1. This table or related slice is empty.
    1. This table or related slice is empty.
 

Querying!

 
Guidance

SQuiL has stopped working due to an internal error.

If you are curious you may find further information in the browser console, which is accessible through the devtools (F12).

Reload