1#!/usr/bin/python2 2# 3# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7"""Script to archive old Autotest results to Google Storage. 8 9Uses gsutil to archive files to the configured Google Storage bucket. 10Upon successful copy, the local results directory is deleted. 11""" 12 13import abc 14try: 15 import cachetools 16except ImportError: 17 cachetools = None 18import datetime 19import errno 20import glob 21import gzip 22import logging 23import logging.handlers 24import os 25import re 26import shutil 27import stat 28import subprocess 29import sys 30import tarfile 31import tempfile 32import time 33import urllib 34 35from optparse import OptionParser 36 37import common 38from autotest_lib.client.common_lib import file_utils 39from autotest_lib.client.common_lib import global_config 40from autotest_lib.client.common_lib import utils 41from autotest_lib.site_utils import job_directories 42# For unittest, the cloud_console.proto is not compiled yet. 43try: 44 from autotest_lib.site_utils import cloud_console_client 45except ImportError: 46 cloud_console_client = None 47from autotest_lib.tko import models 48from autotest_lib.utils import labellib 49from autotest_lib.utils import gslib 50from autotest_lib.utils.side_effects import config_loader 51from chromite.lib import timeout_util 52 53# Autotest requires the psutil module from site-packages, so it must be imported 54# after "import common". 55try: 56 # Does not exist, nor is needed, on moblab. 57 import psutil 58except ImportError: 59 psutil = None 60 61from chromite.lib import parallel 62try: 63 from chromite.lib import metrics 64 from chromite.lib import ts_mon_config 65except ImportError: 66 metrics = utils.metrics_mock 67 ts_mon_config = utils.metrics_mock 68 69 70GS_OFFLOADING_ENABLED = global_config.global_config.get_config_value( 71 'CROS', 'gs_offloading_enabled', type=bool, default=True) 72 73# Nice setting for process, the higher the number the lower the priority. 74NICENESS = 10 75 76# Maximum number of seconds to allow for offloading a single 77# directory. 78OFFLOAD_TIMEOUT_SECS = 60 * 60 79 80# Sleep time per loop. 81SLEEP_TIME_SECS = 5 82 83# Minimum number of seconds between e-mail reports. 84REPORT_INTERVAL_SECS = 60 * 60 85 86# Location of Autotest results on disk. 87RESULTS_DIR = '/usr/local/autotest/results' 88FAILED_OFFLOADS_FILE = os.path.join(RESULTS_DIR, 'FAILED_OFFLOADS') 89 90FAILED_OFFLOADS_FILE_HEADER = ''' 91This is the list of gs_offloader failed jobs. 92Last offloader attempt at %s failed to offload %d files. 93Check http://go/cros-triage-gsoffloader to triage the issue 94 95 96First failure Count Directory name 97=================== ====== ============================== 98''' 99# --+----1----+---- ----+ ----+----1----+----2----+----3 100 101FAILED_OFFLOADS_LINE_FORMAT = '%19s %5d %-1s\n' 102FAILED_OFFLOADS_TIME_FORMAT = '%Y-%m-%d %H:%M:%S' 103 104USE_RSYNC_ENABLED = global_config.global_config.get_config_value( 105 'CROS', 'gs_offloader_use_rsync', type=bool, default=False) 106 107LIMIT_FILE_COUNT = global_config.global_config.get_config_value( 108 'CROS', 'gs_offloader_limit_file_count', type=bool, default=False) 109 110# Use multiprocessing for gsutil uploading. 111GS_OFFLOADER_MULTIPROCESSING = global_config.global_config.get_config_value( 112 'CROS', 'gs_offloader_multiprocessing', type=bool, default=False) 113 114D = '[0-9][0-9]' 115TIMESTAMP_PATTERN = '%s%s.%s.%s_%s.%s.%s' % (D, D, D, D, D, D, D) 116CTS_RESULT_PATTERN = 'testResult.xml' 117CTS_COMPRESSED_RESULT_PATTERN = 'testResult.xml.tgz' 118CTS_V2_RESULT_PATTERN = 'test_result.xml' 119CTS_V2_COMPRESSED_RESULT_PATTERN = 'test_result.xml.tgz' 120 121CTS_COMPRESSED_RESULT_TYPES = { 122 CTS_COMPRESSED_RESULT_PATTERN: CTS_RESULT_PATTERN, 123 CTS_V2_COMPRESSED_RESULT_PATTERN: CTS_V2_RESULT_PATTERN} 124 125# Google Storage bucket URI to store results in. 126DEFAULT_CTS_RESULTS_GSURI = global_config.global_config.get_config_value( 127 'CROS', 'cts_results_server', default='') 128DEFAULT_CTS_APFE_GSURI = global_config.global_config.get_config_value( 129 'CROS', 'cts_apfe_server', default='') 130DEFAULT_CTS_DELTA_RESULTS_GSURI = global_config.global_config.get_config_value( 131 'CROS', 'ctsdelta_results_server', default='') 132DEFAULT_CTS_DELTA_APFE_GSURI = global_config.global_config.get_config_value( 133 'CROS', 'ctsdelta_apfe_server', default='') 134DEFAULT_CTS_BVT_APFE_GSURI = global_config.global_config.get_config_value( 135 'CROS', 'ctsbvt_apfe_server', default='') 136 137# metadata type 138GS_OFFLOADER_SUCCESS_TYPE = 'gs_offloader_success' 139GS_OFFLOADER_FAILURE_TYPE = 'gs_offloader_failure' 140 141# Autotest test to collect list of CTS tests 142TEST_LIST_COLLECTOR = 'tradefed-run-collect-tests-only' 143 144def _get_metrics_fields(dir_entry): 145 """Get metrics fields for the given test result directory, including board 146 and milestone. 147 148 @param dir_entry: Directory entry to offload. 149 @return A dictionary for the metrics data to be uploaded. 150 """ 151 fields = {'board': 'unknown', 152 'milestone': 'unknown'} 153 if dir_entry: 154 # There could be multiple hosts in the job directory, use the first one 155 # available. 156 for host in glob.glob(os.path.join(dir_entry, '*')): 157 try: 158 keyval = models.test.parse_job_keyval(host) 159 except ValueError: 160 continue 161 build = keyval.get('build') 162 if build: 163 try: 164 cros_version = labellib.parse_cros_version(build) 165 fields['board'] = cros_version.board 166 fields['milestone'] = cros_version.milestone 167 break 168 except ValueError: 169 # Ignore version parsing error so it won't crash 170 # gs_offloader. 171 pass 172 173 return fields 174 175 176def _get_cmd_list(multiprocessing, dir_entry, gs_path): 177 """Return the command to offload a specified directory. 178 179 @param multiprocessing: True to turn on -m option for gsutil. 180 @param dir_entry: Directory entry/path that which we need a cmd_list 181 to offload. 182 @param gs_path: Location in google storage where we will 183 offload the directory. 184 185 @return A command list to be executed by Popen. 186 """ 187 cmd = ['gsutil'] 188 if multiprocessing: 189 cmd.append('-m') 190 if USE_RSYNC_ENABLED: 191 cmd.append('rsync') 192 target = os.path.join(gs_path, os.path.basename(dir_entry)) 193 else: 194 cmd.append('cp') 195 target = gs_path 196 cmd += ['-eR', dir_entry, target] 197 return cmd 198 199 200def _get_finish_cmd_list(gs_path): 201 """Returns a command to remotely mark a given gs path as finished. 202 203 @param gs_path: Location in google storage where the offload directory 204 should be marked as finished. 205 206 @return A command list to be executed by Popen. 207 """ 208 target = os.path.join(gs_path, '.finished_offload') 209 return [ 210 'gsutil', 211 'cp', 212 '/dev/null', 213 target, 214 ] 215 216 217def sanitize_dir(dirpath): 218 """Sanitize directory for gs upload. 219 220 Symlinks and FIFOS are converted to regular files to fix bugs. 221 222 @param dirpath: Directory entry to be sanitized. 223 """ 224 if not os.path.exists(dirpath): 225 return 226 _escape_rename(dirpath) 227 _escape_rename_dir_contents(dirpath) 228 _sanitize_fifos(dirpath) 229 _sanitize_symlinks(dirpath) 230 231 232def _escape_rename_dir_contents(dirpath): 233 """Recursively rename directory to escape filenames for gs upload. 234 235 @param dirpath: Directory path string. 236 """ 237 for filename in os.listdir(dirpath): 238 path = os.path.join(dirpath, filename) 239 _escape_rename(path) 240 for filename in os.listdir(dirpath): 241 path = os.path.join(dirpath, filename) 242 if os.path.isdir(path): 243 _escape_rename_dir_contents(path) 244 245 246def _escape_rename(path): 247 """Rename file to escape filenames for gs upload. 248 249 @param path: File path string. 250 """ 251 dirpath, filename = os.path.split(path) 252 sanitized_filename = gslib.escape(filename) 253 sanitized_path = os.path.join(dirpath, sanitized_filename) 254 os.rename(path, sanitized_path) 255 256 257def _sanitize_fifos(dirpath): 258 """Convert fifos to regular files (fixes crbug.com/684122). 259 260 @param dirpath: Directory path string. 261 """ 262 for root, _, files in os.walk(dirpath): 263 for filename in files: 264 path = os.path.join(root, filename) 265 file_stat = os.lstat(path) 266 if stat.S_ISFIFO(file_stat.st_mode): 267 _replace_fifo_with_file(path) 268 269 270def _replace_fifo_with_file(path): 271 """Replace a fifo with a normal file. 272 273 @param path: Fifo path string. 274 """ 275 logging.debug('Removing fifo %s', path) 276 os.remove(path) 277 logging.debug('Creating fifo marker %s', path) 278 with open(path, 'w') as f: 279 f.write('<FIFO>') 280 281 282def _sanitize_symlinks(dirpath): 283 """Convert Symlinks to regular files (fixes crbug.com/692788). 284 285 @param dirpath: Directory path string. 286 """ 287 for root, _, files in os.walk(dirpath): 288 for filename in files: 289 path = os.path.join(root, filename) 290 file_stat = os.lstat(path) 291 if stat.S_ISLNK(file_stat.st_mode): 292 _replace_symlink_with_file(path) 293 294 295def _replace_symlink_with_file(path): 296 """Replace a symlink with a normal file. 297 298 @param path: Symlink path string. 299 """ 300 target = os.readlink(path) 301 logging.debug('Removing symlink %s', path) 302 os.remove(path) 303 logging.debug('Creating symlink marker %s', path) 304 with open(path, 'w') as f: 305 f.write('<symlink to %s>' % target) 306 307 308# Maximum number of files in the folder. 309_MAX_FILE_COUNT = 3000 310_FOLDERS_NEVER_ZIP = ['debug', 'ssp_logs', 'autoupdate_logs'] 311 312 313def _get_zippable_folders(dir_entry): 314 folders_list = [] 315 for folder in os.listdir(dir_entry): 316 folder_path = os.path.join(dir_entry, folder) 317 if (not os.path.isfile(folder_path) and 318 not folder in _FOLDERS_NEVER_ZIP): 319 folders_list.append(folder_path) 320 return folders_list 321 322 323def limit_file_count(dir_entry): 324 """Limit the number of files in given directory. 325 326 The method checks the total number of files in the given directory. 327 If the number is greater than _MAX_FILE_COUNT, the method will 328 compress each folder in the given directory, except folders in 329 _FOLDERS_NEVER_ZIP. 330 331 @param dir_entry: Directory entry to be checked. 332 """ 333 try: 334 count = _count_files(dir_entry) 335 except ValueError: 336 logging.warning('Fail to get the file count in folder %s.', dir_entry) 337 return 338 if count < _MAX_FILE_COUNT: 339 return 340 341 # For test job, zip folders in a second level, e.g. 123-debug/host1. 342 # This is to allow autoserv debug folder still be accessible. 343 # For special task, it does not need to dig one level deeper. 344 is_special_task = re.match(job_directories.SPECIAL_TASK_PATTERN, 345 dir_entry) 346 347 folders = _get_zippable_folders(dir_entry) 348 if not is_special_task: 349 subfolders = [] 350 for folder in folders: 351 subfolders.extend(_get_zippable_folders(folder)) 352 folders = subfolders 353 354 for folder in folders: 355 _make_into_tarball(folder) 356 357 358def _count_files(dirpath): 359 """Count the number of files in a directory recursively. 360 361 @param dirpath: Directory path string. 362 """ 363 return sum(len(files) for _path, _dirs, files in os.walk(dirpath)) 364 365 366def _make_into_tarball(dirpath): 367 """Make directory into tarball. 368 369 @param dirpath: Directory path string. 370 """ 371 tarpath = '%s.tgz' % dirpath 372 with tarfile.open(tarpath, 'w:gz') as tar: 373 tar.add(dirpath, arcname=os.path.basename(dirpath)) 374 shutil.rmtree(dirpath) 375 376 377def correct_results_folder_permission(dir_entry): 378 """Make sure the results folder has the right permission settings. 379 380 For tests running with server-side packaging, the results folder has 381 the owner of root. This must be changed to the user running the 382 autoserv process, so parsing job can access the results folder. 383 384 @param dir_entry: Path to the results folder. 385 """ 386 if not dir_entry: 387 return 388 389 logging.info('Trying to correct file permission of %s.', dir_entry) 390 try: 391 owner = '%s:%s' % (os.getuid(), os.getgid()) 392 subprocess.check_call( 393 ['sudo', '-n', 'chown', '-R', owner, dir_entry]) 394 subprocess.check_call(['chmod', '-R', 'u+rw', dir_entry]) 395 subprocess.check_call( 396 ['find', dir_entry, '-type', 'd', 397 '-exec', 'chmod', 'u+x', '{}', ';']) 398 except subprocess.CalledProcessError as e: 399 logging.error('Failed to modify permission for %s: %s', 400 dir_entry, e) 401 402 403def _upload_cts_testresult(dir_entry, multiprocessing): 404 """Upload test results to separate gs buckets. 405 406 Upload testResult.xml.gz/test_result.xml.gz file to cts_results_bucket. 407 Upload timestamp.zip to cts_apfe_bucket. 408 409 @param dir_entry: Path to the results folder. 410 @param multiprocessing: True to turn on -m option for gsutil. 411 """ 412 for host in glob.glob(os.path.join(dir_entry, '*')): 413 cts_path = os.path.join(host, 'cheets_CTS.*', 'results', '*', 414 TIMESTAMP_PATTERN) 415 cts_v2_path = os.path.join(host, 'cheets_CTS_*', 'results', '*', 416 TIMESTAMP_PATTERN) 417 gts_v2_path = os.path.join(host, 'cheets_GTS*', 'results', '*', 418 TIMESTAMP_PATTERN) 419 for result_path, result_pattern in [(cts_path, CTS_RESULT_PATTERN), 420 (cts_path, CTS_COMPRESSED_RESULT_PATTERN), 421 (cts_v2_path, CTS_V2_RESULT_PATTERN), 422 (cts_v2_path, CTS_V2_COMPRESSED_RESULT_PATTERN), 423 (gts_v2_path, CTS_V2_RESULT_PATTERN)]: 424 for path in glob.glob(result_path): 425 try: 426 # CTS results from bvt-arc suites need to be only uploaded 427 # to APFE from its designated gs bucket for early EDI 428 # entries in APFE. These results need to copied only into 429 # APFE bucket. Copying to results bucket is not required. 430 if 'bvt-arc' in path: 431 _upload_files(host, path, result_pattern, 432 multiprocessing, 433 None, 434 DEFAULT_CTS_BVT_APFE_GSURI) 435 return 436 # Non-bvt CTS results need to be uploaded to standard gs 437 # buckets. 438 _upload_files(host, path, result_pattern, 439 multiprocessing, 440 DEFAULT_CTS_RESULTS_GSURI, 441 DEFAULT_CTS_APFE_GSURI) 442 # TODO(rohitbm): make better comparison using regex. 443 # plan_follower CTS results go to plan_follower specific 444 # gs buckets apart from standard gs buckets. 445 if 'plan_follower' in path: 446 _upload_files(host, path, result_pattern, 447 multiprocessing, 448 DEFAULT_CTS_DELTA_RESULTS_GSURI, 449 DEFAULT_CTS_DELTA_APFE_GSURI) 450 except Exception as e: 451 logging.error('ERROR uploading test results %s to GS: %s', 452 path, e) 453 454 455def _is_valid_result(build, result_pattern, suite): 456 """Check if the result should be uploaded to CTS/GTS buckets. 457 458 @param build: Builder name. 459 @param result_pattern: XML result file pattern. 460 @param suite: Test suite name. 461 462 @returns: Bool flag indicating whether a valid result. 463 """ 464 if build is None or suite is None: 465 return False 466 467 # Not valid if it's not a release build. 468 if not re.match(r'(?!trybot-).*-release/.*', build): 469 return False 470 471 # Not valid if it's cts result but not 'arc-cts*' or 'test_that_wrapper' 472 # suite. 473 result_patterns = [CTS_RESULT_PATTERN, CTS_V2_RESULT_PATTERN] 474 if result_pattern in result_patterns and not ( 475 suite.startswith('arc-cts') or 476 suite.startswith('arc-gts') or 477 suite.startswith('bvt-arc') or 478 suite.startswith('cros_test_platform') or 479 suite.startswith('test_that_wrapper')): 480 return False 481 482 return True 483 484 485def _is_test_collector(package): 486 """Returns true if the test run is just to collect list of CTS tests. 487 488 @param package: Autotest package name. e.g. cheets_CTS_N.CtsGraphicsTestCase 489 490 @return Bool flag indicating a test package is CTS list generator or not. 491 """ 492 return TEST_LIST_COLLECTOR in package 493 494 495def _get_swarming_req_dir(path): 496 """ 497 Returns the parent directory of |path|, if |path| is a swarming task result. 498 499 @param path: Full path to the result of a task. 500 e.g. /tmp/results/swarming-44466815c4bc951/1 501 502 @return string of the parent dir or None if not a swarming task. 503 """ 504 m_parent = re.match( 505 '(?P<parent_dir>.*/swarming-[0-9a-fA-F]*0)/[1-9a-fA-F]$', path) 506 if m_parent: 507 return m_parent.group('parent_dir') 508 return None 509 510 511def _parse_cts_job_results_file_path(path): 512 """Parse CTS file paths an extract required information from them.""" 513 514 # Autotest paths look like: 515 # /317739475-chromeos-test/chromeos4-row9-rack11-host22/ 516 # cheets_CTS.android.dpi/results/cts-results/2016.04.28_01.41.44 517 518 # Swarming paths look like: 519 # /swarming-458e3a3a7fc6f210/1/autoserv_test/ 520 # cheets_CTS.android.dpi/results/cts-results/2016.04.28_01.41.44 521 522 folders = path.split(os.sep) 523 if 'swarming' in folders[1]: 524 # Swarming job and attempt combined 525 job_id = "%s-%s" % (folders[-7], folders[-6]) 526 else: 527 job_id = folders[-6] 528 529 cts_package = folders[-4] 530 timestamp = folders[-1] 531 532 return job_id, cts_package, timestamp 533 534 535def _upload_files(host, path, result_pattern, multiprocessing, 536 result_gs_bucket, apfe_gs_bucket): 537 keyval = models.test.parse_job_keyval(host) 538 build = keyval.get('build') 539 suite = keyval.get('suite') 540 541 host_keyval = models.test.parse_host_keyval(host, keyval.get('hostname')) 542 labels = urllib.unquote(host_keyval.get('labels')) 543 try: 544 host_model_name = re.search(r'model:(\w+)', labels).group(1) 545 except AttributeError: 546 logging.error('Model: name attribute is missing in %s/host_keyval/%s.', 547 host, keyval.get('hostname')) 548 return 549 550 if not _is_valid_result(build, result_pattern, suite): 551 # No need to upload current folder, return. 552 return 553 554 parent_job_id = str(keyval['parent_job_id']) 555 556 job_id, package, timestamp = _parse_cts_job_results_file_path(path) 557 558 # Results produced by CTS test list collector are dummy results. 559 # They don't need to be copied to APFE bucket which is mainly being used for 560 # CTS APFE submission. 561 if not _is_test_collector(package): 562 # Path: bucket/build/parent_job_id/cheets_CTS.*/job_id_timestamp/ 563 # or bucket/build/parent_job_id/cheets_GTS.*/job_id_timestamp/ 564 index = build.find('-release') 565 build_with_model_name = '' 566 if index == -1: 567 logging.info('Not a release build.' 568 'Non release build results can be skipped from offloading') 569 return 570 571 # CTS v2 pipeline requires device info in 'board.model' format. 572 # e.g. coral.robo360-release, eve.eve-release 573 build_with_model_name = (build[:index] + '.' + host_model_name + 574 build[index:]) 575 576 cts_apfe_gs_path = os.path.join( 577 apfe_gs_bucket, build_with_model_name, parent_job_id, 578 package, job_id + '_' + timestamp) + '/' 579 580 for zip_file in glob.glob(os.path.join('%s.zip' % path)): 581 utils.run(' '.join(_get_cmd_list( 582 multiprocessing, zip_file, cts_apfe_gs_path))) 583 logging.debug('Upload %s to %s ', zip_file, cts_apfe_gs_path) 584 else: 585 logging.debug('%s is a CTS Test collector Autotest test run.', package) 586 logging.debug('Skipping CTS results upload to APFE gs:// bucket.') 587 588 if result_gs_bucket: 589 # Path: bucket/cheets_CTS.*/job_id_timestamp/ 590 # or bucket/cheets_GTS.*/job_id_timestamp/ 591 test_result_gs_path = os.path.join( 592 result_gs_bucket, package, job_id + '_' + timestamp) + '/' 593 594 for test_result_file in glob.glob(os.path.join(path, result_pattern)): 595 # gzip test_result_file(testResult.xml/test_result.xml) 596 597 test_result_tgz_file = '' 598 if test_result_file.endswith('tgz'): 599 # Extract .xml file from tgz file for better handling in the 600 # CTS dashboard pipeline. 601 # TODO(rohitbm): work with infra team to produce .gz file so 602 # tgz to gz middle conversion is not needed. 603 try: 604 with tarfile.open(test_result_file, 'r:gz') as tar_file: 605 tar_file.extract( 606 CTS_COMPRESSED_RESULT_TYPES[result_pattern]) 607 test_result_tgz_file = test_result_file 608 test_result_file = os.path.join(path, 609 CTS_COMPRESSED_RESULT_TYPES[result_pattern]) 610 except tarfile.ReadError as error: 611 logging.debug(error) 612 except KeyError as error: 613 logging.debug(error) 614 615 test_result_file_gz = '%s.gz' % test_result_file 616 with open(test_result_file, 'r') as f_in, ( 617 gzip.open(test_result_file_gz, 'w')) as f_out: 618 shutil.copyfileobj(f_in, f_out) 619 utils.run(' '.join(_get_cmd_list( 620 multiprocessing, test_result_file_gz, test_result_gs_path))) 621 logging.debug('Zip and upload %s to %s', 622 test_result_file_gz, test_result_gs_path) 623 # Remove test_result_file_gz(testResult.xml.gz/test_result.xml.gz) 624 os.remove(test_result_file_gz) 625 # Remove extracted test_result.xml file. 626 if test_result_tgz_file: 627 os.remove(test_result_file) 628 629 630def _emit_gs_returncode_metric(returncode): 631 """Increment the gs_returncode counter based on |returncode|.""" 632 m_gs_returncode = 'chromeos/autotest/gs_offloader/gs_returncode' 633 rcode = int(returncode) 634 if rcode < 0 or rcode > 255: 635 rcode = -1 636 metrics.Counter(m_gs_returncode).increment(fields={'return_code': rcode}) 637 638 639def _handle_dir_os_error(dir_entry, fix_permission=False): 640 """Try to fix the result directory's permission issue if needed. 641 642 @param dir_entry: Directory entry to offload. 643 @param fix_permission: True to change the directory's owner to the same one 644 running gs_offloader. 645 """ 646 if fix_permission: 647 correct_results_folder_permission(dir_entry) 648 m_permission_error = ('chromeos/autotest/errors/gs_offloader/' 649 'wrong_permissions_count') 650 metrics_fields = _get_metrics_fields(dir_entry) 651 metrics.Counter(m_permission_error).increment(fields=metrics_fields) 652 653 654class BaseGSOffloader(object): 655 656 """Google Storage offloader interface.""" 657 658 __metaclass__ = abc.ABCMeta 659 660 def offload(self, dir_entry, dest_path, job_complete_time): 661 """Safely offload a directory entry to Google Storage. 662 663 This method is responsible for copying the contents of 664 `dir_entry` to Google storage at `dest_path`. 665 666 When successful, the method must delete all of `dir_entry`. 667 On failure, `dir_entry` should be left undisturbed, in order 668 to allow for retry. 669 670 Errors are conveyed simply and solely by two methods: 671 * At the time of failure, write enough information to the log 672 to allow later debug, if necessary. 673 * Don't delete the content. 674 675 In order to guarantee robustness, this method must not raise any 676 exceptions. 677 678 @param dir_entry: Directory entry to offload. 679 @param dest_path: Location in google storage where we will 680 offload the directory. 681 @param job_complete_time: The complete time of the job from the AFE 682 database. 683 """ 684 try: 685 self._full_offload(dir_entry, dest_path, job_complete_time) 686 except Exception as e: 687 logging.debug('Exception in offload for %s', dir_entry) 688 logging.debug('Ignoring this error: %s', str(e)) 689 690 @abc.abstractmethod 691 def _full_offload(self, dir_entry, dest_path, job_complete_time): 692 """Offload a directory entry to Google Storage. 693 694 This method implements the actual offload behavior of its 695 subclass. To guarantee effective debug, this method should 696 catch all exceptions, and perform any reasonable diagnosis 697 or other handling. 698 699 @param dir_entry: Directory entry to offload. 700 @param dest_path: Location in google storage where we will 701 offload the directory. 702 @param job_complete_time: The complete time of the job from the AFE 703 database. 704 """ 705 706 707class GSOffloader(BaseGSOffloader): 708 """Google Storage Offloader.""" 709 710 def __init__(self, gs_uri, multiprocessing, delete_age, 711 console_client=None): 712 """Returns the offload directory function for the given gs_uri 713 714 @param gs_uri: Google storage bucket uri to offload to. 715 @param multiprocessing: True to turn on -m option for gsutil. 716 @param console_client: The cloud console client. If None, 717 cloud console APIs are not called. 718 """ 719 self._gs_uri = gs_uri 720 self._multiprocessing = multiprocessing 721 self._delete_age = delete_age 722 self._console_client = console_client 723 724 @metrics.SecondsTimerDecorator( 725 'chromeos/autotest/gs_offloader/job_offload_duration') 726 def _full_offload(self, dir_entry, dest_path, job_complete_time): 727 """Offload the specified directory entry to Google storage. 728 729 @param dir_entry: Directory entry to offload. 730 @param dest_path: Location in google storage where we will 731 offload the directory. 732 @param job_complete_time: The complete time of the job from the AFE 733 database. 734 """ 735 with tempfile.TemporaryFile('w+') as stdout_file, \ 736 tempfile.TemporaryFile('w+') as stderr_file: 737 try: 738 try: 739 self._try_offload(dir_entry, dest_path, stdout_file, 740 stderr_file) 741 except OSError as e: 742 # Correct file permission error of the directory, then raise 743 # the exception so gs_offloader can retry later. 744 _handle_dir_os_error(dir_entry, e.errno==errno.EACCES) 745 # Try again after the permission issue is fixed. 746 self._try_offload(dir_entry, dest_path, stdout_file, 747 stderr_file) 748 except _OffloadError as e: 749 metrics_fields = _get_metrics_fields(dir_entry) 750 m_any_error = 'chromeos/autotest/errors/gs_offloader/any_error' 751 metrics.Counter(m_any_error).increment(fields=metrics_fields) 752 753 # Rewind the log files for stdout and stderr and log 754 # their contents. 755 stdout_file.seek(0) 756 stderr_file.seek(0) 757 stderr_content = stderr_file.read() 758 logging.warning('Error occurred when offloading %s:', dir_entry) 759 logging.warning('Stdout:\n%s \nStderr:\n%s', stdout_file.read(), 760 stderr_content) 761 762 # Some result files may have wrong file permission. Try 763 # to correct such error so later try can success. 764 # TODO(dshi): The code is added to correct result files 765 # with wrong file permission caused by bug 511778. After 766 # this code is pushed to lab and run for a while to 767 # clean up these files, following code and function 768 # correct_results_folder_permission can be deleted. 769 if 'CommandException: Error opening file' in stderr_content: 770 correct_results_folder_permission(dir_entry) 771 else: 772 self._prune(dir_entry, job_complete_time) 773 swarming_req_dir = _get_swarming_req_dir(dir_entry) 774 if swarming_req_dir: 775 self._prune_swarming_req_dir(swarming_req_dir) 776 777 778 def _try_offload(self, dir_entry, dest_path, 779 stdout_file, stderr_file): 780 """Offload the specified directory entry to Google storage. 781 782 @param dir_entry: Directory entry to offload. 783 @param dest_path: Location in google storage where we will 784 offload the directory. 785 @param job_complete_time: The complete time of the job from the AFE 786 database. 787 @param stdout_file: Log file. 788 @param stderr_file: Log file. 789 """ 790 if _is_uploaded(dir_entry): 791 return 792 start_time = time.time() 793 metrics_fields = _get_metrics_fields(dir_entry) 794 error_obj = _OffloadError(start_time) 795 config = config_loader.load(dir_entry) 796 cts_enabled = True 797 if config: 798 # TODO(linxinan): use credential file assigned by the side_effect 799 # config. 800 if not config.cts.enabled: 801 cts_enabled = config.cts.enabled 802 if config.google_storage.bucket: 803 gs_prefix = ('' if config.google_storage.bucket.startswith('gs://') 804 else 'gs://') 805 self._gs_uri = gs_prefix + config.google_storage.bucket 806 else: 807 # For now, the absence of config does not block gs_offloader 808 # from uploading files via default credential. 809 logging.debug('Failed to load the side effects config in %s.', 810 dir_entry) 811 try: 812 sanitize_dir(dir_entry) 813 if DEFAULT_CTS_RESULTS_GSURI and cts_enabled: 814 _upload_cts_testresult(dir_entry, self._multiprocessing) 815 816 if LIMIT_FILE_COUNT: 817 limit_file_count(dir_entry) 818 819 process = None 820 with timeout_util.Timeout(OFFLOAD_TIMEOUT_SECS): 821 gs_path = '%s%s' % (self._gs_uri, dest_path) 822 cmd = _get_cmd_list(self._multiprocessing, dir_entry, gs_path) 823 logging.debug('Attempting an offload command %s', cmd) 824 process = subprocess.Popen( 825 cmd, stdout=stdout_file, stderr=stderr_file) 826 process.wait() 827 logging.debug('Offload command %s completed; ' 828 'marking offload complete.', cmd) 829 _mark_upload_finished(gs_path, stdout_file, stderr_file) 830 831 _emit_gs_returncode_metric(process.returncode) 832 if process.returncode != 0: 833 raise error_obj 834 _emit_offload_metrics(dir_entry) 835 836 if self._console_client: 837 gcs_uri = os.path.join(gs_path, 838 os.path.basename(dir_entry)) 839 if not self._console_client.send_test_job_offloaded_message( 840 gcs_uri): 841 raise error_obj 842 843 _mark_uploaded(dir_entry) 844 except timeout_util.TimeoutError: 845 m_timeout = 'chromeos/autotest/errors/gs_offloader/timed_out_count' 846 metrics.Counter(m_timeout).increment(fields=metrics_fields) 847 # If we finished the call to Popen(), we may need to 848 # terminate the child process. We don't bother calling 849 # process.poll(); that inherently races because the child 850 # can die any time it wants. 851 if process: 852 try: 853 process.terminate() 854 except OSError: 855 # We don't expect any error other than "No such 856 # process". 857 pass 858 logging.error('Offloading %s timed out after waiting %d ' 859 'seconds.', dir_entry, OFFLOAD_TIMEOUT_SECS) 860 raise error_obj 861 862 def _prune(self, dir_entry, job_complete_time): 863 """Prune directory if it is uploaded and expired. 864 865 @param dir_entry: Directory entry to offload. 866 @param job_complete_time: The complete time of the job from the AFE 867 database. 868 """ 869 if not (_is_uploaded(dir_entry) 870 and job_directories.is_job_expired(self._delete_age, 871 job_complete_time)): 872 return 873 try: 874 logging.debug('Pruning uploaded directory %s', dir_entry) 875 shutil.rmtree(dir_entry) 876 job_timestamp_cache.delete(dir_entry) 877 except OSError as e: 878 # The wrong file permission can lead call `shutil.rmtree(dir_entry)` 879 # to raise OSError with message 'Permission denied'. Details can be 880 # found in crbug.com/536151 881 _handle_dir_os_error(dir_entry, e.errno==errno.EACCES) 882 # Try again after the permission issue is fixed. 883 shutil.rmtree(dir_entry) 884 885 def _prune_swarming_req_dir(self, swarming_req_dir): 886 """Prune swarming request directory, if it is empty. 887 888 @param swarming_req_dir: Directory entry of a swarming request. 889 """ 890 try: 891 logging.debug('Pruning swarming request directory %s', 892 swarming_req_dir) 893 os.rmdir(swarming_req_dir) 894 except OSError as e: 895 # Do nothing and leave this directory to next attempt to remove. 896 logging.debug('Failed to prune swarming request directory %s', 897 swarming_req_dir) 898 899 900class _OffloadError(Exception): 901 """Google Storage offload failed.""" 902 903 def __init__(self, start_time): 904 super(_OffloadError, self).__init__(start_time) 905 self.start_time = start_time 906 907 908 909class FakeGSOffloader(BaseGSOffloader): 910 911 """Fake Google Storage Offloader that only deletes directories.""" 912 913 def _full_offload(self, dir_entry, dest_path, job_complete_time): 914 """Pretend to offload a directory and delete it. 915 916 @param dir_entry: Directory entry to offload. 917 @param dest_path: Location in google storage where we will 918 offload the directory. 919 @param job_complete_time: The complete time of the job from the AFE 920 database. 921 """ 922 shutil.rmtree(dir_entry) 923 924 925class OptionalMemoryCache(object): 926 """Implements memory cache if cachetools module can be loaded. 927 928 If the platform has cachetools available then the cache will 929 be created, otherwise the get calls will always act as if there 930 was a cache miss and the set/delete will be no-ops. 931 """ 932 cache = None 933 934 def setup(self, age_to_delete): 935 """Set up a TTL cache size based on how long the job will be handled. 936 937 Autotest jobs are handled by gs_offloader until they are deleted from 938 local storage, base the cache size on how long that is. 939 940 @param age_to_delete: Number of days after which items in the cache 941 should expire. 942 """ 943 if cachetools: 944 # Min cache is 1000 items for 10 mins. If the age to delete is 0 945 # days you still want a short / small cache. 946 # 2000 items is a good approximation for the max number of jobs a 947 # moblab # can produce in a day, lab offloads immediatly so 948 # the number of carried jobs should be very small in the normal 949 # case. 950 ttl = max(age_to_delete * 24 * 60 * 60, 600) 951 maxsize = max(age_to_delete * 2000, 1000) 952 job_timestamp_cache.cache = cachetools.TTLCache(maxsize=maxsize, 953 ttl=ttl) 954 955 def get(self, key): 956 """If we have a cache try to retrieve from it.""" 957 if self.cache is not None: 958 result = self.cache.get(key) 959 return result 960 return None 961 962 def add(self, key, value): 963 """If we have a cache try to store key/value.""" 964 if self.cache is not None: 965 self.cache[key] = value 966 967 def delete(self, key): 968 """If we have a cache try to remove a key.""" 969 if self.cache is not None: 970 return self.cache.delete(key) 971 972 973job_timestamp_cache = OptionalMemoryCache() 974 975 976def _cached_get_timestamp_if_finished(job): 977 """Retrieve a job finished timestamp from cache or AFE. 978 @param job _JobDirectory instance to retrieve 979 finished timestamp of.. 980 981 @returns: None if the job is not finished, or the 982 last job finished time recorded by Autotest. 983 """ 984 job_timestamp = job_timestamp_cache.get(job.dirname) 985 if not job_timestamp: 986 job_timestamp = job.get_timestamp_if_finished() 987 if job_timestamp: 988 job_timestamp_cache.add(job.dirname, job_timestamp) 989 return job_timestamp 990 991 992def _is_expired(job, age_limit): 993 """Return whether job directory is expired for uploading 994 995 @param job: _JobDirectory instance. 996 @param age_limit: Minimum age in days at which a job may be offloaded. 997 """ 998 job_timestamp = _cached_get_timestamp_if_finished(job) 999 if not job_timestamp: 1000 return False 1001 return job_directories.is_job_expired(age_limit, job_timestamp) 1002 1003 1004def _emit_offload_metrics(dirpath): 1005 """Emit gs offload metrics. 1006 1007 @param dirpath: Offloaded directory path. 1008 """ 1009 dir_size = file_utils.get_directory_size_kibibytes(dirpath) 1010 metrics_fields = _get_metrics_fields(dirpath) 1011 1012 m_offload_count = ( 1013 'chromeos/autotest/gs_offloader/jobs_offloaded') 1014 metrics.Counter(m_offload_count).increment( 1015 fields=metrics_fields) 1016 m_offload_size = ('chromeos/autotest/gs_offloader/' 1017 'kilobytes_transferred') 1018 metrics.Counter(m_offload_size).increment_by( 1019 dir_size, fields=metrics_fields) 1020 1021 1022def _is_uploaded(dirpath): 1023 """Return whether directory has been uploaded. 1024 1025 @param dirpath: Directory path string. 1026 """ 1027 return os.path.isfile(_get_uploaded_marker_file(dirpath)) 1028 1029 1030def _mark_uploaded(dirpath): 1031 """Mark directory as uploaded. 1032 1033 @param dirpath: Directory path string. 1034 """ 1035 logging.debug('Creating uploaded marker for directory %s', dirpath) 1036 with open(_get_uploaded_marker_file(dirpath), 'a'): 1037 pass 1038 1039 1040def _mark_upload_finished(gs_path, stdout_file, stderr_file): 1041 """Mark a given gs_path upload as finished (remotely). 1042 1043 @param gs_path: gs:// url of the remote directory that is finished 1044 upload. 1045 """ 1046 cmd = _get_finish_cmd_list(gs_path) 1047 process = subprocess.Popen(cmd, stdout=stdout_file, stderr=stderr_file) 1048 process.wait() 1049 logging.debug('Finished marking as complete %s', cmd) 1050 1051 1052def _get_uploaded_marker_file(dirpath): 1053 """Return path to upload marker file for directory. 1054 1055 @param dirpath: Directory path string. 1056 """ 1057 return '%s/.GS_UPLOADED' % (dirpath,) 1058 1059 1060def _format_job_for_failure_reporting(job): 1061 """Formats a _JobDirectory for reporting / logging. 1062 1063 @param job: The _JobDirectory to format. 1064 """ 1065 d = datetime.datetime.fromtimestamp(job.first_offload_start) 1066 data = (d.strftime(FAILED_OFFLOADS_TIME_FORMAT), 1067 job.offload_count, 1068 job.dirname) 1069 return FAILED_OFFLOADS_LINE_FORMAT % data 1070 1071 1072def wait_for_gs_write_access(gs_uri): 1073 """Verify and wait until we have write access to Google Storage. 1074 1075 @param gs_uri: The Google Storage URI we are trying to offload to. 1076 """ 1077 # TODO (sbasi) Try to use the gsutil command to check write access. 1078 # Ensure we have write access to gs_uri. 1079 dummy_file = tempfile.NamedTemporaryFile() 1080 test_cmd = _get_cmd_list(False, dummy_file.name, gs_uri) 1081 while True: 1082 logging.debug('Checking for write access with dummy file %s', 1083 dummy_file.name) 1084 try: 1085 subprocess.check_call(test_cmd) 1086 subprocess.check_call( 1087 ['gsutil', 'rm', 1088 os.path.join(gs_uri, 1089 os.path.basename(dummy_file.name))]) 1090 break 1091 except subprocess.CalledProcessError: 1092 t = 120 1093 logging.debug('Unable to offload dummy file to %s, sleeping for %s ' 1094 'seconds.', gs_uri, t) 1095 time.sleep(t) 1096 logging.debug('Dummy file write check to gs succeeded.') 1097 1098 1099class Offloader(object): 1100 """State of the offload process. 1101 1102 Contains the following member fields: 1103 * _gs_offloader: _BaseGSOffloader to use to offload a job directory. 1104 * _jobdir_classes: List of classes of job directory to be 1105 offloaded. 1106 * _processes: Maximum number of outstanding offload processes 1107 to allow during an offload cycle. 1108 * _age_limit: Minimum age in days at which a job may be 1109 offloaded. 1110 * _open_jobs: a dictionary mapping directory paths to Job 1111 objects. 1112 """ 1113 1114 def __init__(self, options): 1115 self._upload_age_limit = options.age_to_upload 1116 self._delete_age_limit = options.age_to_delete 1117 if options.delete_only: 1118 self._gs_offloader = FakeGSOffloader() 1119 else: 1120 self.gs_uri = utils.get_offload_gsuri() 1121 logging.debug('Offloading to: %s', self.gs_uri) 1122 multiprocessing = False 1123 if options.multiprocessing: 1124 multiprocessing = True 1125 elif options.multiprocessing is None: 1126 multiprocessing = GS_OFFLOADER_MULTIPROCESSING 1127 logging.info( 1128 'Offloader multiprocessing is set to:%r', multiprocessing) 1129 console_client = None 1130 if (cloud_console_client and 1131 cloud_console_client.is_cloud_notification_enabled()): 1132 console_client = cloud_console_client.PubSubBasedClient() 1133 self._gs_offloader = GSOffloader( 1134 self.gs_uri, multiprocessing, self._delete_age_limit, 1135 console_client) 1136 classlist = [ 1137 job_directories.SwarmingJobDirectory, 1138 ] 1139 if options.process_hosts_only or options.process_all: 1140 classlist.append(job_directories.SpecialJobDirectory) 1141 if not options.process_hosts_only: 1142 classlist.append(job_directories.RegularJobDirectory) 1143 self._jobdir_classes = classlist 1144 assert self._jobdir_classes 1145 self._processes = options.parallelism 1146 self._open_jobs = {} 1147 self._pusub_topic = None 1148 self._offload_count_limit = 3 1149 1150 1151 def _add_new_jobs(self): 1152 """Find new job directories that need offloading. 1153 1154 Go through the file system looking for valid job directories 1155 that are currently not in `self._open_jobs`, and add them in. 1156 1157 """ 1158 new_job_count = 0 1159 for cls in self._jobdir_classes: 1160 for resultsdir in cls.get_job_directories(): 1161 if resultsdir in self._open_jobs: 1162 continue 1163 self._open_jobs[resultsdir] = cls(resultsdir) 1164 new_job_count += 1 1165 logging.debug('Start of offload cycle - found %d new jobs', 1166 new_job_count) 1167 1168 1169 def _remove_offloaded_jobs(self): 1170 """Removed offloaded jobs from `self._open_jobs`.""" 1171 removed_job_count = 0 1172 for jobkey, job in self._open_jobs.items(): 1173 if ( 1174 not os.path.exists(job.dirname) 1175 or _is_uploaded(job.dirname)): 1176 del self._open_jobs[jobkey] 1177 removed_job_count += 1 1178 logging.debug('End of offload cycle - cleared %d jobs, ' 1179 'carrying %d open jobs', 1180 removed_job_count, len(self._open_jobs)) 1181 1182 1183 def _report_failed_jobs(self): 1184 """Report status after attempting offload. 1185 1186 This function processes all jobs in `self._open_jobs`, assuming 1187 an attempt has just been made to offload all of them. 1188 1189 If any jobs have reportable errors, and we haven't generated 1190 an e-mail report in the last `REPORT_INTERVAL_SECS` seconds, 1191 send new e-mail describing the failures. 1192 1193 """ 1194 failed_jobs = [j for j in self._open_jobs.values() if 1195 j.first_offload_start] 1196 self._report_failed_jobs_count(failed_jobs) 1197 self._log_failed_jobs_locally(failed_jobs) 1198 1199 1200 def offload_once(self): 1201 """Perform one offload cycle. 1202 1203 Find all job directories for new jobs that we haven't seen 1204 before. Then, attempt to offload the directories for any 1205 jobs that have finished running. Offload of multiple jobs 1206 is done in parallel, up to `self._processes` at a time. 1207 1208 After we've tried uploading all directories, go through the list 1209 checking the status of all uploaded directories. If necessary, 1210 report failures via e-mail. 1211 1212 """ 1213 self._add_new_jobs() 1214 self._report_current_jobs_count() 1215 with parallel.BackgroundTaskRunner( 1216 self._gs_offloader.offload, processes=self._processes) as queue: 1217 for job in self._open_jobs.values(): 1218 _enqueue_offload(job, queue, self._upload_age_limit) 1219 self._give_up_on_jobs_over_limit() 1220 self._remove_offloaded_jobs() 1221 self._report_failed_jobs() 1222 1223 1224 def _give_up_on_jobs_over_limit(self): 1225 """Give up on jobs that have gone over the offload limit. 1226 1227 We mark them as uploaded as we won't try to offload them any more. 1228 """ 1229 for job in self._open_jobs.values(): 1230 if job.offload_count >= self._offload_count_limit: 1231 _mark_uploaded(job.dirname) 1232 1233 1234 def _log_failed_jobs_locally(self, failed_jobs, 1235 log_file=FAILED_OFFLOADS_FILE): 1236 """Updates a local file listing all the failed jobs. 1237 1238 The dropped file can be used by the developers to list jobs that we have 1239 failed to upload. 1240 1241 @param failed_jobs: A list of failed _JobDirectory objects. 1242 @param log_file: The file to log the failed jobs to. 1243 """ 1244 now = datetime.datetime.now() 1245 now_str = now.strftime(FAILED_OFFLOADS_TIME_FORMAT) 1246 formatted_jobs = [_format_job_for_failure_reporting(job) 1247 for job in failed_jobs] 1248 formatted_jobs.sort() 1249 1250 with open(log_file, 'w') as logfile: 1251 logfile.write(FAILED_OFFLOADS_FILE_HEADER % 1252 (now_str, len(failed_jobs))) 1253 logfile.writelines(formatted_jobs) 1254 1255 1256 def _report_current_jobs_count(self): 1257 """Report the number of outstanding jobs to monarch.""" 1258 metrics.Gauge('chromeos/autotest/gs_offloader/current_jobs_count').set( 1259 len(self._open_jobs)) 1260 1261 1262 def _report_failed_jobs_count(self, failed_jobs): 1263 """Report the number of outstanding failed offload jobs to monarch. 1264 1265 @param: List of failed jobs. 1266 """ 1267 metrics.Gauge('chromeos/autotest/gs_offloader/failed_jobs_count').set( 1268 len(failed_jobs)) 1269 1270 1271def _enqueue_offload(job, queue, age_limit): 1272 """Enqueue the job for offload, if it's eligible. 1273 1274 The job is eligible for offloading if the database has marked 1275 it finished, and the job is older than the `age_limit` 1276 parameter. 1277 1278 If the job is eligible, offload processing is requested by 1279 passing the `queue` parameter's `put()` method a sequence with 1280 the job's `dirname` attribute and its directory name. 1281 1282 @param job _JobDirectory instance to offload. 1283 @param queue If the job should be offloaded, put the offload 1284 parameters into this queue for processing. 1285 @param age_limit Minimum age for a job to be offloaded. A value 1286 of 0 means that the job will be offloaded as 1287 soon as it is finished. 1288 1289 """ 1290 if not job.offload_count: 1291 if not _is_expired(job, age_limit): 1292 return 1293 job.first_offload_start = time.time() 1294 job.offload_count += 1 1295 if job.process_gs_instructions(): 1296 timestamp = _cached_get_timestamp_if_finished(job) 1297 queue.put([job.dirname, os.path.dirname(job.dirname), timestamp]) 1298 1299 1300def parse_options(): 1301 """Parse the args passed into gs_offloader.""" 1302 defaults = 'Defaults:\n Destination: %s\n Results Path: %s' % ( 1303 utils.DEFAULT_OFFLOAD_GSURI, RESULTS_DIR) 1304 usage = 'usage: %prog [options]\n' + defaults 1305 parser = OptionParser(usage) 1306 parser.add_option('-a', '--all', dest='process_all', 1307 action='store_true', 1308 help='Offload all files in the results directory.') 1309 parser.add_option('-s', '--hosts', dest='process_hosts_only', 1310 action='store_true', 1311 help='Offload only the special tasks result files ' 1312 'located in the results/hosts subdirectory') 1313 parser.add_option('-p', '--parallelism', dest='parallelism', 1314 type='int', default=1, 1315 help='Number of parallel workers to use.') 1316 parser.add_option('-o', '--delete_only', dest='delete_only', 1317 action='store_true', 1318 help='GS Offloader will only the delete the ' 1319 'directories and will not offload them to google ' 1320 'storage. NOTE: If global_config variable ' 1321 'CROS.gs_offloading_enabled is False, --delete_only ' 1322 'is automatically True.', 1323 default=not GS_OFFLOADING_ENABLED) 1324 parser.add_option('-d', '--days_old', dest='days_old', 1325 help='Minimum job age in days before a result can be ' 1326 'offloaded.', type='int', default=0) 1327 parser.add_option('-l', '--log_size', dest='log_size', 1328 help='Limit the offloader logs to a specified ' 1329 'number of Mega Bytes.', type='int', default=0) 1330 parser.add_option('-m', dest='multiprocessing', action='store_true', 1331 help='Turn on -m option for gsutil. If not set, the ' 1332 'global config setting gs_offloader_multiprocessing ' 1333 'under CROS section is applied.') 1334 parser.add_option('-i', '--offload_once', dest='offload_once', 1335 action='store_true', 1336 help='Upload all available results and then exit.') 1337 parser.add_option('-y', '--normal_priority', dest='normal_priority', 1338 action='store_true', 1339 help='Upload using normal process priority.') 1340 parser.add_option('-u', '--age_to_upload', dest='age_to_upload', 1341 help='Minimum job age in days before a result can be ' 1342 'offloaded, but not removed from local storage', 1343 type='int', default=None) 1344 parser.add_option('-n', '--age_to_delete', dest='age_to_delete', 1345 help='Minimum job age in days before a result can be ' 1346 'removed from local storage', 1347 type='int', default=None) 1348 parser.add_option( 1349 '--metrics-file', 1350 help='If provided, drop metrics to this local file instead of ' 1351 'reporting to ts_mon', 1352 type=str, 1353 default=None, 1354 ) 1355 parser.add_option('-t', '--enable_timestamp_cache', 1356 dest='enable_timestamp_cache', 1357 action='store_true', 1358 help='Cache the finished timestamps from AFE.') 1359 1360 options = parser.parse_args()[0] 1361 if options.process_all and options.process_hosts_only: 1362 parser.print_help() 1363 print ('Cannot process all files and only the hosts ' 1364 'subdirectory. Please remove an argument.') 1365 sys.exit(1) 1366 1367 if options.days_old and (options.age_to_upload or options.age_to_delete): 1368 parser.print_help() 1369 print('Use the days_old option or the age_to_* options but not both') 1370 sys.exit(1) 1371 1372 if options.age_to_upload == None: 1373 options.age_to_upload = options.days_old 1374 if options.age_to_delete == None: 1375 options.age_to_delete = options.days_old 1376 1377 return options 1378 1379 1380def main(): 1381 """Main method of gs_offloader.""" 1382 options = parse_options() 1383 1384 if options.process_all: 1385 offloader_type = 'all' 1386 elif options.process_hosts_only: 1387 offloader_type = 'hosts' 1388 else: 1389 offloader_type = 'jobs' 1390 1391 _setup_logging(options, offloader_type) 1392 1393 if options.enable_timestamp_cache: 1394 # Extend the cache expiry time by another 1% so the timstamps 1395 # are available as the results are purged. 1396 job_timestamp_cache.setup(options.age_to_delete * 1.01) 1397 1398 # Nice our process (carried to subprocesses) so we don't overload 1399 # the system. 1400 if not options.normal_priority: 1401 logging.debug('Set process to nice value: %d', NICENESS) 1402 os.nice(NICENESS) 1403 if psutil: 1404 proc = psutil.Process() 1405 logging.debug('Set process to ionice IDLE') 1406 proc.ionice(psutil.IOPRIO_CLASS_IDLE) 1407 1408 # os.listdir returns relative paths, so change to where we need to 1409 # be to avoid an os.path.join on each loop. 1410 logging.debug('Offloading Autotest results in %s', RESULTS_DIR) 1411 os.chdir(RESULTS_DIR) 1412 1413 service_name = 'gs_offloader(%s)' % offloader_type 1414 with ts_mon_config.SetupTsMonGlobalState(service_name, indirect=True, 1415 short_lived=False, 1416 debug_file=options.metrics_file): 1417 with metrics.SuccessCounter('chromeos/autotest/gs_offloader/exit'): 1418 offloader = Offloader(options) 1419 if not options.delete_only: 1420 wait_for_gs_write_access(offloader.gs_uri) 1421 while True: 1422 offloader.offload_once() 1423 if options.offload_once: 1424 break 1425 time.sleep(SLEEP_TIME_SECS) 1426 1427 1428_LOG_LOCATION = '/usr/local/autotest/logs/' 1429_LOG_FILENAME_FORMAT = 'gs_offloader_%s_log_%s.txt' 1430_LOG_TIMESTAMP_FORMAT = '%Y%m%d_%H%M%S' 1431_LOGGING_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' 1432 1433 1434def _setup_logging(options, offloader_type): 1435 """Set up logging. 1436 1437 @param options: Parsed options. 1438 @param offloader_type: Type of offloader action as string. 1439 """ 1440 log_filename = _get_log_filename(options, offloader_type) 1441 log_formatter = logging.Formatter(_LOGGING_FORMAT) 1442 # Replace the default logging handler with a RotatingFileHandler. If 1443 # options.log_size is 0, the file size will not be limited. Keeps 1444 # one backup just in case. 1445 handler = logging.handlers.RotatingFileHandler( 1446 log_filename, maxBytes=1024 * options.log_size, backupCount=1) 1447 handler.setFormatter(log_formatter) 1448 logger = logging.getLogger() 1449 logger.setLevel(logging.DEBUG) 1450 logger.addHandler(handler) 1451 1452 1453def _get_log_filename(options, offloader_type): 1454 """Get log filename. 1455 1456 @param options: Parsed options. 1457 @param offloader_type: Type of offloader action as string. 1458 """ 1459 if options.log_size > 0: 1460 log_timestamp = '' 1461 else: 1462 log_timestamp = time.strftime(_LOG_TIMESTAMP_FORMAT) 1463 log_basename = _LOG_FILENAME_FORMAT % (offloader_type, log_timestamp) 1464 return os.path.join(_LOG_LOCATION, log_basename) 1465 1466 1467if __name__ == '__main__': 1468 main() 1469