• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python
2#
3# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""Script to archive old Autotest results to Google Storage.
8
9Uses gsutil to archive files to the configured Google Storage bucket.
10Upon successful copy, the local results directory is deleted.
11"""
12
13import abc
14import datetime
15import errno
16import glob
17import gzip
18import logging
19import logging.handlers
20import os
21import re
22import shutil
23import socket
24import stat
25import subprocess
26import sys
27import tarfile
28import tempfile
29import time
30
31from optparse import OptionParser
32
33import common
34from autotest_lib.client.common_lib import file_utils
35from autotest_lib.client.common_lib import global_config
36from autotest_lib.client.common_lib import utils
37from autotest_lib.site_utils import job_directories
38from autotest_lib.site_utils import cloud_console_client
39from autotest_lib.tko import models
40from autotest_lib.utils import labellib
41from autotest_lib.utils import gslib
42from chromite.lib import timeout_util
43
44# Autotest requires the psutil module from site-packages, so it must be imported
45# after "import common".
46try:
47    # Does not exist, nor is needed, on moblab.
48    import psutil
49except ImportError:
50    psutil = None
51
52from chromite.lib import parallel
53try:
54    from chromite.lib import metrics
55    from chromite.lib import ts_mon_config
56except ImportError:
57    from autotest_lib import site_utils
58    metrics = site_utils.metrics_mock
59    ts_mon_config = site_utils.metrics_mock
60
61
62GS_OFFLOADING_ENABLED = global_config.global_config.get_config_value(
63        'CROS', 'gs_offloading_enabled', type=bool, default=True)
64
65# Nice setting for process, the higher the number the lower the priority.
66NICENESS = 10
67
68# Maximum number of seconds to allow for offloading a single
69# directory.
70OFFLOAD_TIMEOUT_SECS = 60 * 60
71
72# Sleep time per loop.
73SLEEP_TIME_SECS = 5
74
75# Minimum number of seconds between e-mail reports.
76REPORT_INTERVAL_SECS = 60 * 60
77
78# Location of Autotest results on disk.
79RESULTS_DIR = '/usr/local/autotest/results'
80FAILED_OFFLOADS_FILE = os.path.join(RESULTS_DIR, 'FAILED_OFFLOADS')
81
82# Hosts sub-directory that contains cleanup, verify and repair jobs.
83HOSTS_SUB_DIR = 'hosts'
84
85FAILED_OFFLOADS_FILE_HEADER = '''
86This is the list of gs_offloader failed jobs.
87Last offloader attempt at %s failed to offload %d files.
88Check http://go/cros-triage-gsoffloader to triage the issue
89
90
91First failure       Count   Directory name
92=================== ======  ==============================
93'''
94# --+----1----+----  ----+  ----+----1----+----2----+----3
95
96FAILED_OFFLOADS_LINE_FORMAT = '%19s  %5d  %-1s\n'
97FAILED_OFFLOADS_TIME_FORMAT = '%Y-%m-%d %H:%M:%S'
98
99USE_RSYNC_ENABLED = global_config.global_config.get_config_value(
100        'CROS', 'gs_offloader_use_rsync', type=bool, default=False)
101
102LIMIT_FILE_COUNT = global_config.global_config.get_config_value(
103        'CROS', 'gs_offloader_limit_file_count', type=bool, default=False)
104
105# Use multiprocessing for gsutil uploading.
106GS_OFFLOADER_MULTIPROCESSING = global_config.global_config.get_config_value(
107        'CROS', 'gs_offloader_multiprocessing', type=bool, default=False)
108
109D = '[0-9][0-9]'
110TIMESTAMP_PATTERN = '%s%s.%s.%s_%s.%s.%s' % (D, D, D, D, D, D, D)
111CTS_RESULT_PATTERN = 'testResult.xml'
112CTS_V2_RESULT_PATTERN = 'test_result.xml'
113# Google Storage bucket URI to store results in.
114DEFAULT_CTS_RESULTS_GSURI = global_config.global_config.get_config_value(
115        'CROS', 'cts_results_server', default='')
116DEFAULT_CTS_APFE_GSURI = global_config.global_config.get_config_value(
117        'CROS', 'cts_apfe_server', default='')
118
119# metadata type
120GS_OFFLOADER_SUCCESS_TYPE = 'gs_offloader_success'
121GS_OFFLOADER_FAILURE_TYPE = 'gs_offloader_failure'
122
123
124def _get_metrics_fields(dir_entry):
125    """Get metrics fields for the given test result directory, including board
126    and milestone.
127
128    @param dir_entry: Directory entry to offload.
129    @return A dictionary for the metrics data to be uploaded.
130    """
131    fields = {'board': 'unknown',
132              'milestone': 'unknown'}
133    if dir_entry:
134        # There could be multiple hosts in the job directory, use the first one
135        # available.
136        for host in glob.glob(os.path.join(dir_entry, '*')):
137            try:
138                keyval = models.test.parse_job_keyval(host)
139            except ValueError:
140                continue
141            build = keyval.get('build')
142            if build:
143                try:
144                    cros_version = labellib.parse_cros_version(build)
145                    fields['board'] = cros_version.board
146                    fields['milestone'] = cros_version.milestone
147                    break
148                except ValueError:
149                    # Ignore version parsing error so it won't crash
150                    # gs_offloader.
151                    pass
152
153    return fields;
154
155
156def _get_es_metadata(dir_entry):
157    """Get ES metadata for the given test result directory.
158
159    @param dir_entry: Directory entry to offload.
160    @return A dictionary for the metadata to be uploaded.
161    """
162    fields = _get_metrics_fields(dir_entry)
163    fields['hostname'] = socket.gethostname()
164    # Include more data about the test job in metadata.
165    if dir_entry:
166        fields['dir_entry'] = dir_entry
167        fields['job_id'] = job_directories.get_job_id_or_task_id(dir_entry)
168
169    return fields
170
171
172def _get_cmd_list(multiprocessing, dir_entry, gs_path):
173    """Return the command to offload a specified directory.
174
175    @param multiprocessing: True to turn on -m option for gsutil.
176    @param dir_entry: Directory entry/path that which we need a cmd_list
177                      to offload.
178    @param gs_path: Location in google storage where we will
179                    offload the directory.
180
181    @return A command list to be executed by Popen.
182    """
183    cmd = ['gsutil']
184    if multiprocessing:
185        cmd.append('-m')
186    if USE_RSYNC_ENABLED:
187        cmd.append('rsync')
188        target = os.path.join(gs_path, os.path.basename(dir_entry))
189    else:
190        cmd.append('cp')
191        target = gs_path
192    cmd += ['-eR', dir_entry, target]
193    return cmd
194
195
196def sanitize_dir(dirpath):
197    """Sanitize directory for gs upload.
198
199    Symlinks and FIFOS are converted to regular files to fix bugs.
200
201    @param dirpath: Directory entry to be sanitized.
202    """
203    if not os.path.exists(dirpath):
204        return
205    _escape_rename(dirpath)
206    _escape_rename_dir_contents(dirpath)
207    _sanitize_fifos(dirpath)
208    _sanitize_symlinks(dirpath)
209
210
211def _escape_rename_dir_contents(dirpath):
212    """Recursively rename directory to escape filenames for gs upload.
213
214    @param dirpath: Directory path string.
215    """
216    for filename in os.listdir(dirpath):
217        path = os.path.join(dirpath, filename)
218        _escape_rename(path)
219    for filename in os.listdir(dirpath):
220        path = os.path.join(dirpath, filename)
221        if os.path.isdir(path):
222            _escape_rename_dir_contents(path)
223
224
225def _escape_rename(path):
226    """Rename file to escape filenames for gs upload.
227
228    @param path: File path string.
229    """
230    dirpath, filename = os.path.split(path)
231    sanitized_filename = gslib.escape(filename)
232    sanitized_path = os.path.join(dirpath, sanitized_filename)
233    os.rename(path, sanitized_path)
234
235
236def _sanitize_fifos(dirpath):
237    """Convert fifos to regular files (fixes crbug.com/684122).
238
239    @param dirpath: Directory path string.
240    """
241    for root, _, files in os.walk(dirpath):
242        for filename in files:
243            path = os.path.join(root, filename)
244            file_stat = os.lstat(path)
245            if stat.S_ISFIFO(file_stat.st_mode):
246                _replace_fifo_with_file(path)
247
248
249def _replace_fifo_with_file(path):
250    """Replace a fifo with a normal file.
251
252    @param path: Fifo path string.
253    """
254    logging.debug('Removing fifo %s', path)
255    os.remove(path)
256    logging.debug('Creating marker %s', path)
257    with open(path, 'w') as f:
258        f.write('<FIFO>')
259
260
261def _sanitize_symlinks(dirpath):
262    """Convert Symlinks to regular files (fixes crbug.com/692788).
263
264    @param dirpath: Directory path string.
265    """
266    for root, _, files in os.walk(dirpath):
267        for filename in files:
268            path = os.path.join(root, filename)
269            file_stat = os.lstat(path)
270            if stat.S_ISLNK(file_stat.st_mode):
271                _replace_symlink_with_file(path)
272
273
274def _replace_symlink_with_file(path):
275    """Replace a symlink with a normal file.
276
277    @param path: Symlink path string.
278    """
279    target = os.readlink(path)
280    logging.debug('Removing symlink %s', path)
281    os.remove(path)
282    logging.debug('Creating marker %s', path)
283    with open(path, 'w') as f:
284        f.write('<symlink to %s>' % target)
285
286
287# Maximum number of files in the folder.
288_MAX_FILE_COUNT = 500
289_FOLDERS_NEVER_ZIP = ['debug', 'ssp_logs', 'autoupdate_logs']
290
291
292def _get_zippable_folders(dir_entry):
293    folders_list = []
294    for folder in os.listdir(dir_entry):
295        folder_path = os.path.join(dir_entry, folder)
296        if (not os.path.isfile(folder_path) and
297                not folder in _FOLDERS_NEVER_ZIP):
298            folders_list.append(folder_path)
299    return folders_list
300
301
302def limit_file_count(dir_entry):
303    """Limit the number of files in given directory.
304
305    The method checks the total number of files in the given directory.
306    If the number is greater than _MAX_FILE_COUNT, the method will
307    compress each folder in the given directory, except folders in
308    _FOLDERS_NEVER_ZIP.
309
310    @param dir_entry: Directory entry to be checked.
311    """
312    try:
313        count = _count_files(dir_entry)
314    except ValueError:
315        logging.warning('Fail to get the file count in folder %s.', dir_entry)
316        return
317    if count < _MAX_FILE_COUNT:
318        return
319
320    # For test job, zip folders in a second level, e.g. 123-debug/host1.
321    # This is to allow autoserv debug folder still be accessible.
322    # For special task, it does not need to dig one level deeper.
323    is_special_task = re.match(job_directories.SPECIAL_TASK_PATTERN,
324                               dir_entry)
325
326    folders = _get_zippable_folders(dir_entry)
327    if not is_special_task:
328        subfolders = []
329        for folder in folders:
330            subfolders.extend(_get_zippable_folders(folder))
331        folders = subfolders
332
333    for folder in folders:
334        _make_into_tarball(folder)
335
336
337def _count_files(dirpath):
338    """Count the number of files in a directory recursively.
339
340    @param dirpath: Directory path string.
341    """
342    return sum(len(files) for _path, _dirs, files in os.walk(dirpath))
343
344
345def _make_into_tarball(dirpath):
346    """Make directory into tarball.
347
348    @param dirpath: Directory path string.
349    """
350    tarpath = '%s.tgz' % dirpath
351    with tarfile.open(tarpath, 'w:gz') as tar:
352        tar.add(dirpath, arcname=os.path.basename(dirpath))
353    shutil.rmtree(dirpath)
354
355
356def correct_results_folder_permission(dir_entry):
357    """Make sure the results folder has the right permission settings.
358
359    For tests running with server-side packaging, the results folder has
360    the owner of root. This must be changed to the user running the
361    autoserv process, so parsing job can access the results folder.
362
363    @param dir_entry: Path to the results folder.
364    """
365    if not dir_entry:
366        return
367
368    logging.info('Trying to correct file permission of %s.', dir_entry)
369    try:
370        subprocess.check_call(
371                ['sudo', '-n', 'chown', '-R', str(os.getuid()), dir_entry])
372        subprocess.check_call(
373                ['sudo', '-n', 'chgrp', '-R', str(os.getgid()), dir_entry])
374    except subprocess.CalledProcessError as e:
375        logging.error('Failed to modify permission for %s: %s',
376                      dir_entry, e)
377
378
379def _upload_cts_testresult(dir_entry, multiprocessing):
380    """Upload test results to separate gs buckets.
381
382    Upload testResult.xml.gz/test_result.xml.gz file to cts_results_bucket.
383    Upload timestamp.zip to cts_apfe_bucket.
384
385    @param dir_entry: Path to the results folder.
386    @param multiprocessing: True to turn on -m option for gsutil.
387    """
388    for host in glob.glob(os.path.join(dir_entry, '*')):
389        cts_path = os.path.join(host, 'cheets_CTS.*', 'results', '*',
390                                TIMESTAMP_PATTERN)
391        cts_v2_path = os.path.join(host, 'cheets_CTS_*', 'results', '*',
392                                   TIMESTAMP_PATTERN)
393        gts_v2_path = os.path.join(host, 'cheets_GTS.*', 'results', '*',
394                                   TIMESTAMP_PATTERN)
395        for result_path, result_pattern in [(cts_path, CTS_RESULT_PATTERN),
396                            (cts_v2_path, CTS_V2_RESULT_PATTERN),
397                            (gts_v2_path, CTS_V2_RESULT_PATTERN)]:
398            for path in glob.glob(result_path):
399                try:
400                    _upload_files(host, path, result_pattern, multiprocessing)
401                except Exception as e:
402                    logging.error('ERROR uploading test results %s to GS: %s',
403                                  path, e)
404
405
406def _is_valid_result(build, result_pattern, suite):
407    """Check if the result should be uploaded to CTS/GTS buckets.
408
409    @param build: Builder name.
410    @param result_pattern: XML result file pattern.
411    @param suite: Test suite name.
412
413    @returns: Bool flag indicating whether a valid result.
414    """
415    if build is None or suite is None:
416        return False
417
418    # Not valid if it's not a release build.
419    if not re.match(r'(?!trybot-).*-release/.*', build):
420        return False
421
422    # Not valid if it's cts result but not 'arc-cts*' or 'test_that_wrapper'
423    # suite.
424    result_patterns = [CTS_RESULT_PATTERN, CTS_V2_RESULT_PATTERN]
425    if result_pattern in result_patterns and not (
426            suite.startswith('arc-cts') or suite.startswith('arc-gts') or
427            suite.startswith('test_that_wrapper')):
428        return False
429
430    return True
431
432
433def _upload_files(host, path, result_pattern, multiprocessing):
434    keyval = models.test.parse_job_keyval(host)
435    build = keyval.get('build')
436    suite = keyval.get('suite')
437
438    if not _is_valid_result(build, result_pattern, suite):
439        # No need to upload current folder, return.
440        return
441
442    parent_job_id = str(keyval['parent_job_id'])
443
444    folders = path.split(os.sep)
445    job_id = folders[-6]
446    package = folders[-4]
447    timestamp = folders[-1]
448
449    # Path: bucket/build/parent_job_id/cheets_CTS.*/job_id_timestamp/
450    # or bucket/build/parent_job_id/cheets_GTS.*/job_id_timestamp/
451    cts_apfe_gs_path = os.path.join(
452            DEFAULT_CTS_APFE_GSURI, build, parent_job_id,
453            package, job_id + '_' + timestamp) + '/'
454
455    # Path: bucket/cheets_CTS.*/job_id_timestamp/
456    # or bucket/cheets_GTS.*/job_id_timestamp/
457    test_result_gs_path = os.path.join(
458            DEFAULT_CTS_RESULTS_GSURI, package,
459            job_id + '_' + timestamp) + '/'
460
461    for zip_file in glob.glob(os.path.join('%s.zip' % path)):
462        utils.run(' '.join(_get_cmd_list(
463                multiprocessing, zip_file, cts_apfe_gs_path)))
464        logging.debug('Upload %s to %s ', zip_file, cts_apfe_gs_path)
465
466    for test_result_file in glob.glob(os.path.join(path, result_pattern)):
467        # gzip test_result_file(testResult.xml/test_result.xml)
468
469        test_result_file_gz =  '%s.gz' % test_result_file
470        with open(test_result_file, 'r') as f_in, (
471                gzip.open(test_result_file_gz, 'w')) as f_out:
472            shutil.copyfileobj(f_in, f_out)
473        utils.run(' '.join(_get_cmd_list(
474                multiprocessing, test_result_file_gz, test_result_gs_path)))
475        logging.debug('Zip and upload %s to %s',
476                      test_result_file_gz, test_result_gs_path)
477        # Remove test_result_file_gz(testResult.xml.gz/test_result.xml.gz)
478        os.remove(test_result_file_gz)
479
480
481def _emit_gs_returncode_metric(returncode):
482    """Increment the gs_returncode counter based on |returncode|."""
483    m_gs_returncode = 'chromeos/autotest/gs_offloader/gs_returncode'
484    rcode = int(returncode)
485    if rcode < 0 or rcode > 255:
486        rcode = -1
487    metrics.Counter(m_gs_returncode).increment(fields={'return_code': rcode})
488
489
490class BaseGSOffloader(object):
491
492    """Google Storage offloader interface."""
493
494    __metaclass__ = abc.ABCMeta
495
496    @abc.abstractmethod
497    def offload(self, dir_entry, dest_path, job_complete_time):
498        """Offload a directory entry to Google Storage.
499
500        @param dir_entry: Directory entry to offload.
501        @param dest_path: Location in google storage where we will
502                          offload the directory.
503        @param job_complete_time: The complete time of the job from the AFE
504                                  database.
505        """
506
507
508class GSOffloader(BaseGSOffloader):
509    """Google Storage Offloader."""
510
511    def __init__(self, gs_uri, multiprocessing, delete_age,
512            console_client=None):
513        """Returns the offload directory function for the given gs_uri
514
515        @param gs_uri: Google storage bucket uri to offload to.
516        @param multiprocessing: True to turn on -m option for gsutil.
517        @param console_client: The cloud console client. If None,
518          cloud console APIs are  not called.
519        """
520        self._gs_uri = gs_uri
521        self._multiprocessing = multiprocessing
522        self._delete_age = delete_age
523        self._console_client = console_client
524
525    @metrics.SecondsTimerDecorator(
526            'chromeos/autotest/gs_offloader/job_offload_duration')
527    def offload(self, dir_entry, dest_path, job_complete_time):
528        """Offload the specified directory entry to Google storage.
529
530        @param dir_entry: Directory entry to offload.
531        @param dest_path: Location in google storage where we will
532                          offload the directory.
533        @param job_complete_time: The complete time of the job from the AFE
534                                  database.
535        """
536        with tempfile.TemporaryFile('w+') as stdout_file, \
537             tempfile.TemporaryFile('w+') as stderr_file:
538            try:
539                self._offload(dir_entry, dest_path, stdout_file, stderr_file)
540            except _OffloadError as e:
541                metrics_fields = _get_metrics_fields(dir_entry)
542                m_any_error = 'chromeos/autotest/errors/gs_offloader/any_error'
543                metrics.Counter(m_any_error).increment(fields=metrics_fields)
544
545                # Rewind the log files for stdout and stderr and log
546                # their contents.
547                stdout_file.seek(0)
548                stderr_file.seek(0)
549                stderr_content = stderr_file.read()
550                logging.warning('Error occurred when offloading %s:', dir_entry)
551                logging.warning('Stdout:\n%s \nStderr:\n%s', stdout_file.read(),
552                                stderr_content)
553
554                # Some result files may have wrong file permission. Try
555                # to correct such error so later try can success.
556                # TODO(dshi): The code is added to correct result files
557                # with wrong file permission caused by bug 511778. After
558                # this code is pushed to lab and run for a while to
559                # clean up these files, following code and function
560                # correct_results_folder_permission can be deleted.
561                if 'CommandException: Error opening file' in stderr_content:
562                    correct_results_folder_permission(dir_entry)
563            else:
564                self._prune(dir_entry, job_complete_time)
565
566    def _offload(self, dir_entry, dest_path,
567                 stdout_file, stderr_file):
568        """Offload the specified directory entry to Google storage.
569
570        @param dir_entry: Directory entry to offload.
571        @param dest_path: Location in google storage where we will
572                          offload the directory.
573        @param job_complete_time: The complete time of the job from the AFE
574                                  database.
575        @param stdout_file: Log file.
576        @param stderr_file: Log file.
577        """
578        if _is_uploaded(dir_entry):
579            return
580        start_time = time.time()
581        metrics_fields = _get_metrics_fields(dir_entry)
582        es_metadata = _get_es_metadata(dir_entry)
583        error_obj = _OffloadError(start_time, es_metadata)
584        try:
585            sanitize_dir(dir_entry)
586            if DEFAULT_CTS_RESULTS_GSURI:
587                _upload_cts_testresult(dir_entry, self._multiprocessing)
588
589            if LIMIT_FILE_COUNT:
590                limit_file_count(dir_entry)
591            es_metadata['size_kb'] = file_utils.get_directory_size_kibibytes(dir_entry)
592
593            process = None
594            with timeout_util.Timeout(OFFLOAD_TIMEOUT_SECS):
595                gs_path = '%s%s' % (self._gs_uri, dest_path)
596                process = subprocess.Popen(
597                        _get_cmd_list(self._multiprocessing, dir_entry, gs_path),
598                        stdout=stdout_file, stderr=stderr_file)
599                process.wait()
600
601            _emit_gs_returncode_metric(process.returncode)
602            if process.returncode != 0:
603                raise error_obj
604            _emit_offload_metrics(dir_entry)
605
606            if self._console_client:
607                gcs_uri = os.path.join(gs_path,
608                        os.path.basename(dir_entry))
609                if not self._console_client.send_test_job_offloaded_message(
610                        gcs_uri):
611                    raise error_obj
612
613            _mark_uploaded(dir_entry)
614        except timeout_util.TimeoutError:
615            m_timeout = 'chromeos/autotest/errors/gs_offloader/timed_out_count'
616            metrics.Counter(m_timeout).increment(fields=metrics_fields)
617            # If we finished the call to Popen(), we may need to
618            # terminate the child process.  We don't bother calling
619            # process.poll(); that inherently races because the child
620            # can die any time it wants.
621            if process:
622                try:
623                    process.terminate()
624                except OSError:
625                    # We don't expect any error other than "No such
626                    # process".
627                    pass
628            logging.error('Offloading %s timed out after waiting %d '
629                          'seconds.', dir_entry, OFFLOAD_TIMEOUT_SECS)
630            raise error_obj
631
632    def _prune(self, dir_entry, job_complete_time):
633        """Prune directory if it is uploaded and expired.
634
635        @param dir_entry: Directory entry to offload.
636        @param job_complete_time: The complete time of the job from the AFE
637                                  database.
638        """
639        if not (_is_uploaded(dir_entry)
640                and job_directories.is_job_expired(self._delete_age,
641                                                   job_complete_time)):
642            return
643        try:
644            shutil.rmtree(dir_entry)
645        except OSError as e:
646            # The wrong file permission can lead call
647            # `shutil.rmtree(dir_entry)` to raise OSError with message
648            # 'Permission denied'. Details can be found in
649            # crbug.com/536151
650            if e.errno == errno.EACCES:
651                correct_results_folder_permission(dir_entry)
652            m_permission_error = ('chromeos/autotest/errors/gs_offloader/'
653                                  'wrong_permissions_count')
654            metrics_fields = _get_metrics_fields(dir_entry)
655            metrics.Counter(m_permission_error).increment(fields=metrics_fields)
656
657
658class _OffloadError(Exception):
659    """Google Storage offload failed."""
660
661    def __init__(self, start_time, es_metadata):
662        super(_OffloadError, self).__init__(start_time, es_metadata)
663        self.start_time = start_time
664        self.es_metadata = es_metadata
665
666
667
668class FakeGSOffloader(BaseGSOffloader):
669
670    """Fake Google Storage Offloader that only deletes directories."""
671
672    def offload(self, dir_entry, dest_path, job_complete_time):
673        """Pretend to offload a directory and delete it.
674
675        @param dir_entry: Directory entry to offload.
676        @param dest_path: Location in google storage where we will
677                          offload the directory.
678        @param job_complete_time: The complete time of the job from the AFE
679                                  database.
680        """
681        shutil.rmtree(dir_entry)
682
683
684def _is_expired(job, age_limit):
685    """Return whether job directory is expired for uploading
686
687    @param job: _JobDirectory instance.
688    @param age_limit:  Minimum age in days at which a job may be offloaded.
689    """
690    job_timestamp = job.get_timestamp_if_finished()
691    if not job_timestamp:
692        return False
693    return job_directories.is_job_expired(age_limit, job_timestamp)
694
695
696def _emit_offload_metrics(dirpath):
697    """Emit gs offload metrics.
698
699    @param dirpath: Offloaded directory path.
700    """
701    dir_size = file_utils.get_directory_size_kibibytes(dirpath)
702    metrics_fields = _get_metrics_fields(dirpath)
703
704    m_offload_count = (
705            'chromeos/autotest/gs_offloader/jobs_offloaded')
706    metrics.Counter(m_offload_count).increment(
707            fields=metrics_fields)
708    m_offload_size = ('chromeos/autotest/gs_offloader/'
709                      'kilobytes_transferred')
710    metrics.Counter(m_offload_size).increment_by(
711            dir_size, fields=metrics_fields)
712
713
714def _is_uploaded(dirpath):
715    """Return whether directory has been uploaded.
716
717    @param dirpath: Directory path string.
718    """
719    return os.path.isfile(_get_uploaded_marker_file(dirpath))
720
721
722def _mark_uploaded(dirpath):
723    """Mark directory as uploaded.
724
725    @param dirpath: Directory path string.
726    """
727    with open(_get_uploaded_marker_file(dirpath), 'a'):
728        pass
729
730
731def _get_uploaded_marker_file(dirpath):
732    """Return path to upload marker file for directory.
733
734    @param dirpath: Directory path string.
735    """
736    return '%s/.GS_UPLOADED' % (dirpath,)
737
738
739def _format_job_for_failure_reporting(job):
740    """Formats a _JobDirectory for reporting / logging.
741
742    @param job: The _JobDirectory to format.
743    """
744    d = datetime.datetime.fromtimestamp(job.first_offload_start)
745    data = (d.strftime(FAILED_OFFLOADS_TIME_FORMAT),
746            job.offload_count,
747            job.dirname)
748    return FAILED_OFFLOADS_LINE_FORMAT % data
749
750
751def wait_for_gs_write_access(gs_uri):
752    """Verify and wait until we have write access to Google Storage.
753
754    @param gs_uri: The Google Storage URI we are trying to offload to.
755    """
756    # TODO (sbasi) Try to use the gsutil command to check write access.
757    # Ensure we have write access to gs_uri.
758    dummy_file = tempfile.NamedTemporaryFile()
759    test_cmd = _get_cmd_list(False, dummy_file.name, gs_uri)
760    while True:
761        try:
762            subprocess.check_call(test_cmd)
763            subprocess.check_call(
764                    ['gsutil', 'rm',
765                     os.path.join(gs_uri,
766                                  os.path.basename(dummy_file.name))])
767            break
768        except subprocess.CalledProcessError:
769            logging.debug('Unable to offload to %s, sleeping.', gs_uri)
770            time.sleep(120)
771
772
773class Offloader(object):
774    """State of the offload process.
775
776    Contains the following member fields:
777      * _gs_offloader:  _BaseGSOffloader to use to offload a job directory.
778      * _jobdir_classes:  List of classes of job directory to be
779        offloaded.
780      * _processes:  Maximum number of outstanding offload processes
781        to allow during an offload cycle.
782      * _age_limit:  Minimum age in days at which a job may be
783        offloaded.
784      * _open_jobs: a dictionary mapping directory paths to Job
785        objects.
786    """
787
788    def __init__(self, options):
789        self._upload_age_limit = options.age_to_upload
790        self._delete_age_limit = options.age_to_delete
791        if options.delete_only:
792            self._gs_offloader = FakeGSOffloader()
793        else:
794            self.gs_uri = utils.get_offload_gsuri()
795            logging.debug('Offloading to: %s', self.gs_uri)
796            multiprocessing = False
797            if options.multiprocessing:
798                multiprocessing = True
799            elif options.multiprocessing is None:
800                multiprocessing = GS_OFFLOADER_MULTIPROCESSING
801            logging.info(
802                    'Offloader multiprocessing is set to:%r', multiprocessing)
803            console_client = None
804            if cloud_console_client.is_cloud_notification_enabled():
805                console_client = cloud_console_client.PubSubBasedClient()
806            self._gs_offloader = GSOffloader(
807                    self.gs_uri, multiprocessing, self._delete_age_limit,
808                    console_client)
809        classlist = []
810        if options.process_hosts_only or options.process_all:
811            classlist.append(job_directories.SpecialJobDirectory)
812        if not options.process_hosts_only:
813            classlist.append(job_directories.RegularJobDirectory)
814        self._jobdir_classes = classlist
815        assert self._jobdir_classes
816        self._processes = options.parallelism
817        self._open_jobs = {}
818        self._pusub_topic = None
819        self._offload_count_limit = 3
820
821
822    def _add_new_jobs(self):
823        """Find new job directories that need offloading.
824
825        Go through the file system looking for valid job directories
826        that are currently not in `self._open_jobs`, and add them in.
827
828        """
829        new_job_count = 0
830        for cls in self._jobdir_classes:
831            for resultsdir in cls.get_job_directories():
832                if (
833                        resultsdir in self._open_jobs
834                        or _is_uploaded(resultsdir)):
835                    continue
836                self._open_jobs[resultsdir] = cls(resultsdir)
837                new_job_count += 1
838        logging.debug('Start of offload cycle - found %d new jobs',
839                      new_job_count)
840
841
842    def _remove_offloaded_jobs(self):
843        """Removed offloaded jobs from `self._open_jobs`."""
844        removed_job_count = 0
845        for jobkey, job in self._open_jobs.items():
846            if (
847                    not os.path.exists(job.dirname)
848                    or _is_uploaded(job.dirname)):
849                del self._open_jobs[jobkey]
850                removed_job_count += 1
851        logging.debug('End of offload cycle - cleared %d new jobs, '
852                      'carrying %d open jobs',
853                      removed_job_count, len(self._open_jobs))
854
855
856    def _report_failed_jobs(self):
857        """Report status after attempting offload.
858
859        This function processes all jobs in `self._open_jobs`, assuming
860        an attempt has just been made to offload all of them.
861
862        If any jobs have reportable errors, and we haven't generated
863        an e-mail report in the last `REPORT_INTERVAL_SECS` seconds,
864        send new e-mail describing the failures.
865
866        """
867        failed_jobs = [j for j in self._open_jobs.values() if
868                       j.first_offload_start]
869        self._report_failed_jobs_count(failed_jobs)
870        self._log_failed_jobs_locally(failed_jobs)
871
872
873    def offload_once(self):
874        """Perform one offload cycle.
875
876        Find all job directories for new jobs that we haven't seen
877        before.  Then, attempt to offload the directories for any
878        jobs that have finished running.  Offload of multiple jobs
879        is done in parallel, up to `self._processes` at a time.
880
881        After we've tried uploading all directories, go through the list
882        checking the status of all uploaded directories.  If necessary,
883        report failures via e-mail.
884
885        """
886        self._add_new_jobs()
887        self._report_current_jobs_count()
888        with parallel.BackgroundTaskRunner(
889                self._gs_offloader.offload, processes=self._processes) as queue:
890            for job in self._open_jobs.values():
891                _enqueue_offload(job, queue, self._upload_age_limit)
892        self._give_up_on_jobs_over_limit()
893        self._remove_offloaded_jobs()
894        self._report_failed_jobs()
895
896
897    def _give_up_on_jobs_over_limit(self):
898        """Give up on jobs that have gone over the offload limit.
899
900        We mark them as uploaded as we won't try to offload them any more.
901        """
902        for job in self._open_jobs.values():
903            if job.offload_count >= self._offload_count_limit:
904                _mark_uploaded(job.dirname)
905
906
907    def _log_failed_jobs_locally(self, failed_jobs,
908                                 log_file=FAILED_OFFLOADS_FILE):
909        """Updates a local file listing all the failed jobs.
910
911        The dropped file can be used by the developers to list jobs that we have
912        failed to upload.
913
914        @param failed_jobs: A list of failed _JobDirectory objects.
915        @param log_file: The file to log the failed jobs to.
916        """
917        now = datetime.datetime.now()
918        now_str = now.strftime(FAILED_OFFLOADS_TIME_FORMAT)
919        formatted_jobs = [_format_job_for_failure_reporting(job)
920                            for job in failed_jobs]
921        formatted_jobs.sort()
922
923        with open(log_file, 'w') as logfile:
924            logfile.write(FAILED_OFFLOADS_FILE_HEADER %
925                          (now_str, len(failed_jobs)))
926            logfile.writelines(formatted_jobs)
927
928
929    def _report_current_jobs_count(self):
930        """Report the number of outstanding jobs to monarch."""
931        metrics.Gauge('chromeos/autotest/gs_offloader/current_jobs_count').set(
932                len(self._open_jobs))
933
934
935    def _report_failed_jobs_count(self, failed_jobs):
936        """Report the number of outstanding failed offload jobs to monarch.
937
938        @param: List of failed jobs.
939        """
940        metrics.Gauge('chromeos/autotest/gs_offloader/failed_jobs_count').set(
941                len(failed_jobs))
942
943
944def _enqueue_offload(job, queue, age_limit):
945    """Enqueue the job for offload, if it's eligible.
946
947    The job is eligible for offloading if the database has marked
948    it finished, and the job is older than the `age_limit`
949    parameter.
950
951    If the job is eligible, offload processing is requested by
952    passing the `queue` parameter's `put()` method a sequence with
953    the job's `dirname` attribute and its directory name.
954
955    @param job       _JobDirectory instance to offload.
956    @param queue     If the job should be offloaded, put the offload
957                     parameters into this queue for processing.
958    @param age_limit Minimum age for a job to be offloaded.  A value
959                     of 0 means that the job will be offloaded as
960                     soon as it is finished.
961
962    """
963    if not job.offload_count:
964        if not _is_expired(job, age_limit):
965            return
966        job.first_offload_start = time.time()
967    job.offload_count += 1
968    if job.process_gs_instructions():
969        timestamp = job.get_timestamp_if_finished()
970        queue.put([job.dirname, os.path.dirname(job.dirname), timestamp])
971
972
973def parse_options():
974    """Parse the args passed into gs_offloader."""
975    defaults = 'Defaults:\n  Destination: %s\n  Results Path: %s' % (
976            utils.DEFAULT_OFFLOAD_GSURI, RESULTS_DIR)
977    usage = 'usage: %prog [options]\n' + defaults
978    parser = OptionParser(usage)
979    parser.add_option('-a', '--all', dest='process_all',
980                      action='store_true',
981                      help='Offload all files in the results directory.')
982    parser.add_option('-s', '--hosts', dest='process_hosts_only',
983                      action='store_true',
984                      help='Offload only the special tasks result files '
985                      'located in the results/hosts subdirectory')
986    parser.add_option('-p', '--parallelism', dest='parallelism',
987                      type='int', default=1,
988                      help='Number of parallel workers to use.')
989    parser.add_option('-o', '--delete_only', dest='delete_only',
990                      action='store_true',
991                      help='GS Offloader will only the delete the '
992                      'directories and will not offload them to google '
993                      'storage. NOTE: If global_config variable '
994                      'CROS.gs_offloading_enabled is False, --delete_only '
995                      'is automatically True.',
996                      default=not GS_OFFLOADING_ENABLED)
997    parser.add_option('-d', '--days_old', dest='days_old',
998                      help='Minimum job age in days before a result can be '
999                      'offloaded.', type='int', default=0)
1000    parser.add_option('-l', '--log_size', dest='log_size',
1001                      help='Limit the offloader logs to a specified '
1002                      'number of Mega Bytes.', type='int', default=0)
1003    parser.add_option('-m', dest='multiprocessing', action='store_true',
1004                      help='Turn on -m option for gsutil. If not set, the '
1005                      'global config setting gs_offloader_multiprocessing '
1006                      'under CROS section is applied.')
1007    parser.add_option('-i', '--offload_once', dest='offload_once',
1008                      action='store_true',
1009                      help='Upload all available results and then exit.')
1010    parser.add_option('-y', '--normal_priority', dest='normal_priority',
1011                      action='store_true',
1012                      help='Upload using normal process priority.')
1013    parser.add_option('-u', '--age_to_upload', dest='age_to_upload',
1014                      help='Minimum job age in days before a result can be '
1015                      'offloaded, but not removed from local storage',
1016                      type='int', default=None)
1017    parser.add_option('-n', '--age_to_delete', dest='age_to_delete',
1018                      help='Minimum job age in days before a result can be '
1019                      'removed from local storage',
1020                      type='int', default=None)
1021
1022    options = parser.parse_args()[0]
1023    if options.process_all and options.process_hosts_only:
1024        parser.print_help()
1025        print ('Cannot process all files and only the hosts '
1026               'subdirectory. Please remove an argument.')
1027        sys.exit(1)
1028
1029    if options.days_old and (options.age_to_upload or options.age_to_delete):
1030        parser.print_help()
1031        print('Use the days_old option or the age_to_* options but not both')
1032        sys.exit(1)
1033
1034    if options.age_to_upload == None:
1035        options.age_to_upload = options.days_old
1036    if options.age_to_delete == None:
1037        options.age_to_delete = options.days_old
1038
1039    return options
1040
1041
1042def main():
1043    """Main method of gs_offloader."""
1044    options = parse_options()
1045
1046    if options.process_all:
1047        offloader_type = 'all'
1048    elif options.process_hosts_only:
1049        offloader_type = 'hosts'
1050    else:
1051        offloader_type = 'jobs'
1052
1053    _setup_logging(options, offloader_type)
1054
1055    # Nice our process (carried to subprocesses) so we don't overload
1056    # the system.
1057    if not options.normal_priority:
1058        logging.debug('Set process to nice value: %d', NICENESS)
1059        os.nice(NICENESS)
1060    if psutil:
1061        proc = psutil.Process()
1062        logging.debug('Set process to ionice IDLE')
1063        proc.ionice(psutil.IOPRIO_CLASS_IDLE)
1064
1065    # os.listdir returns relative paths, so change to where we need to
1066    # be to avoid an os.path.join on each loop.
1067    logging.debug('Offloading Autotest results in %s', RESULTS_DIR)
1068    os.chdir(RESULTS_DIR)
1069
1070    service_name = 'gs_offloader(%s)' % offloader_type
1071    with ts_mon_config.SetupTsMonGlobalState(service_name, indirect=True,
1072                                             short_lived=False):
1073        offloader = Offloader(options)
1074        if not options.delete_only:
1075            wait_for_gs_write_access(offloader.gs_uri)
1076        while True:
1077            offloader.offload_once()
1078            if options.offload_once:
1079                break
1080            time.sleep(SLEEP_TIME_SECS)
1081
1082
1083_LOG_LOCATION = '/usr/local/autotest/logs/'
1084_LOG_FILENAME_FORMAT = 'gs_offloader_%s_log_%s.txt'
1085_LOG_TIMESTAMP_FORMAT = '%Y%m%d_%H%M%S'
1086_LOGGING_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
1087
1088
1089def _setup_logging(options, offloader_type):
1090    """Set up logging.
1091
1092    @param options: Parsed options.
1093    @param offloader_type: Type of offloader action as string.
1094    """
1095    log_filename = _get_log_filename(options, offloader_type)
1096    log_formatter = logging.Formatter(_LOGGING_FORMAT)
1097    # Replace the default logging handler with a RotatingFileHandler. If
1098    # options.log_size is 0, the file size will not be limited. Keeps
1099    # one backup just in case.
1100    handler = logging.handlers.RotatingFileHandler(
1101            log_filename, maxBytes=1024 * options.log_size, backupCount=1)
1102    handler.setFormatter(log_formatter)
1103    logger = logging.getLogger()
1104    logger.setLevel(logging.DEBUG)
1105    logger.addHandler(handler)
1106
1107
1108def _get_log_filename(options, offloader_type):
1109    """Get log filename.
1110
1111    @param options: Parsed options.
1112    @param offloader_type: Type of offloader action as string.
1113    """
1114    if options.log_size > 0:
1115        log_timestamp = ''
1116    else:
1117        log_timestamp = time.strftime(_LOG_TIMESTAMP_FORMAT)
1118    log_basename = _LOG_FILENAME_FORMAT % (offloader_type, log_timestamp)
1119    return os.path.join(_LOG_LOCATION, log_basename)
1120
1121
1122if __name__ == '__main__':
1123    main()
1124