1# Copyright 2014 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Wrappers for gsutil, for basic interaction with Google Cloud Storage.""" 6 7import collections 8import contextlib 9import hashlib 10import logging 11import os 12import shutil 13import stat 14import subprocess 15import re 16import sys 17import tempfile 18 19import py_utils 20from py_utils import lock 21 22# Do a no-op import here so that cloud_storage_global_lock dep is picked up 23# by https://cs.chromium.org/chromium/src/build/android/test_runner.pydeps. 24# TODO(nedn, jbudorick): figure out a way to get rid of this ugly hack. 25from py_utils import cloud_storage_global_lock # pylint: disable=unused-import 26 27logger = logging.getLogger(__name__) # pylint: disable=invalid-name 28 29 30PUBLIC_BUCKET = 'chromium-telemetry' 31PARTNER_BUCKET = 'chrome-partner-telemetry' 32INTERNAL_BUCKET = 'chrome-telemetry' 33TELEMETRY_OUTPUT = 'chrome-telemetry-output' 34 35# Uses ordered dict to make sure that bucket's key-value items are ordered from 36# the most open to the most restrictive. 37BUCKET_ALIASES = collections.OrderedDict(( 38 ('public', PUBLIC_BUCKET), 39 ('partner', PARTNER_BUCKET), 40 ('internal', INTERNAL_BUCKET), 41 ('output', TELEMETRY_OUTPUT), 42)) 43 44BUCKET_ALIAS_NAMES = BUCKET_ALIASES.keys() 45 46 47_GSUTIL_PATH = os.path.join(py_utils.GetCatapultDir(), 'third_party', 'gsutil', 48 'gsutil') 49 50# TODO(tbarzic): A workaround for http://crbug.com/386416 and 51# http://crbug.com/359293. See |_RunCommand|. 52_CROS_GSUTIL_HOME_WAR = '/home/chromeos-test/' 53 54 55# If Environment variables has DISABLE_CLOUD_STORAGE_IO set to '1', any method 56# calls that invoke cloud storage network io will throw exceptions. 57DISABLE_CLOUD_STORAGE_IO = 'DISABLE_CLOUD_STORAGE_IO' 58 59# The maximum number of seconds to wait to acquire the pseudo lock for a cloud 60# storage file before raising an exception. 61LOCK_ACQUISITION_TIMEOUT = 10 62 63 64class CloudStorageError(Exception): 65 66 @staticmethod 67 def _GetConfigInstructions(): 68 command = _GSUTIL_PATH 69 if py_utils.IsRunningOnCrosDevice(): 70 command = 'HOME=%s %s' % (_CROS_GSUTIL_HOME_WAR, _GSUTIL_PATH) 71 return ('To configure your credentials:\n' 72 ' 1. Run "%s config" and follow its instructions.\n' 73 ' 2. If you have a @google.com account, use that account.\n' 74 ' 3. For the project-id, just enter 0.' % command) 75 76 77class PermissionError(CloudStorageError): 78 79 def __init__(self): 80 super(PermissionError, self).__init__( 81 'Attempted to access a file from Cloud Storage but you don\'t ' 82 'have permission. ' + self._GetConfigInstructions()) 83 84 85class CredentialsError(CloudStorageError): 86 87 def __init__(self): 88 super(CredentialsError, self).__init__( 89 'Attempted to access a file from Cloud Storage but you have no ' 90 'configured credentials. ' + self._GetConfigInstructions()) 91 92 93class CloudStorageIODisabled(CloudStorageError): 94 pass 95 96 97class NotFoundError(CloudStorageError): 98 pass 99 100 101class ServerError(CloudStorageError): 102 pass 103 104 105# TODO(tonyg/dtu): Can this be replaced with distutils.spawn.find_executable()? 106def _FindExecutableInPath(relative_executable_path, *extra_search_paths): 107 search_paths = list(extra_search_paths) + os.environ['PATH'].split(os.pathsep) 108 for search_path in search_paths: 109 executable_path = os.path.join(search_path, relative_executable_path) 110 if py_utils.IsExecutable(executable_path): 111 return executable_path 112 return None 113 114 115def _EnsureExecutable(gsutil): 116 """chmod +x if gsutil is not executable.""" 117 st = os.stat(gsutil) 118 if not st.st_mode & stat.S_IEXEC: 119 os.chmod(gsutil, st.st_mode | stat.S_IEXEC) 120 121 122def _RunCommand(args): 123 # On cros device, as telemetry is running as root, home will be set to /root/, 124 # which is not writable. gsutil will attempt to create a download tracker dir 125 # in home dir and fail. To avoid this, override HOME dir to something writable 126 # when running on cros device. 127 # 128 # TODO(tbarzic): Figure out a better way to handle gsutil on cros. 129 # http://crbug.com/386416, http://crbug.com/359293. 130 gsutil_env = None 131 if py_utils.IsRunningOnCrosDevice(): 132 gsutil_env = os.environ.copy() 133 gsutil_env['HOME'] = _CROS_GSUTIL_HOME_WAR 134 135 if os.name == 'nt': 136 # If Windows, prepend python. Python scripts aren't directly executable. 137 args = [sys.executable, _GSUTIL_PATH] + args 138 else: 139 # Don't do it on POSIX, in case someone is using a shell script to redirect. 140 args = [_GSUTIL_PATH] + args 141 _EnsureExecutable(_GSUTIL_PATH) 142 143 if args[0] not in ('help', 'hash', 'version') and not IsNetworkIOEnabled(): 144 raise CloudStorageIODisabled( 145 "Environment variable DISABLE_CLOUD_STORAGE_IO is set to 1. " 146 'Command %s is not allowed to run' % args) 147 148 gsutil = subprocess.Popen(args, stdout=subprocess.PIPE, 149 stderr=subprocess.PIPE, env=gsutil_env) 150 stdout, stderr = gsutil.communicate() 151 152 if gsutil.returncode: 153 raise GetErrorObjectForCloudStorageStderr(stderr) 154 155 return stdout 156 157 158def GetErrorObjectForCloudStorageStderr(stderr): 159 if (stderr.startswith(( 160 'You are attempting to access protected data with no configured', 161 'Failure: No handler was ready to authenticate.')) or 162 re.match('.*401.*does not have .* access to .*', stderr)): 163 return CredentialsError() 164 if ('status=403' in stderr or 'status 403' in stderr or 165 '403 Forbidden' in stderr or 166 re.match('.*403.*does not have .* access to .*', stderr)): 167 return PermissionError() 168 if (stderr.startswith('InvalidUriError') or 'No such object' in stderr or 169 'No URLs matched' in stderr or 'One or more URLs matched no' in stderr): 170 return NotFoundError(stderr) 171 if '500 Internal Server Error' in stderr: 172 return ServerError(stderr) 173 return CloudStorageError(stderr) 174 175 176def IsNetworkIOEnabled(): 177 """Returns true if cloud storage is enabled.""" 178 disable_cloud_storage_env_val = os.getenv(DISABLE_CLOUD_STORAGE_IO) 179 180 if disable_cloud_storage_env_val and disable_cloud_storage_env_val != '1': 181 logger.error( 182 'Unsupported value of environment variable ' 183 'DISABLE_CLOUD_STORAGE_IO. Expected None or \'1\' but got %s.', 184 disable_cloud_storage_env_val) 185 186 return disable_cloud_storage_env_val != '1' 187 188 189def List(bucket): 190 query = 'gs://%s/' % bucket 191 stdout = _RunCommand(['ls', query]) 192 return [url[len(query):] for url in stdout.splitlines()] 193 194 195def Exists(bucket, remote_path): 196 try: 197 _RunCommand(['ls', 'gs://%s/%s' % (bucket, remote_path)]) 198 return True 199 except NotFoundError: 200 return False 201 202 203def Move(bucket1, bucket2, remote_path): 204 url1 = 'gs://%s/%s' % (bucket1, remote_path) 205 url2 = 'gs://%s/%s' % (bucket2, remote_path) 206 logger.info('Moving %s to %s', url1, url2) 207 _RunCommand(['mv', url1, url2]) 208 209 210def Copy(bucket_from, bucket_to, remote_path_from, remote_path_to): 211 """Copy a file from one location in CloudStorage to another. 212 213 Args: 214 bucket_from: The cloud storage bucket where the file is currently located. 215 bucket_to: The cloud storage bucket it is being copied to. 216 remote_path_from: The file path where the file is located in bucket_from. 217 remote_path_to: The file path it is being copied to in bucket_to. 218 219 It should: cause no changes locally or to the starting file, and will 220 overwrite any existing files in the destination location. 221 """ 222 url1 = 'gs://%s/%s' % (bucket_from, remote_path_from) 223 url2 = 'gs://%s/%s' % (bucket_to, remote_path_to) 224 logger.info('Copying %s to %s', url1, url2) 225 _RunCommand(['cp', url1, url2]) 226 227 228def Delete(bucket, remote_path): 229 url = 'gs://%s/%s' % (bucket, remote_path) 230 logger.info('Deleting %s', url) 231 _RunCommand(['rm', url]) 232 233 234def Get(bucket, remote_path, local_path): 235 with _FileLock(local_path): 236 _GetLocked(bucket, remote_path, local_path) 237 238 239_CLOUD_STORAGE_GLOBAL_LOCK = os.path.join( 240 os.path.dirname(os.path.abspath(__file__)), 'cloud_storage_global_lock.py') 241 242 243@contextlib.contextmanager 244def _FileLock(base_path): 245 pseudo_lock_path = '%s.pseudo_lock' % base_path 246 _CreateDirectoryIfNecessary(os.path.dirname(pseudo_lock_path)) 247 248 # Make sure that we guard the creation, acquisition, release, and removal of 249 # the pseudo lock all with the same guard (_CLOUD_STORAGE_GLOBAL_LOCK). 250 # Otherwise, we can get nasty interleavings that result in multiple processes 251 # thinking they have an exclusive lock, like: 252 # 253 # (Process 1) Create and acquire the pseudo lock 254 # (Process 1) Release the pseudo lock 255 # (Process 1) Release the file lock 256 # (Process 2) Open and acquire the existing pseudo lock 257 # (Process 1) Delete the (existing) pseudo lock 258 # (Process 3) Create and acquire a new pseudo lock 259 # 260 # Using the same guard for creation and removal of the pseudo lock guarantees 261 # that all processes are referring to the same lock. 262 pseudo_lock_fd = None 263 pseudo_lock_fd_return = [] 264 py_utils.WaitFor(lambda: _AttemptPseudoLockAcquisition(pseudo_lock_path, 265 pseudo_lock_fd_return), 266 LOCK_ACQUISITION_TIMEOUT) 267 pseudo_lock_fd = pseudo_lock_fd_return[0] 268 269 try: 270 yield 271 finally: 272 py_utils.WaitFor(lambda: _AttemptPseudoLockRelease(pseudo_lock_fd), 273 LOCK_ACQUISITION_TIMEOUT) 274 275def _AttemptPseudoLockAcquisition(pseudo_lock_path, pseudo_lock_fd_return): 276 """Try to acquire the lock and return a boolean indicating whether the attempt 277 was successful. If the attempt was successful, pseudo_lock_fd_return, which 278 should be an empty array, will be modified to contain a single entry: the file 279 descriptor of the (now acquired) lock file. 280 281 This whole operation is guarded with the global cloud storage lock, which 282 prevents race conditions that might otherwise cause multiple processes to 283 believe they hold the same pseudo lock (see _FileLock for more details). 284 """ 285 pseudo_lock_fd = None 286 try: 287 with open(_CLOUD_STORAGE_GLOBAL_LOCK) as global_file: 288 with lock.FileLock(global_file, lock.LOCK_EX | lock.LOCK_NB): 289 # Attempt to acquire the lock in a non-blocking manner. If we block, 290 # then we'll cause deadlock because another process will be unable to 291 # acquire the cloud storage global lock in order to release the pseudo 292 # lock. 293 pseudo_lock_fd = open(pseudo_lock_path, 'w') 294 lock.AcquireFileLock(pseudo_lock_fd, lock.LOCK_EX | lock.LOCK_NB) 295 pseudo_lock_fd_return.append(pseudo_lock_fd) 296 return True 297 except (lock.LockException, IOError): 298 # We failed to acquire either the global cloud storage lock or the pseudo 299 # lock. 300 if pseudo_lock_fd: 301 pseudo_lock_fd.close() 302 return False 303 304 305def _AttemptPseudoLockRelease(pseudo_lock_fd): 306 """Try to release the pseudo lock and return a boolean indicating whether 307 the release was succesful. 308 309 This whole operation is guarded with the global cloud storage lock, which 310 prevents race conditions that might otherwise cause multiple processes to 311 believe they hold the same pseudo lock (see _FileLock for more details). 312 """ 313 pseudo_lock_path = pseudo_lock_fd.name 314 try: 315 with open(_CLOUD_STORAGE_GLOBAL_LOCK) as global_file: 316 with lock.FileLock(global_file, lock.LOCK_EX | lock.LOCK_NB): 317 lock.ReleaseFileLock(pseudo_lock_fd) 318 pseudo_lock_fd.close() 319 try: 320 os.remove(pseudo_lock_path) 321 except OSError: 322 # We don't care if the pseudo lock gets removed elsewhere before 323 # we have a chance to do so. 324 pass 325 return True 326 except (lock.LockException, IOError): 327 # We failed to acquire the global cloud storage lock and are thus unable to 328 # release the pseudo lock. 329 return False 330 331 332def _CreateDirectoryIfNecessary(directory): 333 if not os.path.exists(directory): 334 os.makedirs(directory) 335 336 337def _GetLocked(bucket, remote_path, local_path): 338 url = 'gs://%s/%s' % (bucket, remote_path) 339 logger.info('Downloading %s to %s', url, local_path) 340 _CreateDirectoryIfNecessary(os.path.dirname(local_path)) 341 with tempfile.NamedTemporaryFile( 342 dir=os.path.dirname(local_path), 343 delete=False) as partial_download_path: 344 try: 345 # Windows won't download to an open file. 346 partial_download_path.close() 347 try: 348 _RunCommand(['cp', url, partial_download_path.name]) 349 except ServerError: 350 logger.info('Cloud Storage server error, retrying download') 351 _RunCommand(['cp', url, partial_download_path.name]) 352 shutil.move(partial_download_path.name, local_path) 353 finally: 354 if os.path.exists(partial_download_path.name): 355 os.remove(partial_download_path.name) 356 357 358def Insert(bucket, remote_path, local_path, publicly_readable=False): 359 """ Upload file in |local_path| to cloud storage. 360 Args: 361 bucket: the google cloud storage bucket name. 362 remote_path: the remote file path in |bucket|. 363 local_path: path of the local file to be uploaded. 364 publicly_readable: whether the uploaded file has publicly readable 365 permission. 366 367 Returns: 368 The url where the file is uploaded to. 369 """ 370 url = 'gs://%s/%s' % (bucket, remote_path) 371 command_and_args = ['cp'] 372 extra_info = '' 373 if publicly_readable: 374 command_and_args += ['-a', 'public-read'] 375 extra_info = ' (publicly readable)' 376 command_and_args += [local_path, url] 377 logger.info('Uploading %s to %s%s', local_path, url, extra_info) 378 _RunCommand(command_and_args) 379 return 'https://console.developers.google.com/m/cloudstorage/b/%s/o/%s' % ( 380 bucket, remote_path) 381 382 383def GetIfHashChanged(cs_path, download_path, bucket, file_hash): 384 """Downloads |download_path| to |file_path| if |file_path| doesn't exist or 385 it's hash doesn't match |file_hash|. 386 387 Returns: 388 True if the binary was changed. 389 Raises: 390 CredentialsError if the user has no configured credentials. 391 PermissionError if the user does not have permission to access the bucket. 392 NotFoundError if the file is not in the given bucket in cloud_storage. 393 """ 394 with _FileLock(download_path): 395 if (os.path.exists(download_path) and 396 CalculateHash(download_path) == file_hash): 397 return False 398 _GetLocked(bucket, cs_path, download_path) 399 return True 400 401 402def GetIfChanged(file_path, bucket): 403 """Gets the file at file_path if it has a hash file that doesn't match or 404 if there is no local copy of file_path, but there is a hash file for it. 405 406 Returns: 407 True if the binary was changed. 408 Raises: 409 CredentialsError if the user has no configured credentials. 410 PermissionError if the user does not have permission to access the bucket. 411 NotFoundError if the file is not in the given bucket in cloud_storage. 412 """ 413 with _FileLock(file_path): 414 hash_path = file_path + '.sha1' 415 if not os.path.exists(hash_path): 416 logger.warning('Hash file not found: %s', hash_path) 417 return False 418 419 expected_hash = ReadHash(hash_path) 420 if os.path.exists(file_path) and CalculateHash(file_path) == expected_hash: 421 return False 422 _GetLocked(bucket, expected_hash, file_path) 423 return True 424 425 426def GetFilesInDirectoryIfChanged(directory, bucket): 427 """ Scan the directory for .sha1 files, and download them from the given 428 bucket in cloud storage if the local and remote hash don't match or 429 there is no local copy. 430 """ 431 if not os.path.isdir(directory): 432 raise ValueError( 433 '%s does not exist. Must provide a valid directory path.' % directory) 434 # Don't allow the root directory to be a serving_dir. 435 if directory == os.path.abspath(os.sep): 436 raise ValueError('Trying to serve root directory from HTTP server.') 437 for dirpath, _, filenames in os.walk(directory): 438 for filename in filenames: 439 path_name, extension = os.path.splitext( 440 os.path.join(dirpath, filename)) 441 if extension != '.sha1': 442 continue 443 GetIfChanged(path_name, bucket) 444 445 446def CalculateHash(file_path): 447 """Calculates and returns the hash of the file at file_path.""" 448 sha1 = hashlib.sha1() 449 with open(file_path, 'rb') as f: 450 while True: 451 # Read in 1mb chunks, so it doesn't all have to be loaded into memory. 452 chunk = f.read(1024 * 1024) 453 if not chunk: 454 break 455 sha1.update(chunk) 456 return sha1.hexdigest() 457 458 459def ReadHash(hash_path): 460 with open(hash_path, 'rb') as f: 461 return f.read(1024).rstrip() 462