1#!/usr/bin/python 2# 3# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7"""Script to archive old Autotest results to Google Storage. 8 9Uses gsutil to archive files to the configured Google Storage bucket. 10Upon successful copy, the local results directory is deleted. 11""" 12 13import datetime 14import errno 15import logging 16import logging.handlers 17import os 18import re 19import shutil 20import signal 21import socket 22import subprocess 23import sys 24import tempfile 25import time 26 27from optparse import OptionParser 28 29import common 30from autotest_lib.client.common_lib import error 31from autotest_lib.client.common_lib import utils 32from autotest_lib.site_utils import job_directories 33 34try: 35 # Does not exist, nor is needed, on moblab. 36 import psutil 37except ImportError: 38 psutil = None 39 40import job_directories 41from autotest_lib.client.common_lib import global_config 42from autotest_lib.client.common_lib.cros.graphite import autotest_stats 43from autotest_lib.scheduler import email_manager 44from chromite.lib import parallel 45 46 47GS_OFFLOADING_ENABLED = global_config.global_config.get_config_value( 48 'CROS', 'gs_offloading_enabled', type=bool, default=True) 49 50STATS_KEY = 'gs_offloader.%s' % socket.gethostname().replace('.', '_') 51METADATA_TYPE = 'result_dir_size' 52 53timer = autotest_stats.Timer(STATS_KEY) 54 55# Nice setting for process, the higher the number the lower the priority. 56NICENESS = 10 57 58# Maximum number of seconds to allow for offloading a single 59# directory. 60OFFLOAD_TIMEOUT_SECS = 60 * 60 61 62# Sleep time per loop. 63SLEEP_TIME_SECS = 5 64 65# Minimum number of seconds between e-mail reports. 66REPORT_INTERVAL_SECS = 60 * 60 67 68# Location of Autotest results on disk. 69RESULTS_DIR = '/usr/local/autotest/results' 70 71# Hosts sub-directory that contains cleanup, verify and repair jobs. 72HOSTS_SUB_DIR = 'hosts' 73 74LOG_LOCATION = '/usr/local/autotest/logs/' 75LOG_FILENAME_FORMAT = 'gs_offloader_%s_log_%s.txt' 76LOG_TIMESTAMP_FORMAT = '%Y%m%d_%H%M%S' 77LOGGING_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' 78 79# pylint: disable=E1120 80NOTIFY_ADDRESS = global_config.global_config.get_config_value( 81 'SCHEDULER', 'notify_email', default='') 82 83ERROR_EMAIL_HELPER_URL = 'http://go/cros-triage-gsoffloader' 84ERROR_EMAIL_SUBJECT_FORMAT = 'GS Offloader notifications from %s' 85ERROR_EMAIL_REPORT_FORMAT = '''\ 86gs_offloader is failing to offload results directories. 87 88Check %s to triage the issue. 89 90First failure Count Directory name 91=================== ====== ============================== 92''' % ERROR_EMAIL_HELPER_URL 93# --+----1----+---- ----+ ----+----1----+----2----+----3 94 95ERROR_EMAIL_DIRECTORY_FORMAT = '%19s %5d %-1s\n' 96ERROR_EMAIL_TIME_FORMAT = '%Y-%m-%d %H:%M:%S' 97 98USE_RSYNC_ENABLED = global_config.global_config.get_config_value( 99 'CROS', 'gs_offloader_use_rsync', type=bool, default=False) 100 101# According to https://cloud.google.com/storage/docs/bucket-naming#objectnames 102INVALID_GS_CHARS = ['[', ']', '*', '?', '#'] 103INVALID_GS_CHAR_RANGE = [(0x00, 0x1F), (0x7F, 0x84), (0x86, 0xFF)] 104 105# Maximum number of files in the folder. 106MAX_FILE_COUNT = 500 107FOLDERS_NEVER_ZIP = ['debug', 'ssp_logs'] 108LIMIT_FILE_COUNT = global_config.global_config.get_config_value( 109 'CROS', 'gs_offloader_limit_file_count', type=bool, default=False) 110 111 112class TimeoutException(Exception): 113 """Exception raised by the timeout_handler.""" 114 pass 115 116 117def timeout_handler(_signum, _frame): 118 """Handler for SIGALRM when the offloading process times out. 119 120 @param _signum: Signal number of the signal that was just caught. 121 14 for SIGALRM. 122 @param _frame: Current stack frame. 123 124 @raise TimeoutException: Automatically raises so that the time out 125 is caught by the try/except surrounding the 126 Popen call. 127 """ 128 raise TimeoutException('Process Timed Out') 129 130 131def get_cmd_list(multiprocessing, dir_entry, gs_path): 132 """Return the command to offload a specified directory. 133 134 @param multiprocessing: True to turn on -m option for gsutil. 135 @param dir_entry: Directory entry/path that which we need a cmd_list 136 to offload. 137 @param gs_path: Location in google storage where we will 138 offload the directory. 139 140 @return A command list to be executed by Popen. 141 """ 142 cmd = ['gsutil'] 143 if multiprocessing: 144 cmd.append('-m') 145 if USE_RSYNC_ENABLED: 146 cmd.append('rsync') 147 target = os.path.join(gs_path, os.path.basename(dir_entry)) 148 else: 149 cmd.append('cp') 150 target = gs_path 151 cmd += ['-eR', dir_entry, target] 152 return cmd 153 154 155def get_directory_size_kibibytes_cmd_list(directory): 156 """Returns command to get a directory's total size.""" 157 # Having this in its own method makes it easier to mock in 158 # unittests. 159 return ['du', '-sk', directory] 160 161 162def get_directory_size_kibibytes(directory): 163 """Calculate the total size of a directory with all its contents. 164 165 @param directory: Path to the directory 166 167 @return Size of the directory in kibibytes. 168 """ 169 cmd = get_directory_size_kibibytes_cmd_list(directory) 170 process = subprocess.Popen(cmd, 171 stdout=subprocess.PIPE, 172 stderr=subprocess.PIPE) 173 stdout_data, stderr_data = process.communicate() 174 175 if process.returncode != 0: 176 # This function is used for statistics only, if it fails, 177 # nothing else should crash. 178 logging.warning('Getting size of %s failed. Stderr:', directory) 179 logging.warning(stderr_data) 180 return 0 181 182 return int(stdout_data.split('\t', 1)[0]) 183 184 185def get_sanitized_name(name): 186 """Get a string with all invalid characters in the name being replaced. 187 188 @param name: Name to be processed. 189 190 @return A string with all invalid characters in the name being 191 replaced. 192 """ 193 match_pattern = ''.join([re.escape(c) for c in INVALID_GS_CHARS]) 194 match_pattern += ''.join([r'\x%02x-\x%02x' % (r[0], r[1]) 195 for r in INVALID_GS_CHAR_RANGE]) 196 invalid = re.compile('[%s]' % match_pattern) 197 return invalid.sub(lambda x: '%%%02x' % ord(x.group(0)), name) 198 199 200def sanitize_dir(dir_entry): 201 """Replace all invalid characters in folder and file names with valid ones. 202 203 @param dir_entry: Directory entry to be sanitized. 204 """ 205 if not os.path.exists(dir_entry): 206 return 207 renames = [] 208 for root, dirs, files in os.walk(dir_entry): 209 sanitized_root = get_sanitized_name(root) 210 for name in dirs + files: 211 sanitized_name = get_sanitized_name(name) 212 if name != sanitized_name: 213 orig_path = os.path.join(sanitized_root, name) 214 rename_path = os.path.join(sanitized_root, 215 sanitized_name) 216 renames.append((orig_path, rename_path)) 217 for src, dest in renames: 218 logging.warn('Invalid character found. Renaming %s to %s.', 219 src, dest) 220 shutil.move(src, dest) 221 222 223def _get_zippable_folders(dir_entry): 224 folders_list = [] 225 for folder in os.listdir(dir_entry): 226 folder_path = os.path.join(dir_entry, folder) 227 if (not os.path.isfile(folder_path) and 228 not folder in FOLDERS_NEVER_ZIP): 229 folders_list.append(folder_path) 230 return folders_list 231 232 233def limit_file_count(dir_entry): 234 """Limit the number of files in given directory. 235 236 The method checks the total number of files in the given directory. 237 If the number is greater than MAX_FILE_COUNT, the method will 238 compress each folder in the given directory, except folders in 239 FOLDERS_NEVER_ZIP. 240 241 @param dir_entry: Directory entry to be checked. 242 """ 243 count = utils.run('find "%s" | wc -l' % dir_entry, 244 ignore_status=True).stdout.strip() 245 try: 246 count = int(count) 247 except ValueError, TypeError: 248 logging.warn('Fail to get the file count in folder %s.', 249 dir_entry) 250 return 251 if count < MAX_FILE_COUNT: 252 return 253 254 # For test job, zip folders in a second level, e.g. 123-debug/host1. 255 # This is to allow autoserv debug folder still be accessible. 256 # For special task, it does not need to dig one level deeper. 257 is_special_task = re.match(job_directories.SPECIAL_TASK_PATTERN, 258 dir_entry) 259 260 folders = _get_zippable_folders(dir_entry) 261 if not is_special_task: 262 subfolders = [] 263 for folder in folders: 264 subfolders.extend(_get_zippable_folders(folder)) 265 folders = subfolders 266 267 for folder in folders: 268 try: 269 zip_name = '%s.tgz' % folder 270 utils.run('tar -cz -C "%s" -f "%s" "%s"' % 271 (os.path.dirname(folder), zip_name, 272 os.path.basename(folder))) 273 except error.CmdError as e: 274 logging.error('Fail to compress folder %s. Error: %s', 275 folder, e) 276 continue 277 shutil.rmtree(folder) 278 279 280def correct_results_folder_permission(dir_entry): 281 """Make sure the results folder has the right permission settings. 282 283 For tests running with server-side packaging, the results folder has 284 the owner of root. This must be changed to the user running the 285 autoserv process, so parsing job can access the results folder. 286 287 @param dir_entry: Path to the results folder. 288 """ 289 if not dir_entry: 290 return 291 try: 292 subprocess.check_call( 293 ['sudo', '-n', 'chown', '-R', str(os.getuid()), dir_entry]) 294 subprocess.check_call( 295 ['sudo', '-n', 'chgrp', '-R', str(os.getgid()), dir_entry]) 296 except subprocess.CalledProcessError as e: 297 logging.error('Failed to modify permission for %s: %s', 298 dir_entry, e) 299 300 301def get_offload_dir_func(gs_uri, multiprocessing): 302 """Returns the offload directory function for the given gs_uri 303 304 @param gs_uri: Google storage bucket uri to offload to. 305 @param multiprocessing: True to turn on -m option for gsutil. 306 307 @return offload_dir function to perform the offload. 308 """ 309 @timer.decorate 310 def offload_dir(dir_entry, dest_path): 311 """Offload the specified directory entry to Google storage. 312 313 @param dir_entry: Directory entry to offload. 314 @param dest_path: Location in google storage where we will 315 offload the directory. 316 317 """ 318 try: 319 counter = autotest_stats.Counter(STATS_KEY) 320 counter.increment('jobs_offload_started') 321 322 sanitize_dir(dir_entry) 323 324 if LIMIT_FILE_COUNT: 325 limit_file_count(dir_entry) 326 327 error = False 328 stdout_file = tempfile.TemporaryFile('w+') 329 stderr_file = tempfile.TemporaryFile('w+') 330 process = None 331 signal.alarm(OFFLOAD_TIMEOUT_SECS) 332 gs_path = '%s%s' % (gs_uri, dest_path) 333 process = subprocess.Popen( 334 get_cmd_list(multiprocessing, dir_entry, gs_path), 335 stdout=stdout_file, stderr=stderr_file) 336 process.wait() 337 signal.alarm(0) 338 339 if process.returncode == 0: 340 dir_size = get_directory_size_kibibytes(dir_entry) 341 342 counter.increment('kibibytes_transferred_total', 343 dir_size) 344 metadata = { 345 '_type': METADATA_TYPE, 346 'size_KB': dir_size, 347 'result_dir': dir_entry, 348 'drone': socket.gethostname().replace('.', '_') 349 } 350 autotest_stats.Gauge(STATS_KEY, metadata=metadata).send( 351 'kibibytes_transferred', dir_size) 352 counter.increment('jobs_offloaded') 353 shutil.rmtree(dir_entry) 354 else: 355 error = True 356 except TimeoutException: 357 # If we finished the call to Popen(), we may need to 358 # terminate the child process. We don't bother calling 359 # process.poll(); that inherently races because the child 360 # can die any time it wants. 361 if process: 362 try: 363 process.terminate() 364 except OSError: 365 # We don't expect any error other than "No such 366 # process". 367 pass 368 logging.error('Offloading %s timed out after waiting %d ' 369 'seconds.', dir_entry, OFFLOAD_TIMEOUT_SECS) 370 error = True 371 except OSError as e: 372 # The wrong file permission can lead call 373 # `shutil.rmtree(dir_entry)` to raise OSError with message 374 # 'Permission denied'. Details can be found in 375 # crbug.com/536151 376 if e.errno == errno.EACCES: 377 logging.warn('Try to correct file permission of %s.', dir_entry) 378 correct_results_folder_permission(dir_entry) 379 finally: 380 signal.alarm(0) 381 if error: 382 # Rewind the log files for stdout and stderr and log 383 # their contents. 384 stdout_file.seek(0) 385 stderr_file.seek(0) 386 stderr_content = stderr_file.read() 387 logging.error('Error occurred when offloading %s:', 388 dir_entry) 389 logging.error('Stdout:\n%s \nStderr:\n%s', 390 stdout_file.read(), stderr_content) 391 # Some result files may have wrong file permission. Try 392 # to correct such error so later try can success. 393 # TODO(dshi): The code is added to correct result files 394 # with wrong file permission caused by bug 511778. After 395 # this code is pushed to lab and run for a while to 396 # clean up these files, following code and function 397 # correct_results_folder_permission can be deleted. 398 if 'CommandException: Error opening file' in stderr_content: 399 logging.warn('Try to correct file permission of %s.', 400 dir_entry) 401 correct_results_folder_permission(dir_entry) 402 stdout_file.close() 403 stderr_file.close() 404 return offload_dir 405 406 407def delete_files(dir_entry, dest_path): 408 """Simply deletes the dir_entry from the filesystem. 409 410 Uses same arguments as offload_dir so that it can be used in replace 411 of it on systems that only want to delete files instead of 412 offloading them. 413 414 @param dir_entry: Directory entry to offload. 415 @param dest_path: NOT USED. 416 """ 417 shutil.rmtree(dir_entry) 418 419 420def report_offload_failures(joblist): 421 """Generate e-mail notification for failed offloads. 422 423 The e-mail report will include data from all jobs in `joblist`. 424 425 @param joblist List of jobs to be reported in the message. 426 """ 427 def _format_job(job): 428 d = datetime.datetime.fromtimestamp(job.get_failure_time()) 429 data = (d.strftime(ERROR_EMAIL_TIME_FORMAT), 430 job.get_failure_count(), 431 job.get_job_directory()) 432 return ERROR_EMAIL_DIRECTORY_FORMAT % data 433 joblines = [_format_job(job) for job in joblist] 434 joblines.sort() 435 email_subject = ERROR_EMAIL_SUBJECT_FORMAT % socket.gethostname() 436 email_message = ERROR_EMAIL_REPORT_FORMAT + ''.join(joblines) 437 email_manager.manager.send_email(NOTIFY_ADDRESS, email_subject, 438 email_message) 439 440 441def wait_for_gs_write_access(gs_uri): 442 """Verify and wait until we have write access to Google Storage. 443 444 @param gs_uri: The Google Storage URI we are trying to offload to. 445 """ 446 # TODO (sbasi) Try to use the gsutil command to check write access. 447 # Ensure we have write access to gs_uri. 448 dummy_file = tempfile.NamedTemporaryFile() 449 test_cmd = get_cmd_list(False, dummy_file.name, gs_uri) 450 while True: 451 try: 452 subprocess.check_call(test_cmd) 453 subprocess.check_call( 454 ['gsutil', 'rm', 455 os.path.join(gs_uri, 456 os.path.basename(dummy_file.name))]) 457 break 458 except subprocess.CalledProcessError: 459 logging.debug('Unable to offload to %s, sleeping.', gs_uri) 460 time.sleep(120) 461 462 463class Offloader(object): 464 """State of the offload process. 465 466 Contains the following member fields: 467 * _offload_func: Function to call for each attempt to offload 468 a job directory. 469 * _jobdir_classes: List of classes of job directory to be 470 offloaded. 471 * _processes: Maximum number of outstanding offload processes 472 to allow during an offload cycle. 473 * _age_limit: Minimum age in days at which a job may be 474 offloaded. 475 * _open_jobs: a dictionary mapping directory paths to Job 476 objects. 477 * _next_report_time: Earliest time that we should send e-mail 478 if there are failures to be reported. 479 """ 480 481 def __init__(self, options): 482 if options.delete_only: 483 self._offload_func = delete_files 484 else: 485 self.gs_uri = utils.get_offload_gsuri() 486 logging.debug('Offloading to: %s', self.gs_uri) 487 self._offload_func = get_offload_dir_func( 488 self.gs_uri, options.multiprocessing) 489 classlist = [] 490 if options.process_hosts_only or options.process_all: 491 classlist.append(job_directories.SpecialJobDirectory) 492 if not options.process_hosts_only: 493 classlist.append(job_directories.RegularJobDirectory) 494 self._jobdir_classes = classlist 495 assert self._jobdir_classes 496 self._processes = options.parallelism 497 self._age_limit = options.days_old 498 self._open_jobs = {} 499 self._next_report_time = time.time() 500 501 502 def _add_new_jobs(self): 503 """Find new job directories that need offloading. 504 505 Go through the file system looking for valid job directories 506 that are currently not in `self._open_jobs`, and add them in. 507 508 """ 509 new_job_count = 0 510 for cls in self._jobdir_classes: 511 for resultsdir in cls.get_job_directories(): 512 if resultsdir in self._open_jobs: 513 continue 514 self._open_jobs[resultsdir] = cls(resultsdir) 515 new_job_count += 1 516 logging.debug('Start of offload cycle - found %d new jobs', 517 new_job_count) 518 519 520 def _remove_offloaded_jobs(self): 521 """Removed offloaded jobs from `self._open_jobs`.""" 522 removed_job_count = 0 523 for jobkey, job in self._open_jobs.items(): 524 if job.is_offloaded(): 525 del self._open_jobs[jobkey] 526 removed_job_count += 1 527 logging.debug('End of offload cycle - cleared %d new jobs, ' 528 'carrying %d open jobs', 529 removed_job_count, len(self._open_jobs)) 530 531 532 def _have_reportable_errors(self): 533 """Return whether any jobs need reporting via e-mail. 534 535 @return True if there are reportable jobs in `self._open_jobs`, 536 or False otherwise. 537 """ 538 for job in self._open_jobs.values(): 539 if job.is_reportable(): 540 return True 541 return False 542 543 544 def _update_offload_results(self): 545 """Check and report status after attempting offload. 546 547 This function processes all jobs in `self._open_jobs`, assuming 548 an attempt has just been made to offload all of them. 549 550 Any jobs that have been successfully offloaded are removed. 551 552 If any jobs have reportable errors, and we haven't generated 553 an e-mail report in the last `REPORT_INTERVAL_SECS` seconds, 554 send new e-mail describing the failures. 555 556 """ 557 self._remove_offloaded_jobs() 558 if self._have_reportable_errors(): 559 # N.B. We include all jobs that have failed at least once, 560 # which may include jobs that aren't otherwise reportable. 561 failed_jobs = [j for j in self._open_jobs.values() 562 if j.get_failure_time()] 563 logging.debug('Currently there are %d jobs with offload ' 564 'failures', len(failed_jobs)) 565 if time.time() >= self._next_report_time: 566 logging.debug('Reporting failures by e-mail') 567 report_offload_failures(failed_jobs) 568 self._next_report_time = ( 569 time.time() + REPORT_INTERVAL_SECS) 570 571 572 def offload_once(self): 573 """Perform one offload cycle. 574 575 Find all job directories for new jobs that we haven't seen 576 before. Then, attempt to offload the directories for any 577 jobs that have finished running. Offload of multiple jobs 578 is done in parallel, up to `self._processes` at a time. 579 580 After we've tried uploading all directories, go through the list 581 checking the status of all uploaded directories. If necessary, 582 report failures via e-mail. 583 584 """ 585 self._add_new_jobs() 586 with parallel.BackgroundTaskRunner( 587 self._offload_func, processes=self._processes) as queue: 588 for job in self._open_jobs.values(): 589 job.enqueue_offload(queue, self._age_limit) 590 self._update_offload_results() 591 592 593def parse_options(): 594 """Parse the args passed into gs_offloader.""" 595 defaults = 'Defaults:\n Destination: %s\n Results Path: %s' % ( 596 utils.DEFAULT_OFFLOAD_GSURI, RESULTS_DIR) 597 usage = 'usage: %prog [options]\n' + defaults 598 parser = OptionParser(usage) 599 parser.add_option('-a', '--all', dest='process_all', 600 action='store_true', 601 help='Offload all files in the results directory.') 602 parser.add_option('-s', '--hosts', dest='process_hosts_only', 603 action='store_true', 604 help='Offload only the special tasks result files ' 605 'located in the results/hosts subdirectory') 606 parser.add_option('-p', '--parallelism', dest='parallelism', 607 type='int', default=1, 608 help='Number of parallel workers to use.') 609 parser.add_option('-o', '--delete_only', dest='delete_only', 610 action='store_true', 611 help='GS Offloader will only the delete the ' 612 'directories and will not offload them to google ' 613 'storage. NOTE: If global_config variable ' 614 'CROS.gs_offloading_enabled is False, --delete_only ' 615 'is automatically True.', 616 default=not GS_OFFLOADING_ENABLED) 617 parser.add_option('-d', '--days_old', dest='days_old', 618 help='Minimum job age in days before a result can be ' 619 'offloaded.', type='int', default=0) 620 parser.add_option('-l', '--log_size', dest='log_size', 621 help='Limit the offloader logs to a specified ' 622 'number of Mega Bytes.', type='int', default=0) 623 parser.add_option('-m', dest='multiprocessing', action='store_true', 624 help='Turn on -m option for gsutil.', 625 default=False) 626 options = parser.parse_args()[0] 627 if options.process_all and options.process_hosts_only: 628 parser.print_help() 629 print ('Cannot process all files and only the hosts ' 630 'subdirectory. Please remove an argument.') 631 sys.exit(1) 632 return options 633 634 635def main(): 636 """Main method of gs_offloader.""" 637 options = parse_options() 638 639 if options.process_all: 640 offloader_type = 'all' 641 elif options.process_hosts_only: 642 offloader_type = 'hosts' 643 else: 644 offloader_type = 'jobs' 645 646 log_timestamp = time.strftime(LOG_TIMESTAMP_FORMAT) 647 if options.log_size > 0: 648 log_timestamp = '' 649 log_basename = LOG_FILENAME_FORMAT % (offloader_type, log_timestamp) 650 log_filename = os.path.join(LOG_LOCATION, log_basename) 651 log_formatter = logging.Formatter(LOGGING_FORMAT) 652 # Replace the default logging handler with a RotatingFileHandler. If 653 # options.log_size is 0, the file size will not be limited. Keeps 654 # one backup just in case. 655 handler = logging.handlers.RotatingFileHandler( 656 log_filename, maxBytes=1024 * options.log_size, backupCount=1) 657 handler.setFormatter(log_formatter) 658 logger = logging.getLogger() 659 logger.setLevel(logging.DEBUG) 660 logger.addHandler(handler) 661 662 # Nice our process (carried to subprocesses) so we don't overload 663 # the system. 664 logging.debug('Set process to nice value: %d', NICENESS) 665 os.nice(NICENESS) 666 if psutil: 667 proc = psutil.Process() 668 logging.debug('Set process to ionice IDLE') 669 proc.ionice(psutil.IOPRIO_CLASS_IDLE) 670 671 # os.listdir returns relative paths, so change to where we need to 672 # be to avoid an os.path.join on each loop. 673 logging.debug('Offloading Autotest results in %s', RESULTS_DIR) 674 os.chdir(RESULTS_DIR) 675 676 signal.signal(signal.SIGALRM, timeout_handler) 677 678 offloader = Offloader(options) 679 if not options.delete_only: 680 wait_for_gs_write_access(offloader.gs_uri) 681 while True: 682 offloader.offload_once() 683 time.sleep(SLEEP_TIME_SECS) 684 685 686if __name__ == '__main__': 687 main() 688