1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5 6"""Thread library for drone management. 7 8This library contains a threaded task queue capable of starting, monitoring 9and syncing threads across remote and localhost drones asynchronously. It also 10contains a wrapper for standard python threads that records exceptions so they 11can be re-raised in the thread manager. The api exposed by the threaded task 12queue is as follows: 13 1. worker: The staticmethod executed by all worker threads. 14 2. execute: Takes a list of drones and invokes a worker thread per drone. 15 This method assumes that all drones have a queue of pending calls 16 for execution. 17 3. wait_on_drones: Waits for all worker threads started by execute to finish 18 and raises any exceptions as a consolidated DroneTaskQueueException. 19 4. get_results: Returns the results of all threads as a dictionary keyed 20 on the drones. 21""" 22 23import collections 24import Queue 25import threading 26import logging 27 28import common 29from autotest_lib.scheduler import drone_task_queue 30 31 32class ExceptionRememberingThread(threading.Thread): 33 """A wrapper around regular python threads that records exceptions.""" 34 35 def run(self): 36 """Wrapper around the thread's run method.""" 37 try: 38 super(ExceptionRememberingThread, self).run() 39 except Exception as self.err: 40 logging.error('%s raised an exception that will be re-raised by ' 41 'the thread pool manager.', self.getName()) 42 else: 43 self.err = None 44 45 46class ThreadedTaskQueue(drone_task_queue.DroneTaskQueue): 47 """Threaded implementation of a drone task queue.""" 48 49 result = collections.namedtuple('task', ['drone', 'results']) 50 51 def __init__(self, name='thread_queue'): 52 self.results_queue = Queue.Queue() 53 self.drone_threads = {} 54 self.name = name 55 56 57 @staticmethod 58 def worker(drone, results_queue): 59 """Worker for task execution. 60 61 Execute calls queued against the given drone and place the return value 62 in results_queue. 63 64 @param drone: A drone with calls to execute. 65 @param results_queue: A queue, into which the worker places 66 ThreadedTaskQueue.result from the drone calls. 67 """ 68 logging.info('(Worker.%s) starting.', drone.hostname) 69 results_queue.put(ThreadedTaskQueue.result( 70 drone, drone.execute_queued_calls())) 71 logging.info('(Worker.%s) finished.', drone.hostname) 72 73 74 def wait_on_drones(self): 75 """Wait on all threads that are currently refreshing a drone. 76 77 @raises DroneTaskQueueException: Consolidated exception for all 78 drone thread exceptions. 79 """ 80 if not self.drone_threads: 81 return 82 # TODO: Make this process more resilient. We can: 83 # 1. Timeout the join. 84 # 2. Kick out the exception/timeout drone. 85 # 3. Selectively retry exceptions. 86 # For now, it is compliant with the single threaded drone manager which 87 # will raise all drone_utility, ssh and drone_manager exceptions. 88 drone_exceptions = [] 89 for drone, thread in self.drone_threads.iteritems(): 90 tname = thread.getName() 91 logging.info('(Task Queue) Waiting for %s', tname) 92 thread.join() 93 if thread.err: 94 drone_exceptions.append((drone, thread.err)) 95 logging.info('(Task Queue) All threads have returned, clearing map.') 96 self.drone_threads = {} 97 if not drone_exceptions: 98 return 99 exception_msg = '' 100 for drone, err in drone_exceptions: 101 exception_msg += ('Drone %s raised Exception %s\n' % 102 (drone.hostname, err)) 103 raise drone_task_queue.DroneTaskQueueException(exception_msg) 104 105 106 def get_results(self): 107 """Get a results dictionary keyed on the drones. 108 109 This method synchronously waits till all drone threads have returned 110 before checking for results. It is meant to be invoked in conjunction 111 with the 'execute' method, which creates a thread per drone. 112 113 @return: A dictionary of return values from the drones. 114 """ 115 self.wait_on_drones() 116 results = {} 117 while not self.results_queue.empty(): 118 drone_results = self.results_queue.get() 119 if drone_results.drone in results: 120 raise drone_task_queue.DroneTaskQueueException( 121 'Task queue has recorded results for drone %s: %s' % 122 (drone_results.drone, results)) 123 results[drone_results.drone] = drone_results.results 124 return results 125 126 127 def execute(self, drones, wait=True): 128 """Invoke a thread per drone, to execute drone_utility in parallel. 129 130 @param drones: A list of drones with calls to execute. 131 @param wait: If True, this method will only return when all the drones 132 have returned the result of their respective invocations of 133 drone_utility. The results_queue and drone_threads will be cleared. 134 If False, the caller must clear both the queue and the map before 135 the next invocation of 'execute', by calling 'get_results'. 136 137 @return: A dictionary keyed on the drones, containing a list of return 138 values from the execution of drone_utility. 139 140 @raises DroneManagerError: If the results queue or drone map isn't empty 141 at the time of invocation. 142 """ 143 if not self.results_queue.empty(): 144 raise drone_task_queue.DroneTaskQueueException( 145 'Cannot clobber results queue: %s, it should be cleared ' 146 'through get_results.' % self.results_queue) 147 if self.drone_threads: 148 raise drone_task_queue.DroneTaskQueueException( 149 'Cannot clobber thread map: %s, it should be cleared ' 150 'through wait_on_drones' % self.drone_threads) 151 for drone in drones: 152 if not drone.get_calls(): 153 continue 154 worker_thread = ExceptionRememberingThread( 155 target=ThreadedTaskQueue.worker, 156 args=(drone, self.results_queue)) 157 # None of these threads are allowed to survive past the tick they 158 # were spawned in, and the scheduler won't die mid-tick, so none 159 # of the threads need to be daemons. However, if the scheduler does 160 # die unexpectedly we can just forsake the daemon threads. 161 self.drone_threads[drone] = worker_thread 162 # The name is only used for debugging 163 worker_thread.setName('%s.%s' % 164 (self.name, drone.hostname.replace('.', '_'))) 165 worker_thread.daemon = True 166 worker_thread.start() 167 return self.get_results() if wait else None 168