1# Copyright 2010 Google Inc. All Rights Reserved. 2# 3 4import logging 5import os 6import re 7import threading 8 9from automation.common import job 10from automation.common import logger 11from automation.server.job_executer import JobExecuter 12 13 14class IdProducerPolicy(object): 15 """Produces series of unique integer IDs. 16 17 Example: 18 id_producer = IdProducerPolicy() 19 id_a = id_producer.GetNextId() 20 id_b = id_producer.GetNextId() 21 assert id_a != id_b 22 """ 23 24 def __init__(self): 25 self._counter = 1 26 27 def Initialize(self, home_prefix, home_pattern): 28 """Find first available ID based on a directory listing. 29 30 Args: 31 home_prefix: A directory to be traversed. 32 home_pattern: A regexp describing all files/directories that will be 33 considered. The regexp must contain exactly one match group with name 34 "id", which must match an integer number. 35 36 Example: 37 id_producer.Initialize(JOBDIR_PREFIX, 'job-(?P<id>\d+)') 38 """ 39 harvested_ids = [] 40 41 if os.path.isdir(home_prefix): 42 for filename in os.listdir(home_prefix): 43 path = os.path.join(home_prefix, filename) 44 45 if os.path.isdir(path): 46 match = re.match(home_pattern, filename) 47 48 if match: 49 harvested_ids.append(int(match.group('id'))) 50 51 self._counter = max(harvested_ids or [0]) + 1 52 53 def GetNextId(self): 54 """Calculates another ID considered to be unique.""" 55 new_id = self._counter 56 self._counter += 1 57 return new_id 58 59 60class JobManager(threading.Thread): 61 62 def __init__(self, machine_manager): 63 threading.Thread.__init__(self, name=self.__class__.__name__) 64 self.all_jobs = [] 65 self.ready_jobs = [] 66 self.job_executer_mapping = {} 67 68 self.machine_manager = machine_manager 69 70 self._lock = threading.Lock() 71 self._jobs_available = threading.Condition(self._lock) 72 self._exit_request = False 73 74 self.listeners = [] 75 self.listeners.append(self) 76 77 self._id_producer = IdProducerPolicy() 78 self._id_producer.Initialize(job.Job.WORKDIR_PREFIX, 'job-(?P<id>\d+)') 79 80 self._logger = logging.getLogger(self.__class__.__name__) 81 82 def StartJobManager(self): 83 self._logger.info('Starting...') 84 85 with self._lock: 86 self.start() 87 self._jobs_available.notifyAll() 88 89 def StopJobManager(self): 90 self._logger.info('Shutdown request received.') 91 92 with self._lock: 93 for job_ in self.all_jobs: 94 self._KillJob(job_.id) 95 96 # Signal to die 97 self._exit_request = True 98 self._jobs_available.notifyAll() 99 100 # Wait for all job threads to finish 101 for executer in self.job_executer_mapping.values(): 102 executer.join() 103 104 def KillJob(self, job_id): 105 """Kill a job by id. 106 107 Does not block until the job is completed. 108 """ 109 with self._lock: 110 self._KillJob(job_id) 111 112 def GetJob(self, job_id): 113 for job_ in self.all_jobs: 114 if job_.id == job_id: 115 return job_ 116 return None 117 118 def _KillJob(self, job_id): 119 self._logger.info('Killing [Job: %d].', job_id) 120 121 if job_id in self.job_executer_mapping: 122 self.job_executer_mapping[job_id].Kill() 123 for job_ in self.ready_jobs: 124 if job_.id == job_id: 125 self.ready_jobs.remove(job_) 126 break 127 128 def AddJob(self, job_): 129 with self._lock: 130 job_.id = self._id_producer.GetNextId() 131 132 self.all_jobs.append(job_) 133 # Only queue a job as ready if it has no dependencies 134 if job_.is_ready: 135 self.ready_jobs.append(job_) 136 137 self._jobs_available.notifyAll() 138 139 return job_.id 140 141 def CleanUpJob(self, job_): 142 with self._lock: 143 if job_.id in self.job_executer_mapping: 144 self.job_executer_mapping[job_.id].CleanUpWorkDir() 145 del self.job_executer_mapping[job_.id] 146 # TODO(raymes): remove job from self.all_jobs 147 148 def NotifyJobComplete(self, job_): 149 self.machine_manager.ReturnMachines(job_.machines) 150 151 with self._lock: 152 self._logger.debug('Handling %r completion event.', job_) 153 154 if job_.status == job.STATUS_SUCCEEDED: 155 for succ in job_.successors: 156 if succ.is_ready: 157 if succ not in self.ready_jobs: 158 self.ready_jobs.append(succ) 159 160 self._jobs_available.notifyAll() 161 162 def AddListener(self, listener): 163 self.listeners.append(listener) 164 165 @logger.HandleUncaughtExceptions 166 def run(self): 167 self._logger.info('Started.') 168 169 while not self._exit_request: 170 with self._lock: 171 # Get the next ready job, block if there are none 172 self._jobs_available.wait() 173 174 while self.ready_jobs: 175 ready_job = self.ready_jobs.pop() 176 177 required_machines = ready_job.machine_dependencies 178 for pred in ready_job.predecessors: 179 required_machines[0].AddPreferredMachine( 180 pred.primary_machine.hostname) 181 182 machines = self.machine_manager.GetMachines(required_machines) 183 if not machines: 184 # If we can't get the necessary machines right now, simply wait 185 # for some jobs to complete 186 self.ready_jobs.insert(0, ready_job) 187 break 188 else: 189 # Mark as executing 190 executer = JobExecuter(ready_job, machines, self.listeners) 191 executer.start() 192 self.job_executer_mapping[ready_job.id] = executer 193 194 self._logger.info('Stopped.') 195