1import abc 2import datetime 3import glob 4import json 5import os 6import re 7import shutil 8import time 9 10import common 11from autotest_lib.client.common_lib import time_utils 12from autotest_lib.client.common_lib import utils 13from autotest_lib.server.cros.dynamic_suite import constants 14from autotest_lib.server.cros.dynamic_suite import frontend_wrappers 15 16 17_AFE = frontend_wrappers.RetryingAFE() 18 19SPECIAL_TASK_PATTERN = '.*/hosts/[^/]+/(\d+)-[^/]+' 20JOB_PATTERN = '.*/(\d+)-[^/]+' 21 22def _is_job_expired(age_limit, timestamp): 23 """Check whether a job timestamp is older than an age limit. 24 25 @param age_limit: Minimum age, measured in days. If the value is 26 not positive, the job is always expired. 27 @param timestamp: Timestamp of the job whose age we are checking. 28 The format must match time_utils.TIME_FMT. 29 30 @returns True iff the job is old enough to be expired. 31 """ 32 if age_limit <= 0: 33 return True 34 job_time = time_utils.time_string_to_datetime(timestamp) 35 expiration = job_time + datetime.timedelta(days=age_limit) 36 return datetime.datetime.now() >= expiration 37 38 39def get_job_id_or_task_id(result_dir): 40 """Extract job id or special task id from result_dir 41 42 @param result_dir: path to the result dir. 43 For test job: 44 /usr/local/autotest/results/2032-chromeos-test/chromeos1-rack5-host6 45 The hostname at the end is optional. 46 For special task: 47 /usr/local/autotest/results/hosts/chromeos1-rack5-host6/1343-cleanup 48 49 @returns: integer representing the job id or task id. Returns None if fail 50 to parse job or task id from the result_dir. 51 """ 52 if not result_dir: 53 return 54 result_dir = os.path.abspath(result_dir) 55 # Result folder for job running inside container has only job id. 56 ssp_job_pattern = '.*/(\d+)$' 57 # Try to get the job ID from the last pattern of number-text. This avoids 58 # issue with path like 123-results/456-debug_user, in which 456 is the real 59 # job ID. 60 m_job = re.findall(JOB_PATTERN, result_dir) 61 if m_job: 62 return int(m_job[-1]) 63 m_special_task = re.match(SPECIAL_TASK_PATTERN, result_dir) 64 if m_special_task: 65 return int(m_special_task.group(1)) 66 m_ssp_job_pattern = re.match(ssp_job_pattern, result_dir) 67 if m_ssp_job_pattern and utils.is_in_container(): 68 return int(m_ssp_job_pattern.group(1)) 69 70 71class _JobDirectory(object): 72 """State associated with a job to be offloaded. 73 74 The full life-cycle of a job (including failure events that 75 normally don't occur) looks like this: 76 1. The job's results directory is discovered by 77 `get_job_directories()`, and a job instance is created for it. 78 2. Calls to `offload()` have no effect so long as the job 79 isn't complete in the database and the job isn't expired 80 according to the `age_limit` parameter. 81 3. Eventually, the job is both finished and expired. The next 82 call to `offload()` makes the first attempt to offload the 83 directory to GS. Offload is attempted, but fails to complete 84 (e.g. because of a GS problem). 85 4. After the first failed offload `is_offloaded()` is false, 86 but `is_reportable()` is also false, so the failure is not 87 reported. 88 5. Another call to `offload()` again tries to offload the 89 directory, and again fails. 90 6. After a second failure, `is_offloaded()` is false and 91 `is_reportable()` is true, so the failure generates an e-mail 92 notification. 93 7. Finally, a call to `offload()` succeeds, and the directory no 94 longer exists. Now `is_offloaded()` is true, so the job 95 instance is deleted, and future failures will not mention this 96 directory any more. 97 98 Only steps 1. and 7. are guaranteed to occur. The others depend 99 on the timing of calls to `offload()`, and on the reliability of 100 the actual offload process. 101 102 """ 103 104 __metaclass__ = abc.ABCMeta 105 106 GLOB_PATTERN = None # must be redefined in subclass 107 108 def __init__(self, resultsdir): 109 self._dirname = resultsdir 110 self._id = get_job_id_or_task_id(resultsdir) 111 self._offload_count = 0 112 self._first_offload_start = 0 113 114 @classmethod 115 def get_job_directories(cls): 116 """Return a list of directories of jobs that need offloading.""" 117 return [d for d in glob.glob(cls.GLOB_PATTERN) if os.path.isdir(d)] 118 119 @abc.abstractmethod 120 def get_timestamp_if_finished(self): 121 """Return this job's timestamp from the database. 122 123 If the database has not marked the job as finished, return 124 `None`. Otherwise, return a timestamp for the job. The 125 timestamp is to be used to determine expiration in 126 `_is_job_expired()`. 127 128 @return Return `None` if the job is still running; otherwise 129 return a string with a timestamp in the appropriate 130 format. 131 """ 132 raise NotImplementedError("_JobDirectory.get_timestamp_if_finished") 133 134 def enqueue_offload(self, queue, age_limit): 135 """Enqueue the job for offload, if it's eligible. 136 137 The job is eligible for offloading if the database has marked 138 it finished, and the job is older than the `age_limit` 139 parameter. 140 141 If the job is eligible, offload processing is requested by 142 passing the `queue` parameter's `put()` method a sequence with 143 the job's `_dirname` attribute and its directory name. 144 145 @param queue If the job should be offloaded, put the offload 146 parameters into this queue for processing. 147 @param age_limit Minimum age for a job to be offloaded. A value 148 of 0 means that the job will be offloaded as 149 soon as it is finished. 150 151 """ 152 if not self._offload_count: 153 timestamp = self.get_timestamp_if_finished() 154 if not timestamp: 155 return 156 if not _is_job_expired(age_limit, timestamp): 157 return 158 self._first_offload_start = time.time() 159 self._offload_count += 1 160 if self.process_gs_instructions(): 161 queue.put([self._dirname, os.path.dirname(self._dirname)]) 162 163 def is_offloaded(self): 164 """Return whether this job has been successfully offloaded.""" 165 return not os.path.exists(self._dirname) 166 167 def is_reportable(self): 168 """Return whether this job has a reportable failure.""" 169 return self._offload_count > 1 170 171 def get_failure_time(self): 172 """Return the time of the first offload failure.""" 173 return self._first_offload_start 174 175 def get_failure_count(self): 176 """Return the number of times this job has failed to offload.""" 177 return self._offload_count 178 179 def get_job_directory(self): 180 """Return the name of this job's results directory.""" 181 return self._dirname 182 183 def process_gs_instructions(self): 184 """Process any gs_offloader instructions for this special task. 185 186 @returns True/False if there is anything left to offload. 187 """ 188 # Default support is to still offload the directory. 189 return True 190 191 192class RegularJobDirectory(_JobDirectory): 193 """Subclass of _JobDirectory for regular test jobs.""" 194 195 GLOB_PATTERN = '[0-9]*-*' 196 197 def process_gs_instructions(self): 198 """Process any gs_offloader instructions for this job. 199 200 @returns True/False if there is anything left to offload. 201 """ 202 # Go through the gs_offloader instructions file for each test in this job. 203 for path in glob.glob(os.path.join(self._dirname, '*', 204 constants.GS_OFFLOADER_INSTRUCTIONS)): 205 with open(path, 'r') as f: 206 gs_off_instructions = json.load(f) 207 if gs_off_instructions.get(constants.GS_OFFLOADER_NO_OFFLOAD): 208 shutil.rmtree(os.path.dirname(path)) 209 210 # Finally check if there's anything left to offload. 211 if not os.listdir(self._dirname): 212 shutil.rmtree(self._dirname) 213 return False 214 return True 215 216 217 def get_timestamp_if_finished(self): 218 """Get the timestamp to use for finished jobs. 219 220 @returns the latest hqe finished_on time. If the finished_on times are null 221 returns the job's created_on time. 222 """ 223 entry = _AFE.get_jobs(id=self._id, finished=True) 224 if not entry: 225 return None 226 hqes = _AFE.get_host_queue_entries(finished_on__isnull=False, 227 job_id=self._id) 228 if not hqes: 229 return entry[0].created_on 230 # While most Jobs have 1 HQE, some can have multiple, so check them all. 231 return max([hqe.finished_on for hqe in hqes]) 232 233 234class SpecialJobDirectory(_JobDirectory): 235 """Subclass of _JobDirectory for special (per-host) jobs.""" 236 237 GLOB_PATTERN = 'hosts/*/[0-9]*-*' 238 239 def __init__(self, resultsdir): 240 super(SpecialJobDirectory, self).__init__(resultsdir) 241 242 def get_timestamp_if_finished(self): 243 entry = _AFE.get_special_tasks(id=self._id, is_complete=True) 244 return entry[0].time_finished if entry else None 245