1import abc 2import datetime 3import glob 4import json 5import logging 6import os 7import re 8import shutil 9 10import common 11from autotest_lib.client.common_lib import time_utils 12from autotest_lib.client.common_lib import utils 13from autotest_lib.server.cros.dynamic_suite import constants 14from autotest_lib.server.cros.dynamic_suite import frontend_wrappers 15 16try: 17 from chromite.lib import metrics 18except ImportError: 19 metrics = utils.metrics_mock 20 21 22SPECIAL_TASK_PATTERN = '.*/hosts/[^/]+/(\d+)-[^/]+' 23 24def is_job_expired(age_limit, timestamp): 25 """Check whether a job timestamp is older than an age limit. 26 27 @param age_limit: Minimum age, measured in days. If the value is 28 not positive, the job is always expired. 29 @param timestamp: Timestamp of the job whose age we are checking. 30 The format must match time_utils.TIME_FMT. 31 32 @returns True iff the job is old enough to be expired. 33 """ 34 if age_limit <= 0: 35 return True 36 job_time = time_utils.time_string_to_datetime(timestamp) 37 expiration = job_time + datetime.timedelta(days=age_limit) 38 return datetime.datetime.now() >= expiration 39 40 41def get_job_id_or_task_id(result_dir): 42 """Extract job id or special task id from result_dir 43 44 @param result_dir: path to the result dir. 45 For test job: 46 /usr/local/autotest/results/2032-chromeos-test/chromeos1-rack5-host6 47 The hostname at the end is optional. 48 For special task: 49 /usr/local/autotest/results/hosts/chromeos1-rack5-host6/1343-cleanup 50 51 @returns: str representing the job id or task id. Returns None if fail 52 to parse job or task id from the result_dir. 53 """ 54 if not result_dir: 55 return 56 result_dir = os.path.abspath(result_dir) 57 # Result folder for job running inside container has only job id. 58 ssp_job_pattern = '.*/(\d+)$' 59 # Try to get the job ID from the last pattern of number-text. This avoids 60 # issue with path like 123-results/456-debug_user, in which 456 is the real 61 # job ID. 62 m_job = re.findall('.*/(\d+)-[^/]+', result_dir) 63 if m_job: 64 return m_job[-1] 65 m_special_task = re.match(SPECIAL_TASK_PATTERN, result_dir) 66 if m_special_task: 67 return m_special_task.group(1) 68 m_ssp_job_pattern = re.match(ssp_job_pattern, result_dir) 69 if m_ssp_job_pattern and utils.is_in_container(): 70 return m_ssp_job_pattern.group(1) 71 return _get_swarming_run_id(result_dir) 72 73 74def _get_swarming_run_id(path): 75 """Extract the Swarming run_id for a Skylab task from the result path.""" 76 # Legacy swarming results are in directories like 77 # .../results/swarming-3e4391423c3a4311 78 # In particular, the ending digit is never 0 79 m_legacy_path = re.match('.*/swarming-([0-9a-fA-F]*[1-9a-fA-F])$', path) 80 if m_legacy_path: 81 return m_legacy_path.group(1) 82 # New style swarming results are in directories like 83 # .../results/swarming-3e4391423c3a4310/1 84 # - Results are one directory deeper. 85 # - Ending digit of first directory is always 0. 86 m_path = re.match('.*/swarming-([0-9a-fA-F]*)0/([1-9a-fA-F])$', path) 87 if m_path: 88 return m_path.group(1) + m_path.group(2) 89 return None 90 91 92class _JobDirectory(object): 93 """State associated with a job to be offloaded. 94 95 The full life-cycle of a job (including failure events that 96 normally don't occur) looks like this: 97 1. The job's results directory is discovered by 98 `get_job_directories()`, and a job instance is created for it. 99 2. Calls to `offload()` have no effect so long as the job 100 isn't complete in the database and the job isn't expired 101 according to the `age_limit` parameter. 102 3. Eventually, the job is both finished and expired. The next 103 call to `offload()` makes the first attempt to offload the 104 directory to GS. Offload is attempted, but fails to complete 105 (e.g. because of a GS problem). 106 4. Finally, a call to `offload()` succeeds, and the directory no 107 longer exists. Now `is_offloaded()` is true, so the job 108 instance is deleted, and future failures will not mention this 109 directory any more. 110 111 Only steps 1. and 4. are guaranteed to occur. The others depend 112 on the timing of calls to `offload()`, and on the reliability of 113 the actual offload process. 114 115 """ 116 117 __metaclass__ = abc.ABCMeta 118 119 GLOB_PATTERN = None # must be redefined in subclass 120 121 def __init__(self, resultsdir): 122 self.dirname = resultsdir 123 self._id = get_job_id_or_task_id(resultsdir) 124 self.offload_count = 0 125 self.first_offload_start = 0 126 127 @classmethod 128 def get_job_directories(cls): 129 """Return a list of directories of jobs that need offloading.""" 130 return [d for d in glob.glob(cls.GLOB_PATTERN) if os.path.isdir(d)] 131 132 @abc.abstractmethod 133 def get_timestamp_if_finished(self): 134 """Return this job's timestamp from the database. 135 136 If the database has not marked the job as finished, return 137 `None`. Otherwise, return a timestamp for the job. The 138 timestamp is to be used to determine expiration in 139 `is_job_expired()`. 140 141 @return Return `None` if the job is still running; otherwise 142 return a string with a timestamp in the appropriate 143 format. 144 """ 145 raise NotImplementedError("_JobDirectory.get_timestamp_if_finished") 146 147 def process_gs_instructions(self): 148 """Process any gs_offloader instructions for this special task. 149 150 @returns True/False if there is anything left to offload. 151 """ 152 # Default support is to still offload the directory. 153 return True 154 155 156NO_OFFLOAD_README = """These results have been deleted rather than offloaded. 157This is the expected behavior for passing jobs from the Commit Queue.""" 158 159 160class RegularJobDirectory(_JobDirectory): 161 """Subclass of _JobDirectory for regular test jobs.""" 162 163 GLOB_PATTERN = '[0-9]*-*' 164 165 def process_gs_instructions(self): 166 """Process any gs_offloader instructions for this job. 167 168 @returns True/False if there is anything left to offload. 169 """ 170 # Go through the gs_offloader instructions file for each test in this job. 171 for path in glob.glob( 172 os.path.join(self.dirname, '*', 173 constants.GS_OFFLOADER_INSTRUCTIONS)): 174 with open(path, 'r') as f: 175 gs_off_instructions = json.load(f) 176 if gs_off_instructions.get(constants.GS_OFFLOADER_NO_OFFLOAD): 177 dirname = os.path.dirname(path) 178 _remove_log_directory_contents(dirname) 179 180 # Finally check if there's anything left to offload. 181 if os.path.exists(self.dirname) and not os.listdir(self.dirname): 182 shutil.rmtree(self.dirname) 183 return False 184 return True 185 186 def get_timestamp_if_finished(self): 187 """Get the timestamp to use for finished jobs. 188 189 @returns the latest hqe finished_on time. If the finished_on times are null 190 returns the job's created_on time. 191 """ 192 entry = _cached_afe().get_jobs(id=self._id, finished=True) 193 if not entry: 194 return None 195 hqes = _cached_afe().get_host_queue_entries(finished_on__isnull=False, 196 job_id=self._id) 197 if not hqes: 198 return entry[0].created_on 199 # While most Jobs have 1 HQE, some can have multiple, so check them all. 200 return max([hqe.finished_on for hqe in hqes]) 201 202 203def _remove_log_directory_contents(dirpath): 204 """Remove log directory contents. 205 206 Leave a note explaining what has happened to the logs. 207 208 @param dirpath: Path to log directory. 209 """ 210 shutil.rmtree(dirpath) 211 os.mkdir(dirpath) 212 breadcrumb_name = os.path.join(dirpath, 'logs-removed-readme.txt') 213 with open(breadcrumb_name, 'w') as f: 214 f.write(NO_OFFLOAD_README) 215 216 217class SpecialJobDirectory(_JobDirectory): 218 """Subclass of _JobDirectory for special (per-host) jobs.""" 219 220 GLOB_PATTERN = 'hosts/*/[0-9]*-*' 221 222 def __init__(self, resultsdir): 223 super(SpecialJobDirectory, self).__init__(resultsdir) 224 225 def get_timestamp_if_finished(self): 226 entry = _cached_afe().get_special_tasks(id=self._id, is_complete=True) 227 return entry[0].time_finished if entry else None 228 229 230def _find_results_dir(dirname): 231 subdirs = [] 232 for root, dirs, files in os.walk(dirname, topdown=True): 233 for f in files: 234 if f == _OFFLOAD_MARKER: 235 subdirs.append(root) 236 return subdirs 237 238 239_OFFLOAD_MARKER = ".ready_for_offload" 240_marker_parse_error_metric = metrics.Counter( 241 'chromeos/autotest/gs_offloader/offload_marker_parse_errors', 242 description='Errors parsing the offload marker file') 243 244 245class SwarmingJobDirectory(_JobDirectory): 246 """Subclass of _JobDirectory for Skylab swarming jobs.""" 247 248 @classmethod 249 def get_job_directories(cls): 250 """Return a list of directories of jobs that need offloading.""" 251 # Legacy swarming results are in directories like 252 # .../results/swarming-3e4391423c3a4311 253 # In particular, the ending digit is never 0 254 jobdirs = [ 255 d for d in glob.glob('swarming-[0-9a-f]*[1-9a-f]') 256 if os.path.isdir(d) 257 ] 258 # New style swarming results are in directories like 259 # .../results/swarming-3e4391423c3a4310/1 260 # - Results are one directory deeper. 261 # - Ending digit of first directory is always 0. 262 new_style_topdir = [ 263 d for d in glob.glob('swarming-[0-9a-f]*0') if os.path.isdir(d) 264 ] 265 # When there are multiple tests run in one test_runner build, 266 # the results will be one level deeper with the test_id 267 # as one further subdirectory. 268 # Example: .../results/swarming-3e4391423c3a4310/1/test_id 269 for topdir in new_style_topdir: 270 for d in glob.glob('%s/[1-9a-f]*' % topdir): 271 subdirs = _find_results_dir(d) 272 jobdirs += subdirs 273 274 return jobdirs 275 276 def get_timestamp_if_finished(self): 277 """Get the timestamp to use for finished jobs. 278 279 @returns the latest hqe finished_on time. If the finished_on times are null 280 returns the job's created_on time. 281 """ 282 marker_path = os.path.join(self.dirname, _OFFLOAD_MARKER) 283 try: 284 with open(marker_path) as f: 285 ts_string = f.read().strip() 286 except: 287 return None 288 try: 289 ts = int(ts_string) 290 return time_utils.epoch_time_to_date_string(ts) 291 except ValueError as e: 292 logging.debug('Error parsing %s for %s: %s', _OFFLOAD_MARKER, 293 self.dirname, e) 294 _marker_parse_error_metric.increment() 295 return None 296 297 298_AFE = None 299def _cached_afe(): 300 global _AFE 301 if _AFE is None: 302 _AFE = frontend_wrappers.RetryingAFE() 303 return _AFE 304