1#!/usr/bin/python 2# 3# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7"""Script to archive old Autotest results to Google Storage. 8 9Uses gsutil to archive files to the configured Google Storage bucket. 10Upon successful copy, the local results directory is deleted. 11""" 12 13import abc 14import datetime 15import errno 16import glob 17import gzip 18import logging 19import logging.handlers 20import os 21import re 22import shutil 23import socket 24import stat 25import subprocess 26import sys 27import tarfile 28import tempfile 29import time 30 31from optparse import OptionParser 32 33import common 34from autotest_lib.client.common_lib import file_utils 35from autotest_lib.client.common_lib import global_config 36from autotest_lib.client.common_lib import utils 37from autotest_lib.site_utils import job_directories 38from autotest_lib.site_utils import cloud_console_client 39from autotest_lib.tko import models 40from autotest_lib.utils import labellib 41from autotest_lib.utils import gslib 42from chromite.lib import timeout_util 43 44# Autotest requires the psutil module from site-packages, so it must be imported 45# after "import common". 46try: 47 # Does not exist, nor is needed, on moblab. 48 import psutil 49except ImportError: 50 psutil = None 51 52from chromite.lib import parallel 53try: 54 from chromite.lib import metrics 55 from chromite.lib import ts_mon_config 56except ImportError: 57 from autotest_lib import site_utils 58 metrics = site_utils.metrics_mock 59 ts_mon_config = site_utils.metrics_mock 60 61 62GS_OFFLOADING_ENABLED = global_config.global_config.get_config_value( 63 'CROS', 'gs_offloading_enabled', type=bool, default=True) 64 65# Nice setting for process, the higher the number the lower the priority. 66NICENESS = 10 67 68# Maximum number of seconds to allow for offloading a single 69# directory. 70OFFLOAD_TIMEOUT_SECS = 60 * 60 71 72# Sleep time per loop. 73SLEEP_TIME_SECS = 5 74 75# Minimum number of seconds between e-mail reports. 76REPORT_INTERVAL_SECS = 60 * 60 77 78# Location of Autotest results on disk. 79RESULTS_DIR = '/usr/local/autotest/results' 80FAILED_OFFLOADS_FILE = os.path.join(RESULTS_DIR, 'FAILED_OFFLOADS') 81 82# Hosts sub-directory that contains cleanup, verify and repair jobs. 83HOSTS_SUB_DIR = 'hosts' 84 85FAILED_OFFLOADS_FILE_HEADER = ''' 86This is the list of gs_offloader failed jobs. 87Last offloader attempt at %s failed to offload %d files. 88Check http://go/cros-triage-gsoffloader to triage the issue 89 90 91First failure Count Directory name 92=================== ====== ============================== 93''' 94# --+----1----+---- ----+ ----+----1----+----2----+----3 95 96FAILED_OFFLOADS_LINE_FORMAT = '%19s %5d %-1s\n' 97FAILED_OFFLOADS_TIME_FORMAT = '%Y-%m-%d %H:%M:%S' 98 99USE_RSYNC_ENABLED = global_config.global_config.get_config_value( 100 'CROS', 'gs_offloader_use_rsync', type=bool, default=False) 101 102LIMIT_FILE_COUNT = global_config.global_config.get_config_value( 103 'CROS', 'gs_offloader_limit_file_count', type=bool, default=False) 104 105# Use multiprocessing for gsutil uploading. 106GS_OFFLOADER_MULTIPROCESSING = global_config.global_config.get_config_value( 107 'CROS', 'gs_offloader_multiprocessing', type=bool, default=False) 108 109D = '[0-9][0-9]' 110TIMESTAMP_PATTERN = '%s%s.%s.%s_%s.%s.%s' % (D, D, D, D, D, D, D) 111CTS_RESULT_PATTERN = 'testResult.xml' 112CTS_V2_RESULT_PATTERN = 'test_result.xml' 113# Google Storage bucket URI to store results in. 114DEFAULT_CTS_RESULTS_GSURI = global_config.global_config.get_config_value( 115 'CROS', 'cts_results_server', default='') 116DEFAULT_CTS_APFE_GSURI = global_config.global_config.get_config_value( 117 'CROS', 'cts_apfe_server', default='') 118 119# metadata type 120GS_OFFLOADER_SUCCESS_TYPE = 'gs_offloader_success' 121GS_OFFLOADER_FAILURE_TYPE = 'gs_offloader_failure' 122 123 124def _get_metrics_fields(dir_entry): 125 """Get metrics fields for the given test result directory, including board 126 and milestone. 127 128 @param dir_entry: Directory entry to offload. 129 @return A dictionary for the metrics data to be uploaded. 130 """ 131 fields = {'board': 'unknown', 132 'milestone': 'unknown'} 133 if dir_entry: 134 # There could be multiple hosts in the job directory, use the first one 135 # available. 136 for host in glob.glob(os.path.join(dir_entry, '*')): 137 try: 138 keyval = models.test.parse_job_keyval(host) 139 except ValueError: 140 continue 141 build = keyval.get('build') 142 if build: 143 try: 144 cros_version = labellib.parse_cros_version(build) 145 fields['board'] = cros_version.board 146 fields['milestone'] = cros_version.milestone 147 break 148 except ValueError: 149 # Ignore version parsing error so it won't crash 150 # gs_offloader. 151 pass 152 153 return fields; 154 155 156def _get_es_metadata(dir_entry): 157 """Get ES metadata for the given test result directory. 158 159 @param dir_entry: Directory entry to offload. 160 @return A dictionary for the metadata to be uploaded. 161 """ 162 fields = _get_metrics_fields(dir_entry) 163 fields['hostname'] = socket.gethostname() 164 # Include more data about the test job in metadata. 165 if dir_entry: 166 fields['dir_entry'] = dir_entry 167 fields['job_id'] = job_directories.get_job_id_or_task_id(dir_entry) 168 169 return fields 170 171 172def _get_cmd_list(multiprocessing, dir_entry, gs_path): 173 """Return the command to offload a specified directory. 174 175 @param multiprocessing: True to turn on -m option for gsutil. 176 @param dir_entry: Directory entry/path that which we need a cmd_list 177 to offload. 178 @param gs_path: Location in google storage where we will 179 offload the directory. 180 181 @return A command list to be executed by Popen. 182 """ 183 cmd = ['gsutil'] 184 if multiprocessing: 185 cmd.append('-m') 186 if USE_RSYNC_ENABLED: 187 cmd.append('rsync') 188 target = os.path.join(gs_path, os.path.basename(dir_entry)) 189 else: 190 cmd.append('cp') 191 target = gs_path 192 cmd += ['-eR', dir_entry, target] 193 return cmd 194 195 196def sanitize_dir(dirpath): 197 """Sanitize directory for gs upload. 198 199 Symlinks and FIFOS are converted to regular files to fix bugs. 200 201 @param dirpath: Directory entry to be sanitized. 202 """ 203 if not os.path.exists(dirpath): 204 return 205 _escape_rename(dirpath) 206 _escape_rename_dir_contents(dirpath) 207 _sanitize_fifos(dirpath) 208 _sanitize_symlinks(dirpath) 209 210 211def _escape_rename_dir_contents(dirpath): 212 """Recursively rename directory to escape filenames for gs upload. 213 214 @param dirpath: Directory path string. 215 """ 216 for filename in os.listdir(dirpath): 217 path = os.path.join(dirpath, filename) 218 _escape_rename(path) 219 for filename in os.listdir(dirpath): 220 path = os.path.join(dirpath, filename) 221 if os.path.isdir(path): 222 _escape_rename_dir_contents(path) 223 224 225def _escape_rename(path): 226 """Rename file to escape filenames for gs upload. 227 228 @param path: File path string. 229 """ 230 dirpath, filename = os.path.split(path) 231 sanitized_filename = gslib.escape(filename) 232 sanitized_path = os.path.join(dirpath, sanitized_filename) 233 os.rename(path, sanitized_path) 234 235 236def _sanitize_fifos(dirpath): 237 """Convert fifos to regular files (fixes crbug.com/684122). 238 239 @param dirpath: Directory path string. 240 """ 241 for root, _, files in os.walk(dirpath): 242 for filename in files: 243 path = os.path.join(root, filename) 244 file_stat = os.lstat(path) 245 if stat.S_ISFIFO(file_stat.st_mode): 246 _replace_fifo_with_file(path) 247 248 249def _replace_fifo_with_file(path): 250 """Replace a fifo with a normal file. 251 252 @param path: Fifo path string. 253 """ 254 logging.debug('Removing fifo %s', path) 255 os.remove(path) 256 logging.debug('Creating marker %s', path) 257 with open(path, 'w') as f: 258 f.write('<FIFO>') 259 260 261def _sanitize_symlinks(dirpath): 262 """Convert Symlinks to regular files (fixes crbug.com/692788). 263 264 @param dirpath: Directory path string. 265 """ 266 for root, _, files in os.walk(dirpath): 267 for filename in files: 268 path = os.path.join(root, filename) 269 file_stat = os.lstat(path) 270 if stat.S_ISLNK(file_stat.st_mode): 271 _replace_symlink_with_file(path) 272 273 274def _replace_symlink_with_file(path): 275 """Replace a symlink with a normal file. 276 277 @param path: Symlink path string. 278 """ 279 target = os.readlink(path) 280 logging.debug('Removing symlink %s', path) 281 os.remove(path) 282 logging.debug('Creating marker %s', path) 283 with open(path, 'w') as f: 284 f.write('<symlink to %s>' % target) 285 286 287# Maximum number of files in the folder. 288_MAX_FILE_COUNT = 500 289_FOLDERS_NEVER_ZIP = ['debug', 'ssp_logs', 'autoupdate_logs'] 290 291 292def _get_zippable_folders(dir_entry): 293 folders_list = [] 294 for folder in os.listdir(dir_entry): 295 folder_path = os.path.join(dir_entry, folder) 296 if (not os.path.isfile(folder_path) and 297 not folder in _FOLDERS_NEVER_ZIP): 298 folders_list.append(folder_path) 299 return folders_list 300 301 302def limit_file_count(dir_entry): 303 """Limit the number of files in given directory. 304 305 The method checks the total number of files in the given directory. 306 If the number is greater than _MAX_FILE_COUNT, the method will 307 compress each folder in the given directory, except folders in 308 _FOLDERS_NEVER_ZIP. 309 310 @param dir_entry: Directory entry to be checked. 311 """ 312 try: 313 count = _count_files(dir_entry) 314 except ValueError: 315 logging.warning('Fail to get the file count in folder %s.', dir_entry) 316 return 317 if count < _MAX_FILE_COUNT: 318 return 319 320 # For test job, zip folders in a second level, e.g. 123-debug/host1. 321 # This is to allow autoserv debug folder still be accessible. 322 # For special task, it does not need to dig one level deeper. 323 is_special_task = re.match(job_directories.SPECIAL_TASK_PATTERN, 324 dir_entry) 325 326 folders = _get_zippable_folders(dir_entry) 327 if not is_special_task: 328 subfolders = [] 329 for folder in folders: 330 subfolders.extend(_get_zippable_folders(folder)) 331 folders = subfolders 332 333 for folder in folders: 334 _make_into_tarball(folder) 335 336 337def _count_files(dirpath): 338 """Count the number of files in a directory recursively. 339 340 @param dirpath: Directory path string. 341 """ 342 return sum(len(files) for _path, _dirs, files in os.walk(dirpath)) 343 344 345def _make_into_tarball(dirpath): 346 """Make directory into tarball. 347 348 @param dirpath: Directory path string. 349 """ 350 tarpath = '%s.tgz' % dirpath 351 with tarfile.open(tarpath, 'w:gz') as tar: 352 tar.add(dirpath, arcname=os.path.basename(dirpath)) 353 shutil.rmtree(dirpath) 354 355 356def correct_results_folder_permission(dir_entry): 357 """Make sure the results folder has the right permission settings. 358 359 For tests running with server-side packaging, the results folder has 360 the owner of root. This must be changed to the user running the 361 autoserv process, so parsing job can access the results folder. 362 363 @param dir_entry: Path to the results folder. 364 """ 365 if not dir_entry: 366 return 367 368 logging.info('Trying to correct file permission of %s.', dir_entry) 369 try: 370 subprocess.check_call( 371 ['sudo', '-n', 'chown', '-R', str(os.getuid()), dir_entry]) 372 subprocess.check_call( 373 ['sudo', '-n', 'chgrp', '-R', str(os.getgid()), dir_entry]) 374 except subprocess.CalledProcessError as e: 375 logging.error('Failed to modify permission for %s: %s', 376 dir_entry, e) 377 378 379def _upload_cts_testresult(dir_entry, multiprocessing): 380 """Upload test results to separate gs buckets. 381 382 Upload testResult.xml.gz/test_result.xml.gz file to cts_results_bucket. 383 Upload timestamp.zip to cts_apfe_bucket. 384 385 @param dir_entry: Path to the results folder. 386 @param multiprocessing: True to turn on -m option for gsutil. 387 """ 388 for host in glob.glob(os.path.join(dir_entry, '*')): 389 cts_path = os.path.join(host, 'cheets_CTS.*', 'results', '*', 390 TIMESTAMP_PATTERN) 391 cts_v2_path = os.path.join(host, 'cheets_CTS_*', 'results', '*', 392 TIMESTAMP_PATTERN) 393 gts_v2_path = os.path.join(host, 'cheets_GTS.*', 'results', '*', 394 TIMESTAMP_PATTERN) 395 for result_path, result_pattern in [(cts_path, CTS_RESULT_PATTERN), 396 (cts_v2_path, CTS_V2_RESULT_PATTERN), 397 (gts_v2_path, CTS_V2_RESULT_PATTERN)]: 398 for path in glob.glob(result_path): 399 try: 400 _upload_files(host, path, result_pattern, multiprocessing) 401 except Exception as e: 402 logging.error('ERROR uploading test results %s to GS: %s', 403 path, e) 404 405 406def _is_valid_result(build, result_pattern, suite): 407 """Check if the result should be uploaded to CTS/GTS buckets. 408 409 @param build: Builder name. 410 @param result_pattern: XML result file pattern. 411 @param suite: Test suite name. 412 413 @returns: Bool flag indicating whether a valid result. 414 """ 415 if build is None or suite is None: 416 return False 417 418 # Not valid if it's not a release build. 419 if not re.match(r'(?!trybot-).*-release/.*', build): 420 return False 421 422 # Not valid if it's cts result but not 'arc-cts*' or 'test_that_wrapper' 423 # suite. 424 result_patterns = [CTS_RESULT_PATTERN, CTS_V2_RESULT_PATTERN] 425 if result_pattern in result_patterns and not ( 426 suite.startswith('arc-cts') or suite.startswith('arc-gts') or 427 suite.startswith('test_that_wrapper')): 428 return False 429 430 return True 431 432 433def _upload_files(host, path, result_pattern, multiprocessing): 434 keyval = models.test.parse_job_keyval(host) 435 build = keyval.get('build') 436 suite = keyval.get('suite') 437 438 if not _is_valid_result(build, result_pattern, suite): 439 # No need to upload current folder, return. 440 return 441 442 parent_job_id = str(keyval['parent_job_id']) 443 444 folders = path.split(os.sep) 445 job_id = folders[-6] 446 package = folders[-4] 447 timestamp = folders[-1] 448 449 # Path: bucket/build/parent_job_id/cheets_CTS.*/job_id_timestamp/ 450 # or bucket/build/parent_job_id/cheets_GTS.*/job_id_timestamp/ 451 cts_apfe_gs_path = os.path.join( 452 DEFAULT_CTS_APFE_GSURI, build, parent_job_id, 453 package, job_id + '_' + timestamp) + '/' 454 455 # Path: bucket/cheets_CTS.*/job_id_timestamp/ 456 # or bucket/cheets_GTS.*/job_id_timestamp/ 457 test_result_gs_path = os.path.join( 458 DEFAULT_CTS_RESULTS_GSURI, package, 459 job_id + '_' + timestamp) + '/' 460 461 for zip_file in glob.glob(os.path.join('%s.zip' % path)): 462 utils.run(' '.join(_get_cmd_list( 463 multiprocessing, zip_file, cts_apfe_gs_path))) 464 logging.debug('Upload %s to %s ', zip_file, cts_apfe_gs_path) 465 466 for test_result_file in glob.glob(os.path.join(path, result_pattern)): 467 # gzip test_result_file(testResult.xml/test_result.xml) 468 469 test_result_file_gz = '%s.gz' % test_result_file 470 with open(test_result_file, 'r') as f_in, ( 471 gzip.open(test_result_file_gz, 'w')) as f_out: 472 shutil.copyfileobj(f_in, f_out) 473 utils.run(' '.join(_get_cmd_list( 474 multiprocessing, test_result_file_gz, test_result_gs_path))) 475 logging.debug('Zip and upload %s to %s', 476 test_result_file_gz, test_result_gs_path) 477 # Remove test_result_file_gz(testResult.xml.gz/test_result.xml.gz) 478 os.remove(test_result_file_gz) 479 480 481def _emit_gs_returncode_metric(returncode): 482 """Increment the gs_returncode counter based on |returncode|.""" 483 m_gs_returncode = 'chromeos/autotest/gs_offloader/gs_returncode' 484 rcode = int(returncode) 485 if rcode < 0 or rcode > 255: 486 rcode = -1 487 metrics.Counter(m_gs_returncode).increment(fields={'return_code': rcode}) 488 489 490class BaseGSOffloader(object): 491 492 """Google Storage offloader interface.""" 493 494 __metaclass__ = abc.ABCMeta 495 496 @abc.abstractmethod 497 def offload(self, dir_entry, dest_path, job_complete_time): 498 """Offload a directory entry to Google Storage. 499 500 @param dir_entry: Directory entry to offload. 501 @param dest_path: Location in google storage where we will 502 offload the directory. 503 @param job_complete_time: The complete time of the job from the AFE 504 database. 505 """ 506 507 508class GSOffloader(BaseGSOffloader): 509 """Google Storage Offloader.""" 510 511 def __init__(self, gs_uri, multiprocessing, delete_age, 512 console_client=None): 513 """Returns the offload directory function for the given gs_uri 514 515 @param gs_uri: Google storage bucket uri to offload to. 516 @param multiprocessing: True to turn on -m option for gsutil. 517 @param console_client: The cloud console client. If None, 518 cloud console APIs are not called. 519 """ 520 self._gs_uri = gs_uri 521 self._multiprocessing = multiprocessing 522 self._delete_age = delete_age 523 self._console_client = console_client 524 525 @metrics.SecondsTimerDecorator( 526 'chromeos/autotest/gs_offloader/job_offload_duration') 527 def offload(self, dir_entry, dest_path, job_complete_time): 528 """Offload the specified directory entry to Google storage. 529 530 @param dir_entry: Directory entry to offload. 531 @param dest_path: Location in google storage where we will 532 offload the directory. 533 @param job_complete_time: The complete time of the job from the AFE 534 database. 535 """ 536 with tempfile.TemporaryFile('w+') as stdout_file, \ 537 tempfile.TemporaryFile('w+') as stderr_file: 538 try: 539 self._offload(dir_entry, dest_path, stdout_file, stderr_file) 540 except _OffloadError as e: 541 metrics_fields = _get_metrics_fields(dir_entry) 542 m_any_error = 'chromeos/autotest/errors/gs_offloader/any_error' 543 metrics.Counter(m_any_error).increment(fields=metrics_fields) 544 545 # Rewind the log files for stdout and stderr and log 546 # their contents. 547 stdout_file.seek(0) 548 stderr_file.seek(0) 549 stderr_content = stderr_file.read() 550 logging.warning('Error occurred when offloading %s:', dir_entry) 551 logging.warning('Stdout:\n%s \nStderr:\n%s', stdout_file.read(), 552 stderr_content) 553 554 # Some result files may have wrong file permission. Try 555 # to correct such error so later try can success. 556 # TODO(dshi): The code is added to correct result files 557 # with wrong file permission caused by bug 511778. After 558 # this code is pushed to lab and run for a while to 559 # clean up these files, following code and function 560 # correct_results_folder_permission can be deleted. 561 if 'CommandException: Error opening file' in stderr_content: 562 correct_results_folder_permission(dir_entry) 563 else: 564 self._prune(dir_entry, job_complete_time) 565 566 def _offload(self, dir_entry, dest_path, 567 stdout_file, stderr_file): 568 """Offload the specified directory entry to Google storage. 569 570 @param dir_entry: Directory entry to offload. 571 @param dest_path: Location in google storage where we will 572 offload the directory. 573 @param job_complete_time: The complete time of the job from the AFE 574 database. 575 @param stdout_file: Log file. 576 @param stderr_file: Log file. 577 """ 578 if _is_uploaded(dir_entry): 579 return 580 start_time = time.time() 581 metrics_fields = _get_metrics_fields(dir_entry) 582 es_metadata = _get_es_metadata(dir_entry) 583 error_obj = _OffloadError(start_time, es_metadata) 584 try: 585 sanitize_dir(dir_entry) 586 if DEFAULT_CTS_RESULTS_GSURI: 587 _upload_cts_testresult(dir_entry, self._multiprocessing) 588 589 if LIMIT_FILE_COUNT: 590 limit_file_count(dir_entry) 591 es_metadata['size_kb'] = file_utils.get_directory_size_kibibytes(dir_entry) 592 593 process = None 594 with timeout_util.Timeout(OFFLOAD_TIMEOUT_SECS): 595 gs_path = '%s%s' % (self._gs_uri, dest_path) 596 process = subprocess.Popen( 597 _get_cmd_list(self._multiprocessing, dir_entry, gs_path), 598 stdout=stdout_file, stderr=stderr_file) 599 process.wait() 600 601 _emit_gs_returncode_metric(process.returncode) 602 if process.returncode != 0: 603 raise error_obj 604 _emit_offload_metrics(dir_entry) 605 606 if self._console_client: 607 gcs_uri = os.path.join(gs_path, 608 os.path.basename(dir_entry)) 609 if not self._console_client.send_test_job_offloaded_message( 610 gcs_uri): 611 raise error_obj 612 613 _mark_uploaded(dir_entry) 614 except timeout_util.TimeoutError: 615 m_timeout = 'chromeos/autotest/errors/gs_offloader/timed_out_count' 616 metrics.Counter(m_timeout).increment(fields=metrics_fields) 617 # If we finished the call to Popen(), we may need to 618 # terminate the child process. We don't bother calling 619 # process.poll(); that inherently races because the child 620 # can die any time it wants. 621 if process: 622 try: 623 process.terminate() 624 except OSError: 625 # We don't expect any error other than "No such 626 # process". 627 pass 628 logging.error('Offloading %s timed out after waiting %d ' 629 'seconds.', dir_entry, OFFLOAD_TIMEOUT_SECS) 630 raise error_obj 631 632 def _prune(self, dir_entry, job_complete_time): 633 """Prune directory if it is uploaded and expired. 634 635 @param dir_entry: Directory entry to offload. 636 @param job_complete_time: The complete time of the job from the AFE 637 database. 638 """ 639 if not (_is_uploaded(dir_entry) 640 and job_directories.is_job_expired(self._delete_age, 641 job_complete_time)): 642 return 643 try: 644 shutil.rmtree(dir_entry) 645 except OSError as e: 646 # The wrong file permission can lead call 647 # `shutil.rmtree(dir_entry)` to raise OSError with message 648 # 'Permission denied'. Details can be found in 649 # crbug.com/536151 650 if e.errno == errno.EACCES: 651 correct_results_folder_permission(dir_entry) 652 m_permission_error = ('chromeos/autotest/errors/gs_offloader/' 653 'wrong_permissions_count') 654 metrics_fields = _get_metrics_fields(dir_entry) 655 metrics.Counter(m_permission_error).increment(fields=metrics_fields) 656 657 658class _OffloadError(Exception): 659 """Google Storage offload failed.""" 660 661 def __init__(self, start_time, es_metadata): 662 super(_OffloadError, self).__init__(start_time, es_metadata) 663 self.start_time = start_time 664 self.es_metadata = es_metadata 665 666 667 668class FakeGSOffloader(BaseGSOffloader): 669 670 """Fake Google Storage Offloader that only deletes directories.""" 671 672 def offload(self, dir_entry, dest_path, job_complete_time): 673 """Pretend to offload a directory and delete it. 674 675 @param dir_entry: Directory entry to offload. 676 @param dest_path: Location in google storage where we will 677 offload the directory. 678 @param job_complete_time: The complete time of the job from the AFE 679 database. 680 """ 681 shutil.rmtree(dir_entry) 682 683 684def _is_expired(job, age_limit): 685 """Return whether job directory is expired for uploading 686 687 @param job: _JobDirectory instance. 688 @param age_limit: Minimum age in days at which a job may be offloaded. 689 """ 690 job_timestamp = job.get_timestamp_if_finished() 691 if not job_timestamp: 692 return False 693 return job_directories.is_job_expired(age_limit, job_timestamp) 694 695 696def _emit_offload_metrics(dirpath): 697 """Emit gs offload metrics. 698 699 @param dirpath: Offloaded directory path. 700 """ 701 dir_size = file_utils.get_directory_size_kibibytes(dirpath) 702 metrics_fields = _get_metrics_fields(dirpath) 703 704 m_offload_count = ( 705 'chromeos/autotest/gs_offloader/jobs_offloaded') 706 metrics.Counter(m_offload_count).increment( 707 fields=metrics_fields) 708 m_offload_size = ('chromeos/autotest/gs_offloader/' 709 'kilobytes_transferred') 710 metrics.Counter(m_offload_size).increment_by( 711 dir_size, fields=metrics_fields) 712 713 714def _is_uploaded(dirpath): 715 """Return whether directory has been uploaded. 716 717 @param dirpath: Directory path string. 718 """ 719 return os.path.isfile(_get_uploaded_marker_file(dirpath)) 720 721 722def _mark_uploaded(dirpath): 723 """Mark directory as uploaded. 724 725 @param dirpath: Directory path string. 726 """ 727 with open(_get_uploaded_marker_file(dirpath), 'a'): 728 pass 729 730 731def _get_uploaded_marker_file(dirpath): 732 """Return path to upload marker file for directory. 733 734 @param dirpath: Directory path string. 735 """ 736 return '%s/.GS_UPLOADED' % (dirpath,) 737 738 739def _format_job_for_failure_reporting(job): 740 """Formats a _JobDirectory for reporting / logging. 741 742 @param job: The _JobDirectory to format. 743 """ 744 d = datetime.datetime.fromtimestamp(job.first_offload_start) 745 data = (d.strftime(FAILED_OFFLOADS_TIME_FORMAT), 746 job.offload_count, 747 job.dirname) 748 return FAILED_OFFLOADS_LINE_FORMAT % data 749 750 751def wait_for_gs_write_access(gs_uri): 752 """Verify and wait until we have write access to Google Storage. 753 754 @param gs_uri: The Google Storage URI we are trying to offload to. 755 """ 756 # TODO (sbasi) Try to use the gsutil command to check write access. 757 # Ensure we have write access to gs_uri. 758 dummy_file = tempfile.NamedTemporaryFile() 759 test_cmd = _get_cmd_list(False, dummy_file.name, gs_uri) 760 while True: 761 try: 762 subprocess.check_call(test_cmd) 763 subprocess.check_call( 764 ['gsutil', 'rm', 765 os.path.join(gs_uri, 766 os.path.basename(dummy_file.name))]) 767 break 768 except subprocess.CalledProcessError: 769 logging.debug('Unable to offload to %s, sleeping.', gs_uri) 770 time.sleep(120) 771 772 773class Offloader(object): 774 """State of the offload process. 775 776 Contains the following member fields: 777 * _gs_offloader: _BaseGSOffloader to use to offload a job directory. 778 * _jobdir_classes: List of classes of job directory to be 779 offloaded. 780 * _processes: Maximum number of outstanding offload processes 781 to allow during an offload cycle. 782 * _age_limit: Minimum age in days at which a job may be 783 offloaded. 784 * _open_jobs: a dictionary mapping directory paths to Job 785 objects. 786 """ 787 788 def __init__(self, options): 789 self._upload_age_limit = options.age_to_upload 790 self._delete_age_limit = options.age_to_delete 791 if options.delete_only: 792 self._gs_offloader = FakeGSOffloader() 793 else: 794 self.gs_uri = utils.get_offload_gsuri() 795 logging.debug('Offloading to: %s', self.gs_uri) 796 multiprocessing = False 797 if options.multiprocessing: 798 multiprocessing = True 799 elif options.multiprocessing is None: 800 multiprocessing = GS_OFFLOADER_MULTIPROCESSING 801 logging.info( 802 'Offloader multiprocessing is set to:%r', multiprocessing) 803 console_client = None 804 if cloud_console_client.is_cloud_notification_enabled(): 805 console_client = cloud_console_client.PubSubBasedClient() 806 self._gs_offloader = GSOffloader( 807 self.gs_uri, multiprocessing, self._delete_age_limit, 808 console_client) 809 classlist = [] 810 if options.process_hosts_only or options.process_all: 811 classlist.append(job_directories.SpecialJobDirectory) 812 if not options.process_hosts_only: 813 classlist.append(job_directories.RegularJobDirectory) 814 self._jobdir_classes = classlist 815 assert self._jobdir_classes 816 self._processes = options.parallelism 817 self._open_jobs = {} 818 self._pusub_topic = None 819 self._offload_count_limit = 3 820 821 822 def _add_new_jobs(self): 823 """Find new job directories that need offloading. 824 825 Go through the file system looking for valid job directories 826 that are currently not in `self._open_jobs`, and add them in. 827 828 """ 829 new_job_count = 0 830 for cls in self._jobdir_classes: 831 for resultsdir in cls.get_job_directories(): 832 if ( 833 resultsdir in self._open_jobs 834 or _is_uploaded(resultsdir)): 835 continue 836 self._open_jobs[resultsdir] = cls(resultsdir) 837 new_job_count += 1 838 logging.debug('Start of offload cycle - found %d new jobs', 839 new_job_count) 840 841 842 def _remove_offloaded_jobs(self): 843 """Removed offloaded jobs from `self._open_jobs`.""" 844 removed_job_count = 0 845 for jobkey, job in self._open_jobs.items(): 846 if ( 847 not os.path.exists(job.dirname) 848 or _is_uploaded(job.dirname)): 849 del self._open_jobs[jobkey] 850 removed_job_count += 1 851 logging.debug('End of offload cycle - cleared %d new jobs, ' 852 'carrying %d open jobs', 853 removed_job_count, len(self._open_jobs)) 854 855 856 def _report_failed_jobs(self): 857 """Report status after attempting offload. 858 859 This function processes all jobs in `self._open_jobs`, assuming 860 an attempt has just been made to offload all of them. 861 862 If any jobs have reportable errors, and we haven't generated 863 an e-mail report in the last `REPORT_INTERVAL_SECS` seconds, 864 send new e-mail describing the failures. 865 866 """ 867 failed_jobs = [j for j in self._open_jobs.values() if 868 j.first_offload_start] 869 self._report_failed_jobs_count(failed_jobs) 870 self._log_failed_jobs_locally(failed_jobs) 871 872 873 def offload_once(self): 874 """Perform one offload cycle. 875 876 Find all job directories for new jobs that we haven't seen 877 before. Then, attempt to offload the directories for any 878 jobs that have finished running. Offload of multiple jobs 879 is done in parallel, up to `self._processes` at a time. 880 881 After we've tried uploading all directories, go through the list 882 checking the status of all uploaded directories. If necessary, 883 report failures via e-mail. 884 885 """ 886 self._add_new_jobs() 887 self._report_current_jobs_count() 888 with parallel.BackgroundTaskRunner( 889 self._gs_offloader.offload, processes=self._processes) as queue: 890 for job in self._open_jobs.values(): 891 _enqueue_offload(job, queue, self._upload_age_limit) 892 self._give_up_on_jobs_over_limit() 893 self._remove_offloaded_jobs() 894 self._report_failed_jobs() 895 896 897 def _give_up_on_jobs_over_limit(self): 898 """Give up on jobs that have gone over the offload limit. 899 900 We mark them as uploaded as we won't try to offload them any more. 901 """ 902 for job in self._open_jobs.values(): 903 if job.offload_count >= self._offload_count_limit: 904 _mark_uploaded(job.dirname) 905 906 907 def _log_failed_jobs_locally(self, failed_jobs, 908 log_file=FAILED_OFFLOADS_FILE): 909 """Updates a local file listing all the failed jobs. 910 911 The dropped file can be used by the developers to list jobs that we have 912 failed to upload. 913 914 @param failed_jobs: A list of failed _JobDirectory objects. 915 @param log_file: The file to log the failed jobs to. 916 """ 917 now = datetime.datetime.now() 918 now_str = now.strftime(FAILED_OFFLOADS_TIME_FORMAT) 919 formatted_jobs = [_format_job_for_failure_reporting(job) 920 for job in failed_jobs] 921 formatted_jobs.sort() 922 923 with open(log_file, 'w') as logfile: 924 logfile.write(FAILED_OFFLOADS_FILE_HEADER % 925 (now_str, len(failed_jobs))) 926 logfile.writelines(formatted_jobs) 927 928 929 def _report_current_jobs_count(self): 930 """Report the number of outstanding jobs to monarch.""" 931 metrics.Gauge('chromeos/autotest/gs_offloader/current_jobs_count').set( 932 len(self._open_jobs)) 933 934 935 def _report_failed_jobs_count(self, failed_jobs): 936 """Report the number of outstanding failed offload jobs to monarch. 937 938 @param: List of failed jobs. 939 """ 940 metrics.Gauge('chromeos/autotest/gs_offloader/failed_jobs_count').set( 941 len(failed_jobs)) 942 943 944def _enqueue_offload(job, queue, age_limit): 945 """Enqueue the job for offload, if it's eligible. 946 947 The job is eligible for offloading if the database has marked 948 it finished, and the job is older than the `age_limit` 949 parameter. 950 951 If the job is eligible, offload processing is requested by 952 passing the `queue` parameter's `put()` method a sequence with 953 the job's `dirname` attribute and its directory name. 954 955 @param job _JobDirectory instance to offload. 956 @param queue If the job should be offloaded, put the offload 957 parameters into this queue for processing. 958 @param age_limit Minimum age for a job to be offloaded. A value 959 of 0 means that the job will be offloaded as 960 soon as it is finished. 961 962 """ 963 if not job.offload_count: 964 if not _is_expired(job, age_limit): 965 return 966 job.first_offload_start = time.time() 967 job.offload_count += 1 968 if job.process_gs_instructions(): 969 timestamp = job.get_timestamp_if_finished() 970 queue.put([job.dirname, os.path.dirname(job.dirname), timestamp]) 971 972 973def parse_options(): 974 """Parse the args passed into gs_offloader.""" 975 defaults = 'Defaults:\n Destination: %s\n Results Path: %s' % ( 976 utils.DEFAULT_OFFLOAD_GSURI, RESULTS_DIR) 977 usage = 'usage: %prog [options]\n' + defaults 978 parser = OptionParser(usage) 979 parser.add_option('-a', '--all', dest='process_all', 980 action='store_true', 981 help='Offload all files in the results directory.') 982 parser.add_option('-s', '--hosts', dest='process_hosts_only', 983 action='store_true', 984 help='Offload only the special tasks result files ' 985 'located in the results/hosts subdirectory') 986 parser.add_option('-p', '--parallelism', dest='parallelism', 987 type='int', default=1, 988 help='Number of parallel workers to use.') 989 parser.add_option('-o', '--delete_only', dest='delete_only', 990 action='store_true', 991 help='GS Offloader will only the delete the ' 992 'directories and will not offload them to google ' 993 'storage. NOTE: If global_config variable ' 994 'CROS.gs_offloading_enabled is False, --delete_only ' 995 'is automatically True.', 996 default=not GS_OFFLOADING_ENABLED) 997 parser.add_option('-d', '--days_old', dest='days_old', 998 help='Minimum job age in days before a result can be ' 999 'offloaded.', type='int', default=0) 1000 parser.add_option('-l', '--log_size', dest='log_size', 1001 help='Limit the offloader logs to a specified ' 1002 'number of Mega Bytes.', type='int', default=0) 1003 parser.add_option('-m', dest='multiprocessing', action='store_true', 1004 help='Turn on -m option for gsutil. If not set, the ' 1005 'global config setting gs_offloader_multiprocessing ' 1006 'under CROS section is applied.') 1007 parser.add_option('-i', '--offload_once', dest='offload_once', 1008 action='store_true', 1009 help='Upload all available results and then exit.') 1010 parser.add_option('-y', '--normal_priority', dest='normal_priority', 1011 action='store_true', 1012 help='Upload using normal process priority.') 1013 parser.add_option('-u', '--age_to_upload', dest='age_to_upload', 1014 help='Minimum job age in days before a result can be ' 1015 'offloaded, but not removed from local storage', 1016 type='int', default=None) 1017 parser.add_option('-n', '--age_to_delete', dest='age_to_delete', 1018 help='Minimum job age in days before a result can be ' 1019 'removed from local storage', 1020 type='int', default=None) 1021 1022 options = parser.parse_args()[0] 1023 if options.process_all and options.process_hosts_only: 1024 parser.print_help() 1025 print ('Cannot process all files and only the hosts ' 1026 'subdirectory. Please remove an argument.') 1027 sys.exit(1) 1028 1029 if options.days_old and (options.age_to_upload or options.age_to_delete): 1030 parser.print_help() 1031 print('Use the days_old option or the age_to_* options but not both') 1032 sys.exit(1) 1033 1034 if options.age_to_upload == None: 1035 options.age_to_upload = options.days_old 1036 if options.age_to_delete == None: 1037 options.age_to_delete = options.days_old 1038 1039 return options 1040 1041 1042def main(): 1043 """Main method of gs_offloader.""" 1044 options = parse_options() 1045 1046 if options.process_all: 1047 offloader_type = 'all' 1048 elif options.process_hosts_only: 1049 offloader_type = 'hosts' 1050 else: 1051 offloader_type = 'jobs' 1052 1053 _setup_logging(options, offloader_type) 1054 1055 # Nice our process (carried to subprocesses) so we don't overload 1056 # the system. 1057 if not options.normal_priority: 1058 logging.debug('Set process to nice value: %d', NICENESS) 1059 os.nice(NICENESS) 1060 if psutil: 1061 proc = psutil.Process() 1062 logging.debug('Set process to ionice IDLE') 1063 proc.ionice(psutil.IOPRIO_CLASS_IDLE) 1064 1065 # os.listdir returns relative paths, so change to where we need to 1066 # be to avoid an os.path.join on each loop. 1067 logging.debug('Offloading Autotest results in %s', RESULTS_DIR) 1068 os.chdir(RESULTS_DIR) 1069 1070 service_name = 'gs_offloader(%s)' % offloader_type 1071 with ts_mon_config.SetupTsMonGlobalState(service_name, indirect=True, 1072 short_lived=False): 1073 offloader = Offloader(options) 1074 if not options.delete_only: 1075 wait_for_gs_write_access(offloader.gs_uri) 1076 while True: 1077 offloader.offload_once() 1078 if options.offload_once: 1079 break 1080 time.sleep(SLEEP_TIME_SECS) 1081 1082 1083_LOG_LOCATION = '/usr/local/autotest/logs/' 1084_LOG_FILENAME_FORMAT = 'gs_offloader_%s_log_%s.txt' 1085_LOG_TIMESTAMP_FORMAT = '%Y%m%d_%H%M%S' 1086_LOGGING_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' 1087 1088 1089def _setup_logging(options, offloader_type): 1090 """Set up logging. 1091 1092 @param options: Parsed options. 1093 @param offloader_type: Type of offloader action as string. 1094 """ 1095 log_filename = _get_log_filename(options, offloader_type) 1096 log_formatter = logging.Formatter(_LOGGING_FORMAT) 1097 # Replace the default logging handler with a RotatingFileHandler. If 1098 # options.log_size is 0, the file size will not be limited. Keeps 1099 # one backup just in case. 1100 handler = logging.handlers.RotatingFileHandler( 1101 log_filename, maxBytes=1024 * options.log_size, backupCount=1) 1102 handler.setFormatter(log_formatter) 1103 logger = logging.getLogger() 1104 logger.setLevel(logging.DEBUG) 1105 logger.addHandler(handler) 1106 1107 1108def _get_log_filename(options, offloader_type): 1109 """Get log filename. 1110 1111 @param options: Parsed options. 1112 @param offloader_type: Type of offloader action as string. 1113 """ 1114 if options.log_size > 0: 1115 log_timestamp = '' 1116 else: 1117 log_timestamp = time.strftime(_LOG_TIMESTAMP_FORMAT) 1118 log_basename = _LOG_FILENAME_FORMAT % (offloader_type, log_timestamp) 1119 return os.path.join(_LOG_LOCATION, log_basename) 1120 1121 1122if __name__ == '__main__': 1123 main() 1124