1""" 2A Path-like interface for zipfiles. 3 4This codebase is shared between zipfile.Path in the stdlib 5and zipp in PyPI. See 6https://github.com/python/importlib_metadata/wiki/Development-Methodology 7for more detail. 8""" 9 10import io 11import posixpath 12import zipfile 13import itertools 14import contextlib 15import pathlib 16import re 17import stat 18import sys 19 20from .glob import Translator 21 22 23__all__ = ['Path'] 24 25 26def _parents(path): 27 """ 28 Given a path with elements separated by 29 posixpath.sep, generate all parents of that path. 30 31 >>> list(_parents('b/d')) 32 ['b'] 33 >>> list(_parents('/b/d/')) 34 ['/b'] 35 >>> list(_parents('b/d/f/')) 36 ['b/d', 'b'] 37 >>> list(_parents('b')) 38 [] 39 >>> list(_parents('')) 40 [] 41 """ 42 return itertools.islice(_ancestry(path), 1, None) 43 44 45def _ancestry(path): 46 """ 47 Given a path with elements separated by 48 posixpath.sep, generate all elements of that path. 49 50 >>> list(_ancestry('b/d')) 51 ['b/d', 'b'] 52 >>> list(_ancestry('/b/d/')) 53 ['/b/d', '/b'] 54 >>> list(_ancestry('b/d/f/')) 55 ['b/d/f', 'b/d', 'b'] 56 >>> list(_ancestry('b')) 57 ['b'] 58 >>> list(_ancestry('')) 59 [] 60 61 Multiple separators are treated like a single. 62 63 >>> list(_ancestry('//b//d///f//')) 64 ['//b//d///f', '//b//d', '//b'] 65 """ 66 path = path.rstrip(posixpath.sep) 67 while path.rstrip(posixpath.sep): 68 yield path 69 path, tail = posixpath.split(path) 70 71 72_dedupe = dict.fromkeys 73"""Deduplicate an iterable in original order""" 74 75 76def _difference(minuend, subtrahend): 77 """ 78 Return items in minuend not in subtrahend, retaining order 79 with O(1) lookup. 80 """ 81 return itertools.filterfalse(set(subtrahend).__contains__, minuend) 82 83 84class InitializedState: 85 """ 86 Mix-in to save the initialization state for pickling. 87 """ 88 89 def __init__(self, *args, **kwargs): 90 self.__args = args 91 self.__kwargs = kwargs 92 super().__init__(*args, **kwargs) 93 94 def __getstate__(self): 95 return self.__args, self.__kwargs 96 97 def __setstate__(self, state): 98 args, kwargs = state 99 super().__init__(*args, **kwargs) 100 101 102class CompleteDirs(InitializedState, zipfile.ZipFile): 103 """ 104 A ZipFile subclass that ensures that implied directories 105 are always included in the namelist. 106 107 >>> list(CompleteDirs._implied_dirs(['foo/bar.txt', 'foo/bar/baz.txt'])) 108 ['foo/', 'foo/bar/'] 109 >>> list(CompleteDirs._implied_dirs(['foo/bar.txt', 'foo/bar/baz.txt', 'foo/bar/'])) 110 ['foo/'] 111 """ 112 113 @staticmethod 114 def _implied_dirs(names): 115 parents = itertools.chain.from_iterable(map(_parents, names)) 116 as_dirs = (p + posixpath.sep for p in parents) 117 return _dedupe(_difference(as_dirs, names)) 118 119 def namelist(self): 120 names = super().namelist() 121 return names + list(self._implied_dirs(names)) 122 123 def _name_set(self): 124 return set(self.namelist()) 125 126 def resolve_dir(self, name): 127 """ 128 If the name represents a directory, return that name 129 as a directory (with the trailing slash). 130 """ 131 names = self._name_set() 132 dirname = name + '/' 133 dir_match = name not in names and dirname in names 134 return dirname if dir_match else name 135 136 def getinfo(self, name): 137 """ 138 Supplement getinfo for implied dirs. 139 """ 140 try: 141 return super().getinfo(name) 142 except KeyError: 143 if not name.endswith('/') or name not in self._name_set(): 144 raise 145 return zipfile.ZipInfo(filename=name) 146 147 @classmethod 148 def make(cls, source): 149 """ 150 Given a source (filename or zipfile), return an 151 appropriate CompleteDirs subclass. 152 """ 153 if isinstance(source, CompleteDirs): 154 return source 155 156 if not isinstance(source, zipfile.ZipFile): 157 return cls(source) 158 159 # Only allow for FastLookup when supplied zipfile is read-only 160 if 'r' not in source.mode: 161 cls = CompleteDirs 162 163 source.__class__ = cls 164 return source 165 166 @classmethod 167 def inject(cls, zf: zipfile.ZipFile) -> zipfile.ZipFile: 168 """ 169 Given a writable zip file zf, inject directory entries for 170 any directories implied by the presence of children. 171 """ 172 for name in cls._implied_dirs(zf.namelist()): 173 zf.writestr(name, b"") 174 return zf 175 176 177class FastLookup(CompleteDirs): 178 """ 179 ZipFile subclass to ensure implicit 180 dirs exist and are resolved rapidly. 181 """ 182 183 def namelist(self): 184 with contextlib.suppress(AttributeError): 185 return self.__names 186 self.__names = super().namelist() 187 return self.__names 188 189 def _name_set(self): 190 with contextlib.suppress(AttributeError): 191 return self.__lookup 192 self.__lookup = super()._name_set() 193 return self.__lookup 194 195 196def _extract_text_encoding(encoding=None, *args, **kwargs): 197 # compute stack level so that the caller of the caller sees any warning. 198 is_pypy = sys.implementation.name == 'pypy' 199 stack_level = 3 + is_pypy 200 return io.text_encoding(encoding, stack_level), args, kwargs 201 202 203class Path: 204 """ 205 A :class:`importlib.resources.abc.Traversable` interface for zip files. 206 207 Implements many of the features users enjoy from 208 :class:`pathlib.Path`. 209 210 Consider a zip file with this structure:: 211 212 . 213 ├── a.txt 214 └── b 215 ├── c.txt 216 └── d 217 └── e.txt 218 219 >>> data = io.BytesIO() 220 >>> zf = ZipFile(data, 'w') 221 >>> zf.writestr('a.txt', 'content of a') 222 >>> zf.writestr('b/c.txt', 'content of c') 223 >>> zf.writestr('b/d/e.txt', 'content of e') 224 >>> zf.filename = 'mem/abcde.zip' 225 226 Path accepts the zipfile object itself or a filename 227 228 >>> path = Path(zf) 229 230 From there, several path operations are available. 231 232 Directory iteration (including the zip file itself): 233 234 >>> a, b = path.iterdir() 235 >>> a 236 Path('mem/abcde.zip', 'a.txt') 237 >>> b 238 Path('mem/abcde.zip', 'b/') 239 240 name property: 241 242 >>> b.name 243 'b' 244 245 join with divide operator: 246 247 >>> c = b / 'c.txt' 248 >>> c 249 Path('mem/abcde.zip', 'b/c.txt') 250 >>> c.name 251 'c.txt' 252 253 Read text: 254 255 >>> c.read_text(encoding='utf-8') 256 'content of c' 257 258 existence: 259 260 >>> c.exists() 261 True 262 >>> (b / 'missing.txt').exists() 263 False 264 265 Coercion to string: 266 267 >>> import os 268 >>> str(c).replace(os.sep, posixpath.sep) 269 'mem/abcde.zip/b/c.txt' 270 271 At the root, ``name``, ``filename``, and ``parent`` 272 resolve to the zipfile. 273 274 >>> str(path) 275 'mem/abcde.zip/' 276 >>> path.name 277 'abcde.zip' 278 >>> path.filename == pathlib.Path('mem/abcde.zip') 279 True 280 >>> str(path.parent) 281 'mem' 282 283 If the zipfile has no filename, such attributes are not 284 valid and accessing them will raise an Exception. 285 286 >>> zf.filename = None 287 >>> path.name 288 Traceback (most recent call last): 289 ... 290 TypeError: ... 291 292 >>> path.filename 293 Traceback (most recent call last): 294 ... 295 TypeError: ... 296 297 >>> path.parent 298 Traceback (most recent call last): 299 ... 300 TypeError: ... 301 302 # workaround python/cpython#106763 303 >>> pass 304 """ 305 306 __repr = "{self.__class__.__name__}({self.root.filename!r}, {self.at!r})" 307 308 def __init__(self, root, at=""): 309 """ 310 Construct a Path from a ZipFile or filename. 311 312 Note: When the source is an existing ZipFile object, 313 its type (__class__) will be mutated to a 314 specialized type. If the caller wishes to retain the 315 original type, the caller should either create a 316 separate ZipFile object or pass a filename. 317 """ 318 self.root = FastLookup.make(root) 319 self.at = at 320 321 def __eq__(self, other): 322 """ 323 >>> Path(zipfile.ZipFile(io.BytesIO(), 'w')) == 'foo' 324 False 325 """ 326 if self.__class__ is not other.__class__: 327 return NotImplemented 328 return (self.root, self.at) == (other.root, other.at) 329 330 def __hash__(self): 331 return hash((self.root, self.at)) 332 333 def open(self, mode='r', *args, pwd=None, **kwargs): 334 """ 335 Open this entry as text or binary following the semantics 336 of ``pathlib.Path.open()`` by passing arguments through 337 to io.TextIOWrapper(). 338 """ 339 if self.is_dir(): 340 raise IsADirectoryError(self) 341 zip_mode = mode[0] 342 if zip_mode == 'r' and not self.exists(): 343 raise FileNotFoundError(self) 344 stream = self.root.open(self.at, zip_mode, pwd=pwd) 345 if 'b' in mode: 346 if args or kwargs: 347 raise ValueError("encoding args invalid for binary operation") 348 return stream 349 # Text mode: 350 encoding, args, kwargs = _extract_text_encoding(*args, **kwargs) 351 return io.TextIOWrapper(stream, encoding, *args, **kwargs) 352 353 def _base(self): 354 return pathlib.PurePosixPath(self.at or self.root.filename) 355 356 @property 357 def name(self): 358 return self._base().name 359 360 @property 361 def suffix(self): 362 return self._base().suffix 363 364 @property 365 def suffixes(self): 366 return self._base().suffixes 367 368 @property 369 def stem(self): 370 return self._base().stem 371 372 @property 373 def filename(self): 374 return pathlib.Path(self.root.filename).joinpath(self.at) 375 376 def read_text(self, *args, **kwargs): 377 encoding, args, kwargs = _extract_text_encoding(*args, **kwargs) 378 with self.open('r', encoding, *args, **kwargs) as strm: 379 return strm.read() 380 381 def read_bytes(self): 382 with self.open('rb') as strm: 383 return strm.read() 384 385 def _is_child(self, path): 386 return posixpath.dirname(path.at.rstrip("/")) == self.at.rstrip("/") 387 388 def _next(self, at): 389 return self.__class__(self.root, at) 390 391 def is_dir(self): 392 return not self.at or self.at.endswith("/") 393 394 def is_file(self): 395 return self.exists() and not self.is_dir() 396 397 def exists(self): 398 return self.at in self.root._name_set() 399 400 def iterdir(self): 401 if not self.is_dir(): 402 raise ValueError("Can't listdir a file") 403 subs = map(self._next, self.root.namelist()) 404 return filter(self._is_child, subs) 405 406 def match(self, path_pattern): 407 return pathlib.PurePosixPath(self.at).match(path_pattern) 408 409 def is_symlink(self): 410 """ 411 Return whether this path is a symlink. 412 """ 413 info = self.root.getinfo(self.at) 414 mode = info.external_attr >> 16 415 return stat.S_ISLNK(mode) 416 417 def glob(self, pattern): 418 if not pattern: 419 raise ValueError(f"Unacceptable pattern: {pattern!r}") 420 421 prefix = re.escape(self.at) 422 tr = Translator(seps='/') 423 matches = re.compile(prefix + tr.translate(pattern)).fullmatch 424 return map(self._next, filter(matches, self.root.namelist())) 425 426 def rglob(self, pattern): 427 return self.glob(f'**/{pattern}') 428 429 def relative_to(self, other, *extra): 430 return posixpath.relpath(str(self), str(other.joinpath(*extra))) 431 432 def __str__(self): 433 return posixpath.join(self.root.filename, self.at) 434 435 def __repr__(self): 436 return self.__repr.format(self=self) 437 438 def joinpath(self, *other): 439 next = posixpath.join(self.at, *other) 440 return self._next(self.root.resolve_dir(next)) 441 442 __truediv__ = joinpath 443 444 @property 445 def parent(self): 446 if not self.at: 447 return self.filename.parent 448 parent_at = posixpath.dirname(self.at.rstrip('/')) 449 if parent_at: 450 parent_at += '/' 451 return self._next(parent_at) 452