• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# -*- coding: utf-8 -*-
2# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6"""Library to make common google storage operations more reliable."""
7
8from __future__ import print_function
9
10import collections
11import contextlib
12import datetime
13import errno
14import fnmatch
15import getpass
16import glob
17import hashlib
18import os
19import re
20import shutil
21import subprocess
22import tempfile
23
24import six
25from six.moves import urllib
26
27from autotest_lib.utils.frozen_chromite.lib import constants
28from autotest_lib.utils.frozen_chromite.lib import cache
29from autotest_lib.utils.frozen_chromite.lib import cros_build_lib
30from autotest_lib.utils.frozen_chromite.lib import cros_collections
31from autotest_lib.utils.frozen_chromite.lib import cros_logging as logging
32from autotest_lib.utils.frozen_chromite.lib import osutils
33from autotest_lib.utils.frozen_chromite.lib import path_util
34from autotest_lib.utils.frozen_chromite.lib import retry_stats
35from autotest_lib.utils.frozen_chromite.lib import retry_util
36from autotest_lib.utils.frozen_chromite.lib import signals
37from autotest_lib.utils.frozen_chromite.lib import timeout_util
38
39
40# This bucket has the allAuthenticatedUsers:READER ACL.
41AUTHENTICATION_BUCKET = 'gs://chromeos-authentication-bucket/'
42
43# Public path, only really works for files.
44PUBLIC_BASE_HTTPS_URL = 'https://storage.googleapis.com/'
45
46# Private path for files.
47PRIVATE_BASE_HTTPS_URL = 'https://storage.cloud.google.com/'
48
49# Private path for directories.
50# TODO(akeshet): this is a workaround for b/27653354. If that is ultimately
51# fixed, revisit this workaround.
52PRIVATE_BASE_HTTPS_DOWNLOAD_URL = 'https://stainless.corp.google.com/browse/'
53BASE_GS_URL = 'gs://'
54
55# Format used by "gsutil ls -l" when reporting modified time.
56DATETIME_FORMAT = '%Y-%m-%dT%H:%M:%SZ'
57
58# Regexp for parsing each line of output from "gsutil ls -l".
59# This regexp is prepared for the generation and meta_generation values,
60# too, even though they are not expected until we use "-a".
61#
62# A detailed listing looks like:
63#    99908  2014-03-01T05:50:08Z  gs://bucket/foo/abc#1234  metageneration=1
64#                                 gs://bucket/foo/adir/
65#    99908  2014-03-04T01:16:55Z  gs://bucket/foo/def#5678  metageneration=1
66# TOTAL: 2 objects, 199816 bytes (495.36 KB)
67LS_LA_RE = re.compile(
68    r'^\s*(?P<content_length>\d*?)\s+'
69    r'(?P<creation_time>\S*?)\s+'
70    r'(?P<url>[^#$]+).*?'
71    r'('
72    r'#(?P<generation>\d+)\s+'
73    r'meta_?generation=(?P<metageneration>\d+)'
74    r')?\s*$')
75LS_RE = re.compile(r'^\s*(?P<content_length>)(?P<creation_time>)(?P<url>.*)'
76                   r'(?P<generation>)(?P<metageneration>)\s*$')
77
78# Format used by ContainsWildCard, which is duplicated from
79# https://github.com/GoogleCloudPlatform/gsutil/blob/v4.21/gslib/storage_url.py#L307.
80WILDCARD_REGEX = re.compile(r'[*?\[\]]')
81
82
83def PathIsGs(path):
84  """Determine if a path is a Google Storage URI."""
85  return path.startswith(BASE_GS_URL)
86
87
88def CanonicalizeURL(url, strict=False):
89  """Convert provided URL to gs:// URL, if it follows a known format.
90
91  Args:
92    url: URL to canonicalize.
93    strict: Raises exception if URL cannot be canonicalized.
94  """
95  for prefix in (PUBLIC_BASE_HTTPS_URL,
96                 PRIVATE_BASE_HTTPS_URL,
97                 PRIVATE_BASE_HTTPS_DOWNLOAD_URL,
98                 'https://pantheon.corp.google.com/storage/browser/',
99                 'https://commondatastorage.googleapis.com/'):
100    if url.startswith(prefix):
101      return url.replace(prefix, BASE_GS_URL, 1)
102
103  if not PathIsGs(url) and strict:
104    raise ValueError('Url %r cannot be canonicalized.' % url)
105
106  return url
107
108
109def GetGsURL(bucket, for_gsutil=False, public=True, suburl=''):
110  """Construct a Google Storage URL
111
112  Args:
113    bucket: The Google Storage bucket to use
114    for_gsutil: Do you want a URL for passing to `gsutil`?
115    public: Do we want the public or private url
116    suburl: A url fragment to tack onto the end
117
118  Returns:
119    The fully constructed URL
120  """
121  url = 'gs://%s/%s' % (bucket, suburl)
122
123  if for_gsutil:
124    return url
125  else:
126    return GsUrlToHttp(url, public=public)
127
128
129def GsUrlToHttp(path, public=True, directory=False):
130  """Convert a GS URL to a HTTP URL for the same resource.
131
132  Because the HTTP Urls are not fixed (and may not always be simple prefix
133  replacements), use this method to centralize the conversion.
134
135  Directories need to have different URLs from files, because the Web UIs for GS
136  are weird and really inconsistent. Also public directories probably
137  don't work, and probably never will (permissions as well as UI).
138
139  e.g. 'gs://chromeos-image-archive/path/file' ->
140       'https://pantheon/path/file'
141
142  Args:
143    path: GS URL to convert.
144    public: Is this URL for Googler access, or publicly visible?
145    directory: Force this URL to be treated as a directory?
146               We try to autodetect on False.
147
148  Returns:
149    https URL as a string.
150  """
151  assert PathIsGs(path)
152  directory = directory or path.endswith('/')
153
154  # Public HTTP URls for directories don't work'
155  # assert not public or not directory,
156
157  if public:
158    return path.replace(BASE_GS_URL, PUBLIC_BASE_HTTPS_URL, 1)
159  else:
160    if directory:
161      return path.replace(BASE_GS_URL, PRIVATE_BASE_HTTPS_DOWNLOAD_URL, 1)
162    else:
163      return path.replace(BASE_GS_URL, PRIVATE_BASE_HTTPS_URL, 1)
164
165
166class GSContextException(Exception):
167  """Base exception for all exceptions thrown by GSContext."""
168
169
170# Since the underlying code uses run, some callers might be trying to
171# catch cros_build_lib.RunCommandError themselves.  Extend that class so that
172# code continues to work.
173class GSCommandError(GSContextException, cros_build_lib.RunCommandError):
174  """Thrown when an error happened we couldn't decode."""
175
176
177class GSContextPreconditionFailed(GSContextException):
178  """Thrown when google storage returns code=PreconditionFailed."""
179
180
181class GSNoSuchKey(GSContextException):
182  """Thrown when google storage returns code=NoSuchKey."""
183
184
185# Detailed results of GSContext.Stat.
186#
187# The fields directory correspond to gsutil stat results.
188#
189#  Field name        Type         Example
190#   creation_time     datetime     Sat, 23 Aug 2014 06:53:20 GMT
191#   content_length    int          74
192#   content_type      string       application/octet-stream
193#   hash_crc32c       string       BBPMPA==
194#   hash_md5          string       ms+qSYvgI9SjXn8tW/5UpQ==
195#   etag              string       CNCgocbmqMACEAE=
196#   generation        int          1408776800850000
197#   metageneration    int          1
198#
199# Note: We omit a few stat fields as they are not always available, and we
200# have no callers that want this currently.
201#
202#   content_language  string/None  en   # This field may be None.
203GSStatResult = collections.namedtuple(
204    'GSStatResult',
205    ('creation_time', 'content_length', 'content_type', 'hash_crc32c',
206     'hash_md5', 'etag', 'generation', 'metageneration'))
207
208
209# Detailed results of GSContext.List.
210GSListResult = collections.namedtuple(
211    'GSListResult',
212    ('url', 'creation_time', 'content_length', 'generation', 'metageneration'))
213
214
215ErrorDetails = cros_collections.Collection(
216    'ErrorDetails',
217    type=None, message_pattern='', retriable=None, exception=None)
218
219
220class GSCounter(object):
221  """A counter class for Google Storage."""
222
223  def __init__(self, ctx, path):
224    """Create a counter object.
225
226    Args:
227      ctx: A GSContext object.
228      path: The path to the counter in Google Storage.
229    """
230    self.ctx = ctx
231    self.path = path
232
233  def Get(self):
234    """Get the current value of a counter."""
235    try:
236      return int(self.ctx.Cat(self.path))
237    except GSNoSuchKey:
238      return 0
239
240  def AtomicCounterOperation(self, default_value, operation):
241    """Atomically set the counter value using |operation|.
242
243    Args:
244      default_value: Default value to use for counter, if counter
245                     does not exist.
246      operation: Function that takes the current counter value as a
247                 parameter, and returns the new desired value.
248
249    Returns:
250      The new counter value. None if value could not be set.
251    """
252    generation, _ = self.ctx.GetGeneration(self.path)
253    for _ in range(self.ctx.retries + 1):
254      try:
255        value = default_value if generation == 0 else operation(self.Get())
256        self.ctx.Copy('-', self.path, input=str(value), version=generation)
257        return value
258      except (GSContextPreconditionFailed, GSNoSuchKey):
259        # GSContextPreconditionFailed is thrown if another builder is also
260        # trying to update the counter and we lost the race. GSNoSuchKey is
261        # thrown if another builder deleted the counter. In either case, fetch
262        # the generation again, and, if it has changed, try the copy again.
263        new_generation, _ = self.ctx.GetGeneration(self.path)
264        if new_generation == generation:
265          raise
266        generation = new_generation
267
268  def Increment(self):
269    """Increment the counter.
270
271    Returns:
272      The new counter value. None if value could not be set.
273    """
274    return self.AtomicCounterOperation(1, lambda x: x + 1)
275
276  def Decrement(self):
277    """Decrement the counter.
278
279    Returns:
280      The new counter value. None if value could not be set.
281    """
282    return self.AtomicCounterOperation(-1, lambda x: x - 1)
283
284  def Reset(self):
285    """Reset the counter to zero.
286
287    Returns:
288      The new counter value. None if value could not be set.
289    """
290    return self.AtomicCounterOperation(0, lambda x: 0)
291
292  def StreakIncrement(self):
293    """Increment the counter if it is positive, otherwise set it to 1.
294
295    Returns:
296      The new counter value. None if value could not be set.
297    """
298    return self.AtomicCounterOperation(1, lambda x: x + 1 if x > 0 else 1)
299
300  def StreakDecrement(self):
301    """Decrement the counter if it is negative, otherwise set it to -1.
302
303    Returns:
304      The new counter value. None if value could not be set.
305    """
306    return self.AtomicCounterOperation(-1, lambda x: x - 1 if x < 0 else -1)
307
308
309class GSContext(object):
310  """A class to wrap common google storage operations."""
311
312  # Error messages that indicate an invalid BOTO config.
313  AUTHORIZATION_ERRORS = ('no configured', 'none configured',
314                          'detail=Authorization', '401 Anonymous caller')
315
316  DEFAULT_BOTO_FILE = os.path.expanduser('~/.boto')
317  DEFAULT_GSUTIL_TRACKER_DIR = os.path.expanduser('~/.gsutil/tracker-files')
318  # This is set for ease of testing.
319  DEFAULT_GSUTIL_BIN = None
320  DEFAULT_GSUTIL_BUILDER_BIN = '/b/build/third_party/gsutil/gsutil'
321  # How many times to retry uploads.
322  DEFAULT_RETRIES = 3
323
324  # Multiplier for how long to sleep (in seconds) between retries; will delay
325  # (1*sleep) the first time, then (2*sleep), continuing via attempt * sleep.
326  DEFAULT_SLEEP_TIME = 60
327
328  GSUTIL_VERSION = '4.51'
329  GSUTIL_TAR = 'gsutil_%s.tar.gz' % GSUTIL_VERSION
330  GSUTIL_URL = (PUBLIC_BASE_HTTPS_URL +
331                'chromeos-mirror/gentoo/distfiles/%s' % GSUTIL_TAR)
332  GSUTIL_API_SELECTOR = 'JSON'
333
334  RESUMABLE_UPLOAD_ERROR = (b'Too many resumable upload attempts failed '
335                            b'without progress')
336  RESUMABLE_DOWNLOAD_ERROR = (b'Too many resumable download attempts failed '
337                              b'without progress')
338
339  # TODO: Below is a list of known flaky errors that we should
340  # retry. The list needs to be extended.
341  RESUMABLE_ERROR_MESSAGE = (
342      RESUMABLE_DOWNLOAD_ERROR,
343      RESUMABLE_UPLOAD_ERROR,
344      b'ResumableUploadException',
345      b'ResumableUploadAbortException',
346      b'ResumableDownloadException',
347      b'ssl.SSLError: The read operation timed out',
348      # TODO: Error messages may change in different library versions,
349      # use regexes to match resumable error messages.
350      b"ssl.SSLError: ('The read operation timed out',)",
351      b'ssl.SSLError: _ssl.c:495: The handshake operation timed out',
352      b'Unable to find the server',
353      b"doesn't match cloud-supplied digest",
354      b'ssl.SSLError: [Errno 8]',
355      b'EOF occurred in violation of protocol',
356      # TODO(nxia): crbug.com/775330 narrow down the criteria for retrying
357      b'AccessDeniedException',
358  )
359
360  # We have seen flaky errors with 5xx return codes
361  # See b/17376491 for the "JSON decoding" error.
362  # We have seen transient Oauth 2.0 credential errors (crbug.com/414345).
363  TRANSIENT_ERROR_MESSAGE = (
364      b'ServiceException: 5',
365      b'Failure: No JSON object could be decoded',
366      b'Oauth 2.0 User Account',
367      b'InvalidAccessKeyId',
368      b'socket.error: [Errno 104] Connection reset by peer',
369      b'Received bad request from server',
370      b"can't start new thread",
371  )
372
373  @classmethod
374  def GetDefaultGSUtilBin(cls, cache_dir=None, cache_user=None):
375    if cls.DEFAULT_GSUTIL_BIN is None:
376      if cache_dir is None:
377        cache_dir = path_util.GetCacheDir()
378      if cache_dir is not None:
379        common_path = os.path.join(cache_dir, constants.COMMON_CACHE)
380        tar_cache = cache.TarballCache(common_path, cache_user=cache_user)
381        key = (cls.GSUTIL_TAR,)
382        # The common cache will not be LRU, removing the need to hold a read
383        # lock on the cached gsutil.
384        ref = tar_cache.Lookup(key)
385        ref.SetDefault(cls.GSUTIL_URL)
386        cls.DEFAULT_GSUTIL_BIN = os.path.join(ref.path, 'gsutil', 'gsutil')
387        cls._CompileCrcmod(ref.path)
388      else:
389        # Check if the default gsutil path for builders exists. If
390        # not, try locating gsutil. If none exists, simply use 'gsutil'.
391        gsutil_bin = cls.DEFAULT_GSUTIL_BUILDER_BIN
392        if not os.path.exists(gsutil_bin):
393          gsutil_bin = osutils.Which('gsutil')
394        if gsutil_bin is None:
395          gsutil_bin = 'gsutil'
396        cls.DEFAULT_GSUTIL_BIN = gsutil_bin
397
398    return cls.DEFAULT_GSUTIL_BIN
399
400  @classmethod
401  def _CompileCrcmod(cls, path):
402    """Try to setup a compiled crcmod for gsutil.
403
404    The native crcmod code is much faster than the python implementation, and
405    enables some more features (otherwise gsutil internally disables them).
406    Try to compile the module on demand in the crcmod tree bundled with gsutil.
407
408    For more details, see:
409    https://cloud.google.com/storage/docs/gsutil/addlhelp/CRC32CandInstallingcrcmod
410    """
411    src_root = os.path.join(path, 'gsutil', 'third_party', 'crcmod')
412
413    # Try to build it once.
414    flag = os.path.join(src_root, '.chromite.tried.build')
415    if os.path.exists(flag):
416      return
417    # Flag things now regardless of how the attempt below works out.
418    try:
419      osutils.Touch(flag)
420    except IOError as e:
421      # If the gsutil dir was cached previously as root, but now we're
422      # non-root, just flag it and return.
423      if e.errno == errno.EACCES:
424        logging.debug('Skipping gsutil crcmod compile due to permissions')
425        cros_build_lib.sudo_run(['touch', flag], debug_level=logging.DEBUG)
426        return
427      else:
428        raise
429
430    # See if the system includes one in which case we're done.
431    # We probe `python` as that's what gsutil uses for its shebang.
432    result = cros_build_lib.run(
433        ['python', '-c', 'from crcmod.crcmod import _usingExtension; '
434         'exit(0 if _usingExtension else 1)'], check=False, capture_output=True)
435    if result.returncode == 0:
436      return
437
438    # See if the local copy has one.
439    for pyver in ('python2', 'python3'):
440      logging.debug('Attempting to compile local crcmod for %s gsutil', pyver)
441      with osutils.TempDir(prefix='chromite.gsutil.crcmod') as tempdir:
442        result = cros_build_lib.run(
443            [pyver, 'setup.py', 'build', '--build-base', tempdir,
444             '--build-platlib', tempdir],
445            cwd=src_root, capture_output=True, check=False,
446            debug_level=logging.DEBUG)
447        if result.returncode:
448          continue
449
450        # Locate the module in the build dir.
451        copied = False
452        for mod_path in glob.glob(
453            os.path.join(tempdir, 'crcmod', '_crcfunext*.so')):
454          dst_mod_path = os.path.join(src_root, pyver, 'crcmod',
455                                      os.path.basename(mod_path))
456          try:
457            shutil.copy2(mod_path, dst_mod_path)
458            copied = True
459          except shutil.Error:
460            pass
461
462        if not copied:
463          # If the module compile failed (missing compiler/headers/whatever),
464          # then the setup.py build command above would have passed, but there
465          # won't actually be a _crcfunext.so module.  Check for it here to
466          # disambiguate other errors from shutil.copy2.
467          logging.debug('No crcmod module produced (missing host compiler?)')
468          continue
469
470  def __init__(self, boto_file=None, cache_dir=None, acl=None,
471               dry_run=False, gsutil_bin=None, init_boto=False, retries=None,
472               sleep=None, cache_user=None):
473    """Constructor.
474
475    Args:
476      boto_file: Fully qualified path to user's .boto credential file.
477      cache_dir: The absolute path to the cache directory. Use the default
478        fallback if not given.
479      acl: If given, a canned ACL. It is not valid to pass in an ACL file
480        here, because most gsutil commands do not accept ACL files. If you
481        would like to use an ACL file, use the SetACL command instead.
482      dry_run: Testing mode that prints commands that would be run.
483      gsutil_bin: If given, the absolute path to the gsutil binary.  Else
484        the default fallback will be used.
485      init_boto: If set to True, GSContext will check during __init__ if a
486        valid boto config is configured, and if not, will attempt to ask the
487        user to interactively set up the boto config.
488      retries: Number of times to retry a command before failing.
489      sleep: Amount of time to sleep between failures.
490      cache_user: user for creating cache_dir for gsutil. Default is None.
491    """
492    if gsutil_bin is None:
493      gsutil_bin = self.GetDefaultGSUtilBin(cache_dir, cache_user=cache_user)
494    else:
495      self._CheckFile('gsutil not found', gsutil_bin)
496    self.gsutil_bin = gsutil_bin
497
498    # The version of gsutil is retrieved on demand and cached here.
499    self._gsutil_version = None
500
501    # Increase the number of retries. With 10 retries, Boto will try a total of
502    # 11 times and wait up to 2**11 seconds (~30 minutes) in total, not
503    # not including the time spent actually uploading or downloading.
504    self.gsutil_flags = ['-o', 'Boto:num_retries=10']
505
506    # Set HTTP proxy if environment variable http_proxy is set
507    # (crbug.com/325032).
508    if 'http_proxy' in os.environ:
509      url = urllib.parse.urlparse(os.environ['http_proxy'])
510      if not url.hostname or (not url.username and url.password):
511        logging.warning('GS_ERROR: Ignoring env variable http_proxy because it '
512                        'is not properly set: %s', os.environ['http_proxy'])
513      else:
514        self.gsutil_flags += ['-o', 'Boto:proxy=%s' % url.hostname]
515        if url.username:
516          self.gsutil_flags += ['-o', 'Boto:proxy_user=%s' % url.username]
517        if url.password:
518          self.gsutil_flags += ['-o', 'Boto:proxy_pass=%s' % url.password]
519        if url.port:
520          self.gsutil_flags += ['-o', 'Boto:proxy_port=%d' % url.port]
521
522    # Prefer boto_file if specified, else prefer the env then the default.
523    if boto_file is None:
524      boto_file = os.environ.get('BOTO_CONFIG')
525    if boto_file is None and os.path.isfile(self.DEFAULT_BOTO_FILE):
526      # Only set boto file to DEFAULT_BOTO_FILE if it exists.
527      boto_file = self.DEFAULT_BOTO_FILE
528
529    self.boto_file = boto_file
530
531    self.acl = acl
532
533    self.dry_run = dry_run
534    self.retries = self.DEFAULT_RETRIES if retries is None else int(retries)
535    self._sleep_time = self.DEFAULT_SLEEP_TIME if sleep is None else int(sleep)
536
537    if init_boto and not dry_run:
538      # We can't really expect gsutil to even be present in dry_run mode.
539      self._InitBoto()
540
541  @property
542  def gsutil_version(self):
543    """Return the version of the gsutil in this context."""
544    if not self._gsutil_version:
545      if self.dry_run:
546        self._gsutil_version = self.GSUTIL_VERSION
547      else:
548        cmd = ['-q', 'version']
549
550        # gsutil has been known to return version to stderr in the past, so
551        # use stderr=subprocess.STDOUT.
552        result = self.DoCommand(cmd, stdout=True, stderr=subprocess.STDOUT)
553
554        # Expect output like: 'gsutil version 3.35' or 'gsutil version: 4.5'.
555        match = re.search(r'^\s*gsutil\s+version:?\s+([\d.]+)', result.output,
556                          re.IGNORECASE)
557        if match:
558          self._gsutil_version = match.group(1)
559        else:
560          raise GSContextException('Unexpected output format from "%s":\n%s.' %
561                                   (result.cmdstr, result.output))
562
563    return self._gsutil_version
564
565  def _CheckFile(self, errmsg, afile):
566    """Pre-flight check for valid inputs.
567
568    Args:
569      errmsg: Error message to display.
570      afile: Fully qualified path to test file existance.
571    """
572    if not os.path.isfile(afile):
573      raise GSContextException('%s, %s is not a file' % (errmsg, afile))
574
575  def _TestGSLs(self):
576    """Quick test of gsutil functionality."""
577    # The bucket in question is readable by any authenticated account.
578    # If we can list it's contents, we have valid authentication.
579    cmd = ['ls', AUTHENTICATION_BUCKET]
580    result = self.DoCommand(cmd, retries=0, debug_level=logging.DEBUG,
581                            stderr=True, check=False)
582
583    # Did we fail with an authentication error?
584    if (result.returncode == 1 and
585        any(e in result.error for e in self.AUTHORIZATION_ERRORS)):
586      logging.warning('gsutil authentication failure msg: %s', result.error)
587      return False
588
589    return True
590
591  def _ConfigureBotoConfig(self):
592    """Make sure we can access protected bits in GS."""
593    print('Configuring gsutil. **Please use your @google.com account.**')
594    try:
595      if not self.boto_file:
596        self.boto_file = self.DEFAULT_BOTO_FILE
597      self.DoCommand(['config'], retries=0, debug_level=logging.CRITICAL,
598                     print_cmd=False)
599    finally:
600      if (os.path.exists(self.boto_file) and not
601          os.path.getsize(self.boto_file)):
602        os.remove(self.boto_file)
603        raise GSContextException('GS config could not be set up.')
604
605  def _InitBoto(self):
606    if not self._TestGSLs():
607      self._ConfigureBotoConfig()
608
609  def Cat(self, path, **kwargs):
610    """Returns the contents of a GS object."""
611    kwargs.setdefault('stdout', True)
612    encoding = kwargs.setdefault('encoding', None)
613    errors = kwargs.setdefault('errors', None)
614    if not PathIsGs(path):
615      # gsutil doesn't support cat-ting a local path, so read it ourselves.
616      mode = 'rb' if encoding is None else 'r'
617      try:
618        return osutils.ReadFile(path, mode=mode, encoding=encoding,
619                                errors=errors)
620      except Exception as e:
621        if getattr(e, 'errno', None) == errno.ENOENT:
622          raise GSNoSuchKey('Cat Error: file %s does not exist' % path)
623        else:
624          raise GSContextException(str(e))
625    elif self.dry_run:
626      return b'' if encoding is None else ''
627    else:
628      return self.DoCommand(['cat', path], **kwargs).output
629
630  def StreamingCat(self, path, chunksize=0x100000):
631    """Returns the content of a GS file as a stream.
632
633    Unlike Cat or Copy, this function doesn't support any internal retry or
634    validation by computing checksum of downloaded data. Users should perform
635    their own validation, or use Cat() instead.
636
637    Args:
638      path: Full gs:// path of the src file.
639      chunksize: At most how much data read from upstream and yield to callers
640        at a time. The default value is 1 MB.
641
642    Yields:
643      The file content, chunk by chunk, as bytes.
644    """
645    assert PathIsGs(path)
646
647    if self.dry_run:
648      return (lambda: (yield ''))()
649
650    cmd = [self.gsutil_bin] + self.gsutil_flags + ['cat', path]
651    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
652
653    def read_content():
654      try:
655        while True:
656          data = proc.stdout.read(chunksize)
657          if not data and proc.poll() is not None:
658            break
659          if data:
660            yield data
661
662        rc = proc.poll()
663        if rc:
664          raise GSCommandError(
665              'Cannot stream cat %s from Google Storage!' % path, rc, None)
666      finally:
667        if proc.returncode is None:
668          proc.stdout.close()
669          proc.terminate()
670
671    return read_content()
672
673  def CopyInto(self, local_path, remote_dir, filename=None, **kwargs):
674    """Upload a local file into a directory in google storage.
675
676    Args:
677      local_path: Local file path to copy.
678      remote_dir: Full gs:// url of the directory to transfer the file into.
679      filename: If given, the filename to place the content at; if not given,
680        it's discerned from basename(local_path).
681      **kwargs: See Copy() for documentation.
682
683    Returns:
684      The generation of the remote file.
685    """
686    filename = filename if filename is not None else local_path
687    # Basename it even if an explicit filename was given; we don't want
688    # people using filename as a multi-directory path fragment.
689    return self.Copy(local_path,
690                     '%s/%s' % (remote_dir, os.path.basename(filename)),
691                     **kwargs)
692
693  @staticmethod
694  def GetTrackerFilenames(dest_path):
695    """Returns a list of gsutil tracker filenames.
696
697    Tracker files are used by gsutil to resume downloads/uploads. This
698    function does not handle parallel uploads.
699
700    Args:
701      dest_path: Either a GS path or an absolute local path.
702
703    Returns:
704      The list of potential tracker filenames.
705    """
706    dest = urllib.parse.urlsplit(dest_path)
707    filenames = []
708    if dest.scheme == 'gs':
709      prefix = 'upload'
710      bucket_name = dest.netloc
711      object_name = dest.path.lstrip('/')
712      filenames.append(
713          re.sub(r'[/\\]', '_', 'resumable_upload__%s__%s__%s.url' %
714                 (bucket_name, object_name, GSContext.GSUTIL_API_SELECTOR)))
715    else:
716      prefix = 'download'
717      filenames.append(
718          re.sub(r'[/\\]', '_', 'resumable_download__%s__%s.etag' %
719                 (dest.path, GSContext.GSUTIL_API_SELECTOR)))
720
721    hashed_filenames = []
722    for filename in filenames:
723      m = hashlib.sha1(filename.encode())
724      hashed_filenames.append('%s_TRACKER_%s.%s' %
725                              (prefix, m.hexdigest(), filename[-16:]))
726
727    return hashed_filenames
728
729  def _RetryFilter(self, e):
730    """Returns whether to retry RunCommandError exception |e|.
731
732    Args:
733      e: Exception object to filter. Exception may be re-raised as
734         as different type, if _RetryFilter determines a more appropriate
735         exception type based on the contents of |e|.
736    """
737    error_details = self._MatchKnownError(e)
738    if error_details.exception:
739      raise error_details.exception
740    return error_details.retriable
741
742  def _MatchKnownError(self, e):
743    """Function to match known RunCommandError exceptions.
744
745    Args:
746      e: Exception object to filter.
747
748    Returns:
749      An ErrorDetails instance with details about the message pattern found.
750    """
751    if not retry_util.ShouldRetryCommandCommon(e):
752      if not isinstance(e, cros_build_lib.RunCommandError):
753        error_type = 'unknown'
754      else:
755        error_type = 'failed_to_launch'
756      return ErrorDetails(type=error_type, retriable=False)
757
758    # e is guaranteed by above filter to be a RunCommandError
759    if e.result.returncode < 0:
760      sig_name = signals.StrSignal(-e.result.returncode)
761      logging.info('Child process received signal %d; not retrying.', sig_name)
762      return ErrorDetails(type='received_signal', message_pattern=sig_name,
763                          retriable=False)
764
765    error = e.result.error
766    if error:
767      # Since the captured error will use the encoding the user requested,
768      # normalize to bytes for testing below.
769      if isinstance(error, six.text_type):
770        error = error.encode('utf-8')
771
772      # gsutil usually prints PreconditionException when a precondition fails.
773      # It may also print "ResumableUploadAbortException: 412 Precondition
774      # Failed", so the logic needs to be a little more general.
775      if (b'PreconditionException' in error or
776          b'412 Precondition Failed' in error):
777        return ErrorDetails(type='precondition_exception', retriable=False,
778                            exception=GSContextPreconditionFailed(e))
779
780      # If the file does not exist, one of the following errors occurs. The
781      # "stat" command leaves off the "CommandException: " prefix, but it also
782      # outputs to stdout instead of stderr and so will not be caught here
783      # regardless.
784      if (b'CommandException: No URLs matched' in error or
785          b'NotFoundException:' in error or
786          b'One or more URLs matched no objects' in error):
787        return ErrorDetails(type='no_such_key', retriable=False,
788                            exception=GSNoSuchKey(e))
789
790      logging.warning('GS_ERROR: %s ', error)
791
792      # Temporary fix: remove the gsutil tracker files so that our retry
793      # can hit a different backend. This should be removed after the
794      # bug is fixed by the Google Storage team (see crbug.com/308300).
795      resumable_error = _FirstSubstring(error, self.RESUMABLE_ERROR_MESSAGE)
796      if resumable_error:
797        # Only remove the tracker files if we try to upload/download a file.
798        if 'cp' in e.result.cmd[:-2]:
799          # Assume a command: gsutil [options] cp [options] src_path dest_path
800          # dest_path needs to be a fully qualified local path, which is already
801          # required for GSContext.Copy().
802          tracker_filenames = self.GetTrackerFilenames(e.result.cmd[-1])
803          logging.info('Potential list of tracker files: %s',
804                       tracker_filenames)
805          for tracker_filename in tracker_filenames:
806            tracker_file_path = os.path.join(self.DEFAULT_GSUTIL_TRACKER_DIR,
807                                             tracker_filename)
808            if os.path.exists(tracker_file_path):
809              logging.info('Deleting gsutil tracker file %s before retrying.',
810                           tracker_file_path)
811              logging.info('The content of the tracker file: %s',
812                           osutils.ReadFile(tracker_file_path))
813              osutils.SafeUnlink(tracker_file_path)
814        return ErrorDetails(type='resumable',
815                            message_pattern=resumable_error.decode('utf-8'),
816                            retriable=True)
817
818      transient_error = _FirstSubstring(error, self.TRANSIENT_ERROR_MESSAGE)
819      if transient_error:
820        return ErrorDetails(type='transient',
821                            message_pattern=transient_error.decode('utf-8'),
822                            retriable=True)
823
824    return ErrorDetails(type='unknown', retriable=False)
825
826  # TODO(mtennant): Make a private method.
827  def DoCommand(self, gsutil_cmd, headers=(), retries=None, version=None,
828                parallel=False, **kwargs):
829    """Run a gsutil command, suppressing output, and setting retry/sleep.
830
831    Args:
832      gsutil_cmd: The (mostly) constructed gsutil subcommand to run.
833      headers: A list of raw headers to pass down.
834      parallel: Whether gsutil should enable parallel copy/update of multiple
835        files. NOTE: This option causes gsutil to use significantly more
836        memory, even if gsutil is only uploading one file.
837      retries: How many times to retry this command (defaults to setting given
838        at object creation).
839      version: If given, the generation; essentially the timestamp of the last
840        update.  Note this is not the same as sequence-number; it's
841        monotonically increasing bucket wide rather than reset per file.
842        The usage of this is if we intend to replace/update only if the version
843        is what we expect.  This is useful for distributed reasons- for example,
844        to ensure you don't overwrite someone else's creation, a version of
845        0 states "only update if no version exists".
846
847    Returns:
848      A RunCommandResult object.
849    """
850    kwargs = kwargs.copy()
851    kwargs.setdefault('stderr', True)
852    kwargs.setdefault('encoding', 'utf-8')
853
854    cmd = [self.gsutil_bin]
855    cmd += self.gsutil_flags
856    for header in headers:
857      cmd += ['-h', header]
858    if version is not None:
859      cmd += ['-h', 'x-goog-if-generation-match:%d' % int(version)]
860
861    # Enable parallel copy/update of multiple files if stdin is not to
862    # be piped to the command. This does not split a single file into
863    # smaller components for upload.
864    if parallel and kwargs.get('input') is None:
865      cmd += ['-m']
866
867    cmd.extend(gsutil_cmd)
868
869    if retries is None:
870      retries = self.retries
871
872    extra_env = kwargs.pop('extra_env', {})
873    if self.boto_file and os.path.isfile(self.boto_file):
874      extra_env.setdefault('BOTO_CONFIG', self.boto_file)
875
876    if self.dry_run:
877      logging.debug("%s: would've run: %s", self.__class__.__name__,
878                    cros_build_lib.CmdToStr(cmd))
879    else:
880      try:
881        return retry_stats.RetryWithStats(retry_stats.GSUTIL,
882                                          self._RetryFilter,
883                                          retries, cros_build_lib.run,
884                                          cmd, sleep=self._sleep_time,
885                                          extra_env=extra_env, **kwargs)
886      except cros_build_lib.RunCommandError as e:
887        raise GSCommandError(e.msg, e.result, e.exception)
888
889  def Copy(self, src_path, dest_path, acl=None, recursive=False,
890           skip_symlinks=True, auto_compress=False, **kwargs):
891    """Copy to/from GS bucket.
892
893    Canned ACL permissions can be specified on the gsutil cp command line.
894
895    More info:
896    https://developers.google.com/storage/docs/accesscontrol#applyacls
897
898    Args:
899      src_path: Fully qualified local path or full gs:// path of the src file.
900      dest_path: Fully qualified local path or full gs:// path of the dest
901                 file.
902      acl: One of the google storage canned_acls to apply.
903      recursive: Whether to copy recursively.
904      skip_symlinks: Skip symbolic links when copying recursively.
905      auto_compress: Automatically compress with gzip when uploading.
906
907    Returns:
908      The generation of the remote file.
909
910    Raises:
911      RunCommandError if the command failed despite retries.
912    """
913    # -v causes gs://bucket/path#generation to be listed in output.
914    cmd = ['cp', '-v']
915
916    # Certain versions of gsutil (at least 4.3) assume the source of a copy is
917    # a directory if the -r option is used. If it's really a file, gsutil will
918    # look like it's uploading it but not actually do anything. We'll work
919    # around that problem by surpressing the -r flag if we detect the source
920    # is a local file.
921    if recursive and not os.path.isfile(src_path):
922      cmd.append('-r')
923      if skip_symlinks:
924        cmd.append('-e')
925
926    if auto_compress:
927      cmd.append('-Z')
928
929    acl = self.acl if acl is None else acl
930    if acl is not None:
931      cmd += ['-a', acl]
932
933    with cros_build_lib.ContextManagerStack() as stack:
934      # Write the input into a tempfile if possible. This is needed so that
935      # gsutil can retry failed requests.  We allow the input to be a string
936      # or bytes regardless of the output encoding.
937      if src_path == '-' and kwargs.get('input') is not None:
938        f = stack.Add(tempfile.NamedTemporaryFile, mode='wb')
939        data = kwargs['input']
940        if isinstance(data, six.text_type):
941          data = data.encode('utf-8')
942        f.write(data)
943        f.flush()
944        del kwargs['input']
945        src_path = f.name
946
947      cmd += ['--', src_path, dest_path]
948
949      if not (PathIsGs(src_path) or PathIsGs(dest_path)):
950        # Don't retry on local copies.
951        kwargs.setdefault('retries', 0)
952
953      kwargs['capture_output'] = True
954      try:
955        result = self.DoCommand(cmd, **kwargs)
956        if self.dry_run:
957          return None
958
959        # Now we parse the output for the current generation number.  Example:
960        #   Created: gs://chromeos-throw-away-bucket/foo#1360630664537000.1
961        m = re.search(r'Created: .*#(\d+)([.](\d+))?\n', result.error)
962        if m:
963          return int(m.group(1))
964        else:
965          return None
966      except GSNoSuchKey as e:
967        # If the source was a local file, the error is a quirk of gsutil 4.5
968        # and should be ignored. If the source was remote, there might
969        # legitimately be no such file. See crbug.com/393419.
970        if os.path.isfile(src_path):
971          return None
972
973        # Temp log for crbug.com/642986, should be removed when the bug
974        # is fixed.
975        logging.warning('Copy Error: src %s dest %s: %s '
976                        '(Temp log for crbug.com/642986)',
977                        src_path, dest_path, e)
978        raise
979
980  def CreateWithContents(self, gs_uri, contents, **kwargs):
981    """Creates the specified file with specified contents.
982
983    Args:
984      gs_uri: The URI of a file on Google Storage.
985      contents: String or bytes with contents to write to the file.
986      kwargs: See additional options that Copy takes.
987
988    Raises:
989      See Copy.
990    """
991    self.Copy('-', gs_uri, input=contents, **kwargs)
992
993  # TODO: Merge LS() and List()?
994  def LS(self, path, **kwargs):
995    """Does a directory listing of the given gs path.
996
997    Args:
998      path: The path to get a listing of.
999      kwargs: See options that DoCommand takes.
1000
1001    Returns:
1002      A list of paths that matched |path|.  Might be more than one if a
1003      directory or path include wildcards/etc...
1004    """
1005    if self.dry_run:
1006      return []
1007
1008    if not PathIsGs(path):
1009      # gsutil doesn't support listing a local path, so just run 'ls'.
1010      kwargs.pop('retries', None)
1011      kwargs.pop('headers', None)
1012      kwargs['capture_output'] = True
1013      kwargs.setdefault('encoding', 'utf-8')
1014      result = cros_build_lib.run(['ls', path], **kwargs)
1015      return result.output.splitlines()
1016    else:
1017      return [x.url for x in self.List(path, **kwargs)]
1018
1019  def List(self, path, details=False, **kwargs):
1020    """Does a directory listing of the given gs path.
1021
1022    Args:
1023      path: The path to get a listing of.
1024      details: Whether to include size/timestamp info.
1025      kwargs: See options that DoCommand takes.
1026
1027    Returns:
1028      A list of GSListResult objects that matched |path|.  Might be more
1029      than one if a directory or path include wildcards/etc...
1030    """
1031    ret = []
1032    if self.dry_run:
1033      return ret
1034
1035    cmd = ['ls']
1036    if details:
1037      cmd += ['-l']
1038    cmd += ['--', path]
1039
1040    # We always request the extended details as the overhead compared to a plain
1041    # listing is negligible.
1042    kwargs['stdout'] = True
1043    lines = self.DoCommand(cmd, **kwargs).output.splitlines()
1044
1045    if details:
1046      # The last line is expected to be a summary line.  Ignore it.
1047      lines = lines[:-1]
1048      ls_re = LS_LA_RE
1049    else:
1050      ls_re = LS_RE
1051
1052    # Handle optional fields.
1053    intify = lambda x: int(x) if x else None
1054
1055    # Parse out each result and build up the results list.
1056    for line in lines:
1057      match = ls_re.search(line)
1058      if not match:
1059        raise GSContextException('unable to parse line: %s' % line)
1060      if match.group('creation_time'):
1061        timestamp = datetime.datetime.strptime(match.group('creation_time'),
1062                                               DATETIME_FORMAT)
1063      else:
1064        timestamp = None
1065
1066      ret.append(GSListResult(
1067          content_length=intify(match.group('content_length')),
1068          creation_time=timestamp,
1069          url=match.group('url'),
1070          generation=intify(match.group('generation')),
1071          metageneration=intify(match.group('metageneration'))))
1072
1073    return ret
1074
1075  def GetSize(self, path, **kwargs):
1076    """Returns size of a single object (local or GS)."""
1077    if not PathIsGs(path):
1078      return os.path.getsize(path)
1079    else:
1080      return self.Stat(path, **kwargs).content_length
1081
1082  def Move(self, src_path, dest_path, **kwargs):
1083    """Move/rename to/from GS bucket.
1084
1085    Args:
1086      src_path: Fully qualified local path or full gs:// path of the src file.
1087      dest_path: Fully qualified local path or full gs:// path of the dest file.
1088      kwargs: See options that DoCommand takes.
1089    """
1090    cmd = ['mv', '--', src_path, dest_path]
1091    return self.DoCommand(cmd, **kwargs)
1092
1093  def SetACL(self, upload_url, acl=None, **kwargs):
1094    """Set access on a file already in google storage.
1095
1096    Args:
1097      upload_url: gs:// url that will have acl applied to it.
1098      acl: An ACL permissions file or canned ACL.
1099      kwargs: See options that DoCommand takes.
1100    """
1101    if acl is None:
1102      if not self.acl:
1103        raise GSContextException(
1104            'SetAcl invoked w/out a specified acl, nor a default acl.')
1105      acl = self.acl
1106
1107    self.DoCommand(['acl', 'set', acl, upload_url], **kwargs)
1108
1109  def ChangeACL(self, upload_url, acl_args_file=None, acl_args=None, **kwargs):
1110    """Change access on a file already in google storage with "acl ch".
1111
1112    Args:
1113      upload_url: gs:// url that will have acl applied to it.
1114      acl_args_file: A file with arguments to the gsutil acl ch command. The
1115                     arguments can be spread across multiple lines. Comments
1116                     start with a # character and extend to the end of the
1117                     line. Exactly one of this argument or acl_args must be
1118                     set.
1119      acl_args: A list of arguments for the gsutil acl ch command. Exactly
1120                one of this argument or acl_args must be set.
1121      kwargs: See options that DoCommand takes.
1122    """
1123    if acl_args_file and acl_args:
1124      raise GSContextException(
1125          'ChangeACL invoked with both acl_args and acl_args set.')
1126    if not acl_args_file and not acl_args:
1127      raise GSContextException(
1128          'ChangeACL invoked with neither acl_args nor acl_args set.')
1129
1130    if acl_args_file:
1131      lines = osutils.ReadFile(acl_args_file).splitlines()
1132      # Strip out comments.
1133      lines = [x.split('#', 1)[0].strip() for x in lines]
1134      acl_args = ' '.join([x for x in lines if x]).split()
1135
1136    # Some versions of gsutil bubble up precondition failures even when we
1137    # didn't request it due to how ACL changes happen internally to gsutil.
1138    # https://crbug.com/763450
1139    # We keep the retry limit a bit low because DoCommand already has its
1140    # own level of retries.
1141    retry_util.RetryException(
1142        GSContextPreconditionFailed, 3, self.DoCommand,
1143        ['acl', 'ch'] + acl_args + [upload_url], **kwargs)
1144
1145  def Exists(self, path, **kwargs):
1146    """Checks whether the given object exists.
1147
1148    Args:
1149      path: Local path or gs:// url to check.
1150      kwargs: Flags to pass to DoCommand.
1151
1152    Returns:
1153      True if the path exists; otherwise returns False.
1154    """
1155    if not PathIsGs(path):
1156      return os.path.exists(path)
1157
1158    try:
1159      self.Stat(path, **kwargs)
1160    except GSNoSuchKey:
1161      return False
1162
1163    return True
1164
1165  def Remove(self, path, recursive=False, ignore_missing=False, **kwargs):
1166    """Remove the specified file.
1167
1168    Args:
1169      path: Full gs:// url of the file to delete.
1170      recursive: Remove recursively starting at path.
1171      ignore_missing: Whether to suppress errors about missing files.
1172      kwargs: Flags to pass to DoCommand.
1173    """
1174    cmd = ['rm']
1175    if 'recurse' in kwargs:
1176      raise TypeError('"recurse" has been renamed to "recursive"')
1177    if recursive:
1178      cmd.append('-R')
1179    cmd.append('--')
1180    cmd.append(path)
1181    try:
1182      self.DoCommand(cmd, **kwargs)
1183    except GSNoSuchKey:
1184      if not ignore_missing:
1185        raise
1186
1187  def GetGeneration(self, path):
1188    """Get the generation and metageneration of the given |path|.
1189
1190    Returns:
1191      A tuple of the generation and metageneration.
1192    """
1193    try:
1194      res = self.Stat(path)
1195    except GSNoSuchKey:
1196      return 0, 0
1197
1198    return res.generation, res.metageneration
1199
1200  def Stat(self, path, **kwargs):
1201    """Stat a GS file, and get detailed information.
1202
1203    Args:
1204      path: A GS path for files to Stat. Wildcards are NOT supported.
1205      kwargs: Flags to pass to DoCommand.
1206
1207    Returns:
1208      A GSStatResult object with all fields populated.
1209
1210    Raises:
1211      Assorted GSContextException exceptions.
1212    """
1213    try:
1214      res = self.DoCommand(['stat', '--', path], stdout=True, **kwargs)
1215    except GSCommandError as e:
1216      # Because the 'gsutil stat' command logs errors itself (instead of
1217      # raising errors internally like other commands), we have to look
1218      # for errors ourselves.  See the related bug report here:
1219      # https://github.com/GoogleCloudPlatform/gsutil/issues/288
1220      # Example line:
1221      # No URLs matched gs://bucket/file
1222      if e.result.error and e.result.error.startswith('No URLs matched'):
1223        raise GSNoSuchKey('Stat Error: No URLs matched %s.' % path)
1224
1225      # No idea what this is, so just choke.
1226      raise
1227
1228    # In dryrun mode, DoCommand doesn't return an object, so we need to fake
1229    # out the behavior ourselves.
1230    if self.dry_run:
1231      return GSStatResult(
1232          creation_time=datetime.datetime.now(),
1233          content_length=0,
1234          content_type='application/octet-stream',
1235          hash_crc32c='AAAAAA==',
1236          hash_md5='',
1237          etag='',
1238          generation=0,
1239          metageneration=0)
1240
1241    # We expect Stat output like the following. However, the Content-Language
1242    # line appears to be optional based on how the file in question was
1243    # created.
1244    #
1245    # gs://bucket/path/file:
1246    #     Creation time:      Sat, 23 Aug 2014 06:53:20 GMT
1247    #     Content-Language:   en
1248    #     Content-Length:     74
1249    #     Content-Type:       application/octet-stream
1250    #     Hash (crc32c):      BBPMPA==
1251    #     Hash (md5):         ms+qSYvgI9SjXn8tW/5UpQ==
1252    #     ETag:               CNCgocbmqMACEAE=
1253    #     Generation:         1408776800850000
1254    #     Metageneration:     1
1255
1256    if not res.output.startswith('gs://'):
1257      raise GSContextException('Unexpected stat output: %s' % res.output)
1258
1259    def _GetField(name, optional=False):
1260      m = re.search(r'%s:\s*(.+)' % re.escape(name), res.output)
1261      if m:
1262        return m.group(1)
1263      elif optional:
1264        return None
1265      else:
1266        raise GSContextException('Field "%s" missing in "%s"' %
1267                                 (name, res.output))
1268
1269    return GSStatResult(
1270        creation_time=datetime.datetime.strptime(
1271            _GetField('Creation time'), '%a, %d %b %Y %H:%M:%S %Z'),
1272        content_length=int(_GetField('Content-Length')),
1273        content_type=_GetField('Content-Type'),
1274        hash_crc32c=_GetField('Hash (crc32c)'),
1275        hash_md5=_GetField('Hash (md5)', optional=True),
1276        etag=_GetField('ETag'),
1277        generation=int(_GetField('Generation')),
1278        metageneration=int(_GetField('Metageneration')))
1279
1280  def Counter(self, path):
1281    """Return a GSCounter object pointing at a |path| in Google Storage.
1282
1283    Args:
1284      path: The path to the counter in Google Storage.
1285    """
1286    return GSCounter(self, path)
1287
1288  def WaitForGsPaths(self, paths, timeout, period=10):
1289    """Wait until a list of files exist in GS.
1290
1291    Args:
1292      paths: The list of files to wait for.
1293      timeout: Max seconds to wait for file to appear.
1294      period: How often to check for files while waiting.
1295
1296    Raises:
1297      timeout_util.TimeoutError if the timeout is reached.
1298    """
1299    # Copy the list of URIs to wait for, so we don't modify the callers context.
1300    pending_paths = paths[:]
1301
1302    def _CheckForExistence():
1303      pending_paths[:] = [x for x in pending_paths if not self.Exists(x)]
1304
1305    def _Retry(_return_value):
1306      # Retry, if there are any pending paths left.
1307      return pending_paths
1308
1309    timeout_util.WaitForSuccess(_Retry, _CheckForExistence,
1310                                timeout=timeout, period=period)
1311
1312  def ContainsWildcard(self, url):
1313    """Checks whether url_string contains a wildcard.
1314
1315    Args:
1316      url: URL string to check.
1317
1318    Returns:
1319      True if |url| contains a wildcard.
1320    """
1321    return bool(WILDCARD_REGEX.search(url))
1322
1323  def GetGsNamesWithWait(self, pattern, url, timeout=600, period=10,
1324                         is_regex_pattern=False):
1325    """Returns the google storage names specified by the given pattern.
1326
1327    This method polls Google Storage until the target files specified by the
1328    pattern is available or until the timeout occurs. Because we may not know
1329    the exact name of the target files, the method accepts a filename pattern,
1330    to identify whether a file whose name matches the pattern exists
1331    (e.g. use pattern '*_full_*' to search for the full payload
1332    'chromeos_R17-1413.0.0-a1_x86-mario_full_dev.bin'). Returns the name only
1333    if found before the timeout.
1334
1335    Warning: GS listing are not perfect, and are eventually consistent. Doing a
1336    search for file existence is a 'best effort'. Calling code should be aware
1337    and ready to handle that.
1338
1339    Args:
1340      pattern: a path pattern (glob or regex) identifying the files we need.
1341      url: URL of the Google Storage bucket.
1342      timeout: how many seconds are we allowed to keep trying.
1343      period: how many seconds to wait between attempts.
1344      is_regex_pattern: Whether the pattern is a regex (otherwise a glob).
1345
1346    Returns:
1347      The list of files matching the pattern in Google Storage bucket or None
1348      if the files are not found and hit the timeout_util.TimeoutError.
1349    """
1350    def _GetGsName():
1351      uploaded_list = [os.path.basename(p.url) for p in self.List(url)]
1352
1353      if is_regex_pattern:
1354        filter_re = re.compile(pattern)
1355        matching_names = [f for f in uploaded_list
1356                          if filter_re.search(f) is not None]
1357      else:
1358        matching_names = fnmatch.filter(uploaded_list, pattern)
1359
1360      return matching_names
1361
1362    try:
1363      matching_names = None
1364      if not (is_regex_pattern or self.ContainsWildcard(pattern)):
1365        try:
1366          self.WaitForGsPaths(['%s/%s' % (url, pattern)], timeout)
1367          return [os.path.basename(pattern)]
1368        except GSCommandError:
1369          pass
1370
1371      if not matching_names:
1372        matching_names = timeout_util.WaitForSuccess(
1373            lambda x: not x, _GetGsName, timeout=timeout, period=period)
1374
1375      logging.debug('matching_names=%s, is_regex_pattern=%r',
1376                    matching_names, is_regex_pattern)
1377      return matching_names
1378    except timeout_util.TimeoutError:
1379      return None
1380
1381
1382def _FirstMatch(predicate, elems):
1383  """Returns the first element matching the given |predicate|.
1384
1385  Args:
1386    predicate: A function which takes an element and returns a bool
1387    elems: A sequence of elements.
1388  """
1389  matches = [x for x in elems if predicate(x)]
1390  return matches[0] if matches else None
1391
1392
1393def _FirstSubstring(superstring, haystack):
1394  """Returns the first elem of |haystack| which is a substring of |superstring|.
1395
1396  Args:
1397    superstring: A string to search for substrings of.
1398    haystack: A sequence of strings to search through.
1399  """
1400  return _FirstMatch(lambda s: s in superstring, haystack)
1401
1402
1403@contextlib.contextmanager
1404def TemporaryURL(prefix):
1405  """Context manager to generate a random URL.
1406
1407  At the end, the URL will be deleted.
1408  """
1409  url = '%s/chromite-temp/%s/%s/%s' % (constants.TRASH_BUCKET, prefix,
1410                                       getpass.getuser(),
1411                                       cros_build_lib.GetRandomString())
1412  ctx = GSContext()
1413  ctx.Remove(url, ignore_missing=True, recursive=True)
1414  try:
1415    yield url
1416  finally:
1417    ctx.Remove(url, ignore_missing=True, recursive=True)
1418