# Copyright 2013 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import contextlib import fnmatch import json import os import pipes import shlex import shutil import subprocess import sys import tempfile import zipfile CHROMIUM_SRC = os.path.join(os.path.dirname(__file__), os.pardir, os.pardir, os.pardir, os.pardir) COLORAMA_ROOT = os.path.join(CHROMIUM_SRC, 'third_party', 'colorama', 'src') @contextlib.contextmanager def TempDir(): dirname = tempfile.mkdtemp() try: yield dirname finally: shutil.rmtree(dirname) def MakeDirectory(dir_path): try: os.makedirs(dir_path) except OSError: pass def DeleteDirectory(dir_path): if os.path.exists(dir_path): shutil.rmtree(dir_path) def Touch(path): MakeDirectory(os.path.dirname(path)) with open(path, 'a'): os.utime(path, None) def FindInDirectory(directory, filename_filter): files = [] for root, _dirnames, filenames in os.walk(directory): matched_files = fnmatch.filter(filenames, filename_filter) files.extend((os.path.join(root, f) for f in matched_files)) return files def FindInDirectories(directories, filename_filter): all_files = [] for directory in directories: all_files.extend(FindInDirectory(directory, filename_filter)) return all_files def ParseGypList(gyp_string): # The ninja generator doesn't support $ in strings, so use ## to # represent $. # TODO(cjhopman): Remove when # https://code.google.com/p/gyp/issues/detail?id=327 # is addressed. gyp_string = gyp_string.replace('##', '$') return shlex.split(gyp_string) def CheckOptions(options, parser, required=None): if not required: return for option_name in required: if getattr(options, option_name) is None: parser.error('--%s is required' % option_name.replace('_', '-')) def WriteJson(obj, path, only_if_changed=False): old_dump = None if os.path.exists(path): with open(path, 'r') as oldfile: old_dump = oldfile.read() new_dump = json.dumps(obj, sort_keys=True, indent=2, separators=(',', ': ')) if not only_if_changed or old_dump != new_dump: with open(path, 'w') as outfile: outfile.write(new_dump) def ReadJson(path): with open(path, 'r') as jsonfile: return json.load(jsonfile) class CalledProcessError(Exception): """This exception is raised when the process run by CheckOutput exits with a non-zero exit code.""" def __init__(self, cwd, args, output): super(CalledProcessError, self).__init__() self.cwd = cwd self.args = args self.output = output def __str__(self): # A user should be able to simply copy and paste the command that failed # into their shell. copyable_command = '( cd {}; {} )'.format(os.path.abspath(self.cwd), ' '.join(map(pipes.quote, self.args))) return 'Command failed: {}\n{}'.format(copyable_command, self.output) # This can be used in most cases like subprocess.check_output(). The output, # particularly when the command fails, better highlights the command's failure. # If the command fails, raises a build_utils.CalledProcessError. def CheckOutput(args, cwd=None, print_stdout=False, print_stderr=True, stdout_filter=None, stderr_filter=None, fail_func=lambda returncode, stderr: returncode != 0): if not cwd: cwd = os.getcwd() child = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=cwd) stdout, stderr = child.communicate() if stdout_filter is not None: stdout = stdout_filter(stdout) if stderr_filter is not None: stderr = stderr_filter(stderr) if fail_func(child.returncode, stderr): raise CalledProcessError(cwd, args, stdout + stderr) if print_stdout: sys.stdout.write(stdout) if print_stderr: sys.stderr.write(stderr) return stdout def GetModifiedTime(path): # For a symlink, the modified time should be the greater of the link's # modified time and the modified time of the target. return max(os.lstat(path).st_mtime, os.stat(path).st_mtime) def IsTimeStale(output, inputs): if not os.path.exists(output): return True output_time = GetModifiedTime(output) for i in inputs: if GetModifiedTime(i) > output_time: return True return False def IsDeviceReady(): device_state = CheckOutput(['adb', 'get-state']) return device_state.strip() == 'device' def CheckZipPath(name): if os.path.normpath(name) != name: raise Exception('Non-canonical zip path: %s' % name) if os.path.isabs(name): raise Exception('Absolute zip path: %s' % name) def ExtractAll(zip_path, path=None, no_clobber=True): if path is None: path = os.getcwd() elif not os.path.exists(path): MakeDirectory(path) with zipfile.ZipFile(zip_path) as z: for name in z.namelist(): CheckZipPath(name) if no_clobber: output_path = os.path.join(path, name) if os.path.exists(output_path): raise Exception( 'Path already exists from zip: %s %s %s' % (zip_path, name, output_path)) z.extractall(path=path) def DoZip(inputs, output, base_dir): with zipfile.ZipFile(output, 'w') as outfile: for f in inputs: CheckZipPath(os.path.relpath(f, base_dir)) outfile.write(f, os.path.relpath(f, base_dir)) def PrintWarning(message): print 'WARNING: ' + message def PrintBigWarning(message): print '***** ' * 8 PrintWarning(message) print '***** ' * 8