# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Thread library for drone management. This library contains a threaded task queue capable of starting, monitoring and syncing threads across remote and localhost drones asynchronously. It also contains a wrapper for standard python threads that records exceptions so they can be re-raised in the thread manager. The api exposed by the threaded task queue is as follows: 1. worker: The staticmethod executed by all worker threads. 2. execute: Takes a list of drones and invokes a worker thread per drone. This method assumes that all drones have a queue of pending calls for execution. 3. wait_on_drones: Waits for all worker threads started by execute to finish and raises any exceptions as a consolidated DroneTaskQueueException. 4. get_results: Returns the results of all threads as a dictionary keyed on the drones. """ import collections import Queue import threading import logging import common from autotest_lib.client.common_lib.cros.graphite import autotest_stats from autotest_lib.scheduler import drone_task_queue class ExceptionRememberingThread(threading.Thread): """A wrapper around regular python threads that records exceptions.""" def run(self): """Wrapper around the thread's run method.""" try: with autotest_stats.Timer(self.name): super(ExceptionRememberingThread, self).run() except Exception as self.err: logging.error('%s raised an exception that will be re-raised by ' 'the thread pool manager.', self.getName()) else: self.err = None class PersistentTimer(object): """A class to handle timers across local scopes.""" def __init__(self, name): """Initialize a persistent timer. @param name: The name/key to insert timings under. """ self.name = name self.timer = None def start(self): """Create and start a new timer.""" self.timer = autotest_stats.Timer(self.name) self.timer.start() def stop(self): """Stop a previously started timer.""" try: self.timer.stop() except (AssertionError, AttributeError) as e: logging.info('Stopping timer %s failed: %s', self.name, e) finally: self.timer = None class ThreadedTaskQueue(drone_task_queue.DroneTaskQueue): """Threaded implementation of a drone task queue.""" result = collections.namedtuple('task', ['drone', 'results']) def __init__(self, name='thread_queue'): self.results_queue = Queue.Queue() self.drone_threads = {} self.name = name # The persistent timer is used to measure net time spent # refreshing all drones across 'execute' and 'get_results'. self.timer = PersistentTimer(self.name) @staticmethod def worker(drone, results_queue): """Worker for task execution. Execute calls queued against the given drone and place the return value in results_queue. @param drone: A drone with calls to execute. @param results_queue: A queue, into which the worker places ThreadedTaskQueue.result from the drone calls. """ logging.info('(Worker.%s) starting.', drone.hostname) results_queue.put(ThreadedTaskQueue.result( drone, drone.execute_queued_calls())) logging.info('(Worker.%s) finished.', drone.hostname) def wait_on_drones(self): """Wait on all threads that are currently refreshing a drone. @raises DroneTaskQueueException: Consolidated exception for all drone thread exceptions. """ if not self.drone_threads: return # TODO: Make this process more resilient. We can: # 1. Timeout the join. # 2. Kick out the exception/timeout drone. # 3. Selectively retry exceptions. # For now, it is compliant with the single threaded drone manager which # will raise all drone_utility, ssh and drone_manager exceptions. drone_exceptions = [] for drone, thread in self.drone_threads.iteritems(): tname = thread.getName() logging.info('(Task Queue) Waiting for %s', tname) thread.join() if thread.err: drone_exceptions.append((drone, thread.err)) logging.info('(Task Queue) All threads have returned, clearing map.') self.drone_threads = {} if not drone_exceptions: return exception_msg = '' for drone, err in drone_exceptions: exception_msg += ('Drone %s raised Exception %s\n' % (drone.hostname, err)) raise drone_task_queue.DroneTaskQueueException(exception_msg) def get_results(self): """Get a results dictionary keyed on the drones. This method synchronously waits till all drone threads have returned before checking for results. It is meant to be invoked in conjunction with the 'execute' method, which creates a thread per drone. @return: A dictionary of return values from the drones. """ self.wait_on_drones() self.timer.stop() results = {} while not self.results_queue.empty(): drone_results = self.results_queue.get() if drone_results.drone in results: raise drone_task_queue.DroneTaskQueueException( 'Task queue has recorded results for drone %s: %s' % (drone_results.drone, results)) results[drone_results.drone] = drone_results.results return results def execute(self, drones, wait=True): """Invoke a thread per drone, to execute drone_utility in parallel. @param drones: A list of drones with calls to execute. @param wait: If True, this method will only return when all the drones have returned the result of their respective invocations of drone_utility. The results_queue and drone_threads will be cleared. If False, the caller must clear both the queue and the map before the next invocation of 'execute', by calling 'get_results'. @return: A dictionary keyed on the drones, containing a list of return values from the execution of drone_utility. @raises DroneManagerError: If the results queue or drone map isn't empty at the time of invocation. """ if not self.results_queue.empty(): raise drone_task_queue.DroneTaskQueueException( 'Cannot clobber results queue: %s, it should be cleared ' 'through get_results.' % self.results_queue) if self.drone_threads: raise drone_task_queue.DroneTaskQueueException( 'Cannot clobber thread map: %s, it should be cleared ' 'through wait_on_drones' % self.drone_threads) self.timer.start() for drone in drones: if not drone.get_calls(): continue worker_thread = ExceptionRememberingThread( target=ThreadedTaskQueue.worker, args=(drone, self.results_queue)) # None of these threads are allowed to survive past the tick they # were spawned in, and the scheduler won't die mid-tick, so none # of the threads need to be daemons. However, if the scheduler does # die unexpectedly we can just forsake the daemon threads. self.drone_threads[drone] = worker_thread # The name is only used for debugging worker_thread.setName('%s.%s' % (self.name, drone.hostname.replace('.', '_'))) worker_thread.daemon = True worker_thread.start() return self.get_results() if wait else None