• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python2
2#
3# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""Script to archive old Autotest results to Google Storage.
8
9Uses gsutil to archive files to the configured Google Storage bucket.
10Upon successful copy, the local results directory is deleted.
11"""
12
13import abc
14try:
15  import cachetools
16except ImportError:
17  cachetools = None
18import datetime
19import errno
20import glob
21import gzip
22import logging
23import logging.handlers
24import os
25import re
26import shutil
27import stat
28import subprocess
29import sys
30import tarfile
31import tempfile
32import time
33import urllib
34
35from optparse import OptionParser
36
37import common
38from autotest_lib.client.common_lib import file_utils
39from autotest_lib.client.common_lib import global_config
40from autotest_lib.client.common_lib import utils
41from autotest_lib.site_utils import job_directories
42# For unittest, the cloud_console.proto is not compiled yet.
43try:
44    from autotest_lib.site_utils import cloud_console_client
45except ImportError:
46    cloud_console_client = None
47from autotest_lib.tko import models
48from autotest_lib.utils import labellib
49from autotest_lib.utils import gslib
50from autotest_lib.utils.side_effects import config_loader
51from chromite.lib import timeout_util
52
53# Autotest requires the psutil module from site-packages, so it must be imported
54# after "import common".
55try:
56    # Does not exist, nor is needed, on moblab.
57    import psutil
58except ImportError:
59    psutil = None
60
61from chromite.lib import parallel
62try:
63    from chromite.lib import metrics
64    from chromite.lib import ts_mon_config
65except ImportError:
66    metrics = utils.metrics_mock
67    ts_mon_config = utils.metrics_mock
68
69
70GS_OFFLOADING_ENABLED = global_config.global_config.get_config_value(
71        'CROS', 'gs_offloading_enabled', type=bool, default=True)
72
73# Nice setting for process, the higher the number the lower the priority.
74NICENESS = 10
75
76# Maximum number of seconds to allow for offloading a single
77# directory.
78OFFLOAD_TIMEOUT_SECS = 60 * 60
79
80# Sleep time per loop.
81SLEEP_TIME_SECS = 5
82
83# Minimum number of seconds between e-mail reports.
84REPORT_INTERVAL_SECS = 60 * 60
85
86# Location of Autotest results on disk.
87RESULTS_DIR = '/usr/local/autotest/results'
88FAILED_OFFLOADS_FILE = os.path.join(RESULTS_DIR, 'FAILED_OFFLOADS')
89
90FAILED_OFFLOADS_FILE_HEADER = '''
91This is the list of gs_offloader failed jobs.
92Last offloader attempt at %s failed to offload %d files.
93Check http://go/cros-triage-gsoffloader to triage the issue
94
95
96First failure       Count   Directory name
97=================== ======  ==============================
98'''
99# --+----1----+----  ----+  ----+----1----+----2----+----3
100
101FAILED_OFFLOADS_LINE_FORMAT = '%19s  %5d  %-1s\n'
102FAILED_OFFLOADS_TIME_FORMAT = '%Y-%m-%d %H:%M:%S'
103
104USE_RSYNC_ENABLED = global_config.global_config.get_config_value(
105        'CROS', 'gs_offloader_use_rsync', type=bool, default=False)
106
107LIMIT_FILE_COUNT = global_config.global_config.get_config_value(
108        'CROS', 'gs_offloader_limit_file_count', type=bool, default=False)
109
110# Use multiprocessing for gsutil uploading.
111GS_OFFLOADER_MULTIPROCESSING = global_config.global_config.get_config_value(
112        'CROS', 'gs_offloader_multiprocessing', type=bool, default=False)
113
114D = '[0-9][0-9]'
115TIMESTAMP_PATTERN = '%s%s.%s.%s_%s.%s.%s' % (D, D, D, D, D, D, D)
116CTS_RESULT_PATTERN = 'testResult.xml'
117CTS_COMPRESSED_RESULT_PATTERN = 'testResult.xml.tgz'
118CTS_V2_RESULT_PATTERN = 'test_result.xml'
119CTS_V2_COMPRESSED_RESULT_PATTERN = 'test_result.xml.tgz'
120
121CTS_COMPRESSED_RESULT_TYPES = {
122        CTS_COMPRESSED_RESULT_PATTERN: CTS_RESULT_PATTERN,
123        CTS_V2_COMPRESSED_RESULT_PATTERN: CTS_V2_RESULT_PATTERN}
124
125# Google Storage bucket URI to store results in.
126DEFAULT_CTS_RESULTS_GSURI = global_config.global_config.get_config_value(
127        'CROS', 'cts_results_server', default='')
128DEFAULT_CTS_APFE_GSURI = global_config.global_config.get_config_value(
129        'CROS', 'cts_apfe_server', default='')
130DEFAULT_CTS_DELTA_RESULTS_GSURI = global_config.global_config.get_config_value(
131        'CROS', 'ctsdelta_results_server', default='')
132DEFAULT_CTS_DELTA_APFE_GSURI = global_config.global_config.get_config_value(
133        'CROS', 'ctsdelta_apfe_server', default='')
134DEFAULT_CTS_BVT_APFE_GSURI = global_config.global_config.get_config_value(
135        'CROS', 'ctsbvt_apfe_server', default='')
136
137# metadata type
138GS_OFFLOADER_SUCCESS_TYPE = 'gs_offloader_success'
139GS_OFFLOADER_FAILURE_TYPE = 'gs_offloader_failure'
140
141# Autotest test to collect list of CTS tests
142TEST_LIST_COLLECTOR = 'tradefed-run-collect-tests-only'
143
144def _get_metrics_fields(dir_entry):
145    """Get metrics fields for the given test result directory, including board
146    and milestone.
147
148    @param dir_entry: Directory entry to offload.
149    @return A dictionary for the metrics data to be uploaded.
150    """
151    fields = {'board': 'unknown',
152              'milestone': 'unknown'}
153    if dir_entry:
154        # There could be multiple hosts in the job directory, use the first one
155        # available.
156        for host in glob.glob(os.path.join(dir_entry, '*')):
157            try:
158                keyval = models.test.parse_job_keyval(host)
159            except ValueError:
160                continue
161            build = keyval.get('build')
162            if build:
163                try:
164                    cros_version = labellib.parse_cros_version(build)
165                    fields['board'] = cros_version.board
166                    fields['milestone'] = cros_version.milestone
167                    break
168                except ValueError:
169                    # Ignore version parsing error so it won't crash
170                    # gs_offloader.
171                    pass
172
173    return fields
174
175
176def _get_cmd_list(multiprocessing, dir_entry, gs_path):
177    """Return the command to offload a specified directory.
178
179    @param multiprocessing: True to turn on -m option for gsutil.
180    @param dir_entry: Directory entry/path that which we need a cmd_list
181                      to offload.
182    @param gs_path: Location in google storage where we will
183                    offload the directory.
184
185    @return A command list to be executed by Popen.
186    """
187    cmd = ['gsutil']
188    if multiprocessing:
189        cmd.append('-m')
190    if USE_RSYNC_ENABLED:
191        cmd.append('rsync')
192        target = os.path.join(gs_path, os.path.basename(dir_entry))
193    else:
194        cmd.append('cp')
195        target = gs_path
196    cmd += ['-eR', dir_entry, target]
197    return cmd
198
199
200def _get_finish_cmd_list(gs_path):
201    """Returns a command to remotely mark a given gs path as finished.
202
203    @param gs_path: Location in google storage where the offload directory
204                    should be marked as finished.
205
206    @return A command list to be executed by Popen.
207    """
208    target = os.path.join(gs_path, '.finished_offload')
209    return [
210        'gsutil',
211        'cp',
212        '/dev/null',
213        target,
214        ]
215
216
217def sanitize_dir(dirpath):
218    """Sanitize directory for gs upload.
219
220    Symlinks and FIFOS are converted to regular files to fix bugs.
221
222    @param dirpath: Directory entry to be sanitized.
223    """
224    if not os.path.exists(dirpath):
225        return
226    _escape_rename(dirpath)
227    _escape_rename_dir_contents(dirpath)
228    _sanitize_fifos(dirpath)
229    _sanitize_symlinks(dirpath)
230
231
232def _escape_rename_dir_contents(dirpath):
233    """Recursively rename directory to escape filenames for gs upload.
234
235    @param dirpath: Directory path string.
236    """
237    for filename in os.listdir(dirpath):
238        path = os.path.join(dirpath, filename)
239        _escape_rename(path)
240    for filename in os.listdir(dirpath):
241        path = os.path.join(dirpath, filename)
242        if os.path.isdir(path):
243            _escape_rename_dir_contents(path)
244
245
246def _escape_rename(path):
247    """Rename file to escape filenames for gs upload.
248
249    @param path: File path string.
250    """
251    dirpath, filename = os.path.split(path)
252    sanitized_filename = gslib.escape(filename)
253    sanitized_path = os.path.join(dirpath, sanitized_filename)
254    os.rename(path, sanitized_path)
255
256
257def _sanitize_fifos(dirpath):
258    """Convert fifos to regular files (fixes crbug.com/684122).
259
260    @param dirpath: Directory path string.
261    """
262    for root, _, files in os.walk(dirpath):
263        for filename in files:
264            path = os.path.join(root, filename)
265            file_stat = os.lstat(path)
266            if stat.S_ISFIFO(file_stat.st_mode):
267                _replace_fifo_with_file(path)
268
269
270def _replace_fifo_with_file(path):
271    """Replace a fifo with a normal file.
272
273    @param path: Fifo path string.
274    """
275    logging.debug('Removing fifo %s', path)
276    os.remove(path)
277    logging.debug('Creating fifo marker %s', path)
278    with open(path, 'w') as f:
279        f.write('<FIFO>')
280
281
282def _sanitize_symlinks(dirpath):
283    """Convert Symlinks to regular files (fixes crbug.com/692788).
284
285    @param dirpath: Directory path string.
286    """
287    for root, _, files in os.walk(dirpath):
288        for filename in files:
289            path = os.path.join(root, filename)
290            file_stat = os.lstat(path)
291            if stat.S_ISLNK(file_stat.st_mode):
292                _replace_symlink_with_file(path)
293
294
295def _replace_symlink_with_file(path):
296    """Replace a symlink with a normal file.
297
298    @param path: Symlink path string.
299    """
300    target = os.readlink(path)
301    logging.debug('Removing symlink %s', path)
302    os.remove(path)
303    logging.debug('Creating symlink marker %s', path)
304    with open(path, 'w') as f:
305        f.write('<symlink to %s>' % target)
306
307
308# Maximum number of files in the folder.
309_MAX_FILE_COUNT = 3000
310_FOLDERS_NEVER_ZIP = ['debug', 'ssp_logs', 'autoupdate_logs']
311
312
313def _get_zippable_folders(dir_entry):
314    folders_list = []
315    for folder in os.listdir(dir_entry):
316        folder_path = os.path.join(dir_entry, folder)
317        if (not os.path.isfile(folder_path) and
318                not folder in _FOLDERS_NEVER_ZIP):
319            folders_list.append(folder_path)
320    return folders_list
321
322
323def limit_file_count(dir_entry):
324    """Limit the number of files in given directory.
325
326    The method checks the total number of files in the given directory.
327    If the number is greater than _MAX_FILE_COUNT, the method will
328    compress each folder in the given directory, except folders in
329    _FOLDERS_NEVER_ZIP.
330
331    @param dir_entry: Directory entry to be checked.
332    """
333    try:
334        count = _count_files(dir_entry)
335    except ValueError:
336        logging.warning('Fail to get the file count in folder %s.', dir_entry)
337        return
338    if count < _MAX_FILE_COUNT:
339        return
340
341    # For test job, zip folders in a second level, e.g. 123-debug/host1.
342    # This is to allow autoserv debug folder still be accessible.
343    # For special task, it does not need to dig one level deeper.
344    is_special_task = re.match(job_directories.SPECIAL_TASK_PATTERN,
345                               dir_entry)
346
347    folders = _get_zippable_folders(dir_entry)
348    if not is_special_task:
349        subfolders = []
350        for folder in folders:
351            subfolders.extend(_get_zippable_folders(folder))
352        folders = subfolders
353
354    for folder in folders:
355        _make_into_tarball(folder)
356
357
358def _count_files(dirpath):
359    """Count the number of files in a directory recursively.
360
361    @param dirpath: Directory path string.
362    """
363    return sum(len(files) for _path, _dirs, files in os.walk(dirpath))
364
365
366def _make_into_tarball(dirpath):
367    """Make directory into tarball.
368
369    @param dirpath: Directory path string.
370    """
371    tarpath = '%s.tgz' % dirpath
372    with tarfile.open(tarpath, 'w:gz') as tar:
373        tar.add(dirpath, arcname=os.path.basename(dirpath))
374    shutil.rmtree(dirpath)
375
376
377def correct_results_folder_permission(dir_entry):
378    """Make sure the results folder has the right permission settings.
379
380    For tests running with server-side packaging, the results folder has
381    the owner of root. This must be changed to the user running the
382    autoserv process, so parsing job can access the results folder.
383
384    @param dir_entry: Path to the results folder.
385    """
386    if not dir_entry:
387        return
388
389    logging.info('Trying to correct file permission of %s.', dir_entry)
390    try:
391        owner = '%s:%s' % (os.getuid(), os.getgid())
392        subprocess.check_call(
393                ['sudo', '-n', 'chown', '-R', owner, dir_entry])
394        subprocess.check_call(['chmod', '-R', 'u+rw', dir_entry])
395        subprocess.check_call(
396                ['find', dir_entry, '-type', 'd',
397                 '-exec', 'chmod', 'u+x', '{}', ';'])
398    except subprocess.CalledProcessError as e:
399        logging.error('Failed to modify permission for %s: %s',
400                      dir_entry, e)
401
402
403def _upload_cts_testresult(dir_entry, multiprocessing):
404    """Upload test results to separate gs buckets.
405
406    Upload testResult.xml.gz/test_result.xml.gz file to cts_results_bucket.
407    Upload timestamp.zip to cts_apfe_bucket.
408
409    @param dir_entry: Path to the results folder.
410    @param multiprocessing: True to turn on -m option for gsutil.
411    """
412    for host in glob.glob(os.path.join(dir_entry, '*')):
413        cts_path = os.path.join(host, 'cheets_CTS.*', 'results', '*',
414                                TIMESTAMP_PATTERN)
415        cts_v2_path = os.path.join(host, 'cheets_CTS_*', 'results', '*',
416                                   TIMESTAMP_PATTERN)
417        gts_v2_path = os.path.join(host, 'cheets_GTS*', 'results', '*',
418                                   TIMESTAMP_PATTERN)
419        for result_path, result_pattern in [(cts_path, CTS_RESULT_PATTERN),
420                            (cts_path, CTS_COMPRESSED_RESULT_PATTERN),
421                            (cts_v2_path, CTS_V2_RESULT_PATTERN),
422                            (cts_v2_path, CTS_V2_COMPRESSED_RESULT_PATTERN),
423                            (gts_v2_path, CTS_V2_RESULT_PATTERN)]:
424            for path in glob.glob(result_path):
425                try:
426                    # CTS results from bvt-arc suites need to be only uploaded
427                    # to APFE from its designated gs bucket for early EDI
428                    # entries in APFE. These results need to copied only into
429                    # APFE bucket. Copying to results bucket is not required.
430                    if 'bvt-arc' in path:
431                        _upload_files(host, path, result_pattern,
432                                      multiprocessing,
433                                      None,
434                                      DEFAULT_CTS_BVT_APFE_GSURI)
435                        return
436                    # Non-bvt CTS results need to be uploaded to standard gs
437                    # buckets.
438                    _upload_files(host, path, result_pattern,
439                                  multiprocessing,
440                                  DEFAULT_CTS_RESULTS_GSURI,
441                                  DEFAULT_CTS_APFE_GSURI)
442                    # TODO(rohitbm): make better comparison using regex.
443                    # plan_follower CTS results go to plan_follower specific
444                    # gs buckets apart from standard gs buckets.
445                    if 'plan_follower' in path:
446                        _upload_files(host, path, result_pattern,
447                                      multiprocessing,
448                                      DEFAULT_CTS_DELTA_RESULTS_GSURI,
449                                      DEFAULT_CTS_DELTA_APFE_GSURI)
450                except Exception as e:
451                    logging.error('ERROR uploading test results %s to GS: %s',
452                                  path, e)
453
454
455def _is_valid_result(build, result_pattern, suite):
456    """Check if the result should be uploaded to CTS/GTS buckets.
457
458    @param build: Builder name.
459    @param result_pattern: XML result file pattern.
460    @param suite: Test suite name.
461
462    @returns: Bool flag indicating whether a valid result.
463    """
464    if build is None or suite is None:
465        return False
466
467    # Not valid if it's not a release build.
468    if not re.match(r'(?!trybot-).*-release/.*', build):
469        return False
470
471    # Not valid if it's cts result but not 'arc-cts*' or 'test_that_wrapper'
472    # suite.
473    result_patterns = [CTS_RESULT_PATTERN, CTS_V2_RESULT_PATTERN]
474    if result_pattern in result_patterns and not (
475            suite.startswith('arc-cts') or
476            suite.startswith('arc-gts') or
477            suite.startswith('bvt-arc') or
478            suite.startswith('cros_test_platform') or
479            suite.startswith('test_that_wrapper')):
480        return False
481
482    return True
483
484
485def _is_test_collector(package):
486    """Returns true if the test run is just to collect list of CTS tests.
487
488    @param package: Autotest package name. e.g. cheets_CTS_N.CtsGraphicsTestCase
489
490    @return Bool flag indicating a test package is CTS list generator or not.
491    """
492    return TEST_LIST_COLLECTOR in package
493
494
495def _get_swarming_req_dir(path):
496    """
497    Returns the parent directory of |path|, if |path| is a swarming task result.
498
499    @param path: Full path to the result of a task.
500                      e.g. /tmp/results/swarming-44466815c4bc951/1
501
502    @return string of the parent dir or None if not a swarming task.
503    """
504    m_parent = re.match(
505            '(?P<parent_dir>.*/swarming-[0-9a-fA-F]*0)/[1-9a-fA-F]$', path)
506    if m_parent:
507        return m_parent.group('parent_dir')
508    return None
509
510
511def _parse_cts_job_results_file_path(path):
512    """Parse CTS file paths an extract required information from them."""
513
514    # Autotest paths look like:
515    # /317739475-chromeos-test/chromeos4-row9-rack11-host22/
516    # cheets_CTS.android.dpi/results/cts-results/2016.04.28_01.41.44
517
518    # Swarming paths look like:
519    # /swarming-458e3a3a7fc6f210/1/autoserv_test/
520    # cheets_CTS.android.dpi/results/cts-results/2016.04.28_01.41.44
521
522    folders = path.split(os.sep)
523    if 'swarming' in folders[1]:
524        # Swarming job and attempt combined
525        job_id = "%s-%s" % (folders[-7], folders[-6])
526    else:
527        job_id = folders[-6]
528
529    cts_package = folders[-4]
530    timestamp = folders[-1]
531
532    return job_id, cts_package, timestamp
533
534
535def _upload_files(host, path, result_pattern, multiprocessing,
536                  result_gs_bucket, apfe_gs_bucket):
537    keyval = models.test.parse_job_keyval(host)
538    build = keyval.get('build')
539    suite = keyval.get('suite')
540
541    host_keyval = models.test.parse_host_keyval(host, keyval.get('hostname'))
542    labels =  urllib.unquote(host_keyval.get('labels'))
543    try:
544        host_model_name = re.search(r'model:(\w+)', labels).group(1)
545    except AttributeError:
546        logging.error('Model: name attribute is missing in %s/host_keyval/%s.',
547                      host, keyval.get('hostname'))
548        return
549
550    if not _is_valid_result(build, result_pattern, suite):
551        # No need to upload current folder, return.
552        return
553
554    parent_job_id = str(keyval['parent_job_id'])
555
556    job_id, package, timestamp = _parse_cts_job_results_file_path(path)
557
558    # Results produced by CTS test list collector are dummy results.
559    # They don't need to be copied to APFE bucket which is mainly being used for
560    # CTS APFE submission.
561    if not _is_test_collector(package):
562        # Path: bucket/build/parent_job_id/cheets_CTS.*/job_id_timestamp/
563        # or bucket/build/parent_job_id/cheets_GTS.*/job_id_timestamp/
564        index = build.find('-release')
565        build_with_model_name = ''
566        if index == -1:
567            logging.info('Not a release build.'
568                         'Non release build results can be skipped from offloading')
569            return
570
571        # CTS v2 pipeline requires device info in 'board.model' format.
572        # e.g. coral.robo360-release, eve.eve-release
573        build_with_model_name = (build[:index] + '.' + host_model_name +
574                                     build[index:])
575
576        cts_apfe_gs_path = os.path.join(
577                apfe_gs_bucket, build_with_model_name, parent_job_id,
578                package, job_id + '_' + timestamp) + '/'
579
580        for zip_file in glob.glob(os.path.join('%s.zip' % path)):
581            utils.run(' '.join(_get_cmd_list(
582                    multiprocessing, zip_file, cts_apfe_gs_path)))
583            logging.debug('Upload %s to %s ', zip_file, cts_apfe_gs_path)
584    else:
585        logging.debug('%s is a CTS Test collector Autotest test run.', package)
586        logging.debug('Skipping CTS results upload to APFE gs:// bucket.')
587
588    if result_gs_bucket:
589        # Path: bucket/cheets_CTS.*/job_id_timestamp/
590        # or bucket/cheets_GTS.*/job_id_timestamp/
591        test_result_gs_path = os.path.join(
592                result_gs_bucket, package, job_id + '_' + timestamp) + '/'
593
594        for test_result_file in glob.glob(os.path.join(path, result_pattern)):
595            # gzip test_result_file(testResult.xml/test_result.xml)
596
597            test_result_tgz_file = ''
598            if test_result_file.endswith('tgz'):
599                # Extract .xml file from tgz file for better handling in the
600                # CTS dashboard pipeline.
601                # TODO(rohitbm): work with infra team to produce .gz file so
602                # tgz to gz middle conversion is not needed.
603                try:
604                    with tarfile.open(test_result_file, 'r:gz') as tar_file:
605                        tar_file.extract(
606                                CTS_COMPRESSED_RESULT_TYPES[result_pattern])
607                        test_result_tgz_file = test_result_file
608                        test_result_file = os.path.join(path,
609                                CTS_COMPRESSED_RESULT_TYPES[result_pattern])
610                except tarfile.ReadError as error:
611                    logging.debug(error)
612                except KeyError as error:
613                    logging.debug(error)
614
615            test_result_file_gz =  '%s.gz' % test_result_file
616            with open(test_result_file, 'r') as f_in, (
617                    gzip.open(test_result_file_gz, 'w')) as f_out:
618                shutil.copyfileobj(f_in, f_out)
619            utils.run(' '.join(_get_cmd_list(
620                    multiprocessing, test_result_file_gz, test_result_gs_path)))
621            logging.debug('Zip and upload %s to %s',
622                          test_result_file_gz, test_result_gs_path)
623            # Remove test_result_file_gz(testResult.xml.gz/test_result.xml.gz)
624            os.remove(test_result_file_gz)
625            # Remove extracted test_result.xml file.
626            if test_result_tgz_file:
627               os.remove(test_result_file)
628
629
630def _emit_gs_returncode_metric(returncode):
631    """Increment the gs_returncode counter based on |returncode|."""
632    m_gs_returncode = 'chromeos/autotest/gs_offloader/gs_returncode'
633    rcode = int(returncode)
634    if rcode < 0 or rcode > 255:
635        rcode = -1
636    metrics.Counter(m_gs_returncode).increment(fields={'return_code': rcode})
637
638
639def _handle_dir_os_error(dir_entry, fix_permission=False):
640    """Try to fix the result directory's permission issue if needed.
641
642    @param dir_entry: Directory entry to offload.
643    @param fix_permission: True to change the directory's owner to the same one
644            running gs_offloader.
645    """
646    if fix_permission:
647        correct_results_folder_permission(dir_entry)
648    m_permission_error = ('chromeos/autotest/errors/gs_offloader/'
649                          'wrong_permissions_count')
650    metrics_fields = _get_metrics_fields(dir_entry)
651    metrics.Counter(m_permission_error).increment(fields=metrics_fields)
652
653
654class BaseGSOffloader(object):
655
656    """Google Storage offloader interface."""
657
658    __metaclass__ = abc.ABCMeta
659
660    def offload(self, dir_entry, dest_path, job_complete_time):
661        """Safely offload a directory entry to Google Storage.
662
663        This method is responsible for copying the contents of
664        `dir_entry` to Google storage at `dest_path`.
665
666        When successful, the method must delete all of `dir_entry`.
667        On failure, `dir_entry` should be left undisturbed, in order
668        to allow for retry.
669
670        Errors are conveyed simply and solely by two methods:
671          * At the time of failure, write enough information to the log
672            to allow later debug, if necessary.
673          * Don't delete the content.
674
675        In order to guarantee robustness, this method must not raise any
676        exceptions.
677
678        @param dir_entry: Directory entry to offload.
679        @param dest_path: Location in google storage where we will
680                          offload the directory.
681        @param job_complete_time: The complete time of the job from the AFE
682                                  database.
683        """
684        try:
685            self._full_offload(dir_entry, dest_path, job_complete_time)
686        except Exception as e:
687            logging.debug('Exception in offload for %s', dir_entry)
688            logging.debug('Ignoring this error: %s', str(e))
689
690    @abc.abstractmethod
691    def _full_offload(self, dir_entry, dest_path, job_complete_time):
692        """Offload a directory entry to Google Storage.
693
694        This method implements the actual offload behavior of its
695        subclass.  To guarantee effective debug, this method should
696        catch all exceptions, and perform any reasonable diagnosis
697        or other handling.
698
699        @param dir_entry: Directory entry to offload.
700        @param dest_path: Location in google storage where we will
701                          offload the directory.
702        @param job_complete_time: The complete time of the job from the AFE
703                                  database.
704        """
705
706
707class GSOffloader(BaseGSOffloader):
708    """Google Storage Offloader."""
709
710    def __init__(self, gs_uri, multiprocessing, delete_age,
711            console_client=None):
712        """Returns the offload directory function for the given gs_uri
713
714        @param gs_uri: Google storage bucket uri to offload to.
715        @param multiprocessing: True to turn on -m option for gsutil.
716        @param console_client: The cloud console client. If None,
717          cloud console APIs are  not called.
718        """
719        self._gs_uri = gs_uri
720        self._multiprocessing = multiprocessing
721        self._delete_age = delete_age
722        self._console_client = console_client
723
724    @metrics.SecondsTimerDecorator(
725            'chromeos/autotest/gs_offloader/job_offload_duration')
726    def _full_offload(self, dir_entry, dest_path, job_complete_time):
727        """Offload the specified directory entry to Google storage.
728
729        @param dir_entry: Directory entry to offload.
730        @param dest_path: Location in google storage where we will
731                          offload the directory.
732        @param job_complete_time: The complete time of the job from the AFE
733                                  database.
734        """
735        with tempfile.TemporaryFile('w+') as stdout_file, \
736             tempfile.TemporaryFile('w+') as stderr_file:
737            try:
738                try:
739                    self._try_offload(dir_entry, dest_path, stdout_file,
740                                      stderr_file)
741                except OSError as e:
742                    # Correct file permission error of the directory, then raise
743                    # the exception so gs_offloader can retry later.
744                    _handle_dir_os_error(dir_entry, e.errno==errno.EACCES)
745                    # Try again after the permission issue is fixed.
746                    self._try_offload(dir_entry, dest_path, stdout_file,
747                                      stderr_file)
748            except _OffloadError as e:
749                metrics_fields = _get_metrics_fields(dir_entry)
750                m_any_error = 'chromeos/autotest/errors/gs_offloader/any_error'
751                metrics.Counter(m_any_error).increment(fields=metrics_fields)
752
753                # Rewind the log files for stdout and stderr and log
754                # their contents.
755                stdout_file.seek(0)
756                stderr_file.seek(0)
757                stderr_content = stderr_file.read()
758                logging.warning('Error occurred when offloading %s:', dir_entry)
759                logging.warning('Stdout:\n%s \nStderr:\n%s', stdout_file.read(),
760                                stderr_content)
761
762                # Some result files may have wrong file permission. Try
763                # to correct such error so later try can success.
764                # TODO(dshi): The code is added to correct result files
765                # with wrong file permission caused by bug 511778. After
766                # this code is pushed to lab and run for a while to
767                # clean up these files, following code and function
768                # correct_results_folder_permission can be deleted.
769                if 'CommandException: Error opening file' in stderr_content:
770                    correct_results_folder_permission(dir_entry)
771            else:
772                self._prune(dir_entry, job_complete_time)
773                swarming_req_dir = _get_swarming_req_dir(dir_entry)
774                if swarming_req_dir:
775                    self._prune_swarming_req_dir(swarming_req_dir)
776
777
778    def _try_offload(self, dir_entry, dest_path,
779                 stdout_file, stderr_file):
780        """Offload the specified directory entry to Google storage.
781
782        @param dir_entry: Directory entry to offload.
783        @param dest_path: Location in google storage where we will
784                          offload the directory.
785        @param job_complete_time: The complete time of the job from the AFE
786                                  database.
787        @param stdout_file: Log file.
788        @param stderr_file: Log file.
789        """
790        if _is_uploaded(dir_entry):
791            return
792        start_time = time.time()
793        metrics_fields = _get_metrics_fields(dir_entry)
794        error_obj = _OffloadError(start_time)
795        config = config_loader.load(dir_entry)
796        cts_enabled = True
797        if config:
798          # TODO(linxinan): use credential file assigned by the side_effect
799          # config.
800          if not config.cts.enabled:
801            cts_enabled = config.cts.enabled
802          if config.google_storage.bucket:
803            gs_prefix = ('' if config.google_storage.bucket.startswith('gs://')
804                         else 'gs://')
805            self._gs_uri = gs_prefix + config.google_storage.bucket
806        else:
807          # For now, the absence of config does not block gs_offloader
808          # from uploading files via default credential.
809          logging.debug('Failed to load the side effects config in %s.',
810                        dir_entry)
811        try:
812            sanitize_dir(dir_entry)
813            if DEFAULT_CTS_RESULTS_GSURI and cts_enabled:
814                _upload_cts_testresult(dir_entry, self._multiprocessing)
815
816            if LIMIT_FILE_COUNT:
817                limit_file_count(dir_entry)
818
819            process = None
820            with timeout_util.Timeout(OFFLOAD_TIMEOUT_SECS):
821                gs_path = '%s%s' % (self._gs_uri, dest_path)
822                cmd = _get_cmd_list(self._multiprocessing, dir_entry, gs_path)
823                logging.debug('Attempting an offload command %s', cmd)
824                process = subprocess.Popen(
825                    cmd, stdout=stdout_file, stderr=stderr_file)
826                process.wait()
827                logging.debug('Offload command %s completed; '
828                              'marking offload complete.', cmd)
829                _mark_upload_finished(gs_path, stdout_file, stderr_file)
830
831            _emit_gs_returncode_metric(process.returncode)
832            if process.returncode != 0:
833                raise error_obj
834            _emit_offload_metrics(dir_entry)
835
836            if self._console_client:
837                gcs_uri = os.path.join(gs_path,
838                        os.path.basename(dir_entry))
839                if not self._console_client.send_test_job_offloaded_message(
840                        gcs_uri):
841                    raise error_obj
842
843            _mark_uploaded(dir_entry)
844        except timeout_util.TimeoutError:
845            m_timeout = 'chromeos/autotest/errors/gs_offloader/timed_out_count'
846            metrics.Counter(m_timeout).increment(fields=metrics_fields)
847            # If we finished the call to Popen(), we may need to
848            # terminate the child process.  We don't bother calling
849            # process.poll(); that inherently races because the child
850            # can die any time it wants.
851            if process:
852                try:
853                    process.terminate()
854                except OSError:
855                    # We don't expect any error other than "No such
856                    # process".
857                    pass
858            logging.error('Offloading %s timed out after waiting %d '
859                          'seconds.', dir_entry, OFFLOAD_TIMEOUT_SECS)
860            raise error_obj
861
862    def _prune(self, dir_entry, job_complete_time):
863        """Prune directory if it is uploaded and expired.
864
865        @param dir_entry: Directory entry to offload.
866        @param job_complete_time: The complete time of the job from the AFE
867                                  database.
868        """
869        if not (_is_uploaded(dir_entry)
870                and job_directories.is_job_expired(self._delete_age,
871                                                   job_complete_time)):
872            return
873        try:
874            logging.debug('Pruning uploaded directory %s', dir_entry)
875            shutil.rmtree(dir_entry)
876            job_timestamp_cache.delete(dir_entry)
877        except OSError as e:
878            # The wrong file permission can lead call `shutil.rmtree(dir_entry)`
879            # to raise OSError with message 'Permission denied'. Details can be
880            # found in crbug.com/536151
881            _handle_dir_os_error(dir_entry, e.errno==errno.EACCES)
882            # Try again after the permission issue is fixed.
883            shutil.rmtree(dir_entry)
884
885    def _prune_swarming_req_dir(self, swarming_req_dir):
886        """Prune swarming request directory, if it is empty.
887
888        @param swarming_req_dir: Directory entry of a swarming request.
889        """
890        try:
891            logging.debug('Pruning swarming request directory %s',
892                          swarming_req_dir)
893            os.rmdir(swarming_req_dir)
894        except OSError as e:
895            # Do nothing and leave this directory to next attempt to remove.
896            logging.debug('Failed to prune swarming request directory %s',
897                          swarming_req_dir)
898
899
900class _OffloadError(Exception):
901    """Google Storage offload failed."""
902
903    def __init__(self, start_time):
904        super(_OffloadError, self).__init__(start_time)
905        self.start_time = start_time
906
907
908
909class FakeGSOffloader(BaseGSOffloader):
910
911    """Fake Google Storage Offloader that only deletes directories."""
912
913    def _full_offload(self, dir_entry, dest_path, job_complete_time):
914        """Pretend to offload a directory and delete it.
915
916        @param dir_entry: Directory entry to offload.
917        @param dest_path: Location in google storage where we will
918                          offload the directory.
919        @param job_complete_time: The complete time of the job from the AFE
920                                  database.
921        """
922        shutil.rmtree(dir_entry)
923
924
925class OptionalMemoryCache(object):
926   """Implements memory cache if cachetools module can be loaded.
927
928   If the platform has cachetools available then the cache will
929   be created, otherwise the get calls will always act as if there
930   was a cache miss and the set/delete will be no-ops.
931   """
932   cache = None
933
934   def setup(self, age_to_delete):
935       """Set up a TTL cache size based on how long the job will be handled.
936
937       Autotest jobs are handled by gs_offloader until they are deleted from
938       local storage, base the cache size on how long that is.
939
940       @param age_to_delete: Number of days after which items in the cache
941                             should expire.
942       """
943       if cachetools:
944           # Min cache is 1000 items for 10 mins. If the age to delete is 0
945           # days you still want a short / small cache.
946           # 2000 items is a good approximation for the max number of jobs a
947           # moblab # can produce in a day, lab offloads immediatly so
948           # the number of carried jobs should be very small in the normal
949           # case.
950           ttl = max(age_to_delete * 24 * 60 * 60, 600)
951           maxsize = max(age_to_delete * 2000, 1000)
952           job_timestamp_cache.cache = cachetools.TTLCache(maxsize=maxsize,
953                                                           ttl=ttl)
954
955   def get(self, key):
956       """If we have a cache try to retrieve from it."""
957       if self.cache is not None:
958           result = self.cache.get(key)
959           return result
960       return None
961
962   def add(self, key, value):
963       """If we have a cache try to store key/value."""
964       if self.cache is not None:
965           self.cache[key] = value
966
967   def delete(self, key):
968       """If we have a cache try to remove a key."""
969       if self.cache is not None:
970           return self.cache.delete(key)
971
972
973job_timestamp_cache = OptionalMemoryCache()
974
975
976def _cached_get_timestamp_if_finished(job):
977    """Retrieve a job finished timestamp from cache or AFE.
978    @param job       _JobDirectory instance to retrieve
979                     finished timestamp of..
980
981    @returns: None if the job is not finished, or the
982              last job finished time recorded by Autotest.
983    """
984    job_timestamp = job_timestamp_cache.get(job.dirname)
985    if not job_timestamp:
986        job_timestamp = job.get_timestamp_if_finished()
987        if job_timestamp:
988            job_timestamp_cache.add(job.dirname, job_timestamp)
989    return job_timestamp
990
991
992def _is_expired(job, age_limit):
993    """Return whether job directory is expired for uploading
994
995    @param job: _JobDirectory instance.
996    @param age_limit:  Minimum age in days at which a job may be offloaded.
997    """
998    job_timestamp = _cached_get_timestamp_if_finished(job)
999    if not job_timestamp:
1000        return False
1001    return job_directories.is_job_expired(age_limit, job_timestamp)
1002
1003
1004def _emit_offload_metrics(dirpath):
1005    """Emit gs offload metrics.
1006
1007    @param dirpath: Offloaded directory path.
1008    """
1009    dir_size = file_utils.get_directory_size_kibibytes(dirpath)
1010    metrics_fields = _get_metrics_fields(dirpath)
1011
1012    m_offload_count = (
1013            'chromeos/autotest/gs_offloader/jobs_offloaded')
1014    metrics.Counter(m_offload_count).increment(
1015            fields=metrics_fields)
1016    m_offload_size = ('chromeos/autotest/gs_offloader/'
1017                      'kilobytes_transferred')
1018    metrics.Counter(m_offload_size).increment_by(
1019            dir_size, fields=metrics_fields)
1020
1021
1022def _is_uploaded(dirpath):
1023    """Return whether directory has been uploaded.
1024
1025    @param dirpath: Directory path string.
1026    """
1027    return os.path.isfile(_get_uploaded_marker_file(dirpath))
1028
1029
1030def _mark_uploaded(dirpath):
1031    """Mark directory as uploaded.
1032
1033    @param dirpath: Directory path string.
1034    """
1035    logging.debug('Creating uploaded marker for directory %s', dirpath)
1036    with open(_get_uploaded_marker_file(dirpath), 'a'):
1037        pass
1038
1039
1040def _mark_upload_finished(gs_path, stdout_file, stderr_file):
1041    """Mark a given gs_path upload as finished (remotely).
1042
1043    @param gs_path: gs:// url of the remote directory that is finished
1044                    upload.
1045    """
1046    cmd = _get_finish_cmd_list(gs_path)
1047    process = subprocess.Popen(cmd, stdout=stdout_file, stderr=stderr_file)
1048    process.wait()
1049    logging.debug('Finished marking as complete %s', cmd)
1050
1051
1052def _get_uploaded_marker_file(dirpath):
1053    """Return path to upload marker file for directory.
1054
1055    @param dirpath: Directory path string.
1056    """
1057    return '%s/.GS_UPLOADED' % (dirpath,)
1058
1059
1060def _format_job_for_failure_reporting(job):
1061    """Formats a _JobDirectory for reporting / logging.
1062
1063    @param job: The _JobDirectory to format.
1064    """
1065    d = datetime.datetime.fromtimestamp(job.first_offload_start)
1066    data = (d.strftime(FAILED_OFFLOADS_TIME_FORMAT),
1067            job.offload_count,
1068            job.dirname)
1069    return FAILED_OFFLOADS_LINE_FORMAT % data
1070
1071
1072def wait_for_gs_write_access(gs_uri):
1073    """Verify and wait until we have write access to Google Storage.
1074
1075    @param gs_uri: The Google Storage URI we are trying to offload to.
1076    """
1077    # TODO (sbasi) Try to use the gsutil command to check write access.
1078    # Ensure we have write access to gs_uri.
1079    dummy_file = tempfile.NamedTemporaryFile()
1080    test_cmd = _get_cmd_list(False, dummy_file.name, gs_uri)
1081    while True:
1082        logging.debug('Checking for write access with dummy file %s',
1083                      dummy_file.name)
1084        try:
1085            subprocess.check_call(test_cmd)
1086            subprocess.check_call(
1087                    ['gsutil', 'rm',
1088                     os.path.join(gs_uri,
1089                                  os.path.basename(dummy_file.name))])
1090            break
1091        except subprocess.CalledProcessError:
1092            t = 120
1093            logging.debug('Unable to offload dummy file to %s, sleeping for %s '
1094                          'seconds.', gs_uri, t)
1095            time.sleep(t)
1096    logging.debug('Dummy file write check to gs succeeded.')
1097
1098
1099class Offloader(object):
1100    """State of the offload process.
1101
1102    Contains the following member fields:
1103      * _gs_offloader:  _BaseGSOffloader to use to offload a job directory.
1104      * _jobdir_classes:  List of classes of job directory to be
1105        offloaded.
1106      * _processes:  Maximum number of outstanding offload processes
1107        to allow during an offload cycle.
1108      * _age_limit:  Minimum age in days at which a job may be
1109        offloaded.
1110      * _open_jobs: a dictionary mapping directory paths to Job
1111        objects.
1112    """
1113
1114    def __init__(self, options):
1115        self._upload_age_limit = options.age_to_upload
1116        self._delete_age_limit = options.age_to_delete
1117        if options.delete_only:
1118            self._gs_offloader = FakeGSOffloader()
1119        else:
1120            self.gs_uri = utils.get_offload_gsuri()
1121            logging.debug('Offloading to: %s', self.gs_uri)
1122            multiprocessing = False
1123            if options.multiprocessing:
1124                multiprocessing = True
1125            elif options.multiprocessing is None:
1126                multiprocessing = GS_OFFLOADER_MULTIPROCESSING
1127            logging.info(
1128                    'Offloader multiprocessing is set to:%r', multiprocessing)
1129            console_client = None
1130            if (cloud_console_client and
1131                    cloud_console_client.is_cloud_notification_enabled()):
1132                console_client = cloud_console_client.PubSubBasedClient()
1133            self._gs_offloader = GSOffloader(
1134                    self.gs_uri, multiprocessing, self._delete_age_limit,
1135                    console_client)
1136        classlist = [
1137                job_directories.SwarmingJobDirectory,
1138        ]
1139        if options.process_hosts_only or options.process_all:
1140            classlist.append(job_directories.SpecialJobDirectory)
1141        if not options.process_hosts_only:
1142            classlist.append(job_directories.RegularJobDirectory)
1143        self._jobdir_classes = classlist
1144        assert self._jobdir_classes
1145        self._processes = options.parallelism
1146        self._open_jobs = {}
1147        self._pusub_topic = None
1148        self._offload_count_limit = 3
1149
1150
1151    def _add_new_jobs(self):
1152        """Find new job directories that need offloading.
1153
1154        Go through the file system looking for valid job directories
1155        that are currently not in `self._open_jobs`, and add them in.
1156
1157        """
1158        new_job_count = 0
1159        for cls in self._jobdir_classes:
1160            for resultsdir in cls.get_job_directories():
1161                if resultsdir in self._open_jobs:
1162                    continue
1163                self._open_jobs[resultsdir] = cls(resultsdir)
1164                new_job_count += 1
1165        logging.debug('Start of offload cycle - found %d new jobs',
1166                      new_job_count)
1167
1168
1169    def _remove_offloaded_jobs(self):
1170        """Removed offloaded jobs from `self._open_jobs`."""
1171        removed_job_count = 0
1172        for jobkey, job in self._open_jobs.items():
1173            if (
1174                    not os.path.exists(job.dirname)
1175                    or _is_uploaded(job.dirname)):
1176                del self._open_jobs[jobkey]
1177                removed_job_count += 1
1178        logging.debug('End of offload cycle - cleared %d jobs, '
1179                      'carrying %d open jobs',
1180                      removed_job_count, len(self._open_jobs))
1181
1182
1183    def _report_failed_jobs(self):
1184        """Report status after attempting offload.
1185
1186        This function processes all jobs in `self._open_jobs`, assuming
1187        an attempt has just been made to offload all of them.
1188
1189        If any jobs have reportable errors, and we haven't generated
1190        an e-mail report in the last `REPORT_INTERVAL_SECS` seconds,
1191        send new e-mail describing the failures.
1192
1193        """
1194        failed_jobs = [j for j in self._open_jobs.values() if
1195                       j.first_offload_start]
1196        self._report_failed_jobs_count(failed_jobs)
1197        self._log_failed_jobs_locally(failed_jobs)
1198
1199
1200    def offload_once(self):
1201        """Perform one offload cycle.
1202
1203        Find all job directories for new jobs that we haven't seen
1204        before.  Then, attempt to offload the directories for any
1205        jobs that have finished running.  Offload of multiple jobs
1206        is done in parallel, up to `self._processes` at a time.
1207
1208        After we've tried uploading all directories, go through the list
1209        checking the status of all uploaded directories.  If necessary,
1210        report failures via e-mail.
1211
1212        """
1213        self._add_new_jobs()
1214        self._report_current_jobs_count()
1215        with parallel.BackgroundTaskRunner(
1216                self._gs_offloader.offload, processes=self._processes) as queue:
1217            for job in self._open_jobs.values():
1218                _enqueue_offload(job, queue, self._upload_age_limit)
1219        self._give_up_on_jobs_over_limit()
1220        self._remove_offloaded_jobs()
1221        self._report_failed_jobs()
1222
1223
1224    def _give_up_on_jobs_over_limit(self):
1225        """Give up on jobs that have gone over the offload limit.
1226
1227        We mark them as uploaded as we won't try to offload them any more.
1228        """
1229        for job in self._open_jobs.values():
1230            if job.offload_count >= self._offload_count_limit:
1231                _mark_uploaded(job.dirname)
1232
1233
1234    def _log_failed_jobs_locally(self, failed_jobs,
1235                                 log_file=FAILED_OFFLOADS_FILE):
1236        """Updates a local file listing all the failed jobs.
1237
1238        The dropped file can be used by the developers to list jobs that we have
1239        failed to upload.
1240
1241        @param failed_jobs: A list of failed _JobDirectory objects.
1242        @param log_file: The file to log the failed jobs to.
1243        """
1244        now = datetime.datetime.now()
1245        now_str = now.strftime(FAILED_OFFLOADS_TIME_FORMAT)
1246        formatted_jobs = [_format_job_for_failure_reporting(job)
1247                            for job in failed_jobs]
1248        formatted_jobs.sort()
1249
1250        with open(log_file, 'w') as logfile:
1251            logfile.write(FAILED_OFFLOADS_FILE_HEADER %
1252                          (now_str, len(failed_jobs)))
1253            logfile.writelines(formatted_jobs)
1254
1255
1256    def _report_current_jobs_count(self):
1257        """Report the number of outstanding jobs to monarch."""
1258        metrics.Gauge('chromeos/autotest/gs_offloader/current_jobs_count').set(
1259                len(self._open_jobs))
1260
1261
1262    def _report_failed_jobs_count(self, failed_jobs):
1263        """Report the number of outstanding failed offload jobs to monarch.
1264
1265        @param: List of failed jobs.
1266        """
1267        metrics.Gauge('chromeos/autotest/gs_offloader/failed_jobs_count').set(
1268                len(failed_jobs))
1269
1270
1271def _enqueue_offload(job, queue, age_limit):
1272    """Enqueue the job for offload, if it's eligible.
1273
1274    The job is eligible for offloading if the database has marked
1275    it finished, and the job is older than the `age_limit`
1276    parameter.
1277
1278    If the job is eligible, offload processing is requested by
1279    passing the `queue` parameter's `put()` method a sequence with
1280    the job's `dirname` attribute and its directory name.
1281
1282    @param job       _JobDirectory instance to offload.
1283    @param queue     If the job should be offloaded, put the offload
1284                     parameters into this queue for processing.
1285    @param age_limit Minimum age for a job to be offloaded.  A value
1286                     of 0 means that the job will be offloaded as
1287                     soon as it is finished.
1288
1289    """
1290    if not job.offload_count:
1291        if not _is_expired(job, age_limit):
1292            return
1293        job.first_offload_start = time.time()
1294    job.offload_count += 1
1295    if job.process_gs_instructions():
1296        timestamp = _cached_get_timestamp_if_finished(job)
1297        queue.put([job.dirname, os.path.dirname(job.dirname), timestamp])
1298
1299
1300def parse_options():
1301    """Parse the args passed into gs_offloader."""
1302    defaults = 'Defaults:\n  Destination: %s\n  Results Path: %s' % (
1303            utils.DEFAULT_OFFLOAD_GSURI, RESULTS_DIR)
1304    usage = 'usage: %prog [options]\n' + defaults
1305    parser = OptionParser(usage)
1306    parser.add_option('-a', '--all', dest='process_all',
1307                      action='store_true',
1308                      help='Offload all files in the results directory.')
1309    parser.add_option('-s', '--hosts', dest='process_hosts_only',
1310                      action='store_true',
1311                      help='Offload only the special tasks result files '
1312                      'located in the results/hosts subdirectory')
1313    parser.add_option('-p', '--parallelism', dest='parallelism',
1314                      type='int', default=1,
1315                      help='Number of parallel workers to use.')
1316    parser.add_option('-o', '--delete_only', dest='delete_only',
1317                      action='store_true',
1318                      help='GS Offloader will only the delete the '
1319                      'directories and will not offload them to google '
1320                      'storage. NOTE: If global_config variable '
1321                      'CROS.gs_offloading_enabled is False, --delete_only '
1322                      'is automatically True.',
1323                      default=not GS_OFFLOADING_ENABLED)
1324    parser.add_option('-d', '--days_old', dest='days_old',
1325                      help='Minimum job age in days before a result can be '
1326                      'offloaded.', type='int', default=0)
1327    parser.add_option('-l', '--log_size', dest='log_size',
1328                      help='Limit the offloader logs to a specified '
1329                      'number of Mega Bytes.', type='int', default=0)
1330    parser.add_option('-m', dest='multiprocessing', action='store_true',
1331                      help='Turn on -m option for gsutil. If not set, the '
1332                      'global config setting gs_offloader_multiprocessing '
1333                      'under CROS section is applied.')
1334    parser.add_option('-i', '--offload_once', dest='offload_once',
1335                      action='store_true',
1336                      help='Upload all available results and then exit.')
1337    parser.add_option('-y', '--normal_priority', dest='normal_priority',
1338                      action='store_true',
1339                      help='Upload using normal process priority.')
1340    parser.add_option('-u', '--age_to_upload', dest='age_to_upload',
1341                      help='Minimum job age in days before a result can be '
1342                      'offloaded, but not removed from local storage',
1343                      type='int', default=None)
1344    parser.add_option('-n', '--age_to_delete', dest='age_to_delete',
1345                      help='Minimum job age in days before a result can be '
1346                      'removed from local storage',
1347                      type='int', default=None)
1348    parser.add_option(
1349            '--metrics-file',
1350            help='If provided, drop metrics to this local file instead of '
1351                 'reporting to ts_mon',
1352            type=str,
1353            default=None,
1354    )
1355    parser.add_option('-t', '--enable_timestamp_cache',
1356                      dest='enable_timestamp_cache',
1357                      action='store_true',
1358                      help='Cache the finished timestamps from AFE.')
1359
1360    options = parser.parse_args()[0]
1361    if options.process_all and options.process_hosts_only:
1362        parser.print_help()
1363        print ('Cannot process all files and only the hosts '
1364               'subdirectory. Please remove an argument.')
1365        sys.exit(1)
1366
1367    if options.days_old and (options.age_to_upload or options.age_to_delete):
1368        parser.print_help()
1369        print('Use the days_old option or the age_to_* options but not both')
1370        sys.exit(1)
1371
1372    if options.age_to_upload == None:
1373        options.age_to_upload = options.days_old
1374    if options.age_to_delete == None:
1375        options.age_to_delete = options.days_old
1376
1377    return options
1378
1379
1380def main():
1381    """Main method of gs_offloader."""
1382    options = parse_options()
1383
1384    if options.process_all:
1385        offloader_type = 'all'
1386    elif options.process_hosts_only:
1387        offloader_type = 'hosts'
1388    else:
1389        offloader_type = 'jobs'
1390
1391    _setup_logging(options, offloader_type)
1392
1393    if options.enable_timestamp_cache:
1394        # Extend the cache expiry time by another 1% so the timstamps
1395        # are available as the results are purged.
1396        job_timestamp_cache.setup(options.age_to_delete * 1.01)
1397
1398    # Nice our process (carried to subprocesses) so we don't overload
1399    # the system.
1400    if not options.normal_priority:
1401        logging.debug('Set process to nice value: %d', NICENESS)
1402        os.nice(NICENESS)
1403    if psutil:
1404        proc = psutil.Process()
1405        logging.debug('Set process to ionice IDLE')
1406        proc.ionice(psutil.IOPRIO_CLASS_IDLE)
1407
1408    # os.listdir returns relative paths, so change to where we need to
1409    # be to avoid an os.path.join on each loop.
1410    logging.debug('Offloading Autotest results in %s', RESULTS_DIR)
1411    os.chdir(RESULTS_DIR)
1412
1413    service_name = 'gs_offloader(%s)' % offloader_type
1414    with ts_mon_config.SetupTsMonGlobalState(service_name, indirect=True,
1415                                             short_lived=False,
1416                                             debug_file=options.metrics_file):
1417        with metrics.SuccessCounter('chromeos/autotest/gs_offloader/exit'):
1418            offloader = Offloader(options)
1419            if not options.delete_only:
1420                wait_for_gs_write_access(offloader.gs_uri)
1421            while True:
1422                offloader.offload_once()
1423                if options.offload_once:
1424                    break
1425                time.sleep(SLEEP_TIME_SECS)
1426
1427
1428_LOG_LOCATION = '/usr/local/autotest/logs/'
1429_LOG_FILENAME_FORMAT = 'gs_offloader_%s_log_%s.txt'
1430_LOG_TIMESTAMP_FORMAT = '%Y%m%d_%H%M%S'
1431_LOGGING_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
1432
1433
1434def _setup_logging(options, offloader_type):
1435    """Set up logging.
1436
1437    @param options: Parsed options.
1438    @param offloader_type: Type of offloader action as string.
1439    """
1440    log_filename = _get_log_filename(options, offloader_type)
1441    log_formatter = logging.Formatter(_LOGGING_FORMAT)
1442    # Replace the default logging handler with a RotatingFileHandler. If
1443    # options.log_size is 0, the file size will not be limited. Keeps
1444    # one backup just in case.
1445    handler = logging.handlers.RotatingFileHandler(
1446            log_filename, maxBytes=1024 * options.log_size, backupCount=1)
1447    handler.setFormatter(log_formatter)
1448    logger = logging.getLogger()
1449    logger.setLevel(logging.DEBUG)
1450    logger.addHandler(handler)
1451
1452
1453def _get_log_filename(options, offloader_type):
1454    """Get log filename.
1455
1456    @param options: Parsed options.
1457    @param offloader_type: Type of offloader action as string.
1458    """
1459    if options.log_size > 0:
1460        log_timestamp = ''
1461    else:
1462        log_timestamp = time.strftime(_LOG_TIMESTAMP_FORMAT)
1463    log_basename = _LOG_FILENAME_FORMAT % (offloader_type, log_timestamp)
1464    return os.path.join(_LOG_LOCATION, log_basename)
1465
1466
1467if __name__ == '__main__':
1468    main()
1469