"""A simple script to backfill tko_task_references table with throttling.""" from __future__ import absolute_import from __future__ import division from __future__ import print_function import argparse import collections import contextlib import logging import time import MySQLdb class BackfillException(Exception): pass def _parse_args(): parser = argparse.ArgumentParser( description=__doc__) parser.add_argument('--host', required=True, help='mysql server host') parser.add_argument('--user', required=True, help='mysql server user') parser.add_argument('--password', required=True, help='mysql server password') parser.add_argument('--dryrun', action='store_true', default=False) parser.add_argument( '--num-iterations', default=None, type=int, help='If set, total number of iterations. Default is no limit.', ) parser.add_argument( '--batch-size', default=1000, help='Number of tko_jobs rows to read in one iteration', ) parser.add_argument( '--sleep-seconds', type=int, default=1, help='Time to sleep between iterations', ) args = parser.parse_args() if args.dryrun: if not args.num_iterations: logging.info('DRYRUN: Limiting to 5 iterations in dryrun mode.') args.num_iterations = 5 return args @contextlib.contextmanager def _mysql_connection(args): conn = MySQLdb.connect(user=args.user, host=args.host, passwd=args.password) with _mysql_cursor(conn) as c: c.execute('USE chromeos_autotest_db;') try: yield conn finally: conn.close() @contextlib.contextmanager def _autocommit(conn): try: yield conn except: conn.rollback() else: conn.commit() @contextlib.contextmanager def _mysql_cursor(conn): c = conn.cursor() try: yield c finally: c.close() def _latest_unfilled_job_idx(conn): with _mysql_cursor(conn) as c: c.execute(""" SELECT tko_job_idx FROM tko_task_references ORDER BY tko_job_idx LIMIT 1 ;""") r = c.fetchall() if r: return str(long(r[0][0]) - 1) logging.debug('tko_task_references is empty.' ' Grabbing the latest tko_job_idx to fill.') with _mysql_cursor(conn) as c: c.execute(""" SELECT job_idx FROM tko_jobs ORDER BY job_idx DESC LIMIT 1 ;""") r = c.fetchall() if r: return r[0][0] return None _TKOTaskReference = collections.namedtuple( '_TKOTaskReference', ['tko_job_idx', 'task_reference', 'parent_task_reference'], ) _SQL_SELECT_TASK_REFERENCES = """ SELECT job_idx, afe_job_id, afe_parent_job_id FROM tko_jobs WHERE job_idx <= %(latest_job_idx)s ORDER BY job_idx DESC LIMIT %(batch_size)s ;""" _SQL_INSERT_TASK_REFERENCES = """ INSERT INTO tko_task_references(reference_type, tko_job_idx, task_id, parent_task_id) VALUES %(values)s ;""" _SQL_SELECT_TASK_REFERENCE = """ SELECT tko_job_idx FROM tko_task_references WHERE tko_job_idx = %(tko_job_idx)s ;""" def _compute_task_references(conn, latest_job_idx, batch_size): with _mysql_cursor(conn) as c: sql = _SQL_SELECT_TASK_REFERENCES % { 'latest_job_idx': latest_job_idx, 'batch_size': batch_size, } c.execute(sql) rs = c.fetchall() if rs is None: return [] return [_TKOTaskReference(r[0], r[1], r[2]) for r in rs] def _insert_task_references(conn, task_references, dryrun): values = ', '.join([ '("afe", %s, "%s", "%s")' % (tr.tko_job_idx, tr.task_reference, tr.parent_task_reference) for tr in task_references ]) sql = _SQL_INSERT_TASK_REFERENCES % {'values': values} if dryrun: if len(sql) < 200: sql_log = sql else: sql_log = '%s... [SNIP] ...%s' % (sql[:150], sql[-49:]) logging.debug('Would have run: %s', sql_log) with _autocommit(conn) as conn: with _mysql_cursor(conn) as c: c.execute(sql) def _verify_task_references(conn, task_references): # Just verify that the last one was inserted. if not task_references: return tko_job_idx = task_references[-1].tko_job_idx sql = _SQL_SELECT_TASK_REFERENCE % {'tko_job_idx': tko_job_idx} with _mysql_cursor(conn) as c: c.execute(sql) r = c.fetchall() if not r or r[0][0] != tko_job_idx: raise BackfillException( 'Failed to insert task reference for tko_job_id %s' % tko_job_idx) def _next_job_idx(task_references): return str(long(task_references[-1].tko_job_idx) - 1) def main(): logging.basicConfig(level=logging.DEBUG) args = _parse_args() with _mysql_connection(args) as conn: tko_job_idx = _latest_unfilled_job_idx(conn) if tko_job_idx is None: raise BackfillException('Failed to get last unfilled tko_job_idx') logging.info('First tko_job_idx to fill: %s', tko_job_idx) while True: logging.info('####################################') logging.info('Start backfilling from tko_job_idx: %s', tko_job_idx) task_references = () with _mysql_connection(args) as conn: task_references = _compute_task_references( conn, tko_job_idx, args.batch_size) if not task_references: logging.info('No more unfilled task references. All done!') break logging.info( 'Inserting %d task references. tko_job_ids: %d...%d', len(task_references), task_references[0].tko_job_idx, task_references[-1].tko_job_idx, ) with _mysql_connection(args) as conn: _insert_task_references(conn, task_references, args.dryrun) if not args.dryrun: with _mysql_connection(args) as conn: _verify_task_references(conn, task_references) tko_job_idx = _next_job_idx(task_references) if args.num_iterations is not None: args.num_iterations -= 1 if args.num_iterations <= 0: break logging.info('%d more iterations left', args.num_iterations) logging.info('Iteration done. Sleeping for %d seconds', args.sleep_seconds) time.sleep(args.sleep_seconds) if __name__ == '__main__': main()