1#!/usr/bin/python 2# 3# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7 8"""Script to calculate timing stats for suites. 9 10This script measures nine stats for a suite run. 111. Net suite runtime. 122. Suite scheduling overhead. 133. Average scheduling overhead. 144. Average Queuing time. 155. Average Resetting time. 166. Average provisioning time. 177. Average Running time. 188. Average Parsing time. 199. Average Gathering time. 20 21When the cron_mode is enabled, this script throws all stats but the first one 22(Net suite runtime) to Graphite because the first one is already 23being sent to Graphite by Autotest online. 24 25Net suite runtime is end-to-end time for a suite from the beginning 26to the end. 27It is stored in a field, "duration", of a type, "suite_runtime" in 28elasticsearch (ES). 29 30Suite scheduling overhead is defined by the average of DUT overheads. 31Suite is composed of one or more jobs, and those jobs are run on 32one or more DUTs that are available. 33A DUT overhead is defined by: 34 DUT_i overhead = sum(net time for job_k - runtime for job_k 35 - runtime for special tasks of job_k) 36 Job_k are the jobs run on DUT_i. 37 38Net time for a job is the time from job_queued_time to hqe_finished_time. 39job_queued_time is stored in the "queued_time" column of "tko_jobs" table. 40hqe_finished_time is stored in the "finished_on" of "afe_host_queue_entries" 41table. 42We do not use "job_finished_time" of "tko_jobs" as job_finished_time is 43recorded before gathering/parsing/archiving. 44We do not use hqe started time ("started_on" of "afe_host_queue_entries"), 45as it does not account for the lag from a host is assigned to the job till 46the scheduler sees the assignment. 47 48Runtime for job_k is the sum of durations for the records of 49"job_time_breakdown" type in ES that have "Queued" or "Running" status. 50It is possible that a job has multiple "Queued" records when the job's test 51failed and tried again. 52We take into account only the last "Queued" record. 53 54Runtime for special tasks of job_k is the sum of durations for the records 55of "job_time_breakdown" type in ES that have "Resetting", "Provisioning", 56"Gathering", or "Parsing" status. 57We take into account only the records whose timestamp is larger than 58the timestamp of the last "Queued" record. 59""" 60 61import argparse 62from datetime import datetime 63from datetime import timedelta 64 65import common 66from autotest_lib.client.common_lib import host_queue_entry_states 67from autotest_lib.client.common_lib import time_utils 68from autotest_lib.client.common_lib.cros.graphite import autotest_es 69from autotest_lib.frontend import setup_django_environment 70from autotest_lib.frontend.afe import models 71from autotest_lib.frontend.tko import models as tko_models 72from autotest_lib.server import utils 73from autotest_lib.site_utils import job_overhead 74 75 76_options = None 77 78_hqes = host_queue_entry_states.Status 79_states = [ 80 _hqes.QUEUED, _hqes.RESETTING, _hqes.PROVISIONING, 81 _hqes.RUNNING, _hqes.GATHERING, _hqes.PARSING] 82 83 84def mean(l): 85 """ 86 Calculates an Arithmetic Mean for the numbers in a list. 87 88 @param l: A list of numbers. 89 @return: Arithmetic mean if the list is not empty. 90 Otherwise, returns zero. 91 """ 92 return float(sum(l)) / len(l) if l else 0 93 94 95def print_verbose(string, *args): 96 if _options.verbose: 97 print(string % args) 98 99 100def get_nontask_runtime(job_id, dut, job_info_dict): 101 """ 102 Get sum of durations for "Queued", "Running", "Parsing", and "Gathering" 103 status records. 104 job_info_dict will be modified in this function to store the duration 105 for each status. 106 107 @param job_id: The job id of interest. 108 @param dut: Hostname of a DUT that the job ran on. 109 @param job_info_dict: Dictionary that has information for jobs. 110 @return: Tuple of sum of durations and the timestamp for the last 111 Queued record. 112 """ 113 results = autotest_es.query( 114 fields_returned=['status', 'duration', 'time_recorded'], 115 equality_constraints=[('_type', 'job_time_breakdown'), 116 ('job_id', job_id), 117 ('hostname', dut)], 118 sort_specs=[{'time_recorded': 'desc'}]) 119 120 sum = 0 121 last_queued_timestamp = 0 122 # There could be multiple "Queued" records. 123 # Get sum of durations for the records after the last "Queued" records 124 # (including the last "Queued" record). 125 # Exploits the fact that "results" are ordered in the descending order 126 # of time_recorded. 127 for hit in results.hits: 128 job_info_dict[job_id][hit['status']] = float(hit['duration']) 129 if hit['status'] == 'Queued': 130 # The first Queued record is the last one because of the descending 131 # order of "results". 132 last_queued_timestamp = float(hit['time_recorded']) 133 sum += float(hit['duration']) 134 break 135 else: 136 sum += float(hit['duration']) 137 return (sum, last_queued_timestamp) 138 139 140def get_tasks_runtime(task_list, dut, t_start, job_id, job_info_dict): 141 """ 142 Get sum of durations for special tasks. 143 job_info_dict will be modified in this function to store the duration 144 for each special task. 145 146 @param task_list: List of task id. 147 @param dut: Hostname of a DUT that the tasks ran on. 148 @param t_start: Beginning timestamp. 149 @param job_id: The job id that is related to the tasks. 150 This is used only for debugging purpose. 151 @param job_info_dict: Dictionary that has information for jobs. 152 @return: Sum of durations of the tasks. 153 """ 154 t_start_epoch = time_utils.to_epoch_time(t_start) 155 results = autotest_es.query( 156 fields_returned=['status', 'task_id', 'duration'], 157 equality_constraints=[('_type', 'job_time_breakdown'), 158 ('hostname', dut)], 159 range_constraints=[('time_recorded', t_start_epoch, None)], 160 batch_constraints=[('task_id', task_list)]) 161 sum = 0 162 for hit in results.hits: 163 sum += float(hit['duration']) 164 job_info_dict[job_id][hit['status']] = float(hit['duration']) 165 print_verbose('Task %s for Job %s took %s', 166 hit['task_id'], job_id, hit['duration']) 167 return sum 168 169 170def get_job_runtime(job_id, dut, job_info_dict): 171 """ 172 Get sum of durations for the entries that are related to a job. 173 job_info_dict will be modified in this function. 174 175 @param job_id: The job id of interest. 176 @param dut: Hostname of a DUT that the job ran on. 177 @param job_info_dict: Dictionary that has information for jobs. 178 @return: Total duration taken by a job. 179 """ 180 sum, t_last_queued = get_nontask_runtime(job_id, dut, job_info_dict) 181 print_verbose('Job %s took %f, last Queued: %s', 182 job_id, sum, t_last_queued) 183 sum += get_tasks_runtime( 184 list(job_info_dict[job_id]['tasks']), dut, t_last_queued, 185 job_id, job_info_dict) 186 return sum 187 188 189def get_dut_overhead(dut, jobs, job_info_dict): 190 """ 191 Calculates the scheduling overhead of a DUT. 192 193 The scheduling overhead of a DUT is defined by the sum of scheduling 194 overheads for the jobs that ran on the DUT. 195 The scheduling overhead for a job is defined by the difference 196 of net job runtime and real job runtime. 197 job_info_dict will be modified in this function. 198 199 @param dut: Hostname of a DUT. 200 @param jobs: The list of jobs that ran on the DUT. 201 @param job_info_dict: Dictionary that has information for jobs. 202 @return: Scheduling overhead of a DUT in a floating point value. 203 The unit is a second. 204 """ 205 overheads = [] 206 for job_id in jobs: 207 (t_start, t_end) = job_info_dict[job_id]['timestamps'] 208 runtime = get_job_runtime(job_id, dut, job_info_dict) 209 overheads.append(t_end - t_start - runtime) 210 print_verbose('Job: %s, Net runtime: %f, Real runtime: %f, ' 211 'Overhead: %f', job_id, t_end - t_start, runtime, 212 t_end - t_start - runtime) 213 return sum(overheads) 214 215 216def get_child_jobs_info(suite_job_id, num_child_jobs, sanity_check): 217 """ 218 Gets information about child jobs of a suite. 219 220 @param suite_job_id: Job id of a suite. 221 @param num_child_jobs: Number of child jobs of the suite. 222 @param sanity_check: Do sanity check if True. 223 @return: A tuple of (dictionary, list). For dictionary, the key is 224 a DUT's hostname and the value is a list of jobs that ran on 225 the DUT. List is the list of all jobs of the suite. 226 """ 227 results = autotest_es.query( 228 fields_returned=['job_id', 'hostname'], 229 equality_constraints=[('_type', 'host_history'), 230 ('parent_job_id', suite_job_id), 231 ('status', 'Running'),]) 232 233 dut_jobs_dict = {} 234 job_filter = set() 235 for hit in results.hits: 236 job_id = hit['job_id'] 237 dut = hit['hostname'] 238 if job_id in job_filter: 239 continue 240 job_list = dut_jobs_dict.setdefault(dut, []) 241 job_list.append(job_id) 242 job_filter.add(job_id) 243 244 if sanity_check and len(job_filter) != num_child_jobs: 245 print('WARNING: Mismatch number of child jobs of a suite (%d): ' 246 '%d != %d' % (suite_job_id, len(job_filter), num_child_jobs)) 247 return dut_jobs_dict, list(job_filter) 248 249 250def get_job_timestamps(job_list, job_info_dict): 251 """ 252 Get beginning time and ending time for each job. 253 254 The beginning time of a job is "queued_time" of "tko_jobs" table. 255 The ending time of a job is "finished_on" of "afe_host_queue_entries" table. 256 job_info_dict will be modified in this function to store the timestamps. 257 258 @param job_list: List of job ids 259 @param job_info_dict: Dictionary that timestamps for each job will be stored 260 """ 261 tko = tko_models.Job.objects.filter(afe_job_id__in=job_list) 262 hqe = models.HostQueueEntry.objects.filter(job_id__in=job_list) 263 job_start = {} 264 for t in tko: 265 job_start[t.afe_job_id] = time_utils.to_epoch_time(t.queued_time) 266 job_end = {} 267 for h in hqe: 268 job_end[h.job_id] = time_utils.to_epoch_time(h.finished_on) 269 270 for job_id in job_list: 271 info_dict = job_info_dict.setdefault(job_id, {}) 272 info_dict.setdefault('timestamps', (job_start[job_id], job_end[job_id])) 273 274 275def get_job_tasks(job_list, job_info_dict): 276 """ 277 Get task ids for each job. 278 job_info_dict will be modified in this function to store the task ids. 279 280 @param job_list: List of job ids 281 @param job_info_dict: Dictionary that task ids for each job will be stored. 282 """ 283 results = autotest_es.query( 284 fields_returned=['job_id', 'task_id'], 285 equality_constraints=[('_type', 'host_history')], 286 batch_constraints=[('job_id', job_list)]) 287 for hit in results.hits: 288 if 'task_id' in hit: 289 info_dict = job_info_dict.setdefault(hit['job_id'], {}) 290 task_set = info_dict.setdefault('tasks', set()) 291 task_set.add(hit['task_id']) 292 293 294def get_scheduling_overhead(suite_job_id, num_child_jobs, sanity_check=True): 295 """ 296 Calculates a scheduling overhead. 297 298 A scheduling overhead is defined by the average of DUT overheads 299 for the DUTs that the child jobs of a suite ran on. 300 301 @param suite_job_id: Job id of a suite. 302 @param num_child_jobs: Number of child jobs of the suite. 303 @param sanity_check: Do sanity check if True. 304 @return: Dictionary storing stats. 305 """ 306 dut_jobs_dict, job_list = get_child_jobs_info( 307 suite_job_id, num_child_jobs, sanity_check) 308 job_info_dict = {} 309 get_job_timestamps(job_list, job_info_dict) 310 get_job_tasks(job_list, job_info_dict) 311 312 dut_overheads = [] 313 avg_overhead = 0 314 for dut, jobs in dut_jobs_dict.iteritems(): 315 print_verbose('Dut: %s, Jobs: %s', dut, jobs) 316 overhead = get_dut_overhead(dut, jobs, job_info_dict) 317 avg_overhead += overhead 318 print_verbose('Dut overhead: %f', overhead) 319 dut_overheads.append(overhead) 320 321 if job_list: 322 avg_overhead = avg_overhead / len(job_list) 323 324 state_samples_dict = {} 325 for info in job_info_dict.itervalues(): 326 for state in _states: 327 if state in info: 328 samples = state_samples_dict.setdefault(state, []) 329 samples.append(info[state]) 330 331 if state_samples_dict: 332 result = {state: mean(state_samples_dict[state]) 333 if state in state_samples_dict else 0 334 for state in _states} 335 result['suite_overhead'] = mean(dut_overheads) 336 result['overhead'] = avg_overhead 337 result['num_duts'] = len(dut_jobs_dict) 338 return result 339 340 341def print_suite_stats(suite_stats): 342 """Prints out statistics for a suite to standard output.""" 343 print('suite_overhead: %(suite_overhead)f, overhead: %(overhead)f,' % 344 suite_stats), 345 for state in _states: 346 if state in suite_stats: 347 print('%s: %f,' % (state, suite_stats[state])), 348 print('num_duts: %(num_duts)d' % suite_stats) 349 350 351def analyze_suites(start_time, end_time): 352 """ 353 Calculates timing stats (i.e., suite runtime, scheduling overhead) 354 for the suites that finished within the timestamps given by parameters. 355 356 @param start_time: Beginning timestamp. 357 @param end_time: Ending timestamp. 358 """ 359 print('Analyzing suites from %s to %s...' % ( 360 time_utils.epoch_time_to_date_string(start_time), 361 time_utils.epoch_time_to_date_string(end_time))) 362 363 if _options.bvtonly: 364 batch_constraints = [ 365 ('suite_name', ['bvt-inline', 'bvt-cq', 'bvt-perbuild'])] 366 else: 367 batch_constraints = [] 368 369 start_time_epoch = time_utils.to_epoch_time(start_time) 370 end_time_epoch = time_utils.to_epoch_time(end_time) 371 results = autotest_es.query( 372 fields_returned=['suite_name', 'suite_job_id', 'board', 'build', 373 'num_child_jobs', 'duration'], 374 equality_constraints=[('_type', job_overhead.SUITE_RUNTIME_KEY),], 375 range_constraints=[('time_recorded', start_time_epoch, 376 end_time_epoch)], 377 sort_specs=[{'time_recorded': 'asc'}], 378 batch_constraints=batch_constraints) 379 print('Found %d suites' % (results.total)) 380 381 for hit in results.hits: 382 suite_job_id = hit['suite_job_id'] 383 384 try: 385 suite_name = hit['suite_name'] 386 num_child_jobs = int(hit['num_child_jobs']) 387 suite_runtime = float(hit['duration']) 388 389 print('Suite: %s (%s), Board: %s, Build: %s, Num child jobs: %d' % ( 390 suite_name, suite_job_id, hit['board'], hit['build'], 391 num_child_jobs)) 392 393 suite_stats = get_scheduling_overhead(suite_job_id, num_child_jobs) 394 print('Suite: %s (%s) runtime: %f,' % ( 395 suite_name, suite_job_id, suite_runtime)), 396 print_suite_stats(suite_stats) 397 398 except Exception as e: 399 print('ERROR: Exception is raised while processing suite %s' % ( 400 suite_job_id)) 401 print e 402 403 404def analyze_suite(suite_job_id): 405 suite_stats = get_scheduling_overhead(suite_job_id, 0, False) 406 print('Suite (%s)' % suite_job_id), 407 print_suite_stats(suite_stats) 408 409 410def main(): 411 """main script.""" 412 parser = argparse.ArgumentParser( 413 formatter_class=argparse.ArgumentDefaultsHelpFormatter) 414 parser.add_argument('-c', dest='cron_mode', action='store_true', 415 help=('Run in a cron mode. Cron mode ' 416 'sends calculated stat data to Graphite.'), 417 default=False) 418 parser.add_argument('-s', type=int, dest='span', 419 help=('Number of hours that stats should be ' 420 'collected.'), 421 default=1) 422 parser.add_argument('--bvtonly', dest='bvtonly', action='store_true', 423 help=('Gets bvt suites only (i.e., bvt-inline,' 424 'bvt-cq, bvt-perbuild).'), 425 default=False) 426 parser.add_argument('--suite', type=int, dest='suite_job_id', 427 help=('Job id of a suite.')) 428 parser.add_argument('--verbose', dest='verbose', action='store_true', 429 help=('Prints out more info if True.'), 430 default=False) 431 global _options 432 _options = parser.parse_args() 433 434 if _options.suite_job_id: 435 analyze_suite(_options.suite_job_id) 436 else: 437 end_time = time_utils.to_epoch_time(datetime.now()) 438 start_time = end_time - timedelta(hours=_options.span).total_seconds() 439 analyze_suites(start_time, end_time) 440 441 442if __name__ == '__main__': 443 main() 444