# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import contextlib import logging import time from multiprocessing import pool import base_event, board_enumerator, build_event import task, timed_event import common from autotest_lib.client.common_lib.cros.graphite import autotest_stats from autotest_lib.server import utils POOL_SIZE = 32 _timer = autotest_stats.Timer('suite_scheduler') class Driver(object): """Implements the main loop of the suite_scheduler. @var EVENT_CLASSES: list of the event classes Driver supports. @var _LOOP_INTERVAL_SECONDS: seconds to wait between loop iterations. @var _scheduler: a DedupingScheduler, used to schedule jobs with the AFE. @var _enumerator: a BoardEnumerator, used to list plaforms known to the AFE @var _events: dict of BaseEvents to be handled each time through main loop. """ EVENT_CLASSES = [timed_event.Nightly, timed_event.Weekly, build_event.NewBuild] _LOOP_INTERVAL_SECONDS = 5 * 60 def __init__(self, scheduler, enumerator, is_sanity=False): """Constructor @param scheduler: an instance of deduping_scheduler.DedupingScheduler. @param enumerator: an instance of board_enumerator.BoardEnumerator. @param is_sanity: Set to True if the driver is created for sanity check. Default is set to False. """ self._scheduler = scheduler self._enumerator = enumerator task.TotMilestoneManager.is_sanity = is_sanity def RereadAndReprocessConfig(self, config, mv): """Re-read config, re-populate self._events and recreate task lists. @param config: an instance of ForgivingConfigParser. @param mv: an instance of ManifestVersions. """ config.reread() new_events = self._CreateEventsWithTasks(config, mv) for keyword, event in self._events.iteritems(): event.Merge(new_events[keyword]) def SetUpEventsAndTasks(self, config, mv): """Populate self._events and create task lists from config. @param config: an instance of ForgivingConfigParser. @param mv: an instance of ManifestVersions. """ self._events = self._CreateEventsWithTasks(config, mv) def _CreateEventsWithTasks(self, config, mv): """Create task lists from config, and assign to newly-minted events. Calling multiple times should start afresh each time. @param config: an instance of ForgivingConfigParser. @param mv: an instance of ManifestVersions. """ events = {} for klass in self.EVENT_CLASSES: events[klass.KEYWORD] = klass.CreateFromConfig(config, mv) tasks = self.TasksFromConfig(config) for keyword, task_list in tasks.iteritems(): if keyword in events: events[keyword].tasks = task_list else: logging.warning('%s, is an unknown keyword.', keyword) return events def TasksFromConfig(self, config): """Generate a dict of {event_keyword: [tasks]} mappings from |config|. For each section in |config| that encodes a Task, instantiate a Task object. Determine the event that Task is supposed to run_on and append the object to a list associated with the appropriate event keyword. Return a dictionary of these keyword: list of task mappings. @param config: a ForgivingConfigParser containing tasks to be parsed. @return dict of {event_keyword: [tasks]} mappings. @raise MalformedConfigEntry on a task parsing error. """ tasks = {} for section in config.sections(): if not base_event.HonoredSection(section): try: keyword, new_task = task.Task.CreateFromConfigSection( config, section) except task.MalformedConfigEntry as e: logging.warning('%s is malformed: %s', section, e) continue tasks.setdefault(keyword, []).append(new_task) return tasks def RunForever(self, config, mv): """Main loop of the scheduler. Runs til the process is killed. @param config: an instance of ForgivingConfigParser. @param mv: an instance of manifest_versions.ManifestVersions. """ for event in self._events.itervalues(): event.Prepare() while True: try: self.HandleEventsOnce(mv) except board_enumerator.EnumeratorException as e: logging.warning('Failed to enumerate boards: %r', e) with _timer.get_client('manifest_versions_update'): mv.Update() with _timer.get_client('tot_milestone_manager_refresh'): task.TotMilestoneManager().refresh() time.sleep(self._LOOP_INTERVAL_SECONDS) self.RereadAndReprocessConfig(config, mv) @staticmethod def HandleBoard(inputs): """Handle event based on given inputs. @param inputs: A dictionary of the arguments needed to handle an event. Keys include: scheduler: a DedupingScheduler, used to schedule jobs with the AFE. event: An event object to be handled. board: Name of the board. """ scheduler = inputs['scheduler'] event = inputs['event'] board = inputs['board'] logging.info('Handling %s event for board %s', event.keyword, board) branch_builds = event.GetBranchBuildsForBoard(board) event.Handle(scheduler, branch_builds, board) logging.info('Finished handling %s event for board %s', event.keyword, board) @_timer.decorate def HandleEventsOnce(self, mv): """One turn through the loop. Separated out for unit testing. @param mv: an instance of manifest_versions.ManifestVersions. @raise EnumeratorException if we can't enumerate any supported boards. """ boards = self._enumerator.Enumerate() logging.info('%d boards currently in the lab: %r', len(boards), boards) thread_pool = pool.ThreadPool(POOL_SIZE) with contextlib.closing(thread_pool): for e in self._events.itervalues(): if not e.ShouldHandle(): continue logging.info('Handling %s event for %d boards', e.keyword, len(boards)) args = [] for board in boards: args.append({'scheduler': self._scheduler, 'event': e, 'board': board}) thread_pool.map(self.HandleBoard, args) logging.info('Finished handling %s event for %d boards', e.keyword, len(boards)) e.UpdateCriteria() def ForceEventsOnceForBuild(self, keywords, build_name): """Force events with provided keywords to happen, with given build. @param keywords: iterable of event keywords to force @param build_name: instead of looking up builds to test, test this one. """ board, type, milestone, manifest = utils.ParseBuildName(build_name) branch_builds = {task.PickBranchName(type, milestone): [build_name]} logging.info('Testing build R%s-%s on %s', milestone, manifest, board) for e in self._events.itervalues(): if e.keyword in keywords: e.Handle(self._scheduler, branch_builds, board, force=True)