# Copyright 2013 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import fnmatch import json import os import pipes import shlex import shutil import subprocess import sys import traceback def MakeDirectory(dir_path): try: os.makedirs(dir_path) except OSError: pass def DeleteDirectory(dir_path): if os.path.exists(dir_path): shutil.rmtree(dir_path) def Touch(path): MakeDirectory(os.path.dirname(path)) with open(path, 'a'): os.utime(path, None) def FindInDirectory(directory, filter): files = [] for root, dirnames, filenames in os.walk(directory): matched_files = fnmatch.filter(filenames, filter) files.extend((os.path.join(root, f) for f in matched_files)) return files def FindInDirectories(directories, filter): all_files = [] for directory in directories: all_files.extend(FindInDirectory(directory, filter)) return all_files def ParseGypList(gyp_string): # The ninja generator doesn't support $ in strings, so use ## to # represent $. # TODO(cjhopman): Remove when # https://code.google.com/p/gyp/issues/detail?id=327 # is addressed. gyp_string = gyp_string.replace('##', '$') return shlex.split(gyp_string) def CheckOptions(options, parser, required=[]): for option_name in required: if not getattr(options, option_name): parser.error('--%s is required' % option_name.replace('_', '-')) def WriteJson(obj, path, only_if_changed=False): old_dump = None if os.path.exists(path): with open(path, 'r') as oldfile: old_dump = oldfile.read() new_dump = json.dumps(obj) if not only_if_changed or old_dump != new_dump: with open(path, 'w') as outfile: outfile.write(new_dump) def ReadJson(path): with open(path, 'r') as jsonfile: return json.load(jsonfile) class CalledProcessError(Exception): """This exception is raised when the process run by CheckOutput exits with a non-zero exit code.""" def __init__(self, cwd, args, output): self.cwd = cwd self.args = args self.output = output def __str__(self): # A user should be able to simply copy and paste the command that failed # into their shell. copyable_command = '( cd {}; {} )'.format(os.path.abspath(self.cwd), ' '.join(map(pipes.quote, self.args))) return 'Command failed: {}\n{}'.format(copyable_command, self.output) # This can be used in most cases like subprocess.check_output(). The output, # particularly when the command fails, better highlights the command's failure. # If the command fails, raises a build_utils.CalledProcessError. def CheckOutput(args, cwd=None, print_stdout=False, print_stderr=True, fail_if_stderr=False): if not cwd: cwd = os.getcwd() child = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=cwd) stdout, stderr = child.communicate() if child.returncode or (stderr and fail_if_stderr): raise CalledProcessError(cwd, args, stdout + stderr) if print_stdout: sys.stdout.write(stdout) if print_stderr: sys.stderr.write(stderr) return stdout def GetModifiedTime(path): # For a symlink, the modified time should be the greater of the link's # modified time and the modified time of the target. return max(os.lstat(path).st_mtime, os.stat(path).st_mtime) def IsTimeStale(output, inputs): if not os.path.exists(output): return True output_time = GetModifiedTime(output) for input in inputs: if GetModifiedTime(input) > output_time: return True return False def IsDeviceReady(): device_state = CheckOutput(['adb', 'get-state']) return device_state.strip() == 'device' def PrintWarning(message): print 'WARNING: ' + message def PrintBigWarning(message): print '***** ' * 8 PrintWarning(message) print '***** ' * 8