• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import datetime
6import logging
7import os
8import random
9import time
10
11
12from autotest_lib.client.common_lib import base_job, global_config, log
13from autotest_lib.client.common_lib import time_utils
14
15_DEFAULT_POLL_INTERVAL_SECONDS = 30.0
16
17HQE_MAXIMUM_ABORT_RATE_FLOAT = global_config.global_config.get_config_value(
18            'SCHEDULER', 'hqe_maximum_abort_rate_float', type=float,
19            default=0.5)
20
21
22def view_is_relevant(view):
23    """
24    Indicates whether the view of a given test is meaningful or not.
25
26    @param view: a detailed test 'view' from the TKO DB to look at.
27    @return True if this is a test result worth looking at further.
28    """
29    return not view['test_name'].startswith('CLIENT_JOB')
30
31
32def view_is_for_suite_job(view):
33    """
34    Indicates whether the given test view is the view of Suite job.
35
36    @param view: a detailed test 'view' from the TKO DB to look at.
37    @return True if this is view of suite job.
38    """
39    return view['test_name'] == 'SERVER_JOB'
40
41
42def view_is_for_infrastructure_fail(view):
43    """
44    Indicates whether the given test view is from an infra fail.
45
46    @param view: a detailed test 'view' from the TKO DB to look at.
47    @return True if this view indicates an infrastructure-side issue during
48                 a test.
49    """
50    return view['test_name'].endswith('SERVER_JOB')
51
52
53def is_for_infrastructure_fail(status):
54    """
55    Indicates whether the given Status is from an infra fail.
56
57    @param status: the Status object to look at.
58    @return True if this Status indicates an infrastructure-side issue during
59                 a test.
60    """
61    return view_is_for_infrastructure_fail({'test_name': status.test_name})
62
63
64def _abort_jobs_if_timedout(afe, jobs, start_time, timeout_mins):
65    """
66    Abort all of the jobs in jobs if the running time has past the timeout.
67
68    @param afe: an instance of AFE as defined in server/frontend.py.
69    @param jobs: an iterable of Running frontend.Jobs
70    @param start_time: Time to compare to the current time to see if a timeout
71                       has occurred.
72    @param timeout_mins: Time in minutes to wait before aborting the jobs we
73                         are waiting on.
74
75    @returns True if we there was a timeout, False if not.
76    """
77    if datetime.datetime.utcnow() < (start_time +
78                                     datetime.timedelta(minutes=timeout_mins)):
79        return False
80    for job in jobs:
81        logging.debug('Job: %s has timed out after %s minutes. Aborting job.',
82                      job.id, timeout_mins)
83        afe.run('abort_host_queue_entries', job=job.id)
84    return True
85
86
87def _collate_aborted(current_value, entry):
88    """
89    reduce() over a list of HostQueueEntries for a job; True if any aborted.
90
91    Functor that can be reduced()ed over a list of
92    HostQueueEntries for a job.  If any were aborted
93    (|entry.aborted| exists and is True), then the reduce() will
94    return True.
95
96    Ex:
97      entries = AFE.run('get_host_queue_entries', job=job.id)
98      reduce(_collate_aborted, entries, False)
99
100    @param current_value: the current accumulator (a boolean).
101    @param entry: the current entry under consideration.
102    @return the value of |entry.aborted| if it exists, False if not.
103    """
104    return current_value or ('aborted' in entry and entry['aborted'])
105
106
107def _status_for_test(status):
108    """
109    Indicates whether the status of a given test is meaningful or not.
110
111    @param status: frontend.TestStatus object to look at.
112    @return True if this is a test result worth looking at further.
113    """
114    return not (status.test_name.startswith('SERVER_JOB') or
115                status.test_name.startswith('CLIENT_JOB'))
116
117
118class _JobResultWaiter(object):
119    """Class for waiting on job results."""
120
121    def __init__(self, afe, tko):
122        """Instantiate class
123
124        @param afe: an instance of AFE as defined in server/frontend.py.
125        @param tko: an instance of TKO as defined in server/frontend.py.
126        """
127        self._afe = afe
128        self._tko = tko
129        self._job_ids = set()
130
131    def add_job(self, job):
132        """Add job to wait on.
133
134        @param job: Job object to get results from, as defined in
135                    server/frontend.py
136        """
137        self.add_jobs((job,))
138
139    def add_jobs(self, jobs):
140        """Add job to wait on.
141
142        @param jobs: Iterable of Job object to get results from, as defined in
143                     server/frontend.py
144        """
145        self._job_ids.update(job.id for job in jobs)
146
147    def wait_for_results(self):
148        """Wait for jobs to finish and return their results.
149
150        The returned generator blocks until all jobs have finished,
151        naturally.
152
153        @yields an iterator of Statuses, one per test.
154        """
155        while self._job_ids:
156            for job in self._get_finished_jobs():
157                for result in _yield_job_results(self._afe, self._tko, job):
158                    yield result
159                self._job_ids.remove(job.id)
160            self._sleep()
161
162    def _get_finished_jobs(self):
163        # This is an RPC call which serializes to JSON, so we can't pass
164        # in sets.
165        return self._afe.get_jobs(id__in=list(self._job_ids), finished=True)
166
167    def _sleep(self):
168        time.sleep(_DEFAULT_POLL_INTERVAL_SECONDS * (random.random() + 0.5))
169
170
171def _yield_job_results(afe, tko, job):
172    """
173    Yields the results of an individual job.
174
175    Yields one Status object per test.
176
177    @param afe: an instance of AFE as defined in server/frontend.py.
178    @param tko: an instance of TKO as defined in server/frontend.py.
179    @param job: Job object to get results from, as defined in
180                server/frontend.py
181    @yields an iterator of Statuses, one per test.
182    """
183    entries = afe.run('get_host_queue_entries', job=job.id)
184
185    # This query uses the job id to search through the tko_test_view_2
186    # table, for results of a test with a similar job_tag. The job_tag
187    # is used to store results, and takes the form job_id-owner/host.
188    # Many times when a job aborts during a test, the job_tag actually
189    # exists and the results directory contains valid logs. If the job
190    # was aborted prematurely i.e before it had a chance to create the
191    # job_tag, this query will return no results. When statuses is not
192    # empty it will contain frontend.TestStatus' with fields populated
193    # using the results of the db query.
194    statuses = tko.get_job_test_statuses_from_db(job.id)
195    if not statuses:
196        yield Status('ABORT', job.name)
197
198    # We only care about the SERVER and CLIENT job failures when there
199    # are no test failures.
200    contains_test_failure = any(_status_for_test(s) and s.status != 'GOOD'
201                                for s in statuses)
202    for s in statuses:
203        # TKO parser uniquelly identifies a test run by
204        # (test_name, subdir). In dynamic suite, we need to emit
205        # a subdir for each status and make sure (test_name, subdir)
206        # in the suite job's status log is unique.
207        # For non-test status (i.e.SERVER_JOB, CLIENT_JOB),
208        # we use 'job_tag' from tko_test_view_2, which looks like
209        # '1246-owner/172.22.33.44'
210        # For normal test status, we use 'job_tag/subdir'
211        # which looks like '1246-owner/172.22.33.44/my_DummyTest.tag.subdir_tag'
212        if _status_for_test(s):
213            yield Status(s.status, s.test_name, s.reason,
214                         s.test_started_time, s.test_finished_time,
215                         job.id, job.owner, s.hostname, job.name,
216                         subdir=os.path.join(s.job_tag, s.subdir))
217        else:
218            if s.status != 'GOOD' and not contains_test_failure:
219                yield Status(s.status,
220                             '%s_%s' % (entries[0]['job']['name'],
221                                        s.test_name),
222                             s.reason, s.test_started_time,
223                             s.test_finished_time, job.id,
224                             job.owner, s.hostname, job.name,
225                             subdir=s.job_tag)
226
227
228def wait_for_child_results(afe, tko, parent_job_id):
229    """
230    Wait for results of all tests in jobs with given parent id.
231
232    New jobs could be added by calling send(new_jobs) on the generator.
233    Currently polls for results every 5s.  Yields one Status object per test
234    as results become available.
235
236    @param afe: an instance of AFE as defined in server/frontend.py.
237    @param tko: an instance of TKO as defined in server/frontend.py.
238    @param parent_job_id: Parent job id for the jobs to wait on.
239    @yields an iterator of Statuses, one per test.
240    """
241    waiter = _JobResultWaiter(afe, tko)
242    waiter.add_jobs(afe.get_jobs(parent_job_id=parent_job_id))
243    for result in waiter.wait_for_results():
244        new_jobs = (yield result)
245        if new_jobs:
246            waiter.add_jobs(new_jobs)
247            # Return nothing if 'send' is called
248            yield None
249
250
251def wait_for_results(afe, tko, jobs):
252    """
253    Wait for results of all tests in all jobs in |jobs|.
254
255    New jobs could be added by calling send(new_jobs) on the generator.
256    Currently polls for results every 5s.  Yields one Status object per test
257    as results become available.
258
259    @param afe: an instance of AFE as defined in server/frontend.py.
260    @param tko: an instance of TKO as defined in server/frontend.py.
261    @param jobs: a list of Job objects, as defined in server/frontend.py.
262    @yields an iterator of Statuses, one per test.
263    """
264    waiter = _JobResultWaiter(afe, tko)
265    waiter.add_jobs(jobs)
266    for result in waiter.wait_for_results():
267        new_jobs = (yield result)
268        if new_jobs:
269            waiter.add_jobs(new_jobs)
270            # Return nothing if 'send' is called
271            yield None
272
273
274class Status(object):
275    """
276    A class representing a test result.
277
278    Stores all pertinent info about a test result and, given a callable
279    to use, can record start, result, and end info appropriately.
280
281    @var _status: status code, e.g. 'INFO', 'FAIL', etc.
282    @var _test_name: the name of the test whose result this is.
283    @var _reason: message explaining failure, if any.
284    @var _begin_timestamp: when test started (int, in seconds since the epoch).
285    @var _end_timestamp: when test finished (int, in seconds since the epoch).
286    @var _id: the ID of the job that generated this Status.
287    @var _owner: the owner of the job that generated this Status.
288
289    @var STATUS_MAP: a dict mapping host queue entry status strings to canonical
290                     status codes; e.g. 'Aborted' -> 'ABORT'
291    """
292    _status = None
293    _test_name = None
294    _reason = None
295    _begin_timestamp = None
296    _end_timestamp = None
297
298    # Queued status can occur if the try job just aborted due to not completing
299    # reimaging for all machines. The Queued corresponds to an 'ABORT'.
300    STATUS_MAP = {'Failed': 'FAIL', 'Aborted': 'ABORT', 'Completed': 'GOOD',
301                  'Queued' : 'ABORT'}
302
303    class sle(base_job.status_log_entry):
304        """
305        Thin wrapper around status_log_entry that supports stringification.
306        """
307        def __str__(self):
308            return self.render()
309
310        def __repr__(self):
311            return self.render()
312
313
314    def __init__(self, status, test_name, reason='', begin_time_str=None,
315                 end_time_str=None, job_id=None, owner=None, hostname=None,
316                 job_name='', subdir=None):
317        """
318        Constructor
319
320        @param status: status code, e.g. 'INFO', 'FAIL', etc.
321        @param test_name: the name of the test whose result this is.
322        @param reason: message explaining failure, if any; Optional.
323        @param begin_time_str: when test started (in time_utils.TIME_FMT);
324                               now() if None or 'None'.
325        @param end_time_str: when test finished (in time_utils.TIME_FMT);
326                             now() if None or 'None'.
327        @param job_id: the ID of the job that generated this Status.
328        @param owner: the owner of the job that generated this Status.
329        @param hostname: The name of the host the test that generated this
330                         result ran on.
331        @param job_name: The job name; Contains the test name with/without the
332                         experimental prefix, the tag and the build.
333        @param subdir: The result directory of the test. It will be recorded
334                       as the subdir in the status.log file.
335        """
336        self._status = status
337        self._test_name = test_name
338        self._reason = reason
339        self._id = job_id
340        self._owner = owner
341        self._hostname = hostname
342        self._job_name = job_name
343        self._subdir = subdir
344        # Autoserv drops a keyval of the started time which eventually makes its
345        # way here.  Therefore, if we have a starting time, we may assume that
346        # the test reached Running and actually began execution on a drone.
347        self._test_executed = begin_time_str and begin_time_str != 'None'
348
349        if begin_time_str and begin_time_str != 'None':
350            self._begin_timestamp = int(time.mktime(
351                datetime.datetime.strptime(
352                    begin_time_str, time_utils.TIME_FMT).timetuple()))
353        else:
354            self._begin_timestamp = int(time.time())
355
356        if end_time_str and end_time_str != 'None':
357            self._end_timestamp = int(time.mktime(
358                datetime.datetime.strptime(
359                    end_time_str, time_utils.TIME_FMT).timetuple()))
360        else:
361            self._end_timestamp = int(time.time())
362
363
364    def is_good(self):
365        """ Returns true if status is good. """
366        return self._status == 'GOOD'
367
368
369    def is_warn(self):
370        """ Returns true if status is warn. """
371        return self._status == 'WARN'
372
373
374    def is_testna(self):
375        """ Returns true if status is TEST_NA """
376        return self._status == 'TEST_NA'
377
378
379    def is_worse_than(self, candidate):
380        """
381        Return whether |self| represents a "worse" failure than |candidate|.
382
383        "Worse" is defined the same as it is for log message purposes in
384        common_lib/log.py.  We also consider status with a specific error
385        message to represent a "worse" failure than one without.
386
387        @param candidate: a Status instance to compare to this one.
388        @return True if |self| is "worse" than |candidate|.
389        """
390        if self._status != candidate._status:
391            return (log.job_statuses.index(self._status) <
392                    log.job_statuses.index(candidate._status))
393        # else, if the statuses are the same...
394        if self._reason and not candidate._reason:
395            return True
396        return False
397
398
399    def record_start(self, record_entry):
400        """
401        Use record_entry to log message about start of test.
402
403        @param record_entry: a callable to use for logging.
404               prototype:
405                   record_entry(base_job.status_log_entry)
406        """
407        log_entry = Status.sle('START', self._subdir,
408                                self._test_name, '',
409                                None, self._begin_timestamp)
410        record_entry(log_entry, log_in_subdir=False)
411
412
413    def record_result(self, record_entry):
414        """
415        Use record_entry to log message about result of test.
416
417        @param record_entry: a callable to use for logging.
418               prototype:
419                   record_entry(base_job.status_log_entry)
420        """
421        log_entry = Status.sle(self._status, self._subdir,
422                                self._test_name, self._reason, None,
423                                self._end_timestamp)
424        record_entry(log_entry, log_in_subdir=False)
425
426
427    def record_end(self, record_entry):
428        """
429        Use record_entry to log message about end of test.
430
431        @param record_entry: a callable to use for logging.
432               prototype:
433                   record_entry(base_job.status_log_entry)
434        """
435        log_entry = Status.sle('END %s' % self._status, self._subdir,
436                               self._test_name, '', None, self._end_timestamp)
437        record_entry(log_entry, log_in_subdir=False)
438
439
440    def record_all(self, record_entry):
441        """
442        Use record_entry to log all messages about test results.
443
444        @param record_entry: a callable to use for logging.
445               prototype:
446                   record_entry(base_job.status_log_entry)
447        """
448        self.record_start(record_entry)
449        self.record_result(record_entry)
450        self.record_end(record_entry)
451
452
453    def override_status(self, override):
454        """
455        Override the _status field of this Status.
456
457        @param override: value with which to override _status.
458        """
459        self._status = override
460
461
462    @property
463    def test_name(self):
464        """ Name of the test this status corresponds to. """
465        return self._test_name
466
467
468    @test_name.setter
469    def test_name(self, value):
470        """
471        Test name setter.
472
473        @param value: The test name.
474        """
475        self._test_name = value
476
477
478    @property
479    def id(self):
480        """ Id of the job that corresponds to this status. """
481        return self._id
482
483
484    @property
485    def owner(self):
486        """ Owner of the job that corresponds to this status. """
487        return self._owner
488
489
490    @property
491    def hostname(self):
492        """ Host the job corresponding to this status ran on. """
493        return self._hostname
494
495
496    @property
497    def reason(self):
498        """ Reason the job corresponding to this status failed. """
499        return self._reason
500
501
502    @property
503    def test_executed(self):
504        """ If the test reached running an autoserv instance or not. """
505        return self._test_executed
506
507    @property
508    def subdir(self):
509        """Subdir of test this status corresponds to."""
510        return self._subdir
511