1#pylint: disable-msg=C0111 2 3""" 4Postjob task. 5 6Postjob tasks are responsible for setting the final status of the HQE 7and Host, and scheduling additional special agents such as cleanup, 8if necessary. 9""" 10 11import os 12 13from autotest_lib.client.common_lib import utils 14from autotest_lib.frontend.afe import models, model_attributes 15from autotest_lib.scheduler import agent_task, drones, drone_manager 16from autotest_lib.scheduler import email_manager, pidfile_monitor 17from autotest_lib.scheduler import scheduler_config 18from autotest_lib.server import autoserv_utils 19 20try: 21 from chromite.lib import metrics 22except ImportError: 23 metrics = utils.metrics_mock 24 25 26_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse') 27 28 29class PostJobTask(agent_task.AgentTask): 30 def __init__(self, queue_entries, log_file_name): 31 super(PostJobTask, self).__init__(log_file_name=log_file_name) 32 33 self.queue_entries = queue_entries 34 35 self._autoserv_monitor = pidfile_monitor.PidfileRunMonitor() 36 self._autoserv_monitor.attach_to_existing_process( 37 self._working_directory()) 38 39 40 def _command_line(self): 41 # Do we need testing_mode? 42 return self._generate_command( 43 self._drone_manager.absolute_path(self._working_directory())) 44 45 46 def _generate_command(self, results_dir): 47 raise NotImplementedError('Subclasses must override this') 48 49 50 @property 51 def owner_username(self): 52 return self.queue_entries[0].job.owner 53 54 55 def _working_directory(self): 56 return self._get_consistent_execution_path(self.queue_entries) 57 58 59 def _paired_with_monitor(self): 60 return self._autoserv_monitor 61 62 63 def _job_was_aborted(self): 64 was_aborted = None 65 for queue_entry in self.queue_entries: 66 queue_entry.update_from_database() 67 if was_aborted is None: # first queue entry 68 was_aborted = bool(queue_entry.aborted) 69 elif was_aborted != bool(queue_entry.aborted): # subsequent entries 70 entries = ['%s (aborted: %s)' % (entry, entry.aborted) 71 for entry in self.queue_entries] 72 email_manager.manager.enqueue_notify_email( 73 'Inconsistent abort state', 74 'Queue entries have inconsistent abort state:\n' + 75 '\n'.join(entries)) 76 # don't crash here, just assume true 77 return True 78 return was_aborted 79 80 81 def _final_status(self): 82 if self._job_was_aborted(): 83 return models.HostQueueEntry.Status.ABORTED 84 85 # we'll use a PidfileRunMonitor to read the autoserv exit status 86 if self._autoserv_monitor.exit_code() == 0: 87 return models.HostQueueEntry.Status.COMPLETED 88 return models.HostQueueEntry.Status.FAILED 89 90 91 def _set_all_statuses(self, status): 92 for queue_entry in self.queue_entries: 93 queue_entry.set_status(status) 94 95 96 def abort(self): 97 # override AgentTask.abort() to avoid killing the process and ending 98 # the task. post-job tasks continue when the job is aborted. 99 pass 100 101 102 def _pidfile_label(self): 103 # '.autoserv_execute' -> 'autoserv' 104 return self._pidfile_name()[1:-len('_execute')] 105 106 107class SelfThrottledPostJobTask(PostJobTask): 108 """ 109 PostJobTask that maintains its own process limit. 110 111 We throttle tasks like parsing because we don't want them to 112 hold up tests. At the same time we don't wish to build up load 113 that will take forever to parse. 114 """ 115 _num_running_processes = 0 116 # Last known limit of max processes, used to check whether 117 # max processes config has been changed. 118 _last_known_max_processes = 0 119 # Whether an email should be sent to notifiy process limit being hit. 120 _notification_on = True 121 # Once process limit is hit, an email will be sent. 122 # To prevent spams, do not send another email until 123 # it drops to lower than the following level. 124 REVIVE_NOTIFICATION_THRESHOLD = 0.80 125 126 @classmethod 127 def _gauge_metrics(cls): 128 """Report to monarch the number of running processes.""" 129 m = metrics.Gauge('chromeos/autotest/scheduler/postjob_tasks') 130 m.set(cls._num_running_processes, fields={'task_name': cls.__name__}) 131 132 133 @classmethod 134 def _increment_running_processes(cls): 135 cls._num_running_processes += 1 136 cls._gauge_metrics() 137 138 139 @classmethod 140 def _decrement_running_processes(cls): 141 cls._num_running_processes -= 1 142 cls._gauge_metrics() 143 144 145 @classmethod 146 def _max_processes(cls): 147 raise NotImplementedError 148 149 150 @classmethod 151 def _can_run_new_process(cls): 152 return cls._num_running_processes < cls._max_processes() 153 154 155 def _process_started(self): 156 return bool(self.monitor) 157 158 159 def tick(self): 160 # override tick to keep trying to start until the process count goes 161 # down and we can, at which point we revert to default behavior 162 if self._process_started(): 163 super(SelfThrottledPostJobTask, self).tick() 164 else: 165 self._try_starting_process() 166 167 168 def run(self): 169 # override run() to not actually run unless we can 170 self._try_starting_process() 171 172 173 @classmethod 174 def _notify_process_limit_hit(cls): 175 """Send an email to notify that process limit is hit.""" 176 if cls._notification_on: 177 subject = '%s: hitting max process limit.' % cls.__name__ 178 message = ('Running processes/Max processes: %d/%d' 179 % (cls._num_running_processes, cls._max_processes())) 180 email_manager.manager.enqueue_notify_email(subject, message) 181 cls._notification_on = False 182 183 184 @classmethod 185 def _reset_notification_switch_if_necessary(cls): 186 """Reset _notification_on if necessary. 187 188 Set _notification_on to True on the following cases: 189 1) If the limit of max processes configuration changes; 190 2) If _notification_on is False and the number of running processes 191 drops to lower than a level defined in REVIVE_NOTIFICATION_THRESHOLD. 192 193 """ 194 if cls._last_known_max_processes != cls._max_processes(): 195 cls._notification_on = True 196 cls._last_known_max_processes = cls._max_processes() 197 return 198 percentage = float(cls._num_running_processes) / cls._max_processes() 199 if (not cls._notification_on and 200 percentage < cls.REVIVE_NOTIFICATION_THRESHOLD): 201 cls._notification_on = True 202 203 204 def _try_starting_process(self): 205 self._reset_notification_switch_if_necessary() 206 if not self._can_run_new_process(): 207 self._notify_process_limit_hit() 208 return 209 210 # actually run the command 211 super(SelfThrottledPostJobTask, self).run() 212 if self._process_started(): 213 self._increment_running_processes() 214 215 216 def finished(self, success): 217 super(SelfThrottledPostJobTask, self).finished(success) 218 if self._process_started(): 219 self._decrement_running_processes() 220 221 222class GatherLogsTask(PostJobTask): 223 """ 224 Task responsible for 225 * gathering uncollected logs (if Autoserv crashed hard or was killed) 226 * copying logs to the results repository 227 * spawning CleanupTasks for hosts, if necessary 228 * spawning a FinalReparseTask for the job 229 * setting the final status of the host, directly or through a cleanup 230 """ 231 def __init__(self, queue_entries, recover_run_monitor=None): 232 self._job = queue_entries[0].job 233 super(GatherLogsTask, self).__init__( 234 queue_entries, log_file_name='.collect_crashinfo.log') 235 self._set_ids(queue_entries=queue_entries) 236 237 238 # TODO: Refactor into autoserv_utils. crbug.com/243090 239 def _generate_command(self, results_dir): 240 host_list = ','.join(queue_entry.host.hostname 241 for queue_entry in self.queue_entries) 242 return [autoserv_utils.autoserv_path , '-p', 243 '--pidfile-label=%s' % self._pidfile_label(), 244 '--use-existing-results', '--collect-crashinfo', 245 '-m', host_list, '-r', results_dir] 246 247 248 @property 249 def num_processes(self): 250 return len(self.queue_entries) 251 252 253 def _pidfile_name(self): 254 return drone_manager.CRASHINFO_PID_FILE 255 256 257 def prolog(self): 258 self._check_queue_entry_statuses( 259 self.queue_entries, 260 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,), 261 allowed_host_statuses=(models.Host.Status.RUNNING,)) 262 263 super(GatherLogsTask, self).prolog() 264 265 266 def epilog(self): 267 super(GatherLogsTask, self).epilog() 268 self._parse_results(self.queue_entries) 269 final_success, num_tests_failed = self._get_monitor_info() 270 reset_after_failure = ( 271 not self._job.run_reset and ( 272 not final_success or num_tests_failed > 0)) 273 self._reboot_hosts(final_success, num_tests_failed, reset_after_failure) 274 if reset_after_failure: 275 m = metrics.Counter('chromeos/autotest/scheduler/postjob_tasks/' 276 'reset_after_failure') 277 m.increment(fields={'autoserv_process_success': final_success, 278 'num_tests_failed': num_tests_failed > 0}) 279 self._reset_after_failure() 280 281 282 def _get_monitor_info(self): 283 """Read monitor info from pidfile. 284 285 @returns: a tuple including 286 final_success: whether the monitor is successfully finished; 287 num_tests_failed: how many failed tests in the process. 288 """ 289 if self._autoserv_monitor.has_process(): 290 final_success = (self._final_status() == 291 models.HostQueueEntry.Status.COMPLETED) 292 num_tests_failed = self._autoserv_monitor.num_tests_failed() 293 else: 294 final_success = False 295 num_tests_failed = 0 296 297 return final_success, num_tests_failed 298 299 300 def _reboot_hosts(self, final_success, num_tests_failed, 301 reset_after_failure): 302 """Reboot hosts by scheduling a CLEANUP task on host if needed. 303 304 @param final_success: whether the monitor successfully exits. 305 @param num_tests_failed: how many failed tests in total. 306 @param reset_after_failure: whether to schedule RESET task later. 307 """ 308 reboot_after = self._job.reboot_after 309 do_reboot = ( 310 # always reboot after aborted jobs 311 self._final_status() == models.HostQueueEntry.Status.ABORTED 312 or reboot_after == model_attributes.RebootAfter.ALWAYS 313 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED 314 and final_success and num_tests_failed == 0) 315 or (num_tests_failed > 0 and not reset_after_failure)) 316 317 for queue_entry in self.queue_entries: 318 if do_reboot: 319 # don't pass the queue entry to the CleanupTask. if the cleanup 320 # fails, the job doesn't care -- it's over. 321 models.SpecialTask.objects.create( 322 host=models.Host.objects.get(id=queue_entry.host.id), 323 task=models.SpecialTask.Task.CLEANUP, 324 requested_by=self._job.owner_model()) 325 else: 326 queue_entry.host.set_status(models.Host.Status.READY) 327 328 329 def _reset_after_failure(self): 330 """Queue a RESET job for the host if job fails. 331 332 The current hqe entry is not passed into the RESET job. 333 """ 334 for queue_entry in self.queue_entries: 335 models.SpecialTask.objects.create( 336 host=models.Host.objects.get(id=queue_entry.host.id), 337 task=models.SpecialTask.Task.RESET, 338 requested_by=self._job.owner_model()) 339 340 341 def run(self): 342 autoserv_exit_code = self._autoserv_monitor.exit_code() 343 # only run if Autoserv exited due to some signal. if we have no exit 344 # code, assume something bad (and signal-like) happened. 345 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code): 346 super(GatherLogsTask, self).run() 347 else: 348 self.finished(True) 349 350 351class FinalReparseTask(SelfThrottledPostJobTask): 352 def __init__(self, queue_entries): 353 super(FinalReparseTask, self).__init__(queue_entries, 354 log_file_name='.parse.log') 355 # don't use _set_ids, since we don't want to set the host_ids 356 self.queue_entry_ids = [entry.id for entry in queue_entries] 357 358 359 def _generate_command(self, results_dir): 360 return [_parser_path, '--detach', '--write-pidfile', 361 '--record-duration', '--suite-report', '-l', '2', '-r', '-o', 362 results_dir] 363 364 365 @property 366 def num_processes(self): 367 return 0 # don't include parser processes in accounting 368 369 370 def _pidfile_name(self): 371 return drone_manager.PARSER_PID_FILE 372 373 374 @classmethod 375 def _max_processes(cls): 376 return scheduler_config.config.max_parse_processes 377 378 379 def prolog(self): 380 self._check_queue_entry_statuses( 381 self.queue_entries, 382 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,)) 383 384 super(FinalReparseTask, self).prolog() 385 386 387 def epilog(self): 388 super(FinalReparseTask, self).epilog() 389 self._set_all_statuses(self._final_status()) 390