• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python
2#
3# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""Script to archive old Autotest results to Google Storage.
8
9Uses gsutil to archive files to the configured Google Storage bucket.
10Upon successful copy, the local results directory is deleted.
11"""
12
13import abc
14try:
15  import cachetools
16except ImportError:
17  cachetools = None
18import datetime
19import errno
20import glob
21import gzip
22import logging
23import logging.handlers
24import os
25import re
26import shutil
27import stat
28import subprocess
29import sys
30import tarfile
31import tempfile
32import time
33
34from optparse import OptionParser
35
36import common
37from autotest_lib.client.common_lib import file_utils
38from autotest_lib.client.common_lib import global_config
39from autotest_lib.client.common_lib import utils
40from autotest_lib.site_utils import job_directories
41# For unittest, the cloud_console.proto is not compiled yet.
42try:
43    from autotest_lib.site_utils import cloud_console_client
44except ImportError:
45    cloud_console_client = None
46from autotest_lib.tko import models
47from autotest_lib.utils import labellib
48from autotest_lib.utils import gslib
49from chromite.lib import timeout_util
50
51# Autotest requires the psutil module from site-packages, so it must be imported
52# after "import common".
53try:
54    # Does not exist, nor is needed, on moblab.
55    import psutil
56except ImportError:
57    psutil = None
58
59from chromite.lib import parallel
60try:
61    from chromite.lib import metrics
62    from chromite.lib import ts_mon_config
63except ImportError:
64    metrics = utils.metrics_mock
65    ts_mon_config = utils.metrics_mock
66
67
68GS_OFFLOADING_ENABLED = global_config.global_config.get_config_value(
69        'CROS', 'gs_offloading_enabled', type=bool, default=True)
70
71# Nice setting for process, the higher the number the lower the priority.
72NICENESS = 10
73
74# Maximum number of seconds to allow for offloading a single
75# directory.
76OFFLOAD_TIMEOUT_SECS = 60 * 60
77
78# Sleep time per loop.
79SLEEP_TIME_SECS = 5
80
81# Minimum number of seconds between e-mail reports.
82REPORT_INTERVAL_SECS = 60 * 60
83
84# Location of Autotest results on disk.
85RESULTS_DIR = '/usr/local/autotest/results'
86FAILED_OFFLOADS_FILE = os.path.join(RESULTS_DIR, 'FAILED_OFFLOADS')
87
88FAILED_OFFLOADS_FILE_HEADER = '''
89This is the list of gs_offloader failed jobs.
90Last offloader attempt at %s failed to offload %d files.
91Check http://go/cros-triage-gsoffloader to triage the issue
92
93
94First failure       Count   Directory name
95=================== ======  ==============================
96'''
97# --+----1----+----  ----+  ----+----1----+----2----+----3
98
99FAILED_OFFLOADS_LINE_FORMAT = '%19s  %5d  %-1s\n'
100FAILED_OFFLOADS_TIME_FORMAT = '%Y-%m-%d %H:%M:%S'
101
102USE_RSYNC_ENABLED = global_config.global_config.get_config_value(
103        'CROS', 'gs_offloader_use_rsync', type=bool, default=False)
104
105LIMIT_FILE_COUNT = global_config.global_config.get_config_value(
106        'CROS', 'gs_offloader_limit_file_count', type=bool, default=False)
107
108# Use multiprocessing for gsutil uploading.
109GS_OFFLOADER_MULTIPROCESSING = global_config.global_config.get_config_value(
110        'CROS', 'gs_offloader_multiprocessing', type=bool, default=False)
111
112D = '[0-9][0-9]'
113TIMESTAMP_PATTERN = '%s%s.%s.%s_%s.%s.%s' % (D, D, D, D, D, D, D)
114CTS_RESULT_PATTERN = 'testResult.xml'
115CTS_V2_RESULT_PATTERN = 'test_result.xml'
116# Google Storage bucket URI to store results in.
117DEFAULT_CTS_RESULTS_GSURI = global_config.global_config.get_config_value(
118        'CROS', 'cts_results_server', default='')
119DEFAULT_CTS_APFE_GSURI = global_config.global_config.get_config_value(
120        'CROS', 'cts_apfe_server', default='')
121DEFAULT_CTS_DELTA_RESULTS_GSURI = global_config.global_config.get_config_value(
122        'CROS', 'ctsdelta_results_server', default='')
123DEFAULT_CTS_DELTA_APFE_GSURI = global_config.global_config.get_config_value(
124        'CROS', 'ctsdelta_apfe_server', default='')
125DEFAULT_CTS_BVT_APFE_GSURI = global_config.global_config.get_config_value(
126        'CROS', 'ctsbvt_apfe_server', default='')
127
128# metadata type
129GS_OFFLOADER_SUCCESS_TYPE = 'gs_offloader_success'
130GS_OFFLOADER_FAILURE_TYPE = 'gs_offloader_failure'
131
132# Autotest test to collect list of CTS tests
133TEST_LIST_COLLECTOR = 'tradefed-run-collect-tests-only'
134
135def _get_metrics_fields(dir_entry):
136    """Get metrics fields for the given test result directory, including board
137    and milestone.
138
139    @param dir_entry: Directory entry to offload.
140    @return A dictionary for the metrics data to be uploaded.
141    """
142    fields = {'board': 'unknown',
143              'milestone': 'unknown'}
144    if dir_entry:
145        # There could be multiple hosts in the job directory, use the first one
146        # available.
147        for host in glob.glob(os.path.join(dir_entry, '*')):
148            try:
149                keyval = models.test.parse_job_keyval(host)
150            except ValueError:
151                continue
152            build = keyval.get('build')
153            if build:
154                try:
155                    cros_version = labellib.parse_cros_version(build)
156                    fields['board'] = cros_version.board
157                    fields['milestone'] = cros_version.milestone
158                    break
159                except ValueError:
160                    # Ignore version parsing error so it won't crash
161                    # gs_offloader.
162                    pass
163
164    return fields;
165
166
167def _get_cmd_list(multiprocessing, dir_entry, gs_path):
168    """Return the command to offload a specified directory.
169
170    @param multiprocessing: True to turn on -m option for gsutil.
171    @param dir_entry: Directory entry/path that which we need a cmd_list
172                      to offload.
173    @param gs_path: Location in google storage where we will
174                    offload the directory.
175
176    @return A command list to be executed by Popen.
177    """
178    cmd = ['gsutil']
179    if multiprocessing:
180        cmd.append('-m')
181    if USE_RSYNC_ENABLED:
182        cmd.append('rsync')
183        target = os.path.join(gs_path, os.path.basename(dir_entry))
184    else:
185        cmd.append('cp')
186        target = gs_path
187    cmd += ['-eR', dir_entry, target]
188    return cmd
189
190
191def sanitize_dir(dirpath):
192    """Sanitize directory for gs upload.
193
194    Symlinks and FIFOS are converted to regular files to fix bugs.
195
196    @param dirpath: Directory entry to be sanitized.
197    """
198    if not os.path.exists(dirpath):
199        return
200    _escape_rename(dirpath)
201    _escape_rename_dir_contents(dirpath)
202    _sanitize_fifos(dirpath)
203    _sanitize_symlinks(dirpath)
204
205
206def _escape_rename_dir_contents(dirpath):
207    """Recursively rename directory to escape filenames for gs upload.
208
209    @param dirpath: Directory path string.
210    """
211    for filename in os.listdir(dirpath):
212        path = os.path.join(dirpath, filename)
213        _escape_rename(path)
214    for filename in os.listdir(dirpath):
215        path = os.path.join(dirpath, filename)
216        if os.path.isdir(path):
217            _escape_rename_dir_contents(path)
218
219
220def _escape_rename(path):
221    """Rename file to escape filenames for gs upload.
222
223    @param path: File path string.
224    """
225    dirpath, filename = os.path.split(path)
226    sanitized_filename = gslib.escape(filename)
227    sanitized_path = os.path.join(dirpath, sanitized_filename)
228    os.rename(path, sanitized_path)
229
230
231def _sanitize_fifos(dirpath):
232    """Convert fifos to regular files (fixes crbug.com/684122).
233
234    @param dirpath: Directory path string.
235    """
236    for root, _, files in os.walk(dirpath):
237        for filename in files:
238            path = os.path.join(root, filename)
239            file_stat = os.lstat(path)
240            if stat.S_ISFIFO(file_stat.st_mode):
241                _replace_fifo_with_file(path)
242
243
244def _replace_fifo_with_file(path):
245    """Replace a fifo with a normal file.
246
247    @param path: Fifo path string.
248    """
249    logging.debug('Removing fifo %s', path)
250    os.remove(path)
251    logging.debug('Creating fifo marker %s', path)
252    with open(path, 'w') as f:
253        f.write('<FIFO>')
254
255
256def _sanitize_symlinks(dirpath):
257    """Convert Symlinks to regular files (fixes crbug.com/692788).
258
259    @param dirpath: Directory path string.
260    """
261    for root, _, files in os.walk(dirpath):
262        for filename in files:
263            path = os.path.join(root, filename)
264            file_stat = os.lstat(path)
265            if stat.S_ISLNK(file_stat.st_mode):
266                _replace_symlink_with_file(path)
267
268
269def _replace_symlink_with_file(path):
270    """Replace a symlink with a normal file.
271
272    @param path: Symlink path string.
273    """
274    target = os.readlink(path)
275    logging.debug('Removing symlink %s', path)
276    os.remove(path)
277    logging.debug('Creating symlink marker %s', path)
278    with open(path, 'w') as f:
279        f.write('<symlink to %s>' % target)
280
281
282# Maximum number of files in the folder.
283_MAX_FILE_COUNT = 3000
284_FOLDERS_NEVER_ZIP = ['debug', 'ssp_logs', 'autoupdate_logs']
285
286
287def _get_zippable_folders(dir_entry):
288    folders_list = []
289    for folder in os.listdir(dir_entry):
290        folder_path = os.path.join(dir_entry, folder)
291        if (not os.path.isfile(folder_path) and
292                not folder in _FOLDERS_NEVER_ZIP):
293            folders_list.append(folder_path)
294    return folders_list
295
296
297def limit_file_count(dir_entry):
298    """Limit the number of files in given directory.
299
300    The method checks the total number of files in the given directory.
301    If the number is greater than _MAX_FILE_COUNT, the method will
302    compress each folder in the given directory, except folders in
303    _FOLDERS_NEVER_ZIP.
304
305    @param dir_entry: Directory entry to be checked.
306    """
307    try:
308        count = _count_files(dir_entry)
309    except ValueError:
310        logging.warning('Fail to get the file count in folder %s.', dir_entry)
311        return
312    if count < _MAX_FILE_COUNT:
313        return
314
315    # For test job, zip folders in a second level, e.g. 123-debug/host1.
316    # This is to allow autoserv debug folder still be accessible.
317    # For special task, it does not need to dig one level deeper.
318    is_special_task = re.match(job_directories.SPECIAL_TASK_PATTERN,
319                               dir_entry)
320
321    folders = _get_zippable_folders(dir_entry)
322    if not is_special_task:
323        subfolders = []
324        for folder in folders:
325            subfolders.extend(_get_zippable_folders(folder))
326        folders = subfolders
327
328    for folder in folders:
329        _make_into_tarball(folder)
330
331
332def _count_files(dirpath):
333    """Count the number of files in a directory recursively.
334
335    @param dirpath: Directory path string.
336    """
337    return sum(len(files) for _path, _dirs, files in os.walk(dirpath))
338
339
340def _make_into_tarball(dirpath):
341    """Make directory into tarball.
342
343    @param dirpath: Directory path string.
344    """
345    tarpath = '%s.tgz' % dirpath
346    with tarfile.open(tarpath, 'w:gz') as tar:
347        tar.add(dirpath, arcname=os.path.basename(dirpath))
348    shutil.rmtree(dirpath)
349
350
351def correct_results_folder_permission(dir_entry):
352    """Make sure the results folder has the right permission settings.
353
354    For tests running with server-side packaging, the results folder has
355    the owner of root. This must be changed to the user running the
356    autoserv process, so parsing job can access the results folder.
357
358    @param dir_entry: Path to the results folder.
359    """
360    if not dir_entry:
361        return
362
363    logging.info('Trying to correct file permission of %s.', dir_entry)
364    try:
365        owner = '%s:%s' % (os.getuid(), os.getgid())
366        subprocess.check_call(
367                ['sudo', '-n', 'chown', '-R', owner, dir_entry])
368        subprocess.check_call(['chmod', '-R', 'u+r', dir_entry])
369        subprocess.check_call(
370                ['find', dir_entry, '-type', 'd',
371                 '-exec', 'chmod', 'u+x', '{}', ';'])
372    except subprocess.CalledProcessError as e:
373        logging.error('Failed to modify permission for %s: %s',
374                      dir_entry, e)
375
376
377def _upload_cts_testresult(dir_entry, multiprocessing):
378    """Upload test results to separate gs buckets.
379
380    Upload testResult.xml.gz/test_result.xml.gz file to cts_results_bucket.
381    Upload timestamp.zip to cts_apfe_bucket.
382
383    @param dir_entry: Path to the results folder.
384    @param multiprocessing: True to turn on -m option for gsutil.
385    """
386    for host in glob.glob(os.path.join(dir_entry, '*')):
387        cts_path = os.path.join(host, 'cheets_CTS.*', 'results', '*',
388                                TIMESTAMP_PATTERN)
389        cts_v2_path = os.path.join(host, 'cheets_CTS_*', 'results', '*',
390                                   TIMESTAMP_PATTERN)
391        gts_v2_path = os.path.join(host, 'cheets_GTS*', 'results', '*',
392                                   TIMESTAMP_PATTERN)
393        for result_path, result_pattern in [(cts_path, CTS_RESULT_PATTERN),
394                            (cts_v2_path, CTS_V2_RESULT_PATTERN),
395                            (gts_v2_path, CTS_V2_RESULT_PATTERN)]:
396            for path in glob.glob(result_path):
397                try:
398                    # CTS results from bvt-arc suites need to be only uploaded
399                    # to APFE from its designated gs bucket for early EDI
400                    # entries in APFE. These results need to copied only into
401                    # APFE bucket. Copying to results bucket is not required.
402                    if 'bvt-arc' in path:
403                        _upload_files(host, path, result_pattern,
404                                      multiprocessing,
405                                      None,
406                                      DEFAULT_CTS_BVT_APFE_GSURI)
407                        return
408                    # Non-bvt CTS results need to be uploaded to standard gs
409                    # buckets.
410                    _upload_files(host, path, result_pattern,
411                                  multiprocessing,
412                                  DEFAULT_CTS_RESULTS_GSURI,
413                                  DEFAULT_CTS_APFE_GSURI)
414                    # TODO(rohitbm): make better comparison using regex.
415                    # plan_follower CTS results go to plan_follower specific
416                    # gs buckets apart from standard gs buckets.
417                    if 'plan_follower' in path:
418                        _upload_files(host, path, result_pattern,
419                                      multiprocessing,
420                                      DEFAULT_CTS_DELTA_RESULTS_GSURI,
421                                      DEFAULT_CTS_DELTA_APFE_GSURI)
422                except Exception as e:
423                    logging.error('ERROR uploading test results %s to GS: %s',
424                                  path, e)
425
426
427def _is_valid_result(build, result_pattern, suite):
428    """Check if the result should be uploaded to CTS/GTS buckets.
429
430    @param build: Builder name.
431    @param result_pattern: XML result file pattern.
432    @param suite: Test suite name.
433
434    @returns: Bool flag indicating whether a valid result.
435    """
436    if build is None or suite is None:
437        return False
438
439    # Not valid if it's not a release build.
440    if not re.match(r'(?!trybot-).*-release/.*', build):
441        return False
442
443    # Not valid if it's cts result but not 'arc-cts*' or 'test_that_wrapper'
444    # suite.
445    result_patterns = [CTS_RESULT_PATTERN, CTS_V2_RESULT_PATTERN]
446    if result_pattern in result_patterns and not (
447            suite.startswith('arc-cts') or
448            suite.startswith('arc-gts') or
449            suite.startswith('bvt-arc') or
450            suite.startswith('test_that_wrapper')):
451        return False
452
453    return True
454
455
456def _is_test_collector(package):
457    """Returns true if the test run is just to collect list of CTS tests.
458
459    @param package: Autotest package name. e.g. cheets_CTS_N.CtsGraphicsTestCase
460
461    @return Bool flag indicating a test package is CTS list generator or not.
462    """
463    return TEST_LIST_COLLECTOR in package
464
465
466def _upload_files(host, path, result_pattern, multiprocessing,
467                  result_gs_bucket, apfe_gs_bucket):
468    keyval = models.test.parse_job_keyval(host)
469    build = keyval.get('build')
470    suite = keyval.get('suite')
471
472    if not _is_valid_result(build, result_pattern, suite):
473        # No need to upload current folder, return.
474        return
475
476    parent_job_id = str(keyval['parent_job_id'])
477
478    folders = path.split(os.sep)
479    job_id = folders[-6]
480    package = folders[-4]
481    timestamp = folders[-1]
482
483    # Results produced by CTS test list collector are dummy results.
484    # They don't need to be copied to APFE bucket which is mainly being used for
485    # CTS APFE submission.
486    if not _is_test_collector(package):
487        # Path: bucket/build/parent_job_id/cheets_CTS.*/job_id_timestamp/
488        # or bucket/build/parent_job_id/cheets_GTS.*/job_id_timestamp/
489        cts_apfe_gs_path = os.path.join(
490                apfe_gs_bucket, build, parent_job_id,
491                package, job_id + '_' + timestamp) + '/'
492
493        for zip_file in glob.glob(os.path.join('%s.zip' % path)):
494            utils.run(' '.join(_get_cmd_list(
495                    multiprocessing, zip_file, cts_apfe_gs_path)))
496            logging.debug('Upload %s to %s ', zip_file, cts_apfe_gs_path)
497    else:
498        logging.debug('%s is a CTS Test collector Autotest test run.', package)
499        logging.debug('Skipping CTS results upload to APFE gs:// bucket.')
500
501    if result_gs_bucket:
502        # Path: bucket/cheets_CTS.*/job_id_timestamp/
503        # or bucket/cheets_GTS.*/job_id_timestamp/
504        test_result_gs_path = os.path.join(
505                result_gs_bucket, package, job_id + '_' + timestamp) + '/'
506
507        for test_result_file in glob.glob(os.path.join(path, result_pattern)):
508            # gzip test_result_file(testResult.xml/test_result.xml)
509
510            test_result_file_gz =  '%s.gz' % test_result_file
511            with open(test_result_file, 'r') as f_in, (
512                    gzip.open(test_result_file_gz, 'w')) as f_out:
513                shutil.copyfileobj(f_in, f_out)
514            utils.run(' '.join(_get_cmd_list(
515                    multiprocessing, test_result_file_gz, test_result_gs_path)))
516            logging.debug('Zip and upload %s to %s',
517                          test_result_file_gz, test_result_gs_path)
518            # Remove test_result_file_gz(testResult.xml.gz/test_result.xml.gz)
519            os.remove(test_result_file_gz)
520
521
522def _emit_gs_returncode_metric(returncode):
523    """Increment the gs_returncode counter based on |returncode|."""
524    m_gs_returncode = 'chromeos/autotest/gs_offloader/gs_returncode'
525    rcode = int(returncode)
526    if rcode < 0 or rcode > 255:
527        rcode = -1
528    metrics.Counter(m_gs_returncode).increment(fields={'return_code': rcode})
529
530
531def _handle_dir_os_error(dir_entry, fix_permission=False):
532    """Try to fix the result directory's permission issue if needed.
533
534    @param dir_entry: Directory entry to offload.
535    @param fix_permission: True to change the directory's owner to the same one
536            running gs_offloader.
537    """
538    if fix_permission:
539        correct_results_folder_permission(dir_entry)
540    m_permission_error = ('chromeos/autotest/errors/gs_offloader/'
541                          'wrong_permissions_count')
542    metrics_fields = _get_metrics_fields(dir_entry)
543    metrics.Counter(m_permission_error).increment(fields=metrics_fields)
544
545
546class BaseGSOffloader(object):
547
548    """Google Storage offloader interface."""
549
550    __metaclass__ = abc.ABCMeta
551
552    def offload(self, dir_entry, dest_path, job_complete_time):
553        """Safely offload a directory entry to Google Storage.
554
555        This method is responsible for copying the contents of
556        `dir_entry` to Google storage at `dest_path`.
557
558        When successful, the method must delete all of `dir_entry`.
559        On failure, `dir_entry` should be left undisturbed, in order
560        to allow for retry.
561
562        Errors are conveyed simply and solely by two methods:
563          * At the time of failure, write enough information to the log
564            to allow later debug, if necessary.
565          * Don't delete the content.
566
567        In order to guarantee robustness, this method must not raise any
568        exceptions.
569
570        @param dir_entry: Directory entry to offload.
571        @param dest_path: Location in google storage where we will
572                          offload the directory.
573        @param job_complete_time: The complete time of the job from the AFE
574                                  database.
575        """
576        try:
577            self._full_offload(dir_entry, dest_path, job_complete_time)
578        except Exception as e:
579            logging.debug('Exception in offload for %s', dir_entry)
580            logging.debug('Ignoring this error: %s', str(e))
581
582    @abc.abstractmethod
583    def _full_offload(self, dir_entry, dest_path, job_complete_time):
584        """Offload a directory entry to Google Storage.
585
586        This method implements the actual offload behavior of its
587        subclass.  To guarantee effective debug, this method should
588        catch all exceptions, and perform any reasonable diagnosis
589        or other handling.
590
591        @param dir_entry: Directory entry to offload.
592        @param dest_path: Location in google storage where we will
593                          offload the directory.
594        @param job_complete_time: The complete time of the job from the AFE
595                                  database.
596        """
597
598
599class GSOffloader(BaseGSOffloader):
600    """Google Storage Offloader."""
601
602    def __init__(self, gs_uri, multiprocessing, delete_age,
603            console_client=None):
604        """Returns the offload directory function for the given gs_uri
605
606        @param gs_uri: Google storage bucket uri to offload to.
607        @param multiprocessing: True to turn on -m option for gsutil.
608        @param console_client: The cloud console client. If None,
609          cloud console APIs are  not called.
610        """
611        self._gs_uri = gs_uri
612        self._multiprocessing = multiprocessing
613        self._delete_age = delete_age
614        self._console_client = console_client
615
616    @metrics.SecondsTimerDecorator(
617            'chromeos/autotest/gs_offloader/job_offload_duration')
618    def _full_offload(self, dir_entry, dest_path, job_complete_time):
619        """Offload the specified directory entry to Google storage.
620
621        @param dir_entry: Directory entry to offload.
622        @param dest_path: Location in google storage where we will
623                          offload the directory.
624        @param job_complete_time: The complete time of the job from the AFE
625                                  database.
626        """
627        with tempfile.TemporaryFile('w+') as stdout_file, \
628             tempfile.TemporaryFile('w+') as stderr_file:
629            try:
630                try:
631                    self._try_offload(dir_entry, dest_path, stdout_file,
632                                      stderr_file)
633                except OSError as e:
634                    # Correct file permission error of the directory, then raise
635                    # the exception so gs_offloader can retry later.
636                    _handle_dir_os_error(dir_entry, e.errno==errno.EACCES)
637                    # Try again after the permission issue is fixed.
638                    self._try_offload(dir_entry, dest_path, stdout_file,
639                                      stderr_file)
640            except _OffloadError as e:
641                metrics_fields = _get_metrics_fields(dir_entry)
642                m_any_error = 'chromeos/autotest/errors/gs_offloader/any_error'
643                metrics.Counter(m_any_error).increment(fields=metrics_fields)
644
645                # Rewind the log files for stdout and stderr and log
646                # their contents.
647                stdout_file.seek(0)
648                stderr_file.seek(0)
649                stderr_content = stderr_file.read()
650                logging.warning('Error occurred when offloading %s:', dir_entry)
651                logging.warning('Stdout:\n%s \nStderr:\n%s', stdout_file.read(),
652                                stderr_content)
653
654                # Some result files may have wrong file permission. Try
655                # to correct such error so later try can success.
656                # TODO(dshi): The code is added to correct result files
657                # with wrong file permission caused by bug 511778. After
658                # this code is pushed to lab and run for a while to
659                # clean up these files, following code and function
660                # correct_results_folder_permission can be deleted.
661                if 'CommandException: Error opening file' in stderr_content:
662                    correct_results_folder_permission(dir_entry)
663            else:
664                self._prune(dir_entry, job_complete_time)
665
666    def _try_offload(self, dir_entry, dest_path,
667                 stdout_file, stderr_file):
668        """Offload the specified directory entry to Google storage.
669
670        @param dir_entry: Directory entry to offload.
671        @param dest_path: Location in google storage where we will
672                          offload the directory.
673        @param job_complete_time: The complete time of the job from the AFE
674                                  database.
675        @param stdout_file: Log file.
676        @param stderr_file: Log file.
677        """
678        if _is_uploaded(dir_entry):
679            return
680        start_time = time.time()
681        metrics_fields = _get_metrics_fields(dir_entry)
682        error_obj = _OffloadError(start_time)
683        try:
684            sanitize_dir(dir_entry)
685            if DEFAULT_CTS_RESULTS_GSURI:
686                _upload_cts_testresult(dir_entry, self._multiprocessing)
687
688            if LIMIT_FILE_COUNT:
689                limit_file_count(dir_entry)
690
691            process = None
692            with timeout_util.Timeout(OFFLOAD_TIMEOUT_SECS):
693                gs_path = '%s%s' % (self._gs_uri, dest_path)
694                cmd = _get_cmd_list(self._multiprocessing, dir_entry, gs_path)
695                logging.debug('Attempting an offload command %s', cmd)
696                process = subprocess.Popen(
697                    cmd, stdout=stdout_file, stderr=stderr_file)
698                process.wait()
699                logging.debug('Offload command %s completed.', cmd)
700
701            _emit_gs_returncode_metric(process.returncode)
702            if process.returncode != 0:
703                raise error_obj
704            _emit_offload_metrics(dir_entry)
705
706            if self._console_client:
707                gcs_uri = os.path.join(gs_path,
708                        os.path.basename(dir_entry))
709                if not self._console_client.send_test_job_offloaded_message(
710                        gcs_uri):
711                    raise error_obj
712
713            _mark_uploaded(dir_entry)
714        except timeout_util.TimeoutError:
715            m_timeout = 'chromeos/autotest/errors/gs_offloader/timed_out_count'
716            metrics.Counter(m_timeout).increment(fields=metrics_fields)
717            # If we finished the call to Popen(), we may need to
718            # terminate the child process.  We don't bother calling
719            # process.poll(); that inherently races because the child
720            # can die any time it wants.
721            if process:
722                try:
723                    process.terminate()
724                except OSError:
725                    # We don't expect any error other than "No such
726                    # process".
727                    pass
728            logging.error('Offloading %s timed out after waiting %d '
729                          'seconds.', dir_entry, OFFLOAD_TIMEOUT_SECS)
730            raise error_obj
731
732    def _prune(self, dir_entry, job_complete_time):
733        """Prune directory if it is uploaded and expired.
734
735        @param dir_entry: Directory entry to offload.
736        @param job_complete_time: The complete time of the job from the AFE
737                                  database.
738        """
739        if not (_is_uploaded(dir_entry)
740                and job_directories.is_job_expired(self._delete_age,
741                                                   job_complete_time)):
742            return
743        try:
744            logging.debug('Pruning uploaded directory %s', dir_entry)
745            shutil.rmtree(dir_entry)
746            job_timestamp_cache.delete(dir_entry)
747        except OSError as e:
748            # The wrong file permission can lead call `shutil.rmtree(dir_entry)`
749            # to raise OSError with message 'Permission denied'. Details can be
750            # found in crbug.com/536151
751            _handle_dir_os_error(dir_entry, e.errno==errno.EACCES)
752            # Try again after the permission issue is fixed.
753            shutil.rmtree(dir_entry)
754
755
756class _OffloadError(Exception):
757    """Google Storage offload failed."""
758
759    def __init__(self, start_time):
760        super(_OffloadError, self).__init__(start_time)
761        self.start_time = start_time
762
763
764
765class FakeGSOffloader(BaseGSOffloader):
766
767    """Fake Google Storage Offloader that only deletes directories."""
768
769    def _full_offload(self, dir_entry, dest_path, job_complete_time):
770        """Pretend to offload a directory and delete it.
771
772        @param dir_entry: Directory entry to offload.
773        @param dest_path: Location in google storage where we will
774                          offload the directory.
775        @param job_complete_time: The complete time of the job from the AFE
776                                  database.
777        """
778        shutil.rmtree(dir_entry)
779
780
781class OptionalMemoryCache(object):
782   """Implements memory cache if cachetools module can be loaded.
783
784   If the platform has cachetools available then the cache will
785   be created, otherwise the get calls will always act as if there
786   was a cache miss and the set/delete will be no-ops.
787   """
788   cache = None
789
790   def setup(self, age_to_delete):
791       """Set up a TTL cache size based on how long the job will be handled.
792
793       Autotest jobs are handled by gs_offloader until they are deleted from
794       local storage, base the cache size on how long that is.
795
796       @param age_to_delete: Number of days after which items in the cache
797                             should expire.
798       """
799       if cachetools:
800           # Min cache is 1000 items for 10 mins. If the age to delete is 0
801           # days you still want a short / small cache.
802           # 2000 items is a good approximation for the max number of jobs a
803           # moblab # can produce in a day, lab offloads immediatly so
804           # the number of carried jobs should be very small in the normal
805           # case.
806           ttl = max(age_to_delete * 24 * 60 * 60, 600)
807           maxsize = max(age_to_delete * 2000, 1000)
808           job_timestamp_cache.cache = cachetools.TTLCache(maxsize=maxsize,
809                                                           ttl=ttl)
810
811   def get(self, key):
812       """If we have a cache try to retrieve from it."""
813       if self.cache is not None:
814           result = self.cache.get(key)
815           return result
816       return None
817
818   def add(self, key, value):
819       """If we have a cache try to store key/value."""
820       if self.cache is not None:
821           self.cache[key] = value
822
823   def delete(self, key):
824       """If we have a cache try to remove a key."""
825       if self.cache is not None:
826           return self.cache.delete(key)
827
828
829job_timestamp_cache = OptionalMemoryCache()
830
831
832def _cached_get_timestamp_if_finished(job):
833    """Retrieve a job finished timestamp from cache or AFE.
834    @param job       _JobDirectory instance to retrieve
835                     finished timestamp of..
836
837    @returns: None if the job is not finished, or the
838              last job finished time recorded by Autotest.
839    """
840    job_timestamp = job_timestamp_cache.get(job.dirname)
841    if not job_timestamp:
842        job_timestamp = job.get_timestamp_if_finished()
843        if job_timestamp:
844            job_timestamp_cache.add(job.dirname, job_timestamp)
845    return job_timestamp
846
847
848def _is_expired(job, age_limit):
849    """Return whether job directory is expired for uploading
850
851    @param job: _JobDirectory instance.
852    @param age_limit:  Minimum age in days at which a job may be offloaded.
853    """
854    job_timestamp = _cached_get_timestamp_if_finished(job)
855    if not job_timestamp:
856        return False
857    return job_directories.is_job_expired(age_limit, job_timestamp)
858
859
860def _emit_offload_metrics(dirpath):
861    """Emit gs offload metrics.
862
863    @param dirpath: Offloaded directory path.
864    """
865    dir_size = file_utils.get_directory_size_kibibytes(dirpath)
866    metrics_fields = _get_metrics_fields(dirpath)
867
868    m_offload_count = (
869            'chromeos/autotest/gs_offloader/jobs_offloaded')
870    metrics.Counter(m_offload_count).increment(
871            fields=metrics_fields)
872    m_offload_size = ('chromeos/autotest/gs_offloader/'
873                      'kilobytes_transferred')
874    metrics.Counter(m_offload_size).increment_by(
875            dir_size, fields=metrics_fields)
876
877
878def _is_uploaded(dirpath):
879    """Return whether directory has been uploaded.
880
881    @param dirpath: Directory path string.
882    """
883    return os.path.isfile(_get_uploaded_marker_file(dirpath))
884
885
886def _mark_uploaded(dirpath):
887    """Mark directory as uploaded.
888
889    @param dirpath: Directory path string.
890    """
891    logging.debug('Creating uploaded marker for directory %s', dirpath)
892    with open(_get_uploaded_marker_file(dirpath), 'a'):
893        pass
894
895
896def _get_uploaded_marker_file(dirpath):
897    """Return path to upload marker file for directory.
898
899    @param dirpath: Directory path string.
900    """
901    return '%s/.GS_UPLOADED' % (dirpath,)
902
903
904def _format_job_for_failure_reporting(job):
905    """Formats a _JobDirectory for reporting / logging.
906
907    @param job: The _JobDirectory to format.
908    """
909    d = datetime.datetime.fromtimestamp(job.first_offload_start)
910    data = (d.strftime(FAILED_OFFLOADS_TIME_FORMAT),
911            job.offload_count,
912            job.dirname)
913    return FAILED_OFFLOADS_LINE_FORMAT % data
914
915
916def wait_for_gs_write_access(gs_uri):
917    """Verify and wait until we have write access to Google Storage.
918
919    @param gs_uri: The Google Storage URI we are trying to offload to.
920    """
921    # TODO (sbasi) Try to use the gsutil command to check write access.
922    # Ensure we have write access to gs_uri.
923    dummy_file = tempfile.NamedTemporaryFile()
924    test_cmd = _get_cmd_list(False, dummy_file.name, gs_uri)
925    while True:
926        logging.debug('Checking for write access with dummy file %s',
927                      dummy_file.name)
928        try:
929            subprocess.check_call(test_cmd)
930            subprocess.check_call(
931                    ['gsutil', 'rm',
932                     os.path.join(gs_uri,
933                                  os.path.basename(dummy_file.name))])
934            break
935        except subprocess.CalledProcessError:
936            t = 120
937            logging.debug('Unable to offload dummy file to %s, sleeping for %s '
938                          'seconds.', gs_uri, t)
939            time.sleep(t)
940    logging.debug('Dummy file write check to gs succeeded.')
941
942
943class Offloader(object):
944    """State of the offload process.
945
946    Contains the following member fields:
947      * _gs_offloader:  _BaseGSOffloader to use to offload a job directory.
948      * _jobdir_classes:  List of classes of job directory to be
949        offloaded.
950      * _processes:  Maximum number of outstanding offload processes
951        to allow during an offload cycle.
952      * _age_limit:  Minimum age in days at which a job may be
953        offloaded.
954      * _open_jobs: a dictionary mapping directory paths to Job
955        objects.
956    """
957
958    def __init__(self, options):
959        self._upload_age_limit = options.age_to_upload
960        self._delete_age_limit = options.age_to_delete
961        if options.delete_only:
962            self._gs_offloader = FakeGSOffloader()
963        else:
964            self.gs_uri = utils.get_offload_gsuri()
965            logging.debug('Offloading to: %s', self.gs_uri)
966            multiprocessing = False
967            if options.multiprocessing:
968                multiprocessing = True
969            elif options.multiprocessing is None:
970                multiprocessing = GS_OFFLOADER_MULTIPROCESSING
971            logging.info(
972                    'Offloader multiprocessing is set to:%r', multiprocessing)
973            console_client = None
974            if (cloud_console_client and
975                    cloud_console_client.is_cloud_notification_enabled()):
976                console_client = cloud_console_client.PubSubBasedClient()
977            self._gs_offloader = GSOffloader(
978                    self.gs_uri, multiprocessing, self._delete_age_limit,
979                    console_client)
980        classlist = [
981                job_directories.SwarmingJobDirectory,
982        ]
983        if options.process_hosts_only or options.process_all:
984            classlist.append(job_directories.SpecialJobDirectory)
985        if not options.process_hosts_only:
986            classlist.append(job_directories.RegularJobDirectory)
987        self._jobdir_classes = classlist
988        assert self._jobdir_classes
989        self._processes = options.parallelism
990        self._open_jobs = {}
991        self._pusub_topic = None
992        self._offload_count_limit = 3
993
994
995    def _add_new_jobs(self):
996        """Find new job directories that need offloading.
997
998        Go through the file system looking for valid job directories
999        that are currently not in `self._open_jobs`, and add them in.
1000
1001        """
1002        new_job_count = 0
1003        for cls in self._jobdir_classes:
1004            for resultsdir in cls.get_job_directories():
1005                if resultsdir in self._open_jobs:
1006                    continue
1007                self._open_jobs[resultsdir] = cls(resultsdir)
1008                new_job_count += 1
1009        logging.debug('Start of offload cycle - found %d new jobs',
1010                      new_job_count)
1011
1012
1013    def _remove_offloaded_jobs(self):
1014        """Removed offloaded jobs from `self._open_jobs`."""
1015        removed_job_count = 0
1016        for jobkey, job in self._open_jobs.items():
1017            if (
1018                    not os.path.exists(job.dirname)
1019                    or _is_uploaded(job.dirname)):
1020                del self._open_jobs[jobkey]
1021                removed_job_count += 1
1022        logging.debug('End of offload cycle - cleared %d jobs, '
1023                      'carrying %d open jobs',
1024                      removed_job_count, len(self._open_jobs))
1025
1026
1027    def _report_failed_jobs(self):
1028        """Report status after attempting offload.
1029
1030        This function processes all jobs in `self._open_jobs`, assuming
1031        an attempt has just been made to offload all of them.
1032
1033        If any jobs have reportable errors, and we haven't generated
1034        an e-mail report in the last `REPORT_INTERVAL_SECS` seconds,
1035        send new e-mail describing the failures.
1036
1037        """
1038        failed_jobs = [j for j in self._open_jobs.values() if
1039                       j.first_offload_start]
1040        self._report_failed_jobs_count(failed_jobs)
1041        self._log_failed_jobs_locally(failed_jobs)
1042
1043
1044    def offload_once(self):
1045        """Perform one offload cycle.
1046
1047        Find all job directories for new jobs that we haven't seen
1048        before.  Then, attempt to offload the directories for any
1049        jobs that have finished running.  Offload of multiple jobs
1050        is done in parallel, up to `self._processes` at a time.
1051
1052        After we've tried uploading all directories, go through the list
1053        checking the status of all uploaded directories.  If necessary,
1054        report failures via e-mail.
1055
1056        """
1057        self._add_new_jobs()
1058        self._report_current_jobs_count()
1059        with parallel.BackgroundTaskRunner(
1060                self._gs_offloader.offload, processes=self._processes) as queue:
1061            for job in self._open_jobs.values():
1062                _enqueue_offload(job, queue, self._upload_age_limit)
1063        self._give_up_on_jobs_over_limit()
1064        self._remove_offloaded_jobs()
1065        self._report_failed_jobs()
1066
1067
1068    def _give_up_on_jobs_over_limit(self):
1069        """Give up on jobs that have gone over the offload limit.
1070
1071        We mark them as uploaded as we won't try to offload them any more.
1072        """
1073        for job in self._open_jobs.values():
1074            if job.offload_count >= self._offload_count_limit:
1075                _mark_uploaded(job.dirname)
1076
1077
1078    def _log_failed_jobs_locally(self, failed_jobs,
1079                                 log_file=FAILED_OFFLOADS_FILE):
1080        """Updates a local file listing all the failed jobs.
1081
1082        The dropped file can be used by the developers to list jobs that we have
1083        failed to upload.
1084
1085        @param failed_jobs: A list of failed _JobDirectory objects.
1086        @param log_file: The file to log the failed jobs to.
1087        """
1088        now = datetime.datetime.now()
1089        now_str = now.strftime(FAILED_OFFLOADS_TIME_FORMAT)
1090        formatted_jobs = [_format_job_for_failure_reporting(job)
1091                            for job in failed_jobs]
1092        formatted_jobs.sort()
1093
1094        with open(log_file, 'w') as logfile:
1095            logfile.write(FAILED_OFFLOADS_FILE_HEADER %
1096                          (now_str, len(failed_jobs)))
1097            logfile.writelines(formatted_jobs)
1098
1099
1100    def _report_current_jobs_count(self):
1101        """Report the number of outstanding jobs to monarch."""
1102        metrics.Gauge('chromeos/autotest/gs_offloader/current_jobs_count').set(
1103                len(self._open_jobs))
1104
1105
1106    def _report_failed_jobs_count(self, failed_jobs):
1107        """Report the number of outstanding failed offload jobs to monarch.
1108
1109        @param: List of failed jobs.
1110        """
1111        metrics.Gauge('chromeos/autotest/gs_offloader/failed_jobs_count').set(
1112                len(failed_jobs))
1113
1114
1115def _enqueue_offload(job, queue, age_limit):
1116    """Enqueue the job for offload, if it's eligible.
1117
1118    The job is eligible for offloading if the database has marked
1119    it finished, and the job is older than the `age_limit`
1120    parameter.
1121
1122    If the job is eligible, offload processing is requested by
1123    passing the `queue` parameter's `put()` method a sequence with
1124    the job's `dirname` attribute and its directory name.
1125
1126    @param job       _JobDirectory instance to offload.
1127    @param queue     If the job should be offloaded, put the offload
1128                     parameters into this queue for processing.
1129    @param age_limit Minimum age for a job to be offloaded.  A value
1130                     of 0 means that the job will be offloaded as
1131                     soon as it is finished.
1132
1133    """
1134    if not job.offload_count:
1135        if not _is_expired(job, age_limit):
1136            return
1137        job.first_offload_start = time.time()
1138    job.offload_count += 1
1139    if job.process_gs_instructions():
1140        timestamp = _cached_get_timestamp_if_finished(job)
1141        queue.put([job.dirname, os.path.dirname(job.dirname), timestamp])
1142
1143
1144def parse_options():
1145    """Parse the args passed into gs_offloader."""
1146    defaults = 'Defaults:\n  Destination: %s\n  Results Path: %s' % (
1147            utils.DEFAULT_OFFLOAD_GSURI, RESULTS_DIR)
1148    usage = 'usage: %prog [options]\n' + defaults
1149    parser = OptionParser(usage)
1150    parser.add_option('-a', '--all', dest='process_all',
1151                      action='store_true',
1152                      help='Offload all files in the results directory.')
1153    parser.add_option('-s', '--hosts', dest='process_hosts_only',
1154                      action='store_true',
1155                      help='Offload only the special tasks result files '
1156                      'located in the results/hosts subdirectory')
1157    parser.add_option('-p', '--parallelism', dest='parallelism',
1158                      type='int', default=1,
1159                      help='Number of parallel workers to use.')
1160    parser.add_option('-o', '--delete_only', dest='delete_only',
1161                      action='store_true',
1162                      help='GS Offloader will only the delete the '
1163                      'directories and will not offload them to google '
1164                      'storage. NOTE: If global_config variable '
1165                      'CROS.gs_offloading_enabled is False, --delete_only '
1166                      'is automatically True.',
1167                      default=not GS_OFFLOADING_ENABLED)
1168    parser.add_option('-d', '--days_old', dest='days_old',
1169                      help='Minimum job age in days before a result can be '
1170                      'offloaded.', type='int', default=0)
1171    parser.add_option('-l', '--log_size', dest='log_size',
1172                      help='Limit the offloader logs to a specified '
1173                      'number of Mega Bytes.', type='int', default=0)
1174    parser.add_option('-m', dest='multiprocessing', action='store_true',
1175                      help='Turn on -m option for gsutil. If not set, the '
1176                      'global config setting gs_offloader_multiprocessing '
1177                      'under CROS section is applied.')
1178    parser.add_option('-i', '--offload_once', dest='offload_once',
1179                      action='store_true',
1180                      help='Upload all available results and then exit.')
1181    parser.add_option('-y', '--normal_priority', dest='normal_priority',
1182                      action='store_true',
1183                      help='Upload using normal process priority.')
1184    parser.add_option('-u', '--age_to_upload', dest='age_to_upload',
1185                      help='Minimum job age in days before a result can be '
1186                      'offloaded, but not removed from local storage',
1187                      type='int', default=None)
1188    parser.add_option('-n', '--age_to_delete', dest='age_to_delete',
1189                      help='Minimum job age in days before a result can be '
1190                      'removed from local storage',
1191                      type='int', default=None)
1192    parser.add_option(
1193            '--metrics-file',
1194            help='If provided, drop metrics to this local file instead of '
1195                 'reporting to ts_mon',
1196            type=str,
1197            default=None,
1198    )
1199    parser.add_option('-t', '--enable_timestamp_cache',
1200                      dest='enable_timestamp_cache',
1201                      action='store_true',
1202                      help='Cache the finished timestamps from AFE.')
1203
1204    options = parser.parse_args()[0]
1205    if options.process_all and options.process_hosts_only:
1206        parser.print_help()
1207        print ('Cannot process all files and only the hosts '
1208               'subdirectory. Please remove an argument.')
1209        sys.exit(1)
1210
1211    if options.days_old and (options.age_to_upload or options.age_to_delete):
1212        parser.print_help()
1213        print('Use the days_old option or the age_to_* options but not both')
1214        sys.exit(1)
1215
1216    if options.age_to_upload == None:
1217        options.age_to_upload = options.days_old
1218    if options.age_to_delete == None:
1219        options.age_to_delete = options.days_old
1220
1221    return options
1222
1223
1224def main():
1225    """Main method of gs_offloader."""
1226    options = parse_options()
1227
1228    if options.process_all:
1229        offloader_type = 'all'
1230    elif options.process_hosts_only:
1231        offloader_type = 'hosts'
1232    else:
1233        offloader_type = 'jobs'
1234
1235    _setup_logging(options, offloader_type)
1236
1237    if options.enable_timestamp_cache:
1238        # Extend the cache expiry time by another 1% so the timstamps
1239        # are available as the results are purged.
1240        job_timestamp_cache.setup(options.age_to_delete * 1.01)
1241
1242    # Nice our process (carried to subprocesses) so we don't overload
1243    # the system.
1244    if not options.normal_priority:
1245        logging.debug('Set process to nice value: %d', NICENESS)
1246        os.nice(NICENESS)
1247    if psutil:
1248        proc = psutil.Process()
1249        logging.debug('Set process to ionice IDLE')
1250        proc.ionice(psutil.IOPRIO_CLASS_IDLE)
1251
1252    # os.listdir returns relative paths, so change to where we need to
1253    # be to avoid an os.path.join on each loop.
1254    logging.debug('Offloading Autotest results in %s', RESULTS_DIR)
1255    os.chdir(RESULTS_DIR)
1256
1257    service_name = 'gs_offloader(%s)' % offloader_type
1258    with ts_mon_config.SetupTsMonGlobalState(service_name, indirect=True,
1259                                             short_lived=False,
1260                                             debug_file=options.metrics_file):
1261        with metrics.SuccessCounter('chromeos/autotest/gs_offloader/exit'):
1262            offloader = Offloader(options)
1263            if not options.delete_only:
1264                wait_for_gs_write_access(offloader.gs_uri)
1265            while True:
1266                offloader.offload_once()
1267                if options.offload_once:
1268                    break
1269                time.sleep(SLEEP_TIME_SECS)
1270
1271
1272_LOG_LOCATION = '/usr/local/autotest/logs/'
1273_LOG_FILENAME_FORMAT = 'gs_offloader_%s_log_%s.txt'
1274_LOG_TIMESTAMP_FORMAT = '%Y%m%d_%H%M%S'
1275_LOGGING_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
1276
1277
1278def _setup_logging(options, offloader_type):
1279    """Set up logging.
1280
1281    @param options: Parsed options.
1282    @param offloader_type: Type of offloader action as string.
1283    """
1284    log_filename = _get_log_filename(options, offloader_type)
1285    log_formatter = logging.Formatter(_LOGGING_FORMAT)
1286    # Replace the default logging handler with a RotatingFileHandler. If
1287    # options.log_size is 0, the file size will not be limited. Keeps
1288    # one backup just in case.
1289    handler = logging.handlers.RotatingFileHandler(
1290            log_filename, maxBytes=1024 * options.log_size, backupCount=1)
1291    handler.setFormatter(log_formatter)
1292    logger = logging.getLogger()
1293    logger.setLevel(logging.DEBUG)
1294    logger.addHandler(handler)
1295
1296
1297def _get_log_filename(options, offloader_type):
1298    """Get log filename.
1299
1300    @param options: Parsed options.
1301    @param offloader_type: Type of offloader action as string.
1302    """
1303    if options.log_size > 0:
1304        log_timestamp = ''
1305    else:
1306        log_timestamp = time.strftime(_LOG_TIMESTAMP_FORMAT)
1307    log_basename = _LOG_FILENAME_FORMAT % (offloader_type, log_timestamp)
1308    return os.path.join(_LOG_LOCATION, log_basename)
1309
1310
1311if __name__ == '__main__':
1312    main()
1313