1# -*- coding: utf-8 -*- 2# Copyright 2015 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6"""Handle path inference and translation.""" 7 8from __future__ import print_function 9 10import collections 11import os 12import tempfile 13 14from autotest_lib.utils.frozen_chromite.lib import constants 15from autotest_lib.utils.frozen_chromite.lib import cros_build_lib 16from autotest_lib.utils.frozen_chromite.lib import git 17from autotest_lib.utils.frozen_chromite.lib import osutils 18from autotest_lib.utils.frozen_chromite.utils import memoize 19 20 21GENERAL_CACHE_DIR = '.cache' 22CHROME_CACHE_DIR = 'cros_cache' 23 24CHECKOUT_TYPE_UNKNOWN = 'unknown' 25CHECKOUT_TYPE_GCLIENT = 'gclient' 26CHECKOUT_TYPE_REPO = 'repo' 27 28CheckoutInfo = collections.namedtuple( 29 'CheckoutInfo', ['type', 'root', 'chrome_src_dir']) 30 31 32class ChrootPathResolver(object): 33 """Perform path resolution to/from the chroot. 34 35 Attributes: 36 source_path: Value to override default source root inference. 37 source_from_path_repo: Whether to infer the source root from the converted 38 path's repo parent during inbound translation; overrides |source_path|. 39 """ 40 41 # TODO(garnold) We currently infer the source root based on the path's own 42 # encapsulating repository. This is a heuristic catering to paths are being 43 # translated to be used in a chroot that's not associated with the currently 44 # executing code (for example, cbuildbot run on a build root or a foreign 45 # tree checkout). This approach might result in arbitrary repo-contained 46 # paths being translated to invalid chroot paths where they actually should 47 # not, and other valid source paths failing to translate because they are not 48 # repo-contained. Eventually we'll want to make this behavior explicit, by 49 # either passing a source_root value, or requesting to infer it from the path 50 # (source_from_path_repo=True), but otherwise defaulting to the executing 51 # code's source root in the normal case. When that happens, we'll be 52 # switching source_from_path_repo to False by default. See chromium:485746. 53 54 def __init__(self, source_path=None, source_from_path_repo=True): 55 self._inside_chroot = cros_build_lib.IsInsideChroot() 56 self._source_path = (constants.SOURCE_ROOT if source_path is None 57 else source_path) 58 self._source_from_path_repo = source_from_path_repo 59 60 # The following are only needed if outside the chroot. 61 if self._inside_chroot: 62 self._chroot_path = None 63 self._chroot_link = None 64 self._chroot_to_host_roots = None 65 else: 66 self._chroot_path = self._GetSourcePathChroot(self._source_path) 67 # The chroot link allows us to resolve paths when the chroot is symlinked 68 # to the default location. This is generally not used, but it is useful 69 # for CI for optimization purposes. We will trust them not to do something 70 # dumb, like symlink to /, but this doesn't enable that kind of behavior 71 # anyway, just allows resolving paths correctly from outside the chroot. 72 self._chroot_link = self._ReadChrootLink(self._chroot_path) 73 74 # Initialize mapping of known root bind mounts. 75 self._chroot_to_host_roots = ( 76 (constants.CHROOT_SOURCE_ROOT, self._source_path), 77 (constants.CHROOT_CACHE_ROOT, self._GetCachePath), 78 ) 79 80 @classmethod 81 @memoize.MemoizedSingleCall 82 def _GetCachePath(cls): 83 """Returns the cache directory.""" 84 return os.path.realpath(GetCacheDir()) 85 86 def _GetSourcePathChroot(self, source_path): 87 """Returns path to the chroot directory of a given source root.""" 88 if source_path is None: 89 return None 90 return os.path.join(source_path, constants.DEFAULT_CHROOT_DIR) 91 92 def _ReadChrootLink(self, path): 93 """Convert a chroot symlink to its absolute path. 94 95 This contains defaults/edge cases assumptions for chroot paths. Not 96 recommended for non-chroot paths. 97 98 Args: 99 path (str|None): The path to resolve. 100 101 Returns: 102 str|None: The resolved path if the provided path is a symlink, None 103 otherwise. 104 """ 105 # Mainly for the "if self._source_from_path_repo:" branch in _GetChrootPath. 106 # _GetSourcePathChroot can return None, so double check it here. 107 if not path: 108 return None 109 110 abs_path = os.path.abspath(path) 111 link = osutils.ResolveSymlink(abs_path) 112 113 # ResolveSymlink returns the passed path when the path isn't a symlink. We 114 # can skip some redundant work when its falling back on the link when the 115 # chroot is not a symlink. 116 if link == abs_path: 117 return None 118 119 return link 120 121 def _TranslatePath(self, path, src_root, dst_root_input): 122 """If |path| starts with |src_root|, replace it using |dst_root_input|. 123 124 Args: 125 path: An absolute path we want to convert to a destination equivalent. 126 src_root: The root that path needs to be contained in. 127 dst_root_input: The root we want to relocate the relative path into, or a 128 function returning this value. 129 130 Returns: 131 A translated path, or None if |src_root| is not a prefix of |path|. 132 133 Raises: 134 ValueError: If |src_root| is a prefix but |dst_root_input| yields None, 135 which means we don't have sufficient information to do the translation. 136 """ 137 if not path.startswith(os.path.join(src_root, '')) and path != src_root: 138 return None 139 dst_root = dst_root_input() if callable(dst_root_input) else dst_root_input 140 if dst_root is None: 141 raise ValueError('No target root to translate path to') 142 return os.path.join(dst_root, path[len(src_root):].lstrip(os.path.sep)) 143 144 def _GetChrootPath(self, path): 145 """Translates a fully-expanded host |path| into a chroot equivalent. 146 147 This checks path prefixes in order from the most to least "contained": the 148 chroot itself, then the cache directory, and finally the source tree. The 149 idea is to return the shortest possible chroot equivalent. 150 151 Args: 152 path: A host path to translate. 153 154 Returns: 155 An equivalent chroot path. 156 157 Raises: 158 ValueError: If |path| is not reachable from the chroot. 159 """ 160 new_path = None 161 162 # Preliminary: compute the actual source and chroot paths to use. These are 163 # generally the precomputed values, unless we're inferring the source root 164 # from the path itself. 165 source_path = self._source_path 166 chroot_path = self._chroot_path 167 chroot_link = self._chroot_link 168 169 if self._source_from_path_repo: 170 path_repo_dir = git.FindRepoDir(path) 171 if path_repo_dir is not None: 172 source_path = os.path.abspath(os.path.join(path_repo_dir, '..')) 173 chroot_path = self._GetSourcePathChroot(source_path) 174 chroot_link = self._ReadChrootLink(chroot_path) 175 176 # First, check if the path happens to be in the chroot already. 177 if chroot_path is not None: 178 new_path = self._TranslatePath(path, chroot_path, '/') 179 # Or in the symlinked dir. 180 if new_path is None and chroot_link is not None: 181 new_path = self._TranslatePath(path, chroot_link, '/') 182 183 # Second, check the cache directory. 184 if new_path is None: 185 new_path = self._TranslatePath(path, self._GetCachePath(), 186 constants.CHROOT_CACHE_ROOT) 187 188 # Finally, check the current SDK checkout tree. 189 if new_path is None and source_path is not None: 190 new_path = self._TranslatePath(path, source_path, 191 constants.CHROOT_SOURCE_ROOT) 192 193 if new_path is None: 194 raise ValueError('Path is not reachable from the chroot') 195 196 return new_path 197 198 def _GetHostPath(self, path): 199 """Translates a fully-expanded chroot |path| into a host equivalent. 200 201 We first attempt translation of known roots (source). If any is successful, 202 we check whether the result happens to point back to the chroot, in which 203 case we trim the chroot path prefix and recurse. If neither was successful, 204 just prepend the chroot path. 205 206 Args: 207 path: A chroot path to translate. 208 209 Returns: 210 An equivalent host path. 211 212 Raises: 213 ValueError: If |path| could not be mapped to a proper host destination. 214 """ 215 new_path = None 216 217 # Attempt resolution of known roots. 218 for src_root, dst_root in self._chroot_to_host_roots: 219 new_path = self._TranslatePath(path, src_root, dst_root) 220 if new_path is not None: 221 break 222 223 if new_path is None: 224 # If no known root was identified, just prepend the chroot path. 225 new_path = self._TranslatePath(path, '', self._chroot_path) 226 else: 227 # Check whether the resolved path happens to point back at the chroot, in 228 # which case trim the chroot path or link prefix and continue recursively. 229 path = self._TranslatePath(new_path, self._chroot_path, '/') 230 if path is None and self._chroot_link: 231 path = self._TranslatePath(new_path, self._chroot_link, '/') 232 233 if path is not None: 234 new_path = self._GetHostPath(path) 235 236 return new_path 237 238 def _ConvertPath(self, path, get_converted_path): 239 """Expands |path|; if outside the chroot, applies |get_converted_path|. 240 241 Args: 242 path: A path to be converted. 243 get_converted_path: A conversion function. 244 245 Returns: 246 An expanded and (if needed) converted path. 247 248 Raises: 249 ValueError: If path conversion failed. 250 """ 251 # NOTE: We do not want to expand wrapper script symlinks because this 252 # prevents them from working. Therefore, if the path points to a file we 253 # only resolve its dirname but leave the basename intact. This means our 254 # path resolution might return unusable results for file symlinks that 255 # point outside the reachable space. These are edge cases in which the user 256 # is expected to resolve the realpath themselves in advance. 257 expanded_path = os.path.expanduser(path) 258 if os.path.isfile(expanded_path): 259 expanded_path = os.path.join( 260 os.path.realpath(os.path.dirname(expanded_path)), 261 os.path.basename(expanded_path)) 262 else: 263 expanded_path = os.path.realpath(expanded_path) 264 265 if self._inside_chroot: 266 return expanded_path 267 268 try: 269 return get_converted_path(expanded_path) 270 except ValueError as e: 271 raise ValueError('%s: %s' % (e, path)) 272 273 def ToChroot(self, path): 274 """Resolves current environment |path| for use in the chroot.""" 275 return self._ConvertPath(path, self._GetChrootPath) 276 277 def FromChroot(self, path): 278 """Resolves chroot |path| for use in the current environment.""" 279 return self._ConvertPath(path, self._GetHostPath) 280 281 282def DetermineCheckout(cwd=None): 283 """Gather information on the checkout we are in. 284 285 There are several checkout types, as defined by CHECKOUT_TYPE_XXX variables. 286 This function determines what checkout type |cwd| is in, for example, if |cwd| 287 belongs to a `repo` checkout. 288 289 Returns: 290 A CheckoutInfo object with these attributes: 291 type: The type of checkout. Valid values are CHECKOUT_TYPE_*. 292 root: The root of the checkout. 293 chrome_src_dir: If the checkout is a Chrome checkout, the path to the 294 Chrome src/ directory. 295 """ 296 checkout_type = CHECKOUT_TYPE_UNKNOWN 297 root, path = None, None 298 299 cwd = cwd or os.getcwd() 300 for path in osutils.IteratePathParents(cwd): 301 gclient_file = os.path.join(path, '.gclient') 302 if os.path.exists(gclient_file): 303 checkout_type = CHECKOUT_TYPE_GCLIENT 304 break 305 repo_dir = os.path.join(path, '.repo') 306 if os.path.isdir(repo_dir): 307 checkout_type = CHECKOUT_TYPE_REPO 308 break 309 310 if checkout_type != CHECKOUT_TYPE_UNKNOWN: 311 root = path 312 313 # Determine the chrome src directory. 314 chrome_src_dir = None 315 if checkout_type == CHECKOUT_TYPE_GCLIENT: 316 chrome_src_dir = os.path.join(root, 'src') 317 318 return CheckoutInfo(checkout_type, root, chrome_src_dir) 319 320 321def FindCacheDir(): 322 """Returns the cache directory location based on the checkout type.""" 323 checkout = DetermineCheckout() 324 if checkout.type == CHECKOUT_TYPE_REPO: 325 return os.path.join(checkout.root, GENERAL_CACHE_DIR) 326 elif checkout.type == CHECKOUT_TYPE_GCLIENT: 327 return os.path.join(checkout.chrome_src_dir, 'build', CHROME_CACHE_DIR) 328 elif checkout.type == CHECKOUT_TYPE_UNKNOWN: 329 return os.path.join(tempfile.gettempdir(), 'chromeos-cache') 330 else: 331 raise AssertionError('Unexpected type %s' % checkout.type) 332 333 334def GetCacheDir(): 335 """Returns the current cache dir.""" 336 return os.environ.get(constants.SHARED_CACHE_ENVVAR, FindCacheDir()) 337 338 339def ToChrootPath(path, source_path=None): 340 """Resolves current environment |path| for use in the chroot. 341 342 Args: 343 path: string path to translate into chroot namespace. 344 source_path: string path to root of source checkout with chroot in it. 345 346 Returns: 347 The same path converted to "inside chroot" namespace. 348 349 Raises: 350 ValueError: If the path references a location not available in the chroot. 351 """ 352 return ChrootPathResolver(source_path=source_path).ToChroot(path) 353 354 355def FromChrootPath(path, source_path=None): 356 """Resolves chroot |path| for use in the current environment. 357 358 Args: 359 path: string path to translate out of chroot namespace. 360 source_path: string path to root of source checkout with chroot in it. 361 362 Returns: 363 The same path converted to "outside chroot" namespace. 364 """ 365 return ChrootPathResolver(source_path=source_path).FromChroot(path) 366