# Copyright 2019 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. '''Utilities to summarize TKO results reported by tests in the suite.''' from __future__ import absolute_import from __future__ import division from __future__ import print_function import argparse import collections import contextlib import mysql.connector def Error(Exception): """Error detected in this script.""" # Row corresponds to a single row of tko_test_view_2 table in AFE DB, but # contains only a subset of the columns in the table. Row = collections.namedtuple( 'Row', 'name, status, reason' ) def get(conn, task_ids): """Get tko_test_view_2 Row()s for given skylab task_ids. @param conn: A MySQL connection to TKO. @param task_ids: list of Skylab task request IDs to collect test views for. @return: {task_id: [Row(...)...]} """ try: task_job_ids = _get_job_idxs_from_tko(conn, task_ids) job_task_ids = {v: k for k, v in task_job_ids.iteritems()} job_rows = _get_rows_from_tko(conn, job_task_ids.keys()) return {job_task_ids[k]: v for k, v in job_rows.iteritems()} finally: conn.close() def filter_failed(rows): """Filter down given list of test_views Row() to failed tests.""" return [r for r in rows if r.status in _BAD_STATUSES] def main(): '''Entry-point to use this script standalone.''' parser = argparse.ArgumentParser( description='Summarize TKO results for a Skylab task') parser.add_argument( '--task-id', action='append', help='Swarming request ID for the skylab task (may be repeated)', ) parser.add_argument( '--host', required=True, help='TKO host IP', ) parser.add_argument( '--port', type=int, default=3306, help='TKO port', ) parser.add_argument( '--user', required=True, help='TKO MySQL user', ) parser.add_argument( '--password', required=True, help='TKO MySQL password', ) args = parser.parse_args() if not args.task_id: raise Error('Must request at least one --task-id') conn = mysql.connector.connect( host=args.host, port=args.port, user=args.user, password=args.password, database='chromeos_autotest_db', ) views = get(conn, args.task_id) for task_id, rows in views.iteritems(): print('Task ID: %s' % task_id) for row in filter_failed(rows): print(' %s in status %s' % (row.name, row.status)) print(' reason: %s' % (row.reason,)) print('') _BAD_STATUSES = { 'ABORT', 'ERROR', 'FAIL', } def _get_rows_from_tko(conn, tko_job_ids): """Get a list of Row() for the given TKO job IDs. @param conn: A MySQL connection. @param job_ids: List of tko_job_ids to get Row()s for. @return: {tko_job_id: [Row]} """ job_rows = collections.defaultdict(list) statuses = _get_status_map(conn) _GET_TKO_TEST_VIEW_2 = """ SELECT job_idx, test_name, status_idx, reason FROM tko_test_view_2 WHERE invalid = 0 AND job_idx IN (%s) """ q = _GET_TKO_TEST_VIEW_2 % ', '.join(['%s'] * len(tko_job_ids)) with _cursor(conn) as cursor: cursor.execute(q, tko_job_ids) for job_idx, name, s_idx, reason in cursor.fetchall(): job_rows[job_idx].append( Row(name, statuses.get(s_idx, 'UNKNOWN'), reason)) return dict(job_rows) def _get_job_idxs_from_tko(conn, task_ids): """Get tko_job_idx for given task_ids. Task execution reports the run ID to TKO, but Skylab clients only knows the request ID of the created task. Swarming executes a task with increasing run IDs, retrying on bot failure. If a task is retried after the point where TKO results are reported, this function returns the TKO job_idx corresponding to the last completed attempt. @param conn: MySQL connection to TKO. @param task_ids: List of task request IDs to get TKO job IDs for. @return {task_id: job_id} """ task_runs = {} run_ids = [] for task_id in task_ids: run_ids += _run_ids_for_request(task_id) task_runs[task_id] = list(reversed(run_ids)) run_job_idxs = _get_job_idxs_for_run_ids(conn, run_ids) task_job_idxs = {} for task_id, run_ids in task_runs.iteritems(): for run_id in run_ids: if run_id in run_job_idxs: task_job_idxs[task_id] = run_job_idxs[run_id] break return task_job_idxs def _get_job_idxs_for_run_ids(conn, run_ids): """Get tko_job_idx for a given task run_ids. @param conn: MySQL connection to TKO. @param task_ids: List of task run IDs to get TKO job IDs for. @return {run_id: job_id} """ _GET_TKO_JOB_Q = """ SELECT task_id, tko_job_idx FROM tko_task_references WHERE reference_type = "skylab" AND task_id IN (%s) """ q = _GET_TKO_JOB_Q % ', '.join(['%s'] * len(run_ids)) job_idxs = {} with _cursor(conn) as cursor: cursor.execute(q, run_ids) for run_id, tko_job_idx in cursor.fetchall(): if run_id in job_idxs: raise Error('task run ID %s has multiple tko references' % (run_id,)) job_idxs[run_id] = tko_job_idx return job_idxs def _get_status_map(conn): statuses = {} with _cursor(conn) as cursor: cursor.execute('SELECT status_idx, word FROM tko_status') r = cursor.fetchall() for idx, word in r: statuses[idx] = word return statuses def _run_ids_for_request(request_id): """Return Swarming run IDs for a given request ID, in ascending order.""" prefix = request_id[:len(request_id)-1] return [prefix + i for i in ('1', '2')] @contextlib.contextmanager def _cursor(conn): c = conn.cursor() try: yield c finally: c.close() if __name__ == '__main__': main()