1# Copyright 2010 Google Inc. All Rights Reserved. 2# 3 4import logging 5import os.path 6import threading 7 8from automation.common import command as cmd 9from automation.common import job 10from automation.common import logger 11from automation.common.command_executer import LoggingCommandExecuter 12from automation.common.command_executer import CommandTerminator 13 14 15class JobExecuter(threading.Thread): 16 17 def __init__(self, job_to_execute, machines, listeners): 18 threading.Thread.__init__(self) 19 20 assert machines 21 22 self.job = job_to_execute 23 self.listeners = listeners 24 self.machines = machines 25 26 # Set Thread name. 27 self.name = '%s-%s' % (self.__class__.__name__, self.job.id) 28 29 self._logger = logging.getLogger(self.__class__.__name__) 30 self._executer = LoggingCommandExecuter(self.job.dry_run) 31 self._terminator = CommandTerminator() 32 33 def _RunRemotely(self, command, fail_msg, command_timeout=1 * 60 * 60): 34 exit_code = self._executer.RunCommand(command, 35 self.job.primary_machine.hostname, 36 self.job.primary_machine.username, 37 command_terminator=self._terminator, 38 command_timeout=command_timeout) 39 if exit_code: 40 raise job.JobFailure(fail_msg, exit_code) 41 42 def _RunLocally(self, command, fail_msg, command_timeout=1 * 60 * 60): 43 exit_code = self._executer.RunCommand(command, 44 command_terminator=self._terminator, 45 command_timeout=command_timeout) 46 if exit_code: 47 raise job.JobFailure(fail_msg, exit_code) 48 49 def Kill(self): 50 self._terminator.Terminate() 51 52 def CleanUpWorkDir(self): 53 self._logger.debug('Cleaning up %r work directory.', self.job) 54 self._RunRemotely(cmd.RmTree(self.job.work_dir), 'Cleanup workdir failed.') 55 56 def CleanUpHomeDir(self): 57 self._logger.debug('Cleaning up %r home directory.', self.job) 58 self._RunLocally(cmd.RmTree(self.job.home_dir), 'Cleanup homedir failed.') 59 60 def _PrepareRuntimeEnvironment(self): 61 self._RunRemotely( 62 cmd.MakeDir(self.job.work_dir, self.job.logs_dir, self.job.results_dir), 63 'Creating new job directory failed.') 64 65 # The log directory is ready, so we can prepare to log command's output. 66 self._executer.OpenLog(os.path.join(self.job.logs_dir, 67 self.job.log_filename_prefix)) 68 69 def _SatisfyFolderDependencies(self): 70 for dependency in self.job.folder_dependencies: 71 to_folder = os.path.join(self.job.work_dir, dependency.dest) 72 from_folder = os.path.join(dependency.job.work_dir, dependency.src) 73 from_machine = dependency.job.primary_machine 74 75 if from_machine == self.job.primary_machine and dependency.read_only: 76 # No need to make a copy, just symlink it 77 self._RunRemotely( 78 cmd.MakeSymlink(from_folder, to_folder), 79 'Failed to create symlink to required directory.') 80 else: 81 self._RunRemotely( 82 cmd.RemoteCopyFrom(from_machine.hostname, 83 from_folder, 84 to_folder, 85 username=from_machine.username), 86 'Failed to copy required files.') 87 88 def _LaunchJobCommand(self): 89 command = self.job.GetCommand() 90 91 self._RunRemotely('%s; %s' % ('PS1=. TERM=linux source ~/.bashrc', 92 cmd.Wrapper(command, 93 cwd=self.job.work_dir)), 94 "Command failed to execute: '%s'." % command, 95 self.job.timeout) 96 97 def _CopyJobResults(self): 98 """Copy test results back to directory.""" 99 self._RunLocally( 100 cmd.RemoteCopyFrom(self.job.primary_machine.hostname, 101 self.job.results_dir, 102 self.job.home_dir, 103 username=self.job.primary_machine.username), 104 'Failed to copy results.') 105 106 def run(self): 107 self.job.status = job.STATUS_SETUP 108 self.job.machines = self.machines 109 self._logger.debug('Executing %r on %r in directory %s.', self.job, 110 self.job.primary_machine.hostname, self.job.work_dir) 111 112 try: 113 self.CleanUpWorkDir() 114 115 self._PrepareRuntimeEnvironment() 116 117 self.job.status = job.STATUS_COPYING 118 119 self._SatisfyFolderDependencies() 120 121 self.job.status = job.STATUS_RUNNING 122 123 self._LaunchJobCommand() 124 self._CopyJobResults() 125 126 # If we get here, the job succeeded. 127 self.job.status = job.STATUS_SUCCEEDED 128 except job.JobFailure as ex: 129 self._logger.error('Job failed. Exit code %s. %s', ex.exit_code, ex) 130 if self._terminator.IsTerminated(): 131 self._logger.info('%r was killed', self.job) 132 133 self.job.status = job.STATUS_FAILED 134 135 self._executer.CloseLog() 136 137 for listener in self.listeners: 138 listener.NotifyJobComplete(self.job) 139