1#!/usr/bin/python 2# 3# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7"""Script to archive old Autotest results to Google Storage. 8 9Uses gsutil to archive files to the configured Google Storage bucket. 10Upon successful copy, the local results directory is deleted. 11""" 12 13import abc 14try: 15 import cachetools 16except ImportError: 17 cachetools = None 18import datetime 19import errno 20import glob 21import gzip 22import logging 23import logging.handlers 24import os 25import re 26import shutil 27import stat 28import subprocess 29import sys 30import tarfile 31import tempfile 32import time 33 34from optparse import OptionParser 35 36import common 37from autotest_lib.client.common_lib import file_utils 38from autotest_lib.client.common_lib import global_config 39from autotest_lib.client.common_lib import utils 40from autotest_lib.site_utils import job_directories 41# For unittest, the cloud_console.proto is not compiled yet. 42try: 43 from autotest_lib.site_utils import cloud_console_client 44except ImportError: 45 cloud_console_client = None 46from autotest_lib.tko import models 47from autotest_lib.utils import labellib 48from autotest_lib.utils import gslib 49from chromite.lib import timeout_util 50 51# Autotest requires the psutil module from site-packages, so it must be imported 52# after "import common". 53try: 54 # Does not exist, nor is needed, on moblab. 55 import psutil 56except ImportError: 57 psutil = None 58 59from chromite.lib import parallel 60try: 61 from chromite.lib import metrics 62 from chromite.lib import ts_mon_config 63except ImportError: 64 metrics = utils.metrics_mock 65 ts_mon_config = utils.metrics_mock 66 67 68GS_OFFLOADING_ENABLED = global_config.global_config.get_config_value( 69 'CROS', 'gs_offloading_enabled', type=bool, default=True) 70 71# Nice setting for process, the higher the number the lower the priority. 72NICENESS = 10 73 74# Maximum number of seconds to allow for offloading a single 75# directory. 76OFFLOAD_TIMEOUT_SECS = 60 * 60 77 78# Sleep time per loop. 79SLEEP_TIME_SECS = 5 80 81# Minimum number of seconds between e-mail reports. 82REPORT_INTERVAL_SECS = 60 * 60 83 84# Location of Autotest results on disk. 85RESULTS_DIR = '/usr/local/autotest/results' 86FAILED_OFFLOADS_FILE = os.path.join(RESULTS_DIR, 'FAILED_OFFLOADS') 87 88FAILED_OFFLOADS_FILE_HEADER = ''' 89This is the list of gs_offloader failed jobs. 90Last offloader attempt at %s failed to offload %d files. 91Check http://go/cros-triage-gsoffloader to triage the issue 92 93 94First failure Count Directory name 95=================== ====== ============================== 96''' 97# --+----1----+---- ----+ ----+----1----+----2----+----3 98 99FAILED_OFFLOADS_LINE_FORMAT = '%19s %5d %-1s\n' 100FAILED_OFFLOADS_TIME_FORMAT = '%Y-%m-%d %H:%M:%S' 101 102USE_RSYNC_ENABLED = global_config.global_config.get_config_value( 103 'CROS', 'gs_offloader_use_rsync', type=bool, default=False) 104 105LIMIT_FILE_COUNT = global_config.global_config.get_config_value( 106 'CROS', 'gs_offloader_limit_file_count', type=bool, default=False) 107 108# Use multiprocessing for gsutil uploading. 109GS_OFFLOADER_MULTIPROCESSING = global_config.global_config.get_config_value( 110 'CROS', 'gs_offloader_multiprocessing', type=bool, default=False) 111 112D = '[0-9][0-9]' 113TIMESTAMP_PATTERN = '%s%s.%s.%s_%s.%s.%s' % (D, D, D, D, D, D, D) 114CTS_RESULT_PATTERN = 'testResult.xml' 115CTS_V2_RESULT_PATTERN = 'test_result.xml' 116# Google Storage bucket URI to store results in. 117DEFAULT_CTS_RESULTS_GSURI = global_config.global_config.get_config_value( 118 'CROS', 'cts_results_server', default='') 119DEFAULT_CTS_APFE_GSURI = global_config.global_config.get_config_value( 120 'CROS', 'cts_apfe_server', default='') 121DEFAULT_CTS_DELTA_RESULTS_GSURI = global_config.global_config.get_config_value( 122 'CROS', 'ctsdelta_results_server', default='') 123DEFAULT_CTS_DELTA_APFE_GSURI = global_config.global_config.get_config_value( 124 'CROS', 'ctsdelta_apfe_server', default='') 125DEFAULT_CTS_BVT_APFE_GSURI = global_config.global_config.get_config_value( 126 'CROS', 'ctsbvt_apfe_server', default='') 127 128# metadata type 129GS_OFFLOADER_SUCCESS_TYPE = 'gs_offloader_success' 130GS_OFFLOADER_FAILURE_TYPE = 'gs_offloader_failure' 131 132# Autotest test to collect list of CTS tests 133TEST_LIST_COLLECTOR = 'tradefed-run-collect-tests-only' 134 135def _get_metrics_fields(dir_entry): 136 """Get metrics fields for the given test result directory, including board 137 and milestone. 138 139 @param dir_entry: Directory entry to offload. 140 @return A dictionary for the metrics data to be uploaded. 141 """ 142 fields = {'board': 'unknown', 143 'milestone': 'unknown'} 144 if dir_entry: 145 # There could be multiple hosts in the job directory, use the first one 146 # available. 147 for host in glob.glob(os.path.join(dir_entry, '*')): 148 try: 149 keyval = models.test.parse_job_keyval(host) 150 except ValueError: 151 continue 152 build = keyval.get('build') 153 if build: 154 try: 155 cros_version = labellib.parse_cros_version(build) 156 fields['board'] = cros_version.board 157 fields['milestone'] = cros_version.milestone 158 break 159 except ValueError: 160 # Ignore version parsing error so it won't crash 161 # gs_offloader. 162 pass 163 164 return fields; 165 166 167def _get_cmd_list(multiprocessing, dir_entry, gs_path): 168 """Return the command to offload a specified directory. 169 170 @param multiprocessing: True to turn on -m option for gsutil. 171 @param dir_entry: Directory entry/path that which we need a cmd_list 172 to offload. 173 @param gs_path: Location in google storage where we will 174 offload the directory. 175 176 @return A command list to be executed by Popen. 177 """ 178 cmd = ['gsutil'] 179 if multiprocessing: 180 cmd.append('-m') 181 if USE_RSYNC_ENABLED: 182 cmd.append('rsync') 183 target = os.path.join(gs_path, os.path.basename(dir_entry)) 184 else: 185 cmd.append('cp') 186 target = gs_path 187 cmd += ['-eR', dir_entry, target] 188 return cmd 189 190 191def sanitize_dir(dirpath): 192 """Sanitize directory for gs upload. 193 194 Symlinks and FIFOS are converted to regular files to fix bugs. 195 196 @param dirpath: Directory entry to be sanitized. 197 """ 198 if not os.path.exists(dirpath): 199 return 200 _escape_rename(dirpath) 201 _escape_rename_dir_contents(dirpath) 202 _sanitize_fifos(dirpath) 203 _sanitize_symlinks(dirpath) 204 205 206def _escape_rename_dir_contents(dirpath): 207 """Recursively rename directory to escape filenames for gs upload. 208 209 @param dirpath: Directory path string. 210 """ 211 for filename in os.listdir(dirpath): 212 path = os.path.join(dirpath, filename) 213 _escape_rename(path) 214 for filename in os.listdir(dirpath): 215 path = os.path.join(dirpath, filename) 216 if os.path.isdir(path): 217 _escape_rename_dir_contents(path) 218 219 220def _escape_rename(path): 221 """Rename file to escape filenames for gs upload. 222 223 @param path: File path string. 224 """ 225 dirpath, filename = os.path.split(path) 226 sanitized_filename = gslib.escape(filename) 227 sanitized_path = os.path.join(dirpath, sanitized_filename) 228 os.rename(path, sanitized_path) 229 230 231def _sanitize_fifos(dirpath): 232 """Convert fifos to regular files (fixes crbug.com/684122). 233 234 @param dirpath: Directory path string. 235 """ 236 for root, _, files in os.walk(dirpath): 237 for filename in files: 238 path = os.path.join(root, filename) 239 file_stat = os.lstat(path) 240 if stat.S_ISFIFO(file_stat.st_mode): 241 _replace_fifo_with_file(path) 242 243 244def _replace_fifo_with_file(path): 245 """Replace a fifo with a normal file. 246 247 @param path: Fifo path string. 248 """ 249 logging.debug('Removing fifo %s', path) 250 os.remove(path) 251 logging.debug('Creating fifo marker %s', path) 252 with open(path, 'w') as f: 253 f.write('<FIFO>') 254 255 256def _sanitize_symlinks(dirpath): 257 """Convert Symlinks to regular files (fixes crbug.com/692788). 258 259 @param dirpath: Directory path string. 260 """ 261 for root, _, files in os.walk(dirpath): 262 for filename in files: 263 path = os.path.join(root, filename) 264 file_stat = os.lstat(path) 265 if stat.S_ISLNK(file_stat.st_mode): 266 _replace_symlink_with_file(path) 267 268 269def _replace_symlink_with_file(path): 270 """Replace a symlink with a normal file. 271 272 @param path: Symlink path string. 273 """ 274 target = os.readlink(path) 275 logging.debug('Removing symlink %s', path) 276 os.remove(path) 277 logging.debug('Creating symlink marker %s', path) 278 with open(path, 'w') as f: 279 f.write('<symlink to %s>' % target) 280 281 282# Maximum number of files in the folder. 283_MAX_FILE_COUNT = 3000 284_FOLDERS_NEVER_ZIP = ['debug', 'ssp_logs', 'autoupdate_logs'] 285 286 287def _get_zippable_folders(dir_entry): 288 folders_list = [] 289 for folder in os.listdir(dir_entry): 290 folder_path = os.path.join(dir_entry, folder) 291 if (not os.path.isfile(folder_path) and 292 not folder in _FOLDERS_NEVER_ZIP): 293 folders_list.append(folder_path) 294 return folders_list 295 296 297def limit_file_count(dir_entry): 298 """Limit the number of files in given directory. 299 300 The method checks the total number of files in the given directory. 301 If the number is greater than _MAX_FILE_COUNT, the method will 302 compress each folder in the given directory, except folders in 303 _FOLDERS_NEVER_ZIP. 304 305 @param dir_entry: Directory entry to be checked. 306 """ 307 try: 308 count = _count_files(dir_entry) 309 except ValueError: 310 logging.warning('Fail to get the file count in folder %s.', dir_entry) 311 return 312 if count < _MAX_FILE_COUNT: 313 return 314 315 # For test job, zip folders in a second level, e.g. 123-debug/host1. 316 # This is to allow autoserv debug folder still be accessible. 317 # For special task, it does not need to dig one level deeper. 318 is_special_task = re.match(job_directories.SPECIAL_TASK_PATTERN, 319 dir_entry) 320 321 folders = _get_zippable_folders(dir_entry) 322 if not is_special_task: 323 subfolders = [] 324 for folder in folders: 325 subfolders.extend(_get_zippable_folders(folder)) 326 folders = subfolders 327 328 for folder in folders: 329 _make_into_tarball(folder) 330 331 332def _count_files(dirpath): 333 """Count the number of files in a directory recursively. 334 335 @param dirpath: Directory path string. 336 """ 337 return sum(len(files) for _path, _dirs, files in os.walk(dirpath)) 338 339 340def _make_into_tarball(dirpath): 341 """Make directory into tarball. 342 343 @param dirpath: Directory path string. 344 """ 345 tarpath = '%s.tgz' % dirpath 346 with tarfile.open(tarpath, 'w:gz') as tar: 347 tar.add(dirpath, arcname=os.path.basename(dirpath)) 348 shutil.rmtree(dirpath) 349 350 351def correct_results_folder_permission(dir_entry): 352 """Make sure the results folder has the right permission settings. 353 354 For tests running with server-side packaging, the results folder has 355 the owner of root. This must be changed to the user running the 356 autoserv process, so parsing job can access the results folder. 357 358 @param dir_entry: Path to the results folder. 359 """ 360 if not dir_entry: 361 return 362 363 logging.info('Trying to correct file permission of %s.', dir_entry) 364 try: 365 owner = '%s:%s' % (os.getuid(), os.getgid()) 366 subprocess.check_call( 367 ['sudo', '-n', 'chown', '-R', owner, dir_entry]) 368 subprocess.check_call(['chmod', '-R', 'u+r', dir_entry]) 369 subprocess.check_call( 370 ['find', dir_entry, '-type', 'd', 371 '-exec', 'chmod', 'u+x', '{}', ';']) 372 except subprocess.CalledProcessError as e: 373 logging.error('Failed to modify permission for %s: %s', 374 dir_entry, e) 375 376 377def _upload_cts_testresult(dir_entry, multiprocessing): 378 """Upload test results to separate gs buckets. 379 380 Upload testResult.xml.gz/test_result.xml.gz file to cts_results_bucket. 381 Upload timestamp.zip to cts_apfe_bucket. 382 383 @param dir_entry: Path to the results folder. 384 @param multiprocessing: True to turn on -m option for gsutil. 385 """ 386 for host in glob.glob(os.path.join(dir_entry, '*')): 387 cts_path = os.path.join(host, 'cheets_CTS.*', 'results', '*', 388 TIMESTAMP_PATTERN) 389 cts_v2_path = os.path.join(host, 'cheets_CTS_*', 'results', '*', 390 TIMESTAMP_PATTERN) 391 gts_v2_path = os.path.join(host, 'cheets_GTS*', 'results', '*', 392 TIMESTAMP_PATTERN) 393 for result_path, result_pattern in [(cts_path, CTS_RESULT_PATTERN), 394 (cts_v2_path, CTS_V2_RESULT_PATTERN), 395 (gts_v2_path, CTS_V2_RESULT_PATTERN)]: 396 for path in glob.glob(result_path): 397 try: 398 # CTS results from bvt-arc suites need to be only uploaded 399 # to APFE from its designated gs bucket for early EDI 400 # entries in APFE. These results need to copied only into 401 # APFE bucket. Copying to results bucket is not required. 402 if 'bvt-arc' in path: 403 _upload_files(host, path, result_pattern, 404 multiprocessing, 405 None, 406 DEFAULT_CTS_BVT_APFE_GSURI) 407 return 408 # Non-bvt CTS results need to be uploaded to standard gs 409 # buckets. 410 _upload_files(host, path, result_pattern, 411 multiprocessing, 412 DEFAULT_CTS_RESULTS_GSURI, 413 DEFAULT_CTS_APFE_GSURI) 414 # TODO(rohitbm): make better comparison using regex. 415 # plan_follower CTS results go to plan_follower specific 416 # gs buckets apart from standard gs buckets. 417 if 'plan_follower' in path: 418 _upload_files(host, path, result_pattern, 419 multiprocessing, 420 DEFAULT_CTS_DELTA_RESULTS_GSURI, 421 DEFAULT_CTS_DELTA_APFE_GSURI) 422 except Exception as e: 423 logging.error('ERROR uploading test results %s to GS: %s', 424 path, e) 425 426 427def _is_valid_result(build, result_pattern, suite): 428 """Check if the result should be uploaded to CTS/GTS buckets. 429 430 @param build: Builder name. 431 @param result_pattern: XML result file pattern. 432 @param suite: Test suite name. 433 434 @returns: Bool flag indicating whether a valid result. 435 """ 436 if build is None or suite is None: 437 return False 438 439 # Not valid if it's not a release build. 440 if not re.match(r'(?!trybot-).*-release/.*', build): 441 return False 442 443 # Not valid if it's cts result but not 'arc-cts*' or 'test_that_wrapper' 444 # suite. 445 result_patterns = [CTS_RESULT_PATTERN, CTS_V2_RESULT_PATTERN] 446 if result_pattern in result_patterns and not ( 447 suite.startswith('arc-cts') or 448 suite.startswith('arc-gts') or 449 suite.startswith('bvt-arc') or 450 suite.startswith('test_that_wrapper')): 451 return False 452 453 return True 454 455 456def _is_test_collector(package): 457 """Returns true if the test run is just to collect list of CTS tests. 458 459 @param package: Autotest package name. e.g. cheets_CTS_N.CtsGraphicsTestCase 460 461 @return Bool flag indicating a test package is CTS list generator or not. 462 """ 463 return TEST_LIST_COLLECTOR in package 464 465 466def _upload_files(host, path, result_pattern, multiprocessing, 467 result_gs_bucket, apfe_gs_bucket): 468 keyval = models.test.parse_job_keyval(host) 469 build = keyval.get('build') 470 suite = keyval.get('suite') 471 472 if not _is_valid_result(build, result_pattern, suite): 473 # No need to upload current folder, return. 474 return 475 476 parent_job_id = str(keyval['parent_job_id']) 477 478 folders = path.split(os.sep) 479 job_id = folders[-6] 480 package = folders[-4] 481 timestamp = folders[-1] 482 483 # Results produced by CTS test list collector are dummy results. 484 # They don't need to be copied to APFE bucket which is mainly being used for 485 # CTS APFE submission. 486 if not _is_test_collector(package): 487 # Path: bucket/build/parent_job_id/cheets_CTS.*/job_id_timestamp/ 488 # or bucket/build/parent_job_id/cheets_GTS.*/job_id_timestamp/ 489 cts_apfe_gs_path = os.path.join( 490 apfe_gs_bucket, build, parent_job_id, 491 package, job_id + '_' + timestamp) + '/' 492 493 for zip_file in glob.glob(os.path.join('%s.zip' % path)): 494 utils.run(' '.join(_get_cmd_list( 495 multiprocessing, zip_file, cts_apfe_gs_path))) 496 logging.debug('Upload %s to %s ', zip_file, cts_apfe_gs_path) 497 else: 498 logging.debug('%s is a CTS Test collector Autotest test run.', package) 499 logging.debug('Skipping CTS results upload to APFE gs:// bucket.') 500 501 if result_gs_bucket: 502 # Path: bucket/cheets_CTS.*/job_id_timestamp/ 503 # or bucket/cheets_GTS.*/job_id_timestamp/ 504 test_result_gs_path = os.path.join( 505 result_gs_bucket, package, job_id + '_' + timestamp) + '/' 506 507 for test_result_file in glob.glob(os.path.join(path, result_pattern)): 508 # gzip test_result_file(testResult.xml/test_result.xml) 509 510 test_result_file_gz = '%s.gz' % test_result_file 511 with open(test_result_file, 'r') as f_in, ( 512 gzip.open(test_result_file_gz, 'w')) as f_out: 513 shutil.copyfileobj(f_in, f_out) 514 utils.run(' '.join(_get_cmd_list( 515 multiprocessing, test_result_file_gz, test_result_gs_path))) 516 logging.debug('Zip and upload %s to %s', 517 test_result_file_gz, test_result_gs_path) 518 # Remove test_result_file_gz(testResult.xml.gz/test_result.xml.gz) 519 os.remove(test_result_file_gz) 520 521 522def _emit_gs_returncode_metric(returncode): 523 """Increment the gs_returncode counter based on |returncode|.""" 524 m_gs_returncode = 'chromeos/autotest/gs_offloader/gs_returncode' 525 rcode = int(returncode) 526 if rcode < 0 or rcode > 255: 527 rcode = -1 528 metrics.Counter(m_gs_returncode).increment(fields={'return_code': rcode}) 529 530 531def _handle_dir_os_error(dir_entry, fix_permission=False): 532 """Try to fix the result directory's permission issue if needed. 533 534 @param dir_entry: Directory entry to offload. 535 @param fix_permission: True to change the directory's owner to the same one 536 running gs_offloader. 537 """ 538 if fix_permission: 539 correct_results_folder_permission(dir_entry) 540 m_permission_error = ('chromeos/autotest/errors/gs_offloader/' 541 'wrong_permissions_count') 542 metrics_fields = _get_metrics_fields(dir_entry) 543 metrics.Counter(m_permission_error).increment(fields=metrics_fields) 544 545 546class BaseGSOffloader(object): 547 548 """Google Storage offloader interface.""" 549 550 __metaclass__ = abc.ABCMeta 551 552 def offload(self, dir_entry, dest_path, job_complete_time): 553 """Safely offload a directory entry to Google Storage. 554 555 This method is responsible for copying the contents of 556 `dir_entry` to Google storage at `dest_path`. 557 558 When successful, the method must delete all of `dir_entry`. 559 On failure, `dir_entry` should be left undisturbed, in order 560 to allow for retry. 561 562 Errors are conveyed simply and solely by two methods: 563 * At the time of failure, write enough information to the log 564 to allow later debug, if necessary. 565 * Don't delete the content. 566 567 In order to guarantee robustness, this method must not raise any 568 exceptions. 569 570 @param dir_entry: Directory entry to offload. 571 @param dest_path: Location in google storage where we will 572 offload the directory. 573 @param job_complete_time: The complete time of the job from the AFE 574 database. 575 """ 576 try: 577 self._full_offload(dir_entry, dest_path, job_complete_time) 578 except Exception as e: 579 logging.debug('Exception in offload for %s', dir_entry) 580 logging.debug('Ignoring this error: %s', str(e)) 581 582 @abc.abstractmethod 583 def _full_offload(self, dir_entry, dest_path, job_complete_time): 584 """Offload a directory entry to Google Storage. 585 586 This method implements the actual offload behavior of its 587 subclass. To guarantee effective debug, this method should 588 catch all exceptions, and perform any reasonable diagnosis 589 or other handling. 590 591 @param dir_entry: Directory entry to offload. 592 @param dest_path: Location in google storage where we will 593 offload the directory. 594 @param job_complete_time: The complete time of the job from the AFE 595 database. 596 """ 597 598 599class GSOffloader(BaseGSOffloader): 600 """Google Storage Offloader.""" 601 602 def __init__(self, gs_uri, multiprocessing, delete_age, 603 console_client=None): 604 """Returns the offload directory function for the given gs_uri 605 606 @param gs_uri: Google storage bucket uri to offload to. 607 @param multiprocessing: True to turn on -m option for gsutil. 608 @param console_client: The cloud console client. If None, 609 cloud console APIs are not called. 610 """ 611 self._gs_uri = gs_uri 612 self._multiprocessing = multiprocessing 613 self._delete_age = delete_age 614 self._console_client = console_client 615 616 @metrics.SecondsTimerDecorator( 617 'chromeos/autotest/gs_offloader/job_offload_duration') 618 def _full_offload(self, dir_entry, dest_path, job_complete_time): 619 """Offload the specified directory entry to Google storage. 620 621 @param dir_entry: Directory entry to offload. 622 @param dest_path: Location in google storage where we will 623 offload the directory. 624 @param job_complete_time: The complete time of the job from the AFE 625 database. 626 """ 627 with tempfile.TemporaryFile('w+') as stdout_file, \ 628 tempfile.TemporaryFile('w+') as stderr_file: 629 try: 630 try: 631 self._try_offload(dir_entry, dest_path, stdout_file, 632 stderr_file) 633 except OSError as e: 634 # Correct file permission error of the directory, then raise 635 # the exception so gs_offloader can retry later. 636 _handle_dir_os_error(dir_entry, e.errno==errno.EACCES) 637 # Try again after the permission issue is fixed. 638 self._try_offload(dir_entry, dest_path, stdout_file, 639 stderr_file) 640 except _OffloadError as e: 641 metrics_fields = _get_metrics_fields(dir_entry) 642 m_any_error = 'chromeos/autotest/errors/gs_offloader/any_error' 643 metrics.Counter(m_any_error).increment(fields=metrics_fields) 644 645 # Rewind the log files for stdout and stderr and log 646 # their contents. 647 stdout_file.seek(0) 648 stderr_file.seek(0) 649 stderr_content = stderr_file.read() 650 logging.warning('Error occurred when offloading %s:', dir_entry) 651 logging.warning('Stdout:\n%s \nStderr:\n%s', stdout_file.read(), 652 stderr_content) 653 654 # Some result files may have wrong file permission. Try 655 # to correct such error so later try can success. 656 # TODO(dshi): The code is added to correct result files 657 # with wrong file permission caused by bug 511778. After 658 # this code is pushed to lab and run for a while to 659 # clean up these files, following code and function 660 # correct_results_folder_permission can be deleted. 661 if 'CommandException: Error opening file' in stderr_content: 662 correct_results_folder_permission(dir_entry) 663 else: 664 self._prune(dir_entry, job_complete_time) 665 666 def _try_offload(self, dir_entry, dest_path, 667 stdout_file, stderr_file): 668 """Offload the specified directory entry to Google storage. 669 670 @param dir_entry: Directory entry to offload. 671 @param dest_path: Location in google storage where we will 672 offload the directory. 673 @param job_complete_time: The complete time of the job from the AFE 674 database. 675 @param stdout_file: Log file. 676 @param stderr_file: Log file. 677 """ 678 if _is_uploaded(dir_entry): 679 return 680 start_time = time.time() 681 metrics_fields = _get_metrics_fields(dir_entry) 682 error_obj = _OffloadError(start_time) 683 try: 684 sanitize_dir(dir_entry) 685 if DEFAULT_CTS_RESULTS_GSURI: 686 _upload_cts_testresult(dir_entry, self._multiprocessing) 687 688 if LIMIT_FILE_COUNT: 689 limit_file_count(dir_entry) 690 691 process = None 692 with timeout_util.Timeout(OFFLOAD_TIMEOUT_SECS): 693 gs_path = '%s%s' % (self._gs_uri, dest_path) 694 cmd = _get_cmd_list(self._multiprocessing, dir_entry, gs_path) 695 logging.debug('Attempting an offload command %s', cmd) 696 process = subprocess.Popen( 697 cmd, stdout=stdout_file, stderr=stderr_file) 698 process.wait() 699 logging.debug('Offload command %s completed.', cmd) 700 701 _emit_gs_returncode_metric(process.returncode) 702 if process.returncode != 0: 703 raise error_obj 704 _emit_offload_metrics(dir_entry) 705 706 if self._console_client: 707 gcs_uri = os.path.join(gs_path, 708 os.path.basename(dir_entry)) 709 if not self._console_client.send_test_job_offloaded_message( 710 gcs_uri): 711 raise error_obj 712 713 _mark_uploaded(dir_entry) 714 except timeout_util.TimeoutError: 715 m_timeout = 'chromeos/autotest/errors/gs_offloader/timed_out_count' 716 metrics.Counter(m_timeout).increment(fields=metrics_fields) 717 # If we finished the call to Popen(), we may need to 718 # terminate the child process. We don't bother calling 719 # process.poll(); that inherently races because the child 720 # can die any time it wants. 721 if process: 722 try: 723 process.terminate() 724 except OSError: 725 # We don't expect any error other than "No such 726 # process". 727 pass 728 logging.error('Offloading %s timed out after waiting %d ' 729 'seconds.', dir_entry, OFFLOAD_TIMEOUT_SECS) 730 raise error_obj 731 732 def _prune(self, dir_entry, job_complete_time): 733 """Prune directory if it is uploaded and expired. 734 735 @param dir_entry: Directory entry to offload. 736 @param job_complete_time: The complete time of the job from the AFE 737 database. 738 """ 739 if not (_is_uploaded(dir_entry) 740 and job_directories.is_job_expired(self._delete_age, 741 job_complete_time)): 742 return 743 try: 744 logging.debug('Pruning uploaded directory %s', dir_entry) 745 shutil.rmtree(dir_entry) 746 job_timestamp_cache.delete(dir_entry) 747 except OSError as e: 748 # The wrong file permission can lead call `shutil.rmtree(dir_entry)` 749 # to raise OSError with message 'Permission denied'. Details can be 750 # found in crbug.com/536151 751 _handle_dir_os_error(dir_entry, e.errno==errno.EACCES) 752 # Try again after the permission issue is fixed. 753 shutil.rmtree(dir_entry) 754 755 756class _OffloadError(Exception): 757 """Google Storage offload failed.""" 758 759 def __init__(self, start_time): 760 super(_OffloadError, self).__init__(start_time) 761 self.start_time = start_time 762 763 764 765class FakeGSOffloader(BaseGSOffloader): 766 767 """Fake Google Storage Offloader that only deletes directories.""" 768 769 def _full_offload(self, dir_entry, dest_path, job_complete_time): 770 """Pretend to offload a directory and delete it. 771 772 @param dir_entry: Directory entry to offload. 773 @param dest_path: Location in google storage where we will 774 offload the directory. 775 @param job_complete_time: The complete time of the job from the AFE 776 database. 777 """ 778 shutil.rmtree(dir_entry) 779 780 781class OptionalMemoryCache(object): 782 """Implements memory cache if cachetools module can be loaded. 783 784 If the platform has cachetools available then the cache will 785 be created, otherwise the get calls will always act as if there 786 was a cache miss and the set/delete will be no-ops. 787 """ 788 cache = None 789 790 def setup(self, age_to_delete): 791 """Set up a TTL cache size based on how long the job will be handled. 792 793 Autotest jobs are handled by gs_offloader until they are deleted from 794 local storage, base the cache size on how long that is. 795 796 @param age_to_delete: Number of days after which items in the cache 797 should expire. 798 """ 799 if cachetools: 800 # Min cache is 1000 items for 10 mins. If the age to delete is 0 801 # days you still want a short / small cache. 802 # 2000 items is a good approximation for the max number of jobs a 803 # moblab # can produce in a day, lab offloads immediatly so 804 # the number of carried jobs should be very small in the normal 805 # case. 806 ttl = max(age_to_delete * 24 * 60 * 60, 600) 807 maxsize = max(age_to_delete * 2000, 1000) 808 job_timestamp_cache.cache = cachetools.TTLCache(maxsize=maxsize, 809 ttl=ttl) 810 811 def get(self, key): 812 """If we have a cache try to retrieve from it.""" 813 if self.cache is not None: 814 result = self.cache.get(key) 815 return result 816 return None 817 818 def add(self, key, value): 819 """If we have a cache try to store key/value.""" 820 if self.cache is not None: 821 self.cache[key] = value 822 823 def delete(self, key): 824 """If we have a cache try to remove a key.""" 825 if self.cache is not None: 826 return self.cache.delete(key) 827 828 829job_timestamp_cache = OptionalMemoryCache() 830 831 832def _cached_get_timestamp_if_finished(job): 833 """Retrieve a job finished timestamp from cache or AFE. 834 @param job _JobDirectory instance to retrieve 835 finished timestamp of.. 836 837 @returns: None if the job is not finished, or the 838 last job finished time recorded by Autotest. 839 """ 840 job_timestamp = job_timestamp_cache.get(job.dirname) 841 if not job_timestamp: 842 job_timestamp = job.get_timestamp_if_finished() 843 if job_timestamp: 844 job_timestamp_cache.add(job.dirname, job_timestamp) 845 return job_timestamp 846 847 848def _is_expired(job, age_limit): 849 """Return whether job directory is expired for uploading 850 851 @param job: _JobDirectory instance. 852 @param age_limit: Minimum age in days at which a job may be offloaded. 853 """ 854 job_timestamp = _cached_get_timestamp_if_finished(job) 855 if not job_timestamp: 856 return False 857 return job_directories.is_job_expired(age_limit, job_timestamp) 858 859 860def _emit_offload_metrics(dirpath): 861 """Emit gs offload metrics. 862 863 @param dirpath: Offloaded directory path. 864 """ 865 dir_size = file_utils.get_directory_size_kibibytes(dirpath) 866 metrics_fields = _get_metrics_fields(dirpath) 867 868 m_offload_count = ( 869 'chromeos/autotest/gs_offloader/jobs_offloaded') 870 metrics.Counter(m_offload_count).increment( 871 fields=metrics_fields) 872 m_offload_size = ('chromeos/autotest/gs_offloader/' 873 'kilobytes_transferred') 874 metrics.Counter(m_offload_size).increment_by( 875 dir_size, fields=metrics_fields) 876 877 878def _is_uploaded(dirpath): 879 """Return whether directory has been uploaded. 880 881 @param dirpath: Directory path string. 882 """ 883 return os.path.isfile(_get_uploaded_marker_file(dirpath)) 884 885 886def _mark_uploaded(dirpath): 887 """Mark directory as uploaded. 888 889 @param dirpath: Directory path string. 890 """ 891 logging.debug('Creating uploaded marker for directory %s', dirpath) 892 with open(_get_uploaded_marker_file(dirpath), 'a'): 893 pass 894 895 896def _get_uploaded_marker_file(dirpath): 897 """Return path to upload marker file for directory. 898 899 @param dirpath: Directory path string. 900 """ 901 return '%s/.GS_UPLOADED' % (dirpath,) 902 903 904def _format_job_for_failure_reporting(job): 905 """Formats a _JobDirectory for reporting / logging. 906 907 @param job: The _JobDirectory to format. 908 """ 909 d = datetime.datetime.fromtimestamp(job.first_offload_start) 910 data = (d.strftime(FAILED_OFFLOADS_TIME_FORMAT), 911 job.offload_count, 912 job.dirname) 913 return FAILED_OFFLOADS_LINE_FORMAT % data 914 915 916def wait_for_gs_write_access(gs_uri): 917 """Verify and wait until we have write access to Google Storage. 918 919 @param gs_uri: The Google Storage URI we are trying to offload to. 920 """ 921 # TODO (sbasi) Try to use the gsutil command to check write access. 922 # Ensure we have write access to gs_uri. 923 dummy_file = tempfile.NamedTemporaryFile() 924 test_cmd = _get_cmd_list(False, dummy_file.name, gs_uri) 925 while True: 926 logging.debug('Checking for write access with dummy file %s', 927 dummy_file.name) 928 try: 929 subprocess.check_call(test_cmd) 930 subprocess.check_call( 931 ['gsutil', 'rm', 932 os.path.join(gs_uri, 933 os.path.basename(dummy_file.name))]) 934 break 935 except subprocess.CalledProcessError: 936 t = 120 937 logging.debug('Unable to offload dummy file to %s, sleeping for %s ' 938 'seconds.', gs_uri, t) 939 time.sleep(t) 940 logging.debug('Dummy file write check to gs succeeded.') 941 942 943class Offloader(object): 944 """State of the offload process. 945 946 Contains the following member fields: 947 * _gs_offloader: _BaseGSOffloader to use to offload a job directory. 948 * _jobdir_classes: List of classes of job directory to be 949 offloaded. 950 * _processes: Maximum number of outstanding offload processes 951 to allow during an offload cycle. 952 * _age_limit: Minimum age in days at which a job may be 953 offloaded. 954 * _open_jobs: a dictionary mapping directory paths to Job 955 objects. 956 """ 957 958 def __init__(self, options): 959 self._upload_age_limit = options.age_to_upload 960 self._delete_age_limit = options.age_to_delete 961 if options.delete_only: 962 self._gs_offloader = FakeGSOffloader() 963 else: 964 self.gs_uri = utils.get_offload_gsuri() 965 logging.debug('Offloading to: %s', self.gs_uri) 966 multiprocessing = False 967 if options.multiprocessing: 968 multiprocessing = True 969 elif options.multiprocessing is None: 970 multiprocessing = GS_OFFLOADER_MULTIPROCESSING 971 logging.info( 972 'Offloader multiprocessing is set to:%r', multiprocessing) 973 console_client = None 974 if (cloud_console_client and 975 cloud_console_client.is_cloud_notification_enabled()): 976 console_client = cloud_console_client.PubSubBasedClient() 977 self._gs_offloader = GSOffloader( 978 self.gs_uri, multiprocessing, self._delete_age_limit, 979 console_client) 980 classlist = [ 981 job_directories.SwarmingJobDirectory, 982 ] 983 if options.process_hosts_only or options.process_all: 984 classlist.append(job_directories.SpecialJobDirectory) 985 if not options.process_hosts_only: 986 classlist.append(job_directories.RegularJobDirectory) 987 self._jobdir_classes = classlist 988 assert self._jobdir_classes 989 self._processes = options.parallelism 990 self._open_jobs = {} 991 self._pusub_topic = None 992 self._offload_count_limit = 3 993 994 995 def _add_new_jobs(self): 996 """Find new job directories that need offloading. 997 998 Go through the file system looking for valid job directories 999 that are currently not in `self._open_jobs`, and add them in. 1000 1001 """ 1002 new_job_count = 0 1003 for cls in self._jobdir_classes: 1004 for resultsdir in cls.get_job_directories(): 1005 if resultsdir in self._open_jobs: 1006 continue 1007 self._open_jobs[resultsdir] = cls(resultsdir) 1008 new_job_count += 1 1009 logging.debug('Start of offload cycle - found %d new jobs', 1010 new_job_count) 1011 1012 1013 def _remove_offloaded_jobs(self): 1014 """Removed offloaded jobs from `self._open_jobs`.""" 1015 removed_job_count = 0 1016 for jobkey, job in self._open_jobs.items(): 1017 if ( 1018 not os.path.exists(job.dirname) 1019 or _is_uploaded(job.dirname)): 1020 del self._open_jobs[jobkey] 1021 removed_job_count += 1 1022 logging.debug('End of offload cycle - cleared %d jobs, ' 1023 'carrying %d open jobs', 1024 removed_job_count, len(self._open_jobs)) 1025 1026 1027 def _report_failed_jobs(self): 1028 """Report status after attempting offload. 1029 1030 This function processes all jobs in `self._open_jobs`, assuming 1031 an attempt has just been made to offload all of them. 1032 1033 If any jobs have reportable errors, and we haven't generated 1034 an e-mail report in the last `REPORT_INTERVAL_SECS` seconds, 1035 send new e-mail describing the failures. 1036 1037 """ 1038 failed_jobs = [j for j in self._open_jobs.values() if 1039 j.first_offload_start] 1040 self._report_failed_jobs_count(failed_jobs) 1041 self._log_failed_jobs_locally(failed_jobs) 1042 1043 1044 def offload_once(self): 1045 """Perform one offload cycle. 1046 1047 Find all job directories for new jobs that we haven't seen 1048 before. Then, attempt to offload the directories for any 1049 jobs that have finished running. Offload of multiple jobs 1050 is done in parallel, up to `self._processes` at a time. 1051 1052 After we've tried uploading all directories, go through the list 1053 checking the status of all uploaded directories. If necessary, 1054 report failures via e-mail. 1055 1056 """ 1057 self._add_new_jobs() 1058 self._report_current_jobs_count() 1059 with parallel.BackgroundTaskRunner( 1060 self._gs_offloader.offload, processes=self._processes) as queue: 1061 for job in self._open_jobs.values(): 1062 _enqueue_offload(job, queue, self._upload_age_limit) 1063 self._give_up_on_jobs_over_limit() 1064 self._remove_offloaded_jobs() 1065 self._report_failed_jobs() 1066 1067 1068 def _give_up_on_jobs_over_limit(self): 1069 """Give up on jobs that have gone over the offload limit. 1070 1071 We mark them as uploaded as we won't try to offload them any more. 1072 """ 1073 for job in self._open_jobs.values(): 1074 if job.offload_count >= self._offload_count_limit: 1075 _mark_uploaded(job.dirname) 1076 1077 1078 def _log_failed_jobs_locally(self, failed_jobs, 1079 log_file=FAILED_OFFLOADS_FILE): 1080 """Updates a local file listing all the failed jobs. 1081 1082 The dropped file can be used by the developers to list jobs that we have 1083 failed to upload. 1084 1085 @param failed_jobs: A list of failed _JobDirectory objects. 1086 @param log_file: The file to log the failed jobs to. 1087 """ 1088 now = datetime.datetime.now() 1089 now_str = now.strftime(FAILED_OFFLOADS_TIME_FORMAT) 1090 formatted_jobs = [_format_job_for_failure_reporting(job) 1091 for job in failed_jobs] 1092 formatted_jobs.sort() 1093 1094 with open(log_file, 'w') as logfile: 1095 logfile.write(FAILED_OFFLOADS_FILE_HEADER % 1096 (now_str, len(failed_jobs))) 1097 logfile.writelines(formatted_jobs) 1098 1099 1100 def _report_current_jobs_count(self): 1101 """Report the number of outstanding jobs to monarch.""" 1102 metrics.Gauge('chromeos/autotest/gs_offloader/current_jobs_count').set( 1103 len(self._open_jobs)) 1104 1105 1106 def _report_failed_jobs_count(self, failed_jobs): 1107 """Report the number of outstanding failed offload jobs to monarch. 1108 1109 @param: List of failed jobs. 1110 """ 1111 metrics.Gauge('chromeos/autotest/gs_offloader/failed_jobs_count').set( 1112 len(failed_jobs)) 1113 1114 1115def _enqueue_offload(job, queue, age_limit): 1116 """Enqueue the job for offload, if it's eligible. 1117 1118 The job is eligible for offloading if the database has marked 1119 it finished, and the job is older than the `age_limit` 1120 parameter. 1121 1122 If the job is eligible, offload processing is requested by 1123 passing the `queue` parameter's `put()` method a sequence with 1124 the job's `dirname` attribute and its directory name. 1125 1126 @param job _JobDirectory instance to offload. 1127 @param queue If the job should be offloaded, put the offload 1128 parameters into this queue for processing. 1129 @param age_limit Minimum age for a job to be offloaded. A value 1130 of 0 means that the job will be offloaded as 1131 soon as it is finished. 1132 1133 """ 1134 if not job.offload_count: 1135 if not _is_expired(job, age_limit): 1136 return 1137 job.first_offload_start = time.time() 1138 job.offload_count += 1 1139 if job.process_gs_instructions(): 1140 timestamp = _cached_get_timestamp_if_finished(job) 1141 queue.put([job.dirname, os.path.dirname(job.dirname), timestamp]) 1142 1143 1144def parse_options(): 1145 """Parse the args passed into gs_offloader.""" 1146 defaults = 'Defaults:\n Destination: %s\n Results Path: %s' % ( 1147 utils.DEFAULT_OFFLOAD_GSURI, RESULTS_DIR) 1148 usage = 'usage: %prog [options]\n' + defaults 1149 parser = OptionParser(usage) 1150 parser.add_option('-a', '--all', dest='process_all', 1151 action='store_true', 1152 help='Offload all files in the results directory.') 1153 parser.add_option('-s', '--hosts', dest='process_hosts_only', 1154 action='store_true', 1155 help='Offload only the special tasks result files ' 1156 'located in the results/hosts subdirectory') 1157 parser.add_option('-p', '--parallelism', dest='parallelism', 1158 type='int', default=1, 1159 help='Number of parallel workers to use.') 1160 parser.add_option('-o', '--delete_only', dest='delete_only', 1161 action='store_true', 1162 help='GS Offloader will only the delete the ' 1163 'directories and will not offload them to google ' 1164 'storage. NOTE: If global_config variable ' 1165 'CROS.gs_offloading_enabled is False, --delete_only ' 1166 'is automatically True.', 1167 default=not GS_OFFLOADING_ENABLED) 1168 parser.add_option('-d', '--days_old', dest='days_old', 1169 help='Minimum job age in days before a result can be ' 1170 'offloaded.', type='int', default=0) 1171 parser.add_option('-l', '--log_size', dest='log_size', 1172 help='Limit the offloader logs to a specified ' 1173 'number of Mega Bytes.', type='int', default=0) 1174 parser.add_option('-m', dest='multiprocessing', action='store_true', 1175 help='Turn on -m option for gsutil. If not set, the ' 1176 'global config setting gs_offloader_multiprocessing ' 1177 'under CROS section is applied.') 1178 parser.add_option('-i', '--offload_once', dest='offload_once', 1179 action='store_true', 1180 help='Upload all available results and then exit.') 1181 parser.add_option('-y', '--normal_priority', dest='normal_priority', 1182 action='store_true', 1183 help='Upload using normal process priority.') 1184 parser.add_option('-u', '--age_to_upload', dest='age_to_upload', 1185 help='Minimum job age in days before a result can be ' 1186 'offloaded, but not removed from local storage', 1187 type='int', default=None) 1188 parser.add_option('-n', '--age_to_delete', dest='age_to_delete', 1189 help='Minimum job age in days before a result can be ' 1190 'removed from local storage', 1191 type='int', default=None) 1192 parser.add_option( 1193 '--metrics-file', 1194 help='If provided, drop metrics to this local file instead of ' 1195 'reporting to ts_mon', 1196 type=str, 1197 default=None, 1198 ) 1199 parser.add_option('-t', '--enable_timestamp_cache', 1200 dest='enable_timestamp_cache', 1201 action='store_true', 1202 help='Cache the finished timestamps from AFE.') 1203 1204 options = parser.parse_args()[0] 1205 if options.process_all and options.process_hosts_only: 1206 parser.print_help() 1207 print ('Cannot process all files and only the hosts ' 1208 'subdirectory. Please remove an argument.') 1209 sys.exit(1) 1210 1211 if options.days_old and (options.age_to_upload or options.age_to_delete): 1212 parser.print_help() 1213 print('Use the days_old option or the age_to_* options but not both') 1214 sys.exit(1) 1215 1216 if options.age_to_upload == None: 1217 options.age_to_upload = options.days_old 1218 if options.age_to_delete == None: 1219 options.age_to_delete = options.days_old 1220 1221 return options 1222 1223 1224def main(): 1225 """Main method of gs_offloader.""" 1226 options = parse_options() 1227 1228 if options.process_all: 1229 offloader_type = 'all' 1230 elif options.process_hosts_only: 1231 offloader_type = 'hosts' 1232 else: 1233 offloader_type = 'jobs' 1234 1235 _setup_logging(options, offloader_type) 1236 1237 if options.enable_timestamp_cache: 1238 # Extend the cache expiry time by another 1% so the timstamps 1239 # are available as the results are purged. 1240 job_timestamp_cache.setup(options.age_to_delete * 1.01) 1241 1242 # Nice our process (carried to subprocesses) so we don't overload 1243 # the system. 1244 if not options.normal_priority: 1245 logging.debug('Set process to nice value: %d', NICENESS) 1246 os.nice(NICENESS) 1247 if psutil: 1248 proc = psutil.Process() 1249 logging.debug('Set process to ionice IDLE') 1250 proc.ionice(psutil.IOPRIO_CLASS_IDLE) 1251 1252 # os.listdir returns relative paths, so change to where we need to 1253 # be to avoid an os.path.join on each loop. 1254 logging.debug('Offloading Autotest results in %s', RESULTS_DIR) 1255 os.chdir(RESULTS_DIR) 1256 1257 service_name = 'gs_offloader(%s)' % offloader_type 1258 with ts_mon_config.SetupTsMonGlobalState(service_name, indirect=True, 1259 short_lived=False, 1260 debug_file=options.metrics_file): 1261 with metrics.SuccessCounter('chromeos/autotest/gs_offloader/exit'): 1262 offloader = Offloader(options) 1263 if not options.delete_only: 1264 wait_for_gs_write_access(offloader.gs_uri) 1265 while True: 1266 offloader.offload_once() 1267 if options.offload_once: 1268 break 1269 time.sleep(SLEEP_TIME_SECS) 1270 1271 1272_LOG_LOCATION = '/usr/local/autotest/logs/' 1273_LOG_FILENAME_FORMAT = 'gs_offloader_%s_log_%s.txt' 1274_LOG_TIMESTAMP_FORMAT = '%Y%m%d_%H%M%S' 1275_LOGGING_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' 1276 1277 1278def _setup_logging(options, offloader_type): 1279 """Set up logging. 1280 1281 @param options: Parsed options. 1282 @param offloader_type: Type of offloader action as string. 1283 """ 1284 log_filename = _get_log_filename(options, offloader_type) 1285 log_formatter = logging.Formatter(_LOGGING_FORMAT) 1286 # Replace the default logging handler with a RotatingFileHandler. If 1287 # options.log_size is 0, the file size will not be limited. Keeps 1288 # one backup just in case. 1289 handler = logging.handlers.RotatingFileHandler( 1290 log_filename, maxBytes=1024 * options.log_size, backupCount=1) 1291 handler.setFormatter(log_formatter) 1292 logger = logging.getLogger() 1293 logger.setLevel(logging.DEBUG) 1294 logger.addHandler(handler) 1295 1296 1297def _get_log_filename(options, offloader_type): 1298 """Get log filename. 1299 1300 @param options: Parsed options. 1301 @param offloader_type: Type of offloader action as string. 1302 """ 1303 if options.log_size > 0: 1304 log_timestamp = '' 1305 else: 1306 log_timestamp = time.strftime(_LOG_TIMESTAMP_FORMAT) 1307 log_basename = _LOG_FILENAME_FORMAT % (offloader_type, log_timestamp) 1308 return os.path.join(_LOG_LOCATION, log_basename) 1309 1310 1311if __name__ == '__main__': 1312 main() 1313