• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python
2#
3# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7
8"""Script to calculate timing stats for suites.
9
10This script measures nine stats for a suite run.
111. Net suite runtime.
122. Suite scheduling overhead.
133. Average scheduling overhead.
144. Average Queuing time.
155. Average Resetting time.
166. Average provisioning time.
177. Average Running time.
188. Average Parsing time.
199. Average Gathering time.
20
21When the cron_mode is enabled, this script throws all stats but the first one
22(Net suite runtime) to Graphite because the first one is already
23being sent to Graphite by Autotest online.
24
25Net suite runtime is end-to-end time for a suite from the beginning
26to the end.
27It is stored in a field, "duration", of a type, "suite_runtime" in
28elasticsearch (ES).
29
30Suite scheduling overhead is defined by the average of DUT overheads.
31Suite is composed of one or more jobs, and those jobs are run on
32one or more DUTs that are available.
33A DUT overhead is defined by:
34    DUT_i overhead = sum(net time for job_k - runtime for job_k
35                         - runtime for special tasks of job_k)
36    Job_k are the jobs run on DUT_i.
37
38Net time for a job is the time from job_queued_time to hqe_finished_time.
39job_queued_time is stored in the "queued_time" column of "tko_jobs" table.
40hqe_finished_time is stored in the "finished_on" of "afe_host_queue_entries"
41table.
42We do not use "job_finished_time" of "tko_jobs" as job_finished_time is
43recorded before gathering/parsing/archiving.
44We do not use hqe started time ("started_on" of "afe_host_queue_entries"),
45as it does not account for the lag from a host is assigned to the job till
46the scheduler sees the assignment.
47
48Runtime for job_k is the sum of durations for the records of
49"job_time_breakdown" type in ES that have "Queued" or "Running" status.
50It is possible that a job has multiple "Queued" records when the job's test
51failed and tried again.
52We take into account only the last "Queued" record.
53
54Runtime for special tasks of job_k is the sum of durations for the records
55of "job_time_breakdown" type in ES that have "Resetting", "Provisioning",
56"Gathering", or "Parsing" status.
57We take into account only the records whose timestamp is larger than
58the timestamp of the last "Queued" record.
59"""
60
61import argparse
62from datetime import datetime
63from datetime import timedelta
64
65import common
66from autotest_lib.client.common_lib import host_queue_entry_states
67from autotest_lib.client.common_lib import time_utils
68from autotest_lib.client.common_lib.cros.graphite import autotest_es
69from autotest_lib.frontend import setup_django_environment
70from autotest_lib.frontend.afe import models
71from autotest_lib.frontend.tko import models as tko_models
72from autotest_lib.server import utils
73from autotest_lib.site_utils import job_overhead
74
75
76_options = None
77
78_hqes = host_queue_entry_states.Status
79_states = [
80        _hqes.QUEUED, _hqes.RESETTING, _hqes.PROVISIONING,
81        _hqes.RUNNING, _hqes.GATHERING, _hqes.PARSING]
82
83
84def mean(l):
85    """
86    Calculates an Arithmetic Mean for the numbers in a list.
87
88    @param l: A list of numbers.
89    @return: Arithmetic mean if the list is not empty.
90             Otherwise, returns zero.
91    """
92    return float(sum(l)) / len(l) if l else 0
93
94
95def print_verbose(string, *args):
96    if _options.verbose:
97        print(string % args)
98
99
100def get_nontask_runtime(job_id, dut, job_info_dict):
101    """
102    Get sum of durations for "Queued", "Running", "Parsing", and "Gathering"
103    status records.
104    job_info_dict will be modified in this function to store the duration
105    for each status.
106
107    @param job_id: The job id of interest.
108    @param dut: Hostname of a DUT that the job ran on.
109    @param job_info_dict: Dictionary that has information for jobs.
110    @return: Tuple of sum of durations and the timestamp for the last
111             Queued record.
112    """
113    results = autotest_es.query(
114            fields_returned=['status', 'duration', 'time_recorded'],
115            equality_constraints=[('_type', 'job_time_breakdown'),
116                                  ('job_id', job_id),
117                                  ('hostname', dut)],
118            sort_specs=[{'time_recorded': 'desc'}])
119
120    sum = 0
121    last_queued_timestamp = 0
122    # There could be multiple "Queued" records.
123    # Get sum of durations for the records after the last "Queued" records
124    # (including the last "Queued" record).
125    # Exploits the fact that "results" are ordered in the descending order
126    # of time_recorded.
127    for hit in results.hits:
128        job_info_dict[job_id][hit['status']] = float(hit['duration'])
129        if hit['status'] == 'Queued':
130            # The first Queued record is the last one because of the descending
131            # order of "results".
132            last_queued_timestamp = float(hit['time_recorded'])
133            sum += float(hit['duration'])
134            break
135        else:
136            sum += float(hit['duration'])
137    return (sum, last_queued_timestamp)
138
139
140def get_tasks_runtime(task_list, dut, t_start, job_id, job_info_dict):
141    """
142    Get sum of durations for special tasks.
143    job_info_dict will be modified in this function to store the duration
144    for each special task.
145
146    @param task_list: List of task id.
147    @param dut: Hostname of a DUT that the tasks ran on.
148    @param t_start: Beginning timestamp.
149    @param job_id: The job id that is related to the tasks.
150                   This is used only for debugging purpose.
151    @param job_info_dict: Dictionary that has information for jobs.
152    @return: Sum of durations of the tasks.
153    """
154    t_start_epoch = time_utils.to_epoch_time(t_start)
155    results = autotest_es.query(
156            fields_returned=['status', 'task_id', 'duration'],
157            equality_constraints=[('_type', 'job_time_breakdown'),
158                                  ('hostname', dut)],
159            range_constraints=[('time_recorded', t_start_epoch, None)],
160            batch_constraints=[('task_id', task_list)])
161    sum = 0
162    for hit in results.hits:
163        sum += float(hit['duration'])
164        job_info_dict[job_id][hit['status']] = float(hit['duration'])
165        print_verbose('Task %s for Job %s took %s',
166                      hit['task_id'], job_id, hit['duration'])
167    return sum
168
169
170def get_job_runtime(job_id, dut, job_info_dict):
171    """
172    Get sum of durations for the entries that are related to a job.
173    job_info_dict will be modified in this function.
174
175    @param job_id: The job id of interest.
176    @param dut: Hostname of a DUT that the job ran on.
177    @param job_info_dict: Dictionary that has information for jobs.
178    @return: Total duration taken by a job.
179    """
180    sum, t_last_queued = get_nontask_runtime(job_id, dut, job_info_dict)
181    print_verbose('Job %s took %f, last Queued: %s',
182                  job_id, sum, t_last_queued)
183    sum += get_tasks_runtime(
184            list(job_info_dict[job_id]['tasks']), dut, t_last_queued,
185            job_id, job_info_dict)
186    return sum
187
188
189def get_dut_overhead(dut, jobs, job_info_dict):
190    """
191    Calculates the scheduling overhead of a DUT.
192
193    The scheduling overhead of a DUT is defined by the sum of scheduling
194    overheads for the jobs that ran on the DUT.
195    The scheduling overhead for a job is defined by the difference
196    of net job runtime and real job runtime.
197    job_info_dict will be modified in this function.
198
199    @param dut: Hostname of a DUT.
200    @param jobs: The list of jobs that ran on the DUT.
201    @param job_info_dict: Dictionary that has information for jobs.
202    @return: Scheduling overhead of a DUT in a floating point value.
203             The unit is a second.
204    """
205    overheads = []
206    for job_id in jobs:
207        (t_start, t_end) = job_info_dict[job_id]['timestamps']
208        runtime = get_job_runtime(job_id, dut, job_info_dict)
209        overheads.append(t_end - t_start - runtime)
210        print_verbose('Job: %s, Net runtime: %f, Real runtime: %f, '
211                      'Overhead: %f', job_id, t_end - t_start, runtime,
212                      t_end - t_start - runtime)
213    return sum(overheads)
214
215
216def get_child_jobs_info(suite_job_id, num_child_jobs, sanity_check):
217    """
218    Gets information about child jobs of a suite.
219
220    @param suite_job_id: Job id of a suite.
221    @param num_child_jobs: Number of child jobs of the suite.
222    @param sanity_check: Do sanity check if True.
223    @return: A tuple of (dictionary, list). For dictionary, the key is
224             a DUT's hostname and the value is a list of jobs that ran on
225             the DUT. List is the list of all jobs of the suite.
226    """
227    results = autotest_es.query(
228            fields_returned=['job_id', 'hostname'],
229            equality_constraints=[('_type', 'host_history'),
230                                  ('parent_job_id', suite_job_id),
231                                  ('status', 'Running'),])
232
233    dut_jobs_dict = {}
234    job_filter = set()
235    for hit in results.hits:
236        job_id = hit['job_id']
237        dut = hit['hostname']
238        if job_id in job_filter:
239            continue
240        job_list = dut_jobs_dict.setdefault(dut, [])
241        job_list.append(job_id)
242        job_filter.add(job_id)
243
244    if sanity_check and len(job_filter) != num_child_jobs:
245        print('WARNING: Mismatch number of child jobs of a suite (%d): '
246              '%d != %d' % (suite_job_id, len(job_filter), num_child_jobs))
247    return dut_jobs_dict, list(job_filter)
248
249
250def get_job_timestamps(job_list, job_info_dict):
251    """
252    Get beginning time and ending time for each job.
253
254    The beginning time of a job is "queued_time" of "tko_jobs" table.
255    The ending time of a job is "finished_on" of "afe_host_queue_entries" table.
256    job_info_dict will be modified in this function to store the timestamps.
257
258    @param job_list: List of job ids
259    @param job_info_dict: Dictionary that timestamps for each job will be stored
260    """
261    tko = tko_models.Job.objects.filter(afe_job_id__in=job_list)
262    hqe = models.HostQueueEntry.objects.filter(job_id__in=job_list)
263    job_start = {}
264    for t in tko:
265        job_start[t.afe_job_id] = time_utils.to_epoch_time(t.queued_time)
266    job_end = {}
267    for h in hqe:
268        job_end[h.job_id] = time_utils.to_epoch_time(h.finished_on)
269
270    for job_id in job_list:
271        info_dict = job_info_dict.setdefault(job_id, {})
272        info_dict.setdefault('timestamps', (job_start[job_id], job_end[job_id]))
273
274
275def get_job_tasks(job_list, job_info_dict):
276    """
277    Get task ids for each job.
278    job_info_dict will be modified in this function to store the task ids.
279
280    @param job_list: List of job ids
281    @param job_info_dict: Dictionary that task ids for each job will be stored.
282    """
283    results = autotest_es.query(
284            fields_returned=['job_id', 'task_id'],
285            equality_constraints=[('_type', 'host_history')],
286            batch_constraints=[('job_id', job_list)])
287    for hit in results.hits:
288        if 'task_id' in hit:
289            info_dict = job_info_dict.setdefault(hit['job_id'], {})
290            task_set = info_dict.setdefault('tasks', set())
291            task_set.add(hit['task_id'])
292
293
294def get_scheduling_overhead(suite_job_id, num_child_jobs, sanity_check=True):
295    """
296    Calculates a scheduling overhead.
297
298    A scheduling overhead is defined by the average of DUT overheads
299    for the DUTs that the child jobs of a suite ran on.
300
301    @param suite_job_id: Job id of a suite.
302    @param num_child_jobs: Number of child jobs of the suite.
303    @param sanity_check: Do sanity check if True.
304    @return: Dictionary storing stats.
305    """
306    dut_jobs_dict, job_list = get_child_jobs_info(
307            suite_job_id, num_child_jobs, sanity_check)
308    job_info_dict = {}
309    get_job_timestamps(job_list, job_info_dict)
310    get_job_tasks(job_list, job_info_dict)
311
312    dut_overheads = []
313    avg_overhead = 0
314    for dut, jobs in dut_jobs_dict.iteritems():
315        print_verbose('Dut: %s, Jobs: %s', dut, jobs)
316        overhead = get_dut_overhead(dut, jobs, job_info_dict)
317        avg_overhead += overhead
318        print_verbose('Dut overhead: %f', overhead)
319        dut_overheads.append(overhead)
320
321    if job_list:
322        avg_overhead = avg_overhead / len(job_list)
323
324    state_samples_dict = {}
325    for info in job_info_dict.itervalues():
326        for state in _states:
327            if state in info:
328                samples = state_samples_dict.setdefault(state, [])
329                samples.append(info[state])
330
331    if state_samples_dict:
332        result = {state: mean(state_samples_dict[state])
333                  if state in state_samples_dict else 0
334                  for state in _states}
335    result['suite_overhead'] = mean(dut_overheads)
336    result['overhead'] = avg_overhead
337    result['num_duts'] = len(dut_jobs_dict)
338    return result
339
340
341def print_suite_stats(suite_stats):
342    """Prints out statistics for a suite to standard output."""
343    print('suite_overhead: %(suite_overhead)f, overhead: %(overhead)f,' %
344          suite_stats),
345    for state in _states:
346        if state in suite_stats:
347            print('%s: %f,' % (state, suite_stats[state])),
348    print('num_duts: %(num_duts)d' % suite_stats)
349
350
351def analyze_suites(start_time, end_time):
352    """
353    Calculates timing stats (i.e., suite runtime, scheduling overhead)
354    for the suites that finished within the timestamps given by parameters.
355
356    @param start_time: Beginning timestamp.
357    @param end_time: Ending timestamp.
358    """
359    print('Analyzing suites from %s to %s...' % (
360          time_utils.epoch_time_to_date_string(start_time),
361          time_utils.epoch_time_to_date_string(end_time)))
362
363    if _options.bvtonly:
364        batch_constraints = [
365                ('suite_name', ['bvt-inline', 'bvt-cq', 'bvt-perbuild'])]
366    else:
367        batch_constraints = []
368
369    start_time_epoch = time_utils.to_epoch_time(start_time)
370    end_time_epoch = time_utils.to_epoch_time(end_time)
371    results = autotest_es.query(
372            fields_returned=['suite_name', 'suite_job_id', 'board', 'build',
373                             'num_child_jobs', 'duration'],
374            equality_constraints=[('_type', job_overhead.SUITE_RUNTIME_KEY),],
375            range_constraints=[('time_recorded', start_time_epoch,
376                                end_time_epoch)],
377            sort_specs=[{'time_recorded': 'asc'}],
378            batch_constraints=batch_constraints)
379    print('Found %d suites' % (results.total))
380
381    for hit in results.hits:
382        suite_job_id = hit['suite_job_id']
383
384        try:
385            suite_name = hit['suite_name']
386            num_child_jobs = int(hit['num_child_jobs'])
387            suite_runtime = float(hit['duration'])
388
389            print('Suite: %s (%s), Board: %s, Build: %s, Num child jobs: %d' % (
390                    suite_name, suite_job_id, hit['board'], hit['build'],
391                    num_child_jobs))
392
393            suite_stats = get_scheduling_overhead(suite_job_id, num_child_jobs)
394            print('Suite: %s (%s) runtime: %f,' % (
395                    suite_name, suite_job_id, suite_runtime)),
396            print_suite_stats(suite_stats)
397
398        except Exception as e:
399            print('ERROR: Exception is raised while processing suite %s' % (
400                    suite_job_id))
401            print e
402
403
404def analyze_suite(suite_job_id):
405    suite_stats = get_scheduling_overhead(suite_job_id, 0, False)
406    print('Suite (%s)' % suite_job_id),
407    print_suite_stats(suite_stats)
408
409
410def main():
411    """main script."""
412    parser = argparse.ArgumentParser(
413            formatter_class=argparse.ArgumentDefaultsHelpFormatter)
414    parser.add_argument('-c', dest='cron_mode', action='store_true',
415                        help=('Run in a cron mode. Cron mode '
416                              'sends calculated stat data to Graphite.'),
417                        default=False)
418    parser.add_argument('-s', type=int, dest='span',
419                        help=('Number of hours that stats should be '
420                              'collected.'),
421                        default=1)
422    parser.add_argument('--bvtonly', dest='bvtonly', action='store_true',
423                        help=('Gets bvt suites only (i.e., bvt-inline,'
424                              'bvt-cq, bvt-perbuild).'),
425                        default=False)
426    parser.add_argument('--suite', type=int, dest='suite_job_id',
427                        help=('Job id of a suite.'))
428    parser.add_argument('--verbose', dest='verbose', action='store_true',
429                        help=('Prints out more info if True.'),
430                        default=False)
431    global _options
432    _options = parser.parse_args()
433
434    if _options.suite_job_id:
435        analyze_suite(_options.suite_job_id)
436    else:
437        end_time = time_utils.to_epoch_time(datetime.now())
438        start_time = end_time - timedelta(hours=_options.span).total_seconds()
439        analyze_suites(start_time, end_time)
440
441
442if __name__ == '__main__':
443    main()
444