1# Copyright (C) 2003 Python Software Foundation 2 3import unittest 4import unittest.mock 5import shutil 6import tempfile 7import sys 8import stat 9import os 10import os.path 11import errno 12import functools 13import pathlib 14import subprocess 15import random 16import string 17import contextlib 18import io 19from shutil import (make_archive, 20 register_archive_format, unregister_archive_format, 21 get_archive_formats, Error, unpack_archive, 22 register_unpack_format, RegistryError, 23 unregister_unpack_format, get_unpack_formats, 24 SameFileError, _GiveupOnFastCopy) 25import tarfile 26import zipfile 27try: 28 import posix 29except ImportError: 30 posix = None 31 32from test import support 33from test.support import TESTFN, FakePath 34 35TESTFN2 = TESTFN + "2" 36MACOS = sys.platform.startswith("darwin") 37AIX = sys.platform[:3] == 'aix' 38try: 39 import grp 40 import pwd 41 UID_GID_SUPPORT = True 42except ImportError: 43 UID_GID_SUPPORT = False 44 45try: 46 import _winapi 47except ImportError: 48 _winapi = None 49 50def _fake_rename(*args, **kwargs): 51 # Pretend the destination path is on a different filesystem. 52 raise OSError(getattr(errno, 'EXDEV', 18), "Invalid cross-device link") 53 54def mock_rename(func): 55 @functools.wraps(func) 56 def wrap(*args, **kwargs): 57 try: 58 builtin_rename = os.rename 59 os.rename = _fake_rename 60 return func(*args, **kwargs) 61 finally: 62 os.rename = builtin_rename 63 return wrap 64 65def write_file(path, content, binary=False): 66 """Write *content* to a file located at *path*. 67 68 If *path* is a tuple instead of a string, os.path.join will be used to 69 make a path. If *binary* is true, the file will be opened in binary 70 mode. 71 """ 72 if isinstance(path, tuple): 73 path = os.path.join(*path) 74 with open(path, 'wb' if binary else 'w') as fp: 75 fp.write(content) 76 77def write_test_file(path, size): 78 """Create a test file with an arbitrary size and random text content.""" 79 def chunks(total, step): 80 assert total >= step 81 while total > step: 82 yield step 83 total -= step 84 if total: 85 yield total 86 87 bufsize = min(size, 8192) 88 chunk = b"".join([random.choice(string.ascii_letters).encode() 89 for i in range(bufsize)]) 90 with open(path, 'wb') as f: 91 for csize in chunks(size, bufsize): 92 f.write(chunk) 93 assert os.path.getsize(path) == size 94 95def read_file(path, binary=False): 96 """Return contents from a file located at *path*. 97 98 If *path* is a tuple instead of a string, os.path.join will be used to 99 make a path. If *binary* is true, the file will be opened in binary 100 mode. 101 """ 102 if isinstance(path, tuple): 103 path = os.path.join(*path) 104 with open(path, 'rb' if binary else 'r') as fp: 105 return fp.read() 106 107def rlistdir(path): 108 res = [] 109 for name in sorted(os.listdir(path)): 110 p = os.path.join(path, name) 111 if os.path.isdir(p) and not os.path.islink(p): 112 res.append(name + '/') 113 for n in rlistdir(p): 114 res.append(name + '/' + n) 115 else: 116 res.append(name) 117 return res 118 119def supports_file2file_sendfile(): 120 # ...apparently Linux and Solaris are the only ones 121 if not hasattr(os, "sendfile"): 122 return False 123 srcname = None 124 dstname = None 125 try: 126 with tempfile.NamedTemporaryFile("wb", delete=False) as f: 127 srcname = f.name 128 f.write(b"0123456789") 129 130 with open(srcname, "rb") as src: 131 with tempfile.NamedTemporaryFile("wb", delete=False) as dst: 132 dstname = dst.name 133 infd = src.fileno() 134 outfd = dst.fileno() 135 try: 136 os.sendfile(outfd, infd, 0, 2) 137 except OSError: 138 return False 139 else: 140 return True 141 finally: 142 if srcname is not None: 143 support.unlink(srcname) 144 if dstname is not None: 145 support.unlink(dstname) 146 147 148SUPPORTS_SENDFILE = supports_file2file_sendfile() 149 150# AIX 32-bit mode, by default, lacks enough memory for the xz/lzma compiler test 151# The AIX command 'dump -o program' gives XCOFF header information 152# The second word of the last line in the maxdata value 153# when 32-bit maxdata must be greater than 0x1000000 for the xz test to succeed 154def _maxdataOK(): 155 if AIX and sys.maxsize == 2147483647: 156 hdrs=subprocess.getoutput("/usr/bin/dump -o %s" % sys.executable) 157 maxdata=hdrs.split("\n")[-1].split()[1] 158 return int(maxdata,16) >= 0x20000000 159 else: 160 return True 161 162class TestShutil(unittest.TestCase): 163 164 def setUp(self): 165 super(TestShutil, self).setUp() 166 self.tempdirs = [] 167 168 def tearDown(self): 169 super(TestShutil, self).tearDown() 170 while self.tempdirs: 171 d = self.tempdirs.pop() 172 shutil.rmtree(d, os.name in ('nt', 'cygwin')) 173 174 175 def mkdtemp(self): 176 """Create a temporary directory that will be cleaned up. 177 178 Returns the path of the directory. 179 """ 180 basedir = None 181 if sys.platform == "win32": 182 basedir = os.path.realpath(os.getcwd()) 183 d = tempfile.mkdtemp(dir=basedir) 184 self.tempdirs.append(d) 185 return d 186 187 def test_rmtree_works_on_bytes(self): 188 tmp = self.mkdtemp() 189 victim = os.path.join(tmp, 'killme') 190 os.mkdir(victim) 191 write_file(os.path.join(victim, 'somefile'), 'foo') 192 victim = os.fsencode(victim) 193 self.assertIsInstance(victim, bytes) 194 shutil.rmtree(victim) 195 196 @support.skip_unless_symlink 197 def test_rmtree_fails_on_symlink(self): 198 tmp = self.mkdtemp() 199 dir_ = os.path.join(tmp, 'dir') 200 os.mkdir(dir_) 201 link = os.path.join(tmp, 'link') 202 os.symlink(dir_, link) 203 self.assertRaises(OSError, shutil.rmtree, link) 204 self.assertTrue(os.path.exists(dir_)) 205 self.assertTrue(os.path.lexists(link)) 206 errors = [] 207 def onerror(*args): 208 errors.append(args) 209 shutil.rmtree(link, onerror=onerror) 210 self.assertEqual(len(errors), 1) 211 self.assertIs(errors[0][0], os.path.islink) 212 self.assertEqual(errors[0][1], link) 213 self.assertIsInstance(errors[0][2][1], OSError) 214 215 @support.skip_unless_symlink 216 def test_rmtree_works_on_symlinks(self): 217 tmp = self.mkdtemp() 218 dir1 = os.path.join(tmp, 'dir1') 219 dir2 = os.path.join(dir1, 'dir2') 220 dir3 = os.path.join(tmp, 'dir3') 221 for d in dir1, dir2, dir3: 222 os.mkdir(d) 223 file1 = os.path.join(tmp, 'file1') 224 write_file(file1, 'foo') 225 link1 = os.path.join(dir1, 'link1') 226 os.symlink(dir2, link1) 227 link2 = os.path.join(dir1, 'link2') 228 os.symlink(dir3, link2) 229 link3 = os.path.join(dir1, 'link3') 230 os.symlink(file1, link3) 231 # make sure symlinks are removed but not followed 232 shutil.rmtree(dir1) 233 self.assertFalse(os.path.exists(dir1)) 234 self.assertTrue(os.path.exists(dir3)) 235 self.assertTrue(os.path.exists(file1)) 236 237 @unittest.skipUnless(_winapi, 'only relevant on Windows') 238 def test_rmtree_fails_on_junctions(self): 239 tmp = self.mkdtemp() 240 dir_ = os.path.join(tmp, 'dir') 241 os.mkdir(dir_) 242 link = os.path.join(tmp, 'link') 243 _winapi.CreateJunction(dir_, link) 244 self.assertRaises(OSError, shutil.rmtree, link) 245 self.assertTrue(os.path.exists(dir_)) 246 self.assertTrue(os.path.lexists(link)) 247 errors = [] 248 def onerror(*args): 249 errors.append(args) 250 shutil.rmtree(link, onerror=onerror) 251 self.assertEqual(len(errors), 1) 252 self.assertIs(errors[0][0], os.path.islink) 253 self.assertEqual(errors[0][1], link) 254 self.assertIsInstance(errors[0][2][1], OSError) 255 256 @unittest.skipUnless(_winapi, 'only relevant on Windows') 257 def test_rmtree_works_on_junctions(self): 258 tmp = self.mkdtemp() 259 dir1 = os.path.join(tmp, 'dir1') 260 dir2 = os.path.join(dir1, 'dir2') 261 dir3 = os.path.join(tmp, 'dir3') 262 for d in dir1, dir2, dir3: 263 os.mkdir(d) 264 file1 = os.path.join(tmp, 'file1') 265 write_file(file1, 'foo') 266 link1 = os.path.join(dir1, 'link1') 267 _winapi.CreateJunction(dir2, link1) 268 link2 = os.path.join(dir1, 'link2') 269 _winapi.CreateJunction(dir3, link2) 270 link3 = os.path.join(dir1, 'link3') 271 _winapi.CreateJunction(file1, link3) 272 # make sure junctions are removed but not followed 273 shutil.rmtree(dir1) 274 self.assertFalse(os.path.exists(dir1)) 275 self.assertTrue(os.path.exists(dir3)) 276 self.assertTrue(os.path.exists(file1)) 277 278 def test_rmtree_errors(self): 279 # filename is guaranteed not to exist 280 filename = tempfile.mktemp() 281 self.assertRaises(FileNotFoundError, shutil.rmtree, filename) 282 # test that ignore_errors option is honored 283 shutil.rmtree(filename, ignore_errors=True) 284 285 # existing file 286 tmpdir = self.mkdtemp() 287 write_file((tmpdir, "tstfile"), "") 288 filename = os.path.join(tmpdir, "tstfile") 289 with self.assertRaises(NotADirectoryError) as cm: 290 shutil.rmtree(filename) 291 # The reason for this rather odd construct is that Windows sprinkles 292 # a \*.* at the end of file names. But only sometimes on some buildbots 293 possible_args = [filename, os.path.join(filename, '*.*')] 294 self.assertIn(cm.exception.filename, possible_args) 295 self.assertTrue(os.path.exists(filename)) 296 # test that ignore_errors option is honored 297 shutil.rmtree(filename, ignore_errors=True) 298 self.assertTrue(os.path.exists(filename)) 299 errors = [] 300 def onerror(*args): 301 errors.append(args) 302 shutil.rmtree(filename, onerror=onerror) 303 self.assertEqual(len(errors), 2) 304 self.assertIs(errors[0][0], os.scandir) 305 self.assertEqual(errors[0][1], filename) 306 self.assertIsInstance(errors[0][2][1], NotADirectoryError) 307 self.assertIn(errors[0][2][1].filename, possible_args) 308 self.assertIs(errors[1][0], os.rmdir) 309 self.assertEqual(errors[1][1], filename) 310 self.assertIsInstance(errors[1][2][1], NotADirectoryError) 311 self.assertIn(errors[1][2][1].filename, possible_args) 312 313 314 @unittest.skipIf(sys.platform[:6] == 'cygwin', 315 "This test can't be run on Cygwin (issue #1071513).") 316 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0, 317 "This test can't be run reliably as root (issue #1076467).") 318 def test_on_error(self): 319 self.errorState = 0 320 os.mkdir(TESTFN) 321 self.addCleanup(shutil.rmtree, TESTFN) 322 323 self.child_file_path = os.path.join(TESTFN, 'a') 324 self.child_dir_path = os.path.join(TESTFN, 'b') 325 support.create_empty_file(self.child_file_path) 326 os.mkdir(self.child_dir_path) 327 old_dir_mode = os.stat(TESTFN).st_mode 328 old_child_file_mode = os.stat(self.child_file_path).st_mode 329 old_child_dir_mode = os.stat(self.child_dir_path).st_mode 330 # Make unwritable. 331 new_mode = stat.S_IREAD|stat.S_IEXEC 332 os.chmod(self.child_file_path, new_mode) 333 os.chmod(self.child_dir_path, new_mode) 334 os.chmod(TESTFN, new_mode) 335 336 self.addCleanup(os.chmod, TESTFN, old_dir_mode) 337 self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode) 338 self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode) 339 340 shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror) 341 # Test whether onerror has actually been called. 342 self.assertEqual(self.errorState, 3, 343 "Expected call to onerror function did not happen.") 344 345 def check_args_to_onerror(self, func, arg, exc): 346 # test_rmtree_errors deliberately runs rmtree 347 # on a directory that is chmod 500, which will fail. 348 # This function is run when shutil.rmtree fails. 349 # 99.9% of the time it initially fails to remove 350 # a file in the directory, so the first time through 351 # func is os.remove. 352 # However, some Linux machines running ZFS on 353 # FUSE experienced a failure earlier in the process 354 # at os.listdir. The first failure may legally 355 # be either. 356 if self.errorState < 2: 357 if func is os.unlink: 358 self.assertEqual(arg, self.child_file_path) 359 elif func is os.rmdir: 360 self.assertEqual(arg, self.child_dir_path) 361 else: 362 self.assertIs(func, os.listdir) 363 self.assertIn(arg, [TESTFN, self.child_dir_path]) 364 self.assertTrue(issubclass(exc[0], OSError)) 365 self.errorState += 1 366 else: 367 self.assertEqual(func, os.rmdir) 368 self.assertEqual(arg, TESTFN) 369 self.assertTrue(issubclass(exc[0], OSError)) 370 self.errorState = 3 371 372 def test_rmtree_does_not_choke_on_failing_lstat(self): 373 try: 374 orig_lstat = os.lstat 375 def raiser(fn, *args, **kwargs): 376 if fn != TESTFN: 377 raise OSError() 378 else: 379 return orig_lstat(fn) 380 os.lstat = raiser 381 382 os.mkdir(TESTFN) 383 write_file((TESTFN, 'foo'), 'foo') 384 shutil.rmtree(TESTFN) 385 finally: 386 os.lstat = orig_lstat 387 388 @support.skip_unless_symlink 389 def test_copymode_follow_symlinks(self): 390 tmp_dir = self.mkdtemp() 391 src = os.path.join(tmp_dir, 'foo') 392 dst = os.path.join(tmp_dir, 'bar') 393 src_link = os.path.join(tmp_dir, 'baz') 394 dst_link = os.path.join(tmp_dir, 'quux') 395 write_file(src, 'foo') 396 write_file(dst, 'foo') 397 os.symlink(src, src_link) 398 os.symlink(dst, dst_link) 399 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG) 400 # file to file 401 os.chmod(dst, stat.S_IRWXO) 402 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 403 shutil.copymode(src, dst) 404 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 405 # On Windows, os.chmod does not follow symlinks (issue #15411) 406 if os.name != 'nt': 407 # follow src link 408 os.chmod(dst, stat.S_IRWXO) 409 shutil.copymode(src_link, dst) 410 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 411 # follow dst link 412 os.chmod(dst, stat.S_IRWXO) 413 shutil.copymode(src, dst_link) 414 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 415 # follow both links 416 os.chmod(dst, stat.S_IRWXO) 417 shutil.copymode(src_link, dst_link) 418 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 419 420 @unittest.skipUnless(hasattr(os, 'lchmod'), 'requires os.lchmod') 421 @support.skip_unless_symlink 422 def test_copymode_symlink_to_symlink(self): 423 tmp_dir = self.mkdtemp() 424 src = os.path.join(tmp_dir, 'foo') 425 dst = os.path.join(tmp_dir, 'bar') 426 src_link = os.path.join(tmp_dir, 'baz') 427 dst_link = os.path.join(tmp_dir, 'quux') 428 write_file(src, 'foo') 429 write_file(dst, 'foo') 430 os.symlink(src, src_link) 431 os.symlink(dst, dst_link) 432 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG) 433 os.chmod(dst, stat.S_IRWXU) 434 os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG) 435 # link to link 436 os.lchmod(dst_link, stat.S_IRWXO) 437 shutil.copymode(src_link, dst_link, follow_symlinks=False) 438 self.assertEqual(os.lstat(src_link).st_mode, 439 os.lstat(dst_link).st_mode) 440 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 441 # src link - use chmod 442 os.lchmod(dst_link, stat.S_IRWXO) 443 shutil.copymode(src_link, dst, follow_symlinks=False) 444 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 445 # dst link - use chmod 446 os.lchmod(dst_link, stat.S_IRWXO) 447 shutil.copymode(src, dst_link, follow_symlinks=False) 448 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 449 450 @unittest.skipIf(hasattr(os, 'lchmod'), 'requires os.lchmod to be missing') 451 @support.skip_unless_symlink 452 def test_copymode_symlink_to_symlink_wo_lchmod(self): 453 tmp_dir = self.mkdtemp() 454 src = os.path.join(tmp_dir, 'foo') 455 dst = os.path.join(tmp_dir, 'bar') 456 src_link = os.path.join(tmp_dir, 'baz') 457 dst_link = os.path.join(tmp_dir, 'quux') 458 write_file(src, 'foo') 459 write_file(dst, 'foo') 460 os.symlink(src, src_link) 461 os.symlink(dst, dst_link) 462 shutil.copymode(src_link, dst_link, follow_symlinks=False) # silent fail 463 464 @support.skip_unless_symlink 465 def test_copystat_symlinks(self): 466 tmp_dir = self.mkdtemp() 467 src = os.path.join(tmp_dir, 'foo') 468 dst = os.path.join(tmp_dir, 'bar') 469 src_link = os.path.join(tmp_dir, 'baz') 470 dst_link = os.path.join(tmp_dir, 'qux') 471 write_file(src, 'foo') 472 src_stat = os.stat(src) 473 os.utime(src, (src_stat.st_atime, 474 src_stat.st_mtime - 42.0)) # ensure different mtimes 475 write_file(dst, 'bar') 476 self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime) 477 os.symlink(src, src_link) 478 os.symlink(dst, dst_link) 479 if hasattr(os, 'lchmod'): 480 os.lchmod(src_link, stat.S_IRWXO) 481 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'): 482 os.lchflags(src_link, stat.UF_NODUMP) 483 src_link_stat = os.lstat(src_link) 484 # follow 485 if hasattr(os, 'lchmod'): 486 shutil.copystat(src_link, dst_link, follow_symlinks=True) 487 self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode) 488 # don't follow 489 shutil.copystat(src_link, dst_link, follow_symlinks=False) 490 dst_link_stat = os.lstat(dst_link) 491 if os.utime in os.supports_follow_symlinks: 492 for attr in 'st_atime', 'st_mtime': 493 # The modification times may be truncated in the new file. 494 self.assertLessEqual(getattr(src_link_stat, attr), 495 getattr(dst_link_stat, attr) + 1) 496 if hasattr(os, 'lchmod'): 497 self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode) 498 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'): 499 self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags) 500 # tell to follow but dst is not a link 501 shutil.copystat(src_link, dst, follow_symlinks=False) 502 self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) < 503 00000.1) 504 505 @unittest.skipUnless(hasattr(os, 'chflags') and 506 hasattr(errno, 'EOPNOTSUPP') and 507 hasattr(errno, 'ENOTSUP'), 508 "requires os.chflags, EOPNOTSUPP & ENOTSUP") 509 def test_copystat_handles_harmless_chflags_errors(self): 510 tmpdir = self.mkdtemp() 511 file1 = os.path.join(tmpdir, 'file1') 512 file2 = os.path.join(tmpdir, 'file2') 513 write_file(file1, 'xxx') 514 write_file(file2, 'xxx') 515 516 def make_chflags_raiser(err): 517 ex = OSError() 518 519 def _chflags_raiser(path, flags, *, follow_symlinks=True): 520 ex.errno = err 521 raise ex 522 return _chflags_raiser 523 old_chflags = os.chflags 524 try: 525 for err in errno.EOPNOTSUPP, errno.ENOTSUP: 526 os.chflags = make_chflags_raiser(err) 527 shutil.copystat(file1, file2) 528 # assert others errors break it 529 os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP) 530 self.assertRaises(OSError, shutil.copystat, file1, file2) 531 finally: 532 os.chflags = old_chflags 533 534 @support.skip_unless_xattr 535 def test_copyxattr(self): 536 tmp_dir = self.mkdtemp() 537 src = os.path.join(tmp_dir, 'foo') 538 write_file(src, 'foo') 539 dst = os.path.join(tmp_dir, 'bar') 540 write_file(dst, 'bar') 541 542 # no xattr == no problem 543 shutil._copyxattr(src, dst) 544 # common case 545 os.setxattr(src, 'user.foo', b'42') 546 os.setxattr(src, 'user.bar', b'43') 547 shutil._copyxattr(src, dst) 548 self.assertEqual(sorted(os.listxattr(src)), sorted(os.listxattr(dst))) 549 self.assertEqual( 550 os.getxattr(src, 'user.foo'), 551 os.getxattr(dst, 'user.foo')) 552 # check errors don't affect other attrs 553 os.remove(dst) 554 write_file(dst, 'bar') 555 os_error = OSError(errno.EPERM, 'EPERM') 556 557 def _raise_on_user_foo(fname, attr, val, **kwargs): 558 if attr == 'user.foo': 559 raise os_error 560 else: 561 orig_setxattr(fname, attr, val, **kwargs) 562 try: 563 orig_setxattr = os.setxattr 564 os.setxattr = _raise_on_user_foo 565 shutil._copyxattr(src, dst) 566 self.assertIn('user.bar', os.listxattr(dst)) 567 finally: 568 os.setxattr = orig_setxattr 569 # the source filesystem not supporting xattrs should be ok, too. 570 def _raise_on_src(fname, *, follow_symlinks=True): 571 if fname == src: 572 raise OSError(errno.ENOTSUP, 'Operation not supported') 573 return orig_listxattr(fname, follow_symlinks=follow_symlinks) 574 try: 575 orig_listxattr = os.listxattr 576 os.listxattr = _raise_on_src 577 shutil._copyxattr(src, dst) 578 finally: 579 os.listxattr = orig_listxattr 580 581 # test that shutil.copystat copies xattrs 582 src = os.path.join(tmp_dir, 'the_original') 583 srcro = os.path.join(tmp_dir, 'the_original_ro') 584 write_file(src, src) 585 write_file(srcro, srcro) 586 os.setxattr(src, 'user.the_value', b'fiddly') 587 os.setxattr(srcro, 'user.the_value', b'fiddly') 588 os.chmod(srcro, 0o444) 589 dst = os.path.join(tmp_dir, 'the_copy') 590 dstro = os.path.join(tmp_dir, 'the_copy_ro') 591 write_file(dst, dst) 592 write_file(dstro, dstro) 593 shutil.copystat(src, dst) 594 shutil.copystat(srcro, dstro) 595 self.assertEqual(os.getxattr(dst, 'user.the_value'), b'fiddly') 596 self.assertEqual(os.getxattr(dstro, 'user.the_value'), b'fiddly') 597 598 @support.skip_unless_symlink 599 @support.skip_unless_xattr 600 @unittest.skipUnless(hasattr(os, 'geteuid') and os.geteuid() == 0, 601 'root privileges required') 602 def test_copyxattr_symlinks(self): 603 # On Linux, it's only possible to access non-user xattr for symlinks; 604 # which in turn require root privileges. This test should be expanded 605 # as soon as other platforms gain support for extended attributes. 606 tmp_dir = self.mkdtemp() 607 src = os.path.join(tmp_dir, 'foo') 608 src_link = os.path.join(tmp_dir, 'baz') 609 write_file(src, 'foo') 610 os.symlink(src, src_link) 611 os.setxattr(src, 'trusted.foo', b'42') 612 os.setxattr(src_link, 'trusted.foo', b'43', follow_symlinks=False) 613 dst = os.path.join(tmp_dir, 'bar') 614 dst_link = os.path.join(tmp_dir, 'qux') 615 write_file(dst, 'bar') 616 os.symlink(dst, dst_link) 617 shutil._copyxattr(src_link, dst_link, follow_symlinks=False) 618 self.assertEqual(os.getxattr(dst_link, 'trusted.foo', follow_symlinks=False), b'43') 619 self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo') 620 shutil._copyxattr(src_link, dst, follow_symlinks=False) 621 self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43') 622 623 @support.skip_unless_symlink 624 def test_copy_symlinks(self): 625 tmp_dir = self.mkdtemp() 626 src = os.path.join(tmp_dir, 'foo') 627 dst = os.path.join(tmp_dir, 'bar') 628 src_link = os.path.join(tmp_dir, 'baz') 629 write_file(src, 'foo') 630 os.symlink(src, src_link) 631 if hasattr(os, 'lchmod'): 632 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO) 633 # don't follow 634 shutil.copy(src_link, dst, follow_symlinks=True) 635 self.assertFalse(os.path.islink(dst)) 636 self.assertEqual(read_file(src), read_file(dst)) 637 os.remove(dst) 638 # follow 639 shutil.copy(src_link, dst, follow_symlinks=False) 640 self.assertTrue(os.path.islink(dst)) 641 self.assertEqual(os.readlink(dst), os.readlink(src_link)) 642 if hasattr(os, 'lchmod'): 643 self.assertEqual(os.lstat(src_link).st_mode, 644 os.lstat(dst).st_mode) 645 646 @support.skip_unless_symlink 647 def test_copy2_symlinks(self): 648 tmp_dir = self.mkdtemp() 649 src = os.path.join(tmp_dir, 'foo') 650 dst = os.path.join(tmp_dir, 'bar') 651 src_link = os.path.join(tmp_dir, 'baz') 652 write_file(src, 'foo') 653 os.symlink(src, src_link) 654 if hasattr(os, 'lchmod'): 655 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO) 656 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'): 657 os.lchflags(src_link, stat.UF_NODUMP) 658 src_stat = os.stat(src) 659 src_link_stat = os.lstat(src_link) 660 # follow 661 shutil.copy2(src_link, dst, follow_symlinks=True) 662 self.assertFalse(os.path.islink(dst)) 663 self.assertEqual(read_file(src), read_file(dst)) 664 os.remove(dst) 665 # don't follow 666 shutil.copy2(src_link, dst, follow_symlinks=False) 667 self.assertTrue(os.path.islink(dst)) 668 self.assertEqual(os.readlink(dst), os.readlink(src_link)) 669 dst_stat = os.lstat(dst) 670 if os.utime in os.supports_follow_symlinks: 671 for attr in 'st_atime', 'st_mtime': 672 # The modification times may be truncated in the new file. 673 self.assertLessEqual(getattr(src_link_stat, attr), 674 getattr(dst_stat, attr) + 1) 675 if hasattr(os, 'lchmod'): 676 self.assertEqual(src_link_stat.st_mode, dst_stat.st_mode) 677 self.assertNotEqual(src_stat.st_mode, dst_stat.st_mode) 678 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'): 679 self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags) 680 681 @support.skip_unless_xattr 682 def test_copy2_xattr(self): 683 tmp_dir = self.mkdtemp() 684 src = os.path.join(tmp_dir, 'foo') 685 dst = os.path.join(tmp_dir, 'bar') 686 write_file(src, 'foo') 687 os.setxattr(src, 'user.foo', b'42') 688 shutil.copy2(src, dst) 689 self.assertEqual( 690 os.getxattr(src, 'user.foo'), 691 os.getxattr(dst, 'user.foo')) 692 os.remove(dst) 693 694 @support.skip_unless_symlink 695 def test_copyfile_symlinks(self): 696 tmp_dir = self.mkdtemp() 697 src = os.path.join(tmp_dir, 'src') 698 dst = os.path.join(tmp_dir, 'dst') 699 dst_link = os.path.join(tmp_dir, 'dst_link') 700 link = os.path.join(tmp_dir, 'link') 701 write_file(src, 'foo') 702 os.symlink(src, link) 703 # don't follow 704 shutil.copyfile(link, dst_link, follow_symlinks=False) 705 self.assertTrue(os.path.islink(dst_link)) 706 self.assertEqual(os.readlink(link), os.readlink(dst_link)) 707 # follow 708 shutil.copyfile(link, dst) 709 self.assertFalse(os.path.islink(dst)) 710 711 def test_rmtree_uses_safe_fd_version_if_available(self): 712 _use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <= 713 os.supports_dir_fd and 714 os.listdir in os.supports_fd and 715 os.stat in os.supports_follow_symlinks) 716 if _use_fd_functions: 717 self.assertTrue(shutil._use_fd_functions) 718 self.assertTrue(shutil.rmtree.avoids_symlink_attacks) 719 tmp_dir = self.mkdtemp() 720 d = os.path.join(tmp_dir, 'a') 721 os.mkdir(d) 722 try: 723 real_rmtree = shutil._rmtree_safe_fd 724 class Called(Exception): pass 725 def _raiser(*args, **kwargs): 726 raise Called 727 shutil._rmtree_safe_fd = _raiser 728 self.assertRaises(Called, shutil.rmtree, d) 729 finally: 730 shutil._rmtree_safe_fd = real_rmtree 731 else: 732 self.assertFalse(shutil._use_fd_functions) 733 self.assertFalse(shutil.rmtree.avoids_symlink_attacks) 734 735 def test_rmtree_dont_delete_file(self): 736 # When called on a file instead of a directory, don't delete it. 737 handle, path = tempfile.mkstemp() 738 os.close(handle) 739 self.assertRaises(NotADirectoryError, shutil.rmtree, path) 740 os.remove(path) 741 742 def test_copytree_simple(self): 743 src_dir = tempfile.mkdtemp() 744 dst_dir = os.path.join(tempfile.mkdtemp(), 'destination') 745 self.addCleanup(shutil.rmtree, src_dir) 746 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir)) 747 write_file((src_dir, 'test.txt'), '123') 748 os.mkdir(os.path.join(src_dir, 'test_dir')) 749 write_file((src_dir, 'test_dir', 'test.txt'), '456') 750 751 shutil.copytree(src_dir, dst_dir) 752 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt'))) 753 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir'))) 754 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir', 755 'test.txt'))) 756 actual = read_file((dst_dir, 'test.txt')) 757 self.assertEqual(actual, '123') 758 actual = read_file((dst_dir, 'test_dir', 'test.txt')) 759 self.assertEqual(actual, '456') 760 761 def test_copytree_dirs_exist_ok(self): 762 src_dir = tempfile.mkdtemp() 763 dst_dir = tempfile.mkdtemp() 764 self.addCleanup(shutil.rmtree, src_dir) 765 self.addCleanup(shutil.rmtree, dst_dir) 766 767 write_file((src_dir, 'nonexisting.txt'), '123') 768 os.mkdir(os.path.join(src_dir, 'existing_dir')) 769 os.mkdir(os.path.join(dst_dir, 'existing_dir')) 770 write_file((dst_dir, 'existing_dir', 'existing.txt'), 'will be replaced') 771 write_file((src_dir, 'existing_dir', 'existing.txt'), 'has been replaced') 772 773 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=True) 774 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'nonexisting.txt'))) 775 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'existing_dir'))) 776 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'existing_dir', 777 'existing.txt'))) 778 actual = read_file((dst_dir, 'nonexisting.txt')) 779 self.assertEqual(actual, '123') 780 actual = read_file((dst_dir, 'existing_dir', 'existing.txt')) 781 self.assertEqual(actual, 'has been replaced') 782 783 with self.assertRaises(FileExistsError): 784 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=False) 785 786 @support.skip_unless_symlink 787 def test_copytree_symlinks(self): 788 tmp_dir = self.mkdtemp() 789 src_dir = os.path.join(tmp_dir, 'src') 790 dst_dir = os.path.join(tmp_dir, 'dst') 791 sub_dir = os.path.join(src_dir, 'sub') 792 os.mkdir(src_dir) 793 os.mkdir(sub_dir) 794 write_file((src_dir, 'file.txt'), 'foo') 795 src_link = os.path.join(sub_dir, 'link') 796 dst_link = os.path.join(dst_dir, 'sub/link') 797 os.symlink(os.path.join(src_dir, 'file.txt'), 798 src_link) 799 if hasattr(os, 'lchmod'): 800 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO) 801 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'): 802 os.lchflags(src_link, stat.UF_NODUMP) 803 src_stat = os.lstat(src_link) 804 shutil.copytree(src_dir, dst_dir, symlinks=True) 805 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'sub', 'link'))) 806 actual = os.readlink(os.path.join(dst_dir, 'sub', 'link')) 807 # Bad practice to blindly strip the prefix as it may be required to 808 # correctly refer to the file, but we're only comparing paths here. 809 if os.name == 'nt' and actual.startswith('\\\\?\\'): 810 actual = actual[4:] 811 self.assertEqual(actual, os.path.join(src_dir, 'file.txt')) 812 dst_stat = os.lstat(dst_link) 813 if hasattr(os, 'lchmod'): 814 self.assertEqual(dst_stat.st_mode, src_stat.st_mode) 815 if hasattr(os, 'lchflags'): 816 self.assertEqual(dst_stat.st_flags, src_stat.st_flags) 817 818 def test_copytree_with_exclude(self): 819 # creating data 820 join = os.path.join 821 exists = os.path.exists 822 src_dir = tempfile.mkdtemp() 823 try: 824 dst_dir = join(tempfile.mkdtemp(), 'destination') 825 write_file((src_dir, 'test.txt'), '123') 826 write_file((src_dir, 'test.tmp'), '123') 827 os.mkdir(join(src_dir, 'test_dir')) 828 write_file((src_dir, 'test_dir', 'test.txt'), '456') 829 os.mkdir(join(src_dir, 'test_dir2')) 830 write_file((src_dir, 'test_dir2', 'test.txt'), '456') 831 os.mkdir(join(src_dir, 'test_dir2', 'subdir')) 832 os.mkdir(join(src_dir, 'test_dir2', 'subdir2')) 833 write_file((src_dir, 'test_dir2', 'subdir', 'test.txt'), '456') 834 write_file((src_dir, 'test_dir2', 'subdir2', 'test.py'), '456') 835 836 # testing glob-like patterns 837 try: 838 patterns = shutil.ignore_patterns('*.tmp', 'test_dir2') 839 shutil.copytree(src_dir, dst_dir, ignore=patterns) 840 # checking the result: some elements should not be copied 841 self.assertTrue(exists(join(dst_dir, 'test.txt'))) 842 self.assertFalse(exists(join(dst_dir, 'test.tmp'))) 843 self.assertFalse(exists(join(dst_dir, 'test_dir2'))) 844 finally: 845 shutil.rmtree(dst_dir) 846 try: 847 patterns = shutil.ignore_patterns('*.tmp', 'subdir*') 848 shutil.copytree(src_dir, dst_dir, ignore=patterns) 849 # checking the result: some elements should not be copied 850 self.assertFalse(exists(join(dst_dir, 'test.tmp'))) 851 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2'))) 852 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir'))) 853 finally: 854 shutil.rmtree(dst_dir) 855 856 # testing callable-style 857 try: 858 def _filter(src, names): 859 res = [] 860 for name in names: 861 path = os.path.join(src, name) 862 863 if (os.path.isdir(path) and 864 path.split()[-1] == 'subdir'): 865 res.append(name) 866 elif os.path.splitext(path)[-1] in ('.py'): 867 res.append(name) 868 return res 869 870 shutil.copytree(src_dir, dst_dir, ignore=_filter) 871 872 # checking the result: some elements should not be copied 873 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2', 874 'test.py'))) 875 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir'))) 876 877 finally: 878 shutil.rmtree(dst_dir) 879 finally: 880 shutil.rmtree(src_dir) 881 shutil.rmtree(os.path.dirname(dst_dir)) 882 883 def test_copytree_retains_permissions(self): 884 tmp_dir = tempfile.mkdtemp() 885 src_dir = os.path.join(tmp_dir, 'source') 886 os.mkdir(src_dir) 887 dst_dir = os.path.join(tmp_dir, 'destination') 888 self.addCleanup(shutil.rmtree, tmp_dir) 889 890 os.chmod(src_dir, 0o777) 891 write_file((src_dir, 'permissive.txt'), '123') 892 os.chmod(os.path.join(src_dir, 'permissive.txt'), 0o777) 893 write_file((src_dir, 'restrictive.txt'), '456') 894 os.chmod(os.path.join(src_dir, 'restrictive.txt'), 0o600) 895 restrictive_subdir = tempfile.mkdtemp(dir=src_dir) 896 os.chmod(restrictive_subdir, 0o600) 897 898 shutil.copytree(src_dir, dst_dir) 899 self.assertEqual(os.stat(src_dir).st_mode, os.stat(dst_dir).st_mode) 900 self.assertEqual(os.stat(os.path.join(src_dir, 'permissive.txt')).st_mode, 901 os.stat(os.path.join(dst_dir, 'permissive.txt')).st_mode) 902 self.assertEqual(os.stat(os.path.join(src_dir, 'restrictive.txt')).st_mode, 903 os.stat(os.path.join(dst_dir, 'restrictive.txt')).st_mode) 904 restrictive_subdir_dst = os.path.join(dst_dir, 905 os.path.split(restrictive_subdir)[1]) 906 self.assertEqual(os.stat(restrictive_subdir).st_mode, 907 os.stat(restrictive_subdir_dst).st_mode) 908 909 @unittest.mock.patch('os.chmod') 910 def test_copytree_winerror(self, mock_patch): 911 # When copying to VFAT, copystat() raises OSError. On Windows, the 912 # exception object has a meaningful 'winerror' attribute, but not 913 # on other operating systems. Do not assume 'winerror' is set. 914 src_dir = tempfile.mkdtemp() 915 dst_dir = os.path.join(tempfile.mkdtemp(), 'destination') 916 self.addCleanup(shutil.rmtree, src_dir) 917 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir)) 918 919 mock_patch.side_effect = PermissionError('ka-boom') 920 with self.assertRaises(shutil.Error): 921 shutil.copytree(src_dir, dst_dir) 922 923 def test_copytree_custom_copy_function(self): 924 # See: https://bugs.python.org/issue35648 925 def custom_cpfun(a, b): 926 flag.append(None) 927 self.assertIsInstance(a, str) 928 self.assertIsInstance(b, str) 929 self.assertEqual(a, os.path.join(src, 'foo')) 930 self.assertEqual(b, os.path.join(dst, 'foo')) 931 932 flag = [] 933 src = tempfile.mkdtemp() 934 self.addCleanup(support.rmtree, src) 935 dst = tempfile.mktemp() 936 self.addCleanup(support.rmtree, dst) 937 with open(os.path.join(src, 'foo'), 'w') as f: 938 f.close() 939 shutil.copytree(src, dst, copy_function=custom_cpfun) 940 self.assertEqual(len(flag), 1) 941 942 @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link') 943 def test_dont_copy_file_onto_link_to_itself(self): 944 # bug 851123. 945 os.mkdir(TESTFN) 946 src = os.path.join(TESTFN, 'cheese') 947 dst = os.path.join(TESTFN, 'shop') 948 try: 949 with open(src, 'w') as f: 950 f.write('cheddar') 951 try: 952 os.link(src, dst) 953 except PermissionError as e: 954 self.skipTest('os.link(): %s' % e) 955 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst) 956 with open(src, 'r') as f: 957 self.assertEqual(f.read(), 'cheddar') 958 os.remove(dst) 959 finally: 960 shutil.rmtree(TESTFN, ignore_errors=True) 961 962 @support.skip_unless_symlink 963 def test_dont_copy_file_onto_symlink_to_itself(self): 964 # bug 851123. 965 os.mkdir(TESTFN) 966 src = os.path.join(TESTFN, 'cheese') 967 dst = os.path.join(TESTFN, 'shop') 968 try: 969 with open(src, 'w') as f: 970 f.write('cheddar') 971 # Using `src` here would mean we end up with a symlink pointing 972 # to TESTFN/TESTFN/cheese, while it should point at 973 # TESTFN/cheese. 974 os.symlink('cheese', dst) 975 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst) 976 with open(src, 'r') as f: 977 self.assertEqual(f.read(), 'cheddar') 978 os.remove(dst) 979 finally: 980 shutil.rmtree(TESTFN, ignore_errors=True) 981 982 @support.skip_unless_symlink 983 def test_rmtree_on_symlink(self): 984 # bug 1669. 985 os.mkdir(TESTFN) 986 try: 987 src = os.path.join(TESTFN, 'cheese') 988 dst = os.path.join(TESTFN, 'shop') 989 os.mkdir(src) 990 os.symlink(src, dst) 991 self.assertRaises(OSError, shutil.rmtree, dst) 992 shutil.rmtree(dst, ignore_errors=True) 993 finally: 994 shutil.rmtree(TESTFN, ignore_errors=True) 995 996 @unittest.skipUnless(_winapi, 'only relevant on Windows') 997 def test_rmtree_on_junction(self): 998 os.mkdir(TESTFN) 999 try: 1000 src = os.path.join(TESTFN, 'cheese') 1001 dst = os.path.join(TESTFN, 'shop') 1002 os.mkdir(src) 1003 open(os.path.join(src, 'spam'), 'wb').close() 1004 _winapi.CreateJunction(src, dst) 1005 self.assertRaises(OSError, shutil.rmtree, dst) 1006 shutil.rmtree(dst, ignore_errors=True) 1007 finally: 1008 shutil.rmtree(TESTFN, ignore_errors=True) 1009 1010 # Issue #3002: copyfile and copytree block indefinitely on named pipes 1011 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()') 1012 def test_copyfile_named_pipe(self): 1013 try: 1014 os.mkfifo(TESTFN) 1015 except PermissionError as e: 1016 self.skipTest('os.mkfifo(): %s' % e) 1017 try: 1018 self.assertRaises(shutil.SpecialFileError, 1019 shutil.copyfile, TESTFN, TESTFN2) 1020 self.assertRaises(shutil.SpecialFileError, 1021 shutil.copyfile, __file__, TESTFN) 1022 finally: 1023 os.remove(TESTFN) 1024 1025 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()') 1026 @support.skip_unless_symlink 1027 def test_copytree_named_pipe(self): 1028 os.mkdir(TESTFN) 1029 try: 1030 subdir = os.path.join(TESTFN, "subdir") 1031 os.mkdir(subdir) 1032 pipe = os.path.join(subdir, "mypipe") 1033 try: 1034 os.mkfifo(pipe) 1035 except PermissionError as e: 1036 self.skipTest('os.mkfifo(): %s' % e) 1037 try: 1038 shutil.copytree(TESTFN, TESTFN2) 1039 except shutil.Error as e: 1040 errors = e.args[0] 1041 self.assertEqual(len(errors), 1) 1042 src, dst, error_msg = errors[0] 1043 self.assertEqual("`%s` is a named pipe" % pipe, error_msg) 1044 else: 1045 self.fail("shutil.Error should have been raised") 1046 finally: 1047 shutil.rmtree(TESTFN, ignore_errors=True) 1048 shutil.rmtree(TESTFN2, ignore_errors=True) 1049 1050 def test_copytree_special_func(self): 1051 1052 src_dir = self.mkdtemp() 1053 dst_dir = os.path.join(self.mkdtemp(), 'destination') 1054 write_file((src_dir, 'test.txt'), '123') 1055 os.mkdir(os.path.join(src_dir, 'test_dir')) 1056 write_file((src_dir, 'test_dir', 'test.txt'), '456') 1057 1058 copied = [] 1059 def _copy(src, dst): 1060 copied.append((src, dst)) 1061 1062 shutil.copytree(src_dir, dst_dir, copy_function=_copy) 1063 self.assertEqual(len(copied), 2) 1064 1065 @support.skip_unless_symlink 1066 def test_copytree_dangling_symlinks(self): 1067 1068 # a dangling symlink raises an error at the end 1069 src_dir = self.mkdtemp() 1070 dst_dir = os.path.join(self.mkdtemp(), 'destination') 1071 os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt')) 1072 os.mkdir(os.path.join(src_dir, 'test_dir')) 1073 write_file((src_dir, 'test_dir', 'test.txt'), '456') 1074 self.assertRaises(Error, shutil.copytree, src_dir, dst_dir) 1075 1076 # a dangling symlink is ignored with the proper flag 1077 dst_dir = os.path.join(self.mkdtemp(), 'destination2') 1078 shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True) 1079 self.assertNotIn('test.txt', os.listdir(dst_dir)) 1080 1081 # a dangling symlink is copied if symlinks=True 1082 dst_dir = os.path.join(self.mkdtemp(), 'destination3') 1083 shutil.copytree(src_dir, dst_dir, symlinks=True) 1084 self.assertIn('test.txt', os.listdir(dst_dir)) 1085 1086 @support.skip_unless_symlink 1087 def test_copytree_symlink_dir(self): 1088 src_dir = self.mkdtemp() 1089 dst_dir = os.path.join(self.mkdtemp(), 'destination') 1090 os.mkdir(os.path.join(src_dir, 'real_dir')) 1091 with open(os.path.join(src_dir, 'real_dir', 'test.txt'), 'w'): 1092 pass 1093 os.symlink(os.path.join(src_dir, 'real_dir'), 1094 os.path.join(src_dir, 'link_to_dir'), 1095 target_is_directory=True) 1096 1097 shutil.copytree(src_dir, dst_dir, symlinks=False) 1098 self.assertFalse(os.path.islink(os.path.join(dst_dir, 'link_to_dir'))) 1099 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir'))) 1100 1101 dst_dir = os.path.join(self.mkdtemp(), 'destination2') 1102 shutil.copytree(src_dir, dst_dir, symlinks=True) 1103 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'link_to_dir'))) 1104 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir'))) 1105 1106 def _copy_file(self, method): 1107 fname = 'test.txt' 1108 tmpdir = self.mkdtemp() 1109 write_file((tmpdir, fname), 'xxx') 1110 file1 = os.path.join(tmpdir, fname) 1111 tmpdir2 = self.mkdtemp() 1112 method(file1, tmpdir2) 1113 file2 = os.path.join(tmpdir2, fname) 1114 return (file1, file2) 1115 1116 def test_copy(self): 1117 # Ensure that the copied file exists and has the same mode bits. 1118 file1, file2 = self._copy_file(shutil.copy) 1119 self.assertTrue(os.path.exists(file2)) 1120 self.assertEqual(os.stat(file1).st_mode, os.stat(file2).st_mode) 1121 1122 @unittest.skipUnless(hasattr(os, 'utime'), 'requires os.utime') 1123 def test_copy2(self): 1124 # Ensure that the copied file exists and has the same mode and 1125 # modification time bits. 1126 file1, file2 = self._copy_file(shutil.copy2) 1127 self.assertTrue(os.path.exists(file2)) 1128 file1_stat = os.stat(file1) 1129 file2_stat = os.stat(file2) 1130 self.assertEqual(file1_stat.st_mode, file2_stat.st_mode) 1131 for attr in 'st_atime', 'st_mtime': 1132 # The modification times may be truncated in the new file. 1133 self.assertLessEqual(getattr(file1_stat, attr), 1134 getattr(file2_stat, attr) + 1) 1135 if hasattr(os, 'chflags') and hasattr(file1_stat, 'st_flags'): 1136 self.assertEqual(getattr(file1_stat, 'st_flags'), 1137 getattr(file2_stat, 'st_flags')) 1138 1139 @support.requires_zlib 1140 def test_make_tarball(self): 1141 # creating something to tar 1142 root_dir, base_dir = self._create_files('') 1143 1144 tmpdir2 = self.mkdtemp() 1145 # force shutil to create the directory 1146 os.rmdir(tmpdir2) 1147 # working with relative paths 1148 work_dir = os.path.dirname(tmpdir2) 1149 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive') 1150 1151 with support.change_cwd(work_dir): 1152 base_name = os.path.abspath(rel_base_name) 1153 tarball = make_archive(rel_base_name, 'gztar', root_dir, '.') 1154 1155 # check if the compressed tarball was created 1156 self.assertEqual(tarball, base_name + '.tar.gz') 1157 self.assertTrue(os.path.isfile(tarball)) 1158 self.assertTrue(tarfile.is_tarfile(tarball)) 1159 with tarfile.open(tarball, 'r:gz') as tf: 1160 self.assertCountEqual(tf.getnames(), 1161 ['.', './sub', './sub2', 1162 './file1', './file2', './sub/file3']) 1163 1164 # trying an uncompressed one 1165 with support.change_cwd(work_dir): 1166 tarball = make_archive(rel_base_name, 'tar', root_dir, '.') 1167 self.assertEqual(tarball, base_name + '.tar') 1168 self.assertTrue(os.path.isfile(tarball)) 1169 self.assertTrue(tarfile.is_tarfile(tarball)) 1170 with tarfile.open(tarball, 'r') as tf: 1171 self.assertCountEqual(tf.getnames(), 1172 ['.', './sub', './sub2', 1173 './file1', './file2', './sub/file3']) 1174 1175 def _tarinfo(self, path): 1176 with tarfile.open(path) as tar: 1177 names = tar.getnames() 1178 names.sort() 1179 return tuple(names) 1180 1181 def _create_files(self, base_dir='dist'): 1182 # creating something to tar 1183 root_dir = self.mkdtemp() 1184 dist = os.path.join(root_dir, base_dir) 1185 os.makedirs(dist, exist_ok=True) 1186 write_file((dist, 'file1'), 'xxx') 1187 write_file((dist, 'file2'), 'xxx') 1188 os.mkdir(os.path.join(dist, 'sub')) 1189 write_file((dist, 'sub', 'file3'), 'xxx') 1190 os.mkdir(os.path.join(dist, 'sub2')) 1191 if base_dir: 1192 write_file((root_dir, 'outer'), 'xxx') 1193 return root_dir, base_dir 1194 1195 @support.requires_zlib 1196 @unittest.skipUnless(shutil.which('tar'), 1197 'Need the tar command to run') 1198 def test_tarfile_vs_tar(self): 1199 root_dir, base_dir = self._create_files() 1200 base_name = os.path.join(self.mkdtemp(), 'archive') 1201 tarball = make_archive(base_name, 'gztar', root_dir, base_dir) 1202 1203 # check if the compressed tarball was created 1204 self.assertEqual(tarball, base_name + '.tar.gz') 1205 self.assertTrue(os.path.isfile(tarball)) 1206 1207 # now create another tarball using `tar` 1208 tarball2 = os.path.join(root_dir, 'archive2.tar') 1209 tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir] 1210 subprocess.check_call(tar_cmd, cwd=root_dir, 1211 stdout=subprocess.DEVNULL) 1212 1213 self.assertTrue(os.path.isfile(tarball2)) 1214 # let's compare both tarballs 1215 self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2)) 1216 1217 # trying an uncompressed one 1218 tarball = make_archive(base_name, 'tar', root_dir, base_dir) 1219 self.assertEqual(tarball, base_name + '.tar') 1220 self.assertTrue(os.path.isfile(tarball)) 1221 1222 # now for a dry_run 1223 tarball = make_archive(base_name, 'tar', root_dir, base_dir, 1224 dry_run=True) 1225 self.assertEqual(tarball, base_name + '.tar') 1226 self.assertTrue(os.path.isfile(tarball)) 1227 1228 @support.requires_zlib 1229 def test_make_zipfile(self): 1230 # creating something to zip 1231 root_dir, base_dir = self._create_files() 1232 1233 tmpdir2 = self.mkdtemp() 1234 # force shutil to create the directory 1235 os.rmdir(tmpdir2) 1236 # working with relative paths 1237 work_dir = os.path.dirname(tmpdir2) 1238 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive') 1239 1240 with support.change_cwd(work_dir): 1241 base_name = os.path.abspath(rel_base_name) 1242 res = make_archive(rel_base_name, 'zip', root_dir) 1243 1244 self.assertEqual(res, base_name + '.zip') 1245 self.assertTrue(os.path.isfile(res)) 1246 self.assertTrue(zipfile.is_zipfile(res)) 1247 with zipfile.ZipFile(res) as zf: 1248 self.assertCountEqual(zf.namelist(), 1249 ['dist/', 'dist/sub/', 'dist/sub2/', 1250 'dist/file1', 'dist/file2', 'dist/sub/file3', 1251 'outer']) 1252 1253 with support.change_cwd(work_dir): 1254 base_name = os.path.abspath(rel_base_name) 1255 res = make_archive(rel_base_name, 'zip', root_dir, base_dir) 1256 1257 self.assertEqual(res, base_name + '.zip') 1258 self.assertTrue(os.path.isfile(res)) 1259 self.assertTrue(zipfile.is_zipfile(res)) 1260 with zipfile.ZipFile(res) as zf: 1261 self.assertCountEqual(zf.namelist(), 1262 ['dist/', 'dist/sub/', 'dist/sub2/', 1263 'dist/file1', 'dist/file2', 'dist/sub/file3']) 1264 1265 @support.requires_zlib 1266 @unittest.skipUnless(shutil.which('zip'), 1267 'Need the zip command to run') 1268 def test_zipfile_vs_zip(self): 1269 root_dir, base_dir = self._create_files() 1270 base_name = os.path.join(self.mkdtemp(), 'archive') 1271 archive = make_archive(base_name, 'zip', root_dir, base_dir) 1272 1273 # check if ZIP file was created 1274 self.assertEqual(archive, base_name + '.zip') 1275 self.assertTrue(os.path.isfile(archive)) 1276 1277 # now create another ZIP file using `zip` 1278 archive2 = os.path.join(root_dir, 'archive2.zip') 1279 zip_cmd = ['zip', '-q', '-r', 'archive2.zip', base_dir] 1280 subprocess.check_call(zip_cmd, cwd=root_dir, 1281 stdout=subprocess.DEVNULL) 1282 1283 self.assertTrue(os.path.isfile(archive2)) 1284 # let's compare both ZIP files 1285 with zipfile.ZipFile(archive) as zf: 1286 names = zf.namelist() 1287 with zipfile.ZipFile(archive2) as zf: 1288 names2 = zf.namelist() 1289 self.assertEqual(sorted(names), sorted(names2)) 1290 1291 @support.requires_zlib 1292 @unittest.skipUnless(shutil.which('unzip'), 1293 'Need the unzip command to run') 1294 def test_unzip_zipfile(self): 1295 root_dir, base_dir = self._create_files() 1296 base_name = os.path.join(self.mkdtemp(), 'archive') 1297 archive = make_archive(base_name, 'zip', root_dir, base_dir) 1298 1299 # check if ZIP file was created 1300 self.assertEqual(archive, base_name + '.zip') 1301 self.assertTrue(os.path.isfile(archive)) 1302 1303 # now check the ZIP file using `unzip -t` 1304 zip_cmd = ['unzip', '-t', archive] 1305 with support.change_cwd(root_dir): 1306 try: 1307 subprocess.check_output(zip_cmd, stderr=subprocess.STDOUT) 1308 except subprocess.CalledProcessError as exc: 1309 details = exc.output.decode(errors="replace") 1310 if 'unrecognized option: t' in details: 1311 self.skipTest("unzip doesn't support -t") 1312 msg = "{}\n\n**Unzip Output**\n{}" 1313 self.fail(msg.format(exc, details)) 1314 1315 def test_make_archive(self): 1316 tmpdir = self.mkdtemp() 1317 base_name = os.path.join(tmpdir, 'archive') 1318 self.assertRaises(ValueError, make_archive, base_name, 'xxx') 1319 1320 @support.requires_zlib 1321 def test_make_archive_owner_group(self): 1322 # testing make_archive with owner and group, with various combinations 1323 # this works even if there's not gid/uid support 1324 if UID_GID_SUPPORT: 1325 group = grp.getgrgid(0)[0] 1326 owner = pwd.getpwuid(0)[0] 1327 else: 1328 group = owner = 'root' 1329 1330 root_dir, base_dir = self._create_files() 1331 base_name = os.path.join(self.mkdtemp(), 'archive') 1332 res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner, 1333 group=group) 1334 self.assertTrue(os.path.isfile(res)) 1335 1336 res = make_archive(base_name, 'zip', root_dir, base_dir) 1337 self.assertTrue(os.path.isfile(res)) 1338 1339 res = make_archive(base_name, 'tar', root_dir, base_dir, 1340 owner=owner, group=group) 1341 self.assertTrue(os.path.isfile(res)) 1342 1343 res = make_archive(base_name, 'tar', root_dir, base_dir, 1344 owner='kjhkjhkjg', group='oihohoh') 1345 self.assertTrue(os.path.isfile(res)) 1346 1347 1348 @support.requires_zlib 1349 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support") 1350 def test_tarfile_root_owner(self): 1351 root_dir, base_dir = self._create_files() 1352 base_name = os.path.join(self.mkdtemp(), 'archive') 1353 group = grp.getgrgid(0)[0] 1354 owner = pwd.getpwuid(0)[0] 1355 with support.change_cwd(root_dir): 1356 archive_name = make_archive(base_name, 'gztar', root_dir, 'dist', 1357 owner=owner, group=group) 1358 1359 # check if the compressed tarball was created 1360 self.assertTrue(os.path.isfile(archive_name)) 1361 1362 # now checks the rights 1363 archive = tarfile.open(archive_name) 1364 try: 1365 for member in archive.getmembers(): 1366 self.assertEqual(member.uid, 0) 1367 self.assertEqual(member.gid, 0) 1368 finally: 1369 archive.close() 1370 1371 def test_make_archive_cwd(self): 1372 current_dir = os.getcwd() 1373 def _breaks(*args, **kw): 1374 raise RuntimeError() 1375 1376 register_archive_format('xxx', _breaks, [], 'xxx file') 1377 try: 1378 try: 1379 make_archive('xxx', 'xxx', root_dir=self.mkdtemp()) 1380 except Exception: 1381 pass 1382 self.assertEqual(os.getcwd(), current_dir) 1383 finally: 1384 unregister_archive_format('xxx') 1385 1386 def test_make_tarfile_in_curdir(self): 1387 # Issue #21280 1388 root_dir = self.mkdtemp() 1389 with support.change_cwd(root_dir): 1390 self.assertEqual(make_archive('test', 'tar'), 'test.tar') 1391 self.assertTrue(os.path.isfile('test.tar')) 1392 1393 @support.requires_zlib 1394 def test_make_zipfile_in_curdir(self): 1395 # Issue #21280 1396 root_dir = self.mkdtemp() 1397 with support.change_cwd(root_dir): 1398 self.assertEqual(make_archive('test', 'zip'), 'test.zip') 1399 self.assertTrue(os.path.isfile('test.zip')) 1400 1401 def test_register_archive_format(self): 1402 1403 self.assertRaises(TypeError, register_archive_format, 'xxx', 1) 1404 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x, 1405 1) 1406 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x, 1407 [(1, 2), (1, 2, 3)]) 1408 1409 register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file') 1410 formats = [name for name, params in get_archive_formats()] 1411 self.assertIn('xxx', formats) 1412 1413 unregister_archive_format('xxx') 1414 formats = [name for name, params in get_archive_formats()] 1415 self.assertNotIn('xxx', formats) 1416 1417 def check_unpack_archive(self, format): 1418 self.check_unpack_archive_with_converter(format, lambda path: path) 1419 self.check_unpack_archive_with_converter(format, pathlib.Path) 1420 self.check_unpack_archive_with_converter(format, FakePath) 1421 1422 def check_unpack_archive_with_converter(self, format, converter): 1423 root_dir, base_dir = self._create_files() 1424 expected = rlistdir(root_dir) 1425 expected.remove('outer') 1426 1427 base_name = os.path.join(self.mkdtemp(), 'archive') 1428 filename = make_archive(base_name, format, root_dir, base_dir) 1429 1430 # let's try to unpack it now 1431 tmpdir2 = self.mkdtemp() 1432 unpack_archive(converter(filename), converter(tmpdir2)) 1433 self.assertEqual(rlistdir(tmpdir2), expected) 1434 1435 # and again, this time with the format specified 1436 tmpdir3 = self.mkdtemp() 1437 unpack_archive(converter(filename), converter(tmpdir3), format=format) 1438 self.assertEqual(rlistdir(tmpdir3), expected) 1439 1440 self.assertRaises(shutil.ReadError, unpack_archive, converter(TESTFN)) 1441 self.assertRaises(ValueError, unpack_archive, converter(TESTFN), format='xxx') 1442 1443 def test_unpack_archive_tar(self): 1444 self.check_unpack_archive('tar') 1445 1446 @support.requires_zlib 1447 def test_unpack_archive_gztar(self): 1448 self.check_unpack_archive('gztar') 1449 1450 @support.requires_bz2 1451 def test_unpack_archive_bztar(self): 1452 self.check_unpack_archive('bztar') 1453 1454 @support.requires_lzma 1455 @unittest.skipIf(AIX and not _maxdataOK(), "AIX MAXDATA must be 0x20000000 or larger") 1456 def test_unpack_archive_xztar(self): 1457 self.check_unpack_archive('xztar') 1458 1459 @support.requires_zlib 1460 def test_unpack_archive_zip(self): 1461 self.check_unpack_archive('zip') 1462 1463 def test_unpack_registry(self): 1464 1465 formats = get_unpack_formats() 1466 1467 def _boo(filename, extract_dir, extra): 1468 self.assertEqual(extra, 1) 1469 self.assertEqual(filename, 'stuff.boo') 1470 self.assertEqual(extract_dir, 'xx') 1471 1472 register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)]) 1473 unpack_archive('stuff.boo', 'xx') 1474 1475 # trying to register a .boo unpacker again 1476 self.assertRaises(RegistryError, register_unpack_format, 'Boo2', 1477 ['.boo'], _boo) 1478 1479 # should work now 1480 unregister_unpack_format('Boo') 1481 register_unpack_format('Boo2', ['.boo'], _boo) 1482 self.assertIn(('Boo2', ['.boo'], ''), get_unpack_formats()) 1483 self.assertNotIn(('Boo', ['.boo'], ''), get_unpack_formats()) 1484 1485 # let's leave a clean state 1486 unregister_unpack_format('Boo2') 1487 self.assertEqual(get_unpack_formats(), formats) 1488 1489 @unittest.skipUnless(hasattr(shutil, 'disk_usage'), 1490 "disk_usage not available on this platform") 1491 def test_disk_usage(self): 1492 usage = shutil.disk_usage(os.path.dirname(__file__)) 1493 for attr in ('total', 'used', 'free'): 1494 self.assertIsInstance(getattr(usage, attr), int) 1495 self.assertGreater(usage.total, 0) 1496 self.assertGreater(usage.used, 0) 1497 self.assertGreaterEqual(usage.free, 0) 1498 self.assertGreaterEqual(usage.total, usage.used) 1499 self.assertGreater(usage.total, usage.free) 1500 1501 # bpo-32557: Check that disk_usage() also accepts a filename 1502 shutil.disk_usage(__file__) 1503 1504 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support") 1505 @unittest.skipUnless(hasattr(os, 'chown'), 'requires os.chown') 1506 def test_chown(self): 1507 1508 # cleaned-up automatically by TestShutil.tearDown method 1509 dirname = self.mkdtemp() 1510 filename = tempfile.mktemp(dir=dirname) 1511 write_file(filename, 'testing chown function') 1512 1513 with self.assertRaises(ValueError): 1514 shutil.chown(filename) 1515 1516 with self.assertRaises(LookupError): 1517 shutil.chown(filename, user='non-existing username') 1518 1519 with self.assertRaises(LookupError): 1520 shutil.chown(filename, group='non-existing groupname') 1521 1522 with self.assertRaises(TypeError): 1523 shutil.chown(filename, b'spam') 1524 1525 with self.assertRaises(TypeError): 1526 shutil.chown(filename, 3.14) 1527 1528 uid = os.getuid() 1529 gid = os.getgid() 1530 1531 def check_chown(path, uid=None, gid=None): 1532 s = os.stat(filename) 1533 if uid is not None: 1534 self.assertEqual(uid, s.st_uid) 1535 if gid is not None: 1536 self.assertEqual(gid, s.st_gid) 1537 1538 shutil.chown(filename, uid, gid) 1539 check_chown(filename, uid, gid) 1540 shutil.chown(filename, uid) 1541 check_chown(filename, uid) 1542 shutil.chown(filename, user=uid) 1543 check_chown(filename, uid) 1544 shutil.chown(filename, group=gid) 1545 check_chown(filename, gid=gid) 1546 1547 shutil.chown(dirname, uid, gid) 1548 check_chown(dirname, uid, gid) 1549 shutil.chown(dirname, uid) 1550 check_chown(dirname, uid) 1551 shutil.chown(dirname, user=uid) 1552 check_chown(dirname, uid) 1553 shutil.chown(dirname, group=gid) 1554 check_chown(dirname, gid=gid) 1555 1556 user = pwd.getpwuid(uid)[0] 1557 group = grp.getgrgid(gid)[0] 1558 shutil.chown(filename, user, group) 1559 check_chown(filename, uid, gid) 1560 shutil.chown(dirname, user, group) 1561 check_chown(dirname, uid, gid) 1562 1563 def test_copy_return_value(self): 1564 # copy and copy2 both return their destination path. 1565 for fn in (shutil.copy, shutil.copy2): 1566 src_dir = self.mkdtemp() 1567 dst_dir = self.mkdtemp() 1568 src = os.path.join(src_dir, 'foo') 1569 write_file(src, 'foo') 1570 rv = fn(src, dst_dir) 1571 self.assertEqual(rv, os.path.join(dst_dir, 'foo')) 1572 rv = fn(src, os.path.join(dst_dir, 'bar')) 1573 self.assertEqual(rv, os.path.join(dst_dir, 'bar')) 1574 1575 def test_copyfile_return_value(self): 1576 # copytree returns its destination path. 1577 src_dir = self.mkdtemp() 1578 dst_dir = self.mkdtemp() 1579 dst_file = os.path.join(dst_dir, 'bar') 1580 src_file = os.path.join(src_dir, 'foo') 1581 write_file(src_file, 'foo') 1582 rv = shutil.copyfile(src_file, dst_file) 1583 self.assertTrue(os.path.exists(rv)) 1584 self.assertEqual(read_file(src_file), read_file(dst_file)) 1585 1586 def test_copyfile_same_file(self): 1587 # copyfile() should raise SameFileError if the source and destination 1588 # are the same. 1589 src_dir = self.mkdtemp() 1590 src_file = os.path.join(src_dir, 'foo') 1591 write_file(src_file, 'foo') 1592 self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file) 1593 # But Error should work too, to stay backward compatible. 1594 self.assertRaises(Error, shutil.copyfile, src_file, src_file) 1595 # Make sure file is not corrupted. 1596 self.assertEqual(read_file(src_file), 'foo') 1597 1598 def test_copytree_return_value(self): 1599 # copytree returns its destination path. 1600 src_dir = self.mkdtemp() 1601 dst_dir = src_dir + "dest" 1602 self.addCleanup(shutil.rmtree, dst_dir, True) 1603 src = os.path.join(src_dir, 'foo') 1604 write_file(src, 'foo') 1605 rv = shutil.copytree(src_dir, dst_dir) 1606 self.assertEqual(['foo'], os.listdir(rv)) 1607 1608 def test_copytree_subdirectory(self): 1609 # copytree where dst is a subdirectory of src, see Issue 38688 1610 base_dir = self.mkdtemp() 1611 self.addCleanup(shutil.rmtree, base_dir, ignore_errors=True) 1612 src_dir = os.path.join(base_dir, "t", "pg") 1613 dst_dir = os.path.join(src_dir, "somevendor", "1.0") 1614 os.makedirs(src_dir) 1615 src = os.path.join(src_dir, 'pol') 1616 write_file(src, 'pol') 1617 rv = shutil.copytree(src_dir, dst_dir) 1618 self.assertEqual(['pol'], os.listdir(rv)) 1619 1620 1621class TestWhich(unittest.TestCase): 1622 1623 def setUp(self): 1624 self.temp_dir = tempfile.mkdtemp(prefix="Tmp") 1625 self.addCleanup(shutil.rmtree, self.temp_dir, True) 1626 # Give the temp_file an ".exe" suffix for all. 1627 # It's needed on Windows and not harmful on other platforms. 1628 self.temp_file = tempfile.NamedTemporaryFile(dir=self.temp_dir, 1629 prefix="Tmp", 1630 suffix=".Exe") 1631 os.chmod(self.temp_file.name, stat.S_IXUSR) 1632 self.addCleanup(self.temp_file.close) 1633 self.dir, self.file = os.path.split(self.temp_file.name) 1634 self.env_path = self.dir 1635 self.curdir = os.curdir 1636 self.ext = ".EXE" 1637 1638 def test_basic(self): 1639 # Given an EXE in a directory, it should be returned. 1640 rv = shutil.which(self.file, path=self.dir) 1641 self.assertEqual(rv, self.temp_file.name) 1642 1643 def test_absolute_cmd(self): 1644 # When given the fully qualified path to an executable that exists, 1645 # it should be returned. 1646 rv = shutil.which(self.temp_file.name, path=self.temp_dir) 1647 self.assertEqual(rv, self.temp_file.name) 1648 1649 def test_relative_cmd(self): 1650 # When given the relative path with a directory part to an executable 1651 # that exists, it should be returned. 1652 base_dir, tail_dir = os.path.split(self.dir) 1653 relpath = os.path.join(tail_dir, self.file) 1654 with support.change_cwd(path=base_dir): 1655 rv = shutil.which(relpath, path=self.temp_dir) 1656 self.assertEqual(rv, relpath) 1657 # But it shouldn't be searched in PATH directories (issue #16957). 1658 with support.change_cwd(path=self.dir): 1659 rv = shutil.which(relpath, path=base_dir) 1660 self.assertIsNone(rv) 1661 1662 def test_cwd(self): 1663 # Issue #16957 1664 base_dir = os.path.dirname(self.dir) 1665 with support.change_cwd(path=self.dir): 1666 rv = shutil.which(self.file, path=base_dir) 1667 if sys.platform == "win32": 1668 # Windows: current directory implicitly on PATH 1669 self.assertEqual(rv, os.path.join(self.curdir, self.file)) 1670 else: 1671 # Other platforms: shouldn't match in the current directory. 1672 self.assertIsNone(rv) 1673 1674 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0, 1675 'non-root user required') 1676 def test_non_matching_mode(self): 1677 # Set the file read-only and ask for writeable files. 1678 os.chmod(self.temp_file.name, stat.S_IREAD) 1679 if os.access(self.temp_file.name, os.W_OK): 1680 self.skipTest("can't set the file read-only") 1681 rv = shutil.which(self.file, path=self.dir, mode=os.W_OK) 1682 self.assertIsNone(rv) 1683 1684 def test_relative_path(self): 1685 base_dir, tail_dir = os.path.split(self.dir) 1686 with support.change_cwd(path=base_dir): 1687 rv = shutil.which(self.file, path=tail_dir) 1688 self.assertEqual(rv, os.path.join(tail_dir, self.file)) 1689 1690 def test_nonexistent_file(self): 1691 # Return None when no matching executable file is found on the path. 1692 rv = shutil.which("foo.exe", path=self.dir) 1693 self.assertIsNone(rv) 1694 1695 @unittest.skipUnless(sys.platform == "win32", 1696 "pathext check is Windows-only") 1697 def test_pathext_checking(self): 1698 # Ask for the file without the ".exe" extension, then ensure that 1699 # it gets found properly with the extension. 1700 rv = shutil.which(self.file[:-4], path=self.dir) 1701 self.assertEqual(rv, self.temp_file.name[:-4] + self.ext) 1702 1703 def test_environ_path(self): 1704 with support.EnvironmentVarGuard() as env: 1705 env['PATH'] = self.env_path 1706 rv = shutil.which(self.file) 1707 self.assertEqual(rv, self.temp_file.name) 1708 1709 def test_environ_path_empty(self): 1710 # PATH='': no match 1711 with support.EnvironmentVarGuard() as env: 1712 env['PATH'] = '' 1713 with unittest.mock.patch('os.confstr', return_value=self.dir, \ 1714 create=True), \ 1715 support.swap_attr(os, 'defpath', self.dir), \ 1716 support.change_cwd(self.dir): 1717 rv = shutil.which(self.file) 1718 self.assertIsNone(rv) 1719 1720 def test_environ_path_cwd(self): 1721 expected_cwd = os.path.basename(self.temp_file.name) 1722 if sys.platform == "win32": 1723 curdir = os.curdir 1724 if isinstance(expected_cwd, bytes): 1725 curdir = os.fsencode(curdir) 1726 expected_cwd = os.path.join(curdir, expected_cwd) 1727 1728 # PATH=':': explicitly looks in the current directory 1729 with support.EnvironmentVarGuard() as env: 1730 env['PATH'] = os.pathsep 1731 with unittest.mock.patch('os.confstr', return_value=self.dir, \ 1732 create=True), \ 1733 support.swap_attr(os, 'defpath', self.dir): 1734 rv = shutil.which(self.file) 1735 self.assertIsNone(rv) 1736 1737 # look in current directory 1738 with support.change_cwd(self.dir): 1739 rv = shutil.which(self.file) 1740 self.assertEqual(rv, expected_cwd) 1741 1742 def test_environ_path_missing(self): 1743 with support.EnvironmentVarGuard() as env: 1744 env.pop('PATH', None) 1745 1746 # without confstr 1747 with unittest.mock.patch('os.confstr', side_effect=ValueError, \ 1748 create=True), \ 1749 support.swap_attr(os, 'defpath', self.dir): 1750 rv = shutil.which(self.file) 1751 self.assertEqual(rv, self.temp_file.name) 1752 1753 # with confstr 1754 with unittest.mock.patch('os.confstr', return_value=self.dir, \ 1755 create=True), \ 1756 support.swap_attr(os, 'defpath', ''): 1757 rv = shutil.which(self.file) 1758 self.assertEqual(rv, self.temp_file.name) 1759 1760 def test_empty_path(self): 1761 base_dir = os.path.dirname(self.dir) 1762 with support.change_cwd(path=self.dir), \ 1763 support.EnvironmentVarGuard() as env: 1764 env['PATH'] = self.env_path 1765 rv = shutil.which(self.file, path='') 1766 self.assertIsNone(rv) 1767 1768 def test_empty_path_no_PATH(self): 1769 with support.EnvironmentVarGuard() as env: 1770 env.pop('PATH', None) 1771 rv = shutil.which(self.file) 1772 self.assertIsNone(rv) 1773 1774 @unittest.skipUnless(sys.platform == "win32", 'test specific to Windows') 1775 def test_pathext(self): 1776 ext = ".xyz" 1777 temp_filexyz = tempfile.NamedTemporaryFile(dir=self.temp_dir, 1778 prefix="Tmp2", suffix=ext) 1779 os.chmod(temp_filexyz.name, stat.S_IXUSR) 1780 self.addCleanup(temp_filexyz.close) 1781 1782 # strip path and extension 1783 program = os.path.basename(temp_filexyz.name) 1784 program = os.path.splitext(program)[0] 1785 1786 with support.EnvironmentVarGuard() as env: 1787 env['PATHEXT'] = ext 1788 rv = shutil.which(program, path=self.temp_dir) 1789 self.assertEqual(rv, temp_filexyz.name) 1790 1791 1792class TestWhichBytes(TestWhich): 1793 def setUp(self): 1794 TestWhich.setUp(self) 1795 self.dir = os.fsencode(self.dir) 1796 self.file = os.fsencode(self.file) 1797 self.temp_file.name = os.fsencode(self.temp_file.name) 1798 self.curdir = os.fsencode(self.curdir) 1799 self.ext = os.fsencode(self.ext) 1800 1801 1802class TestMove(unittest.TestCase): 1803 1804 def setUp(self): 1805 filename = "foo" 1806 basedir = None 1807 if sys.platform == "win32": 1808 basedir = os.path.realpath(os.getcwd()) 1809 self.src_dir = tempfile.mkdtemp(dir=basedir) 1810 self.dst_dir = tempfile.mkdtemp(dir=basedir) 1811 self.src_file = os.path.join(self.src_dir, filename) 1812 self.dst_file = os.path.join(self.dst_dir, filename) 1813 with open(self.src_file, "wb") as f: 1814 f.write(b"spam") 1815 1816 def tearDown(self): 1817 for d in (self.src_dir, self.dst_dir): 1818 try: 1819 if d: 1820 shutil.rmtree(d) 1821 except: 1822 pass 1823 1824 def _check_move_file(self, src, dst, real_dst): 1825 with open(src, "rb") as f: 1826 contents = f.read() 1827 shutil.move(src, dst) 1828 with open(real_dst, "rb") as f: 1829 self.assertEqual(contents, f.read()) 1830 self.assertFalse(os.path.exists(src)) 1831 1832 def _check_move_dir(self, src, dst, real_dst): 1833 contents = sorted(os.listdir(src)) 1834 shutil.move(src, dst) 1835 self.assertEqual(contents, sorted(os.listdir(real_dst))) 1836 self.assertFalse(os.path.exists(src)) 1837 1838 def test_move_file(self): 1839 # Move a file to another location on the same filesystem. 1840 self._check_move_file(self.src_file, self.dst_file, self.dst_file) 1841 1842 def test_move_file_to_dir(self): 1843 # Move a file inside an existing dir on the same filesystem. 1844 self._check_move_file(self.src_file, self.dst_dir, self.dst_file) 1845 1846 @mock_rename 1847 def test_move_file_other_fs(self): 1848 # Move a file to an existing dir on another filesystem. 1849 self.test_move_file() 1850 1851 @mock_rename 1852 def test_move_file_to_dir_other_fs(self): 1853 # Move a file to another location on another filesystem. 1854 self.test_move_file_to_dir() 1855 1856 def test_move_dir(self): 1857 # Move a dir to another location on the same filesystem. 1858 dst_dir = tempfile.mktemp() 1859 try: 1860 self._check_move_dir(self.src_dir, dst_dir, dst_dir) 1861 finally: 1862 try: 1863 shutil.rmtree(dst_dir) 1864 except: 1865 pass 1866 1867 @mock_rename 1868 def test_move_dir_other_fs(self): 1869 # Move a dir to another location on another filesystem. 1870 self.test_move_dir() 1871 1872 def test_move_dir_to_dir(self): 1873 # Move a dir inside an existing dir on the same filesystem. 1874 self._check_move_dir(self.src_dir, self.dst_dir, 1875 os.path.join(self.dst_dir, os.path.basename(self.src_dir))) 1876 1877 @mock_rename 1878 def test_move_dir_to_dir_other_fs(self): 1879 # Move a dir inside an existing dir on another filesystem. 1880 self.test_move_dir_to_dir() 1881 1882 def test_move_dir_sep_to_dir(self): 1883 self._check_move_dir(self.src_dir + os.path.sep, self.dst_dir, 1884 os.path.join(self.dst_dir, os.path.basename(self.src_dir))) 1885 1886 @unittest.skipUnless(os.path.altsep, 'requires os.path.altsep') 1887 def test_move_dir_altsep_to_dir(self): 1888 self._check_move_dir(self.src_dir + os.path.altsep, self.dst_dir, 1889 os.path.join(self.dst_dir, os.path.basename(self.src_dir))) 1890 1891 def test_existing_file_inside_dest_dir(self): 1892 # A file with the same name inside the destination dir already exists. 1893 with open(self.dst_file, "wb"): 1894 pass 1895 self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir) 1896 1897 def test_dont_move_dir_in_itself(self): 1898 # Moving a dir inside itself raises an Error. 1899 dst = os.path.join(self.src_dir, "bar") 1900 self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst) 1901 1902 def test_destinsrc_false_negative(self): 1903 os.mkdir(TESTFN) 1904 try: 1905 for src, dst in [('srcdir', 'srcdir/dest')]: 1906 src = os.path.join(TESTFN, src) 1907 dst = os.path.join(TESTFN, dst) 1908 self.assertTrue(shutil._destinsrc(src, dst), 1909 msg='_destinsrc() wrongly concluded that ' 1910 'dst (%s) is not in src (%s)' % (dst, src)) 1911 finally: 1912 shutil.rmtree(TESTFN, ignore_errors=True) 1913 1914 def test_destinsrc_false_positive(self): 1915 os.mkdir(TESTFN) 1916 try: 1917 for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]: 1918 src = os.path.join(TESTFN, src) 1919 dst = os.path.join(TESTFN, dst) 1920 self.assertFalse(shutil._destinsrc(src, dst), 1921 msg='_destinsrc() wrongly concluded that ' 1922 'dst (%s) is in src (%s)' % (dst, src)) 1923 finally: 1924 shutil.rmtree(TESTFN, ignore_errors=True) 1925 1926 @support.skip_unless_symlink 1927 @mock_rename 1928 def test_move_file_symlink(self): 1929 dst = os.path.join(self.src_dir, 'bar') 1930 os.symlink(self.src_file, dst) 1931 shutil.move(dst, self.dst_file) 1932 self.assertTrue(os.path.islink(self.dst_file)) 1933 self.assertTrue(os.path.samefile(self.src_file, self.dst_file)) 1934 1935 @support.skip_unless_symlink 1936 @mock_rename 1937 def test_move_file_symlink_to_dir(self): 1938 filename = "bar" 1939 dst = os.path.join(self.src_dir, filename) 1940 os.symlink(self.src_file, dst) 1941 shutil.move(dst, self.dst_dir) 1942 final_link = os.path.join(self.dst_dir, filename) 1943 self.assertTrue(os.path.islink(final_link)) 1944 self.assertTrue(os.path.samefile(self.src_file, final_link)) 1945 1946 @support.skip_unless_symlink 1947 @mock_rename 1948 def test_move_dangling_symlink(self): 1949 src = os.path.join(self.src_dir, 'baz') 1950 dst = os.path.join(self.src_dir, 'bar') 1951 os.symlink(src, dst) 1952 dst_link = os.path.join(self.dst_dir, 'quux') 1953 shutil.move(dst, dst_link) 1954 self.assertTrue(os.path.islink(dst_link)) 1955 self.assertEqual(os.path.realpath(src), os.path.realpath(dst_link)) 1956 1957 @support.skip_unless_symlink 1958 @mock_rename 1959 def test_move_dir_symlink(self): 1960 src = os.path.join(self.src_dir, 'baz') 1961 dst = os.path.join(self.src_dir, 'bar') 1962 os.mkdir(src) 1963 os.symlink(src, dst) 1964 dst_link = os.path.join(self.dst_dir, 'quux') 1965 shutil.move(dst, dst_link) 1966 self.assertTrue(os.path.islink(dst_link)) 1967 self.assertTrue(os.path.samefile(src, dst_link)) 1968 1969 def test_move_return_value(self): 1970 rv = shutil.move(self.src_file, self.dst_dir) 1971 self.assertEqual(rv, 1972 os.path.join(self.dst_dir, os.path.basename(self.src_file))) 1973 1974 def test_move_as_rename_return_value(self): 1975 rv = shutil.move(self.src_file, os.path.join(self.dst_dir, 'bar')) 1976 self.assertEqual(rv, os.path.join(self.dst_dir, 'bar')) 1977 1978 @mock_rename 1979 def test_move_file_special_function(self): 1980 moved = [] 1981 def _copy(src, dst): 1982 moved.append((src, dst)) 1983 shutil.move(self.src_file, self.dst_dir, copy_function=_copy) 1984 self.assertEqual(len(moved), 1) 1985 1986 @mock_rename 1987 def test_move_dir_special_function(self): 1988 moved = [] 1989 def _copy(src, dst): 1990 moved.append((src, dst)) 1991 support.create_empty_file(os.path.join(self.src_dir, 'child')) 1992 support.create_empty_file(os.path.join(self.src_dir, 'child1')) 1993 shutil.move(self.src_dir, self.dst_dir, copy_function=_copy) 1994 self.assertEqual(len(moved), 3) 1995 1996 1997class TestCopyFile(unittest.TestCase): 1998 1999 _delete = False 2000 2001 class Faux(object): 2002 _entered = False 2003 _exited_with = None 2004 _raised = False 2005 def __init__(self, raise_in_exit=False, suppress_at_exit=True): 2006 self._raise_in_exit = raise_in_exit 2007 self._suppress_at_exit = suppress_at_exit 2008 def read(self, *args): 2009 return '' 2010 def __enter__(self): 2011 self._entered = True 2012 def __exit__(self, exc_type, exc_val, exc_tb): 2013 self._exited_with = exc_type, exc_val, exc_tb 2014 if self._raise_in_exit: 2015 self._raised = True 2016 raise OSError("Cannot close") 2017 return self._suppress_at_exit 2018 2019 def tearDown(self): 2020 if self._delete: 2021 del shutil.open 2022 2023 def _set_shutil_open(self, func): 2024 shutil.open = func 2025 self._delete = True 2026 2027 def test_w_source_open_fails(self): 2028 def _open(filename, mode='r'): 2029 if filename == 'srcfile': 2030 raise OSError('Cannot open "srcfile"') 2031 assert 0 # shouldn't reach here. 2032 2033 self._set_shutil_open(_open) 2034 2035 self.assertRaises(OSError, shutil.copyfile, 'srcfile', 'destfile') 2036 2037 @unittest.skipIf(MACOS, "skipped on macOS") 2038 def test_w_dest_open_fails(self): 2039 2040 srcfile = self.Faux() 2041 2042 def _open(filename, mode='r'): 2043 if filename == 'srcfile': 2044 return srcfile 2045 if filename == 'destfile': 2046 raise OSError('Cannot open "destfile"') 2047 assert 0 # shouldn't reach here. 2048 2049 self._set_shutil_open(_open) 2050 2051 shutil.copyfile('srcfile', 'destfile') 2052 self.assertTrue(srcfile._entered) 2053 self.assertTrue(srcfile._exited_with[0] is OSError) 2054 self.assertEqual(srcfile._exited_with[1].args, 2055 ('Cannot open "destfile"',)) 2056 2057 @unittest.skipIf(MACOS, "skipped on macOS") 2058 def test_w_dest_close_fails(self): 2059 2060 srcfile = self.Faux() 2061 destfile = self.Faux(True) 2062 2063 def _open(filename, mode='r'): 2064 if filename == 'srcfile': 2065 return srcfile 2066 if filename == 'destfile': 2067 return destfile 2068 assert 0 # shouldn't reach here. 2069 2070 self._set_shutil_open(_open) 2071 2072 shutil.copyfile('srcfile', 'destfile') 2073 self.assertTrue(srcfile._entered) 2074 self.assertTrue(destfile._entered) 2075 self.assertTrue(destfile._raised) 2076 self.assertTrue(srcfile._exited_with[0] is OSError) 2077 self.assertEqual(srcfile._exited_with[1].args, 2078 ('Cannot close',)) 2079 2080 @unittest.skipIf(MACOS, "skipped on macOS") 2081 def test_w_source_close_fails(self): 2082 2083 srcfile = self.Faux(True) 2084 destfile = self.Faux() 2085 2086 def _open(filename, mode='r'): 2087 if filename == 'srcfile': 2088 return srcfile 2089 if filename == 'destfile': 2090 return destfile 2091 assert 0 # shouldn't reach here. 2092 2093 self._set_shutil_open(_open) 2094 2095 self.assertRaises(OSError, 2096 shutil.copyfile, 'srcfile', 'destfile') 2097 self.assertTrue(srcfile._entered) 2098 self.assertTrue(destfile._entered) 2099 self.assertFalse(destfile._raised) 2100 self.assertTrue(srcfile._exited_with[0] is None) 2101 self.assertTrue(srcfile._raised) 2102 2103 def test_move_dir_caseinsensitive(self): 2104 # Renames a folder to the same name 2105 # but a different case. 2106 2107 self.src_dir = tempfile.mkdtemp() 2108 self.addCleanup(shutil.rmtree, self.src_dir, True) 2109 dst_dir = os.path.join( 2110 os.path.dirname(self.src_dir), 2111 os.path.basename(self.src_dir).upper()) 2112 self.assertNotEqual(self.src_dir, dst_dir) 2113 2114 try: 2115 shutil.move(self.src_dir, dst_dir) 2116 self.assertTrue(os.path.isdir(dst_dir)) 2117 finally: 2118 os.rmdir(dst_dir) 2119 2120 2121class TestCopyFileObj(unittest.TestCase): 2122 FILESIZE = 2 * 1024 * 1024 2123 2124 @classmethod 2125 def setUpClass(cls): 2126 write_test_file(TESTFN, cls.FILESIZE) 2127 2128 @classmethod 2129 def tearDownClass(cls): 2130 support.unlink(TESTFN) 2131 support.unlink(TESTFN2) 2132 2133 def tearDown(self): 2134 support.unlink(TESTFN2) 2135 2136 @contextlib.contextmanager 2137 def get_files(self): 2138 with open(TESTFN, "rb") as src: 2139 with open(TESTFN2, "wb") as dst: 2140 yield (src, dst) 2141 2142 def assert_files_eq(self, src, dst): 2143 with open(src, 'rb') as fsrc: 2144 with open(dst, 'rb') as fdst: 2145 self.assertEqual(fsrc.read(), fdst.read()) 2146 2147 def test_content(self): 2148 with self.get_files() as (src, dst): 2149 shutil.copyfileobj(src, dst) 2150 self.assert_files_eq(TESTFN, TESTFN2) 2151 2152 def test_file_not_closed(self): 2153 with self.get_files() as (src, dst): 2154 shutil.copyfileobj(src, dst) 2155 assert not src.closed 2156 assert not dst.closed 2157 2158 def test_file_offset(self): 2159 with self.get_files() as (src, dst): 2160 shutil.copyfileobj(src, dst) 2161 self.assertEqual(src.tell(), self.FILESIZE) 2162 self.assertEqual(dst.tell(), self.FILESIZE) 2163 2164 @unittest.skipIf(os.name != 'nt', "Windows only") 2165 def test_win_impl(self): 2166 # Make sure alternate Windows implementation is called. 2167 with unittest.mock.patch("shutil._copyfileobj_readinto") as m: 2168 shutil.copyfile(TESTFN, TESTFN2) 2169 assert m.called 2170 2171 # File size is 2 MiB but max buf size should be 1 MiB. 2172 self.assertEqual(m.call_args[0][2], 1 * 1024 * 1024) 2173 2174 # If file size < 1 MiB memoryview() length must be equal to 2175 # the actual file size. 2176 with tempfile.NamedTemporaryFile(delete=False) as f: 2177 f.write(b'foo') 2178 fname = f.name 2179 self.addCleanup(support.unlink, fname) 2180 with unittest.mock.patch("shutil._copyfileobj_readinto") as m: 2181 shutil.copyfile(fname, TESTFN2) 2182 self.assertEqual(m.call_args[0][2], 3) 2183 2184 # Empty files should not rely on readinto() variant. 2185 with tempfile.NamedTemporaryFile(delete=False) as f: 2186 pass 2187 fname = f.name 2188 self.addCleanup(support.unlink, fname) 2189 with unittest.mock.patch("shutil._copyfileobj_readinto") as m: 2190 shutil.copyfile(fname, TESTFN2) 2191 assert not m.called 2192 self.assert_files_eq(fname, TESTFN2) 2193 2194 2195class _ZeroCopyFileTest(object): 2196 """Tests common to all zero-copy APIs.""" 2197 FILESIZE = (10 * 1024 * 1024) # 10 MiB 2198 FILEDATA = b"" 2199 PATCHPOINT = "" 2200 2201 @classmethod 2202 def setUpClass(cls): 2203 write_test_file(TESTFN, cls.FILESIZE) 2204 with open(TESTFN, 'rb') as f: 2205 cls.FILEDATA = f.read() 2206 assert len(cls.FILEDATA) == cls.FILESIZE 2207 2208 @classmethod 2209 def tearDownClass(cls): 2210 support.unlink(TESTFN) 2211 2212 def tearDown(self): 2213 support.unlink(TESTFN2) 2214 2215 @contextlib.contextmanager 2216 def get_files(self): 2217 with open(TESTFN, "rb") as src: 2218 with open(TESTFN2, "wb") as dst: 2219 yield (src, dst) 2220 2221 def zerocopy_fun(self, *args, **kwargs): 2222 raise NotImplementedError("must be implemented in subclass") 2223 2224 def reset(self): 2225 self.tearDown() 2226 self.tearDownClass() 2227 self.setUpClass() 2228 self.setUp() 2229 2230 # --- 2231 2232 def test_regular_copy(self): 2233 with self.get_files() as (src, dst): 2234 self.zerocopy_fun(src, dst) 2235 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) 2236 # Make sure the fallback function is not called. 2237 with self.get_files() as (src, dst): 2238 with unittest.mock.patch('shutil.copyfileobj') as m: 2239 shutil.copyfile(TESTFN, TESTFN2) 2240 assert not m.called 2241 2242 def test_same_file(self): 2243 self.addCleanup(self.reset) 2244 with self.get_files() as (src, dst): 2245 with self.assertRaises(Exception): 2246 self.zerocopy_fun(src, src) 2247 # Make sure src file is not corrupted. 2248 self.assertEqual(read_file(TESTFN, binary=True), self.FILEDATA) 2249 2250 def test_non_existent_src(self): 2251 name = tempfile.mktemp() 2252 with self.assertRaises(FileNotFoundError) as cm: 2253 shutil.copyfile(name, "new") 2254 self.assertEqual(cm.exception.filename, name) 2255 2256 def test_empty_file(self): 2257 srcname = TESTFN + 'src' 2258 dstname = TESTFN + 'dst' 2259 self.addCleanup(lambda: support.unlink(srcname)) 2260 self.addCleanup(lambda: support.unlink(dstname)) 2261 with open(srcname, "wb"): 2262 pass 2263 2264 with open(srcname, "rb") as src: 2265 with open(dstname, "wb") as dst: 2266 self.zerocopy_fun(src, dst) 2267 2268 self.assertEqual(read_file(dstname, binary=True), b"") 2269 2270 def test_unhandled_exception(self): 2271 with unittest.mock.patch(self.PATCHPOINT, 2272 side_effect=ZeroDivisionError): 2273 self.assertRaises(ZeroDivisionError, 2274 shutil.copyfile, TESTFN, TESTFN2) 2275 2276 def test_exception_on_first_call(self): 2277 # Emulate a case where the first call to the zero-copy 2278 # function raises an exception in which case the function is 2279 # supposed to give up immediately. 2280 with unittest.mock.patch(self.PATCHPOINT, 2281 side_effect=OSError(errno.EINVAL, "yo")): 2282 with self.get_files() as (src, dst): 2283 with self.assertRaises(_GiveupOnFastCopy): 2284 self.zerocopy_fun(src, dst) 2285 2286 def test_filesystem_full(self): 2287 # Emulate a case where filesystem is full and sendfile() fails 2288 # on first call. 2289 with unittest.mock.patch(self.PATCHPOINT, 2290 side_effect=OSError(errno.ENOSPC, "yo")): 2291 with self.get_files() as (src, dst): 2292 self.assertRaises(OSError, self.zerocopy_fun, src, dst) 2293 2294 2295@unittest.skipIf(not SUPPORTS_SENDFILE, 'os.sendfile() not supported') 2296class TestZeroCopySendfile(_ZeroCopyFileTest, unittest.TestCase): 2297 PATCHPOINT = "os.sendfile" 2298 2299 def zerocopy_fun(self, fsrc, fdst): 2300 return shutil._fastcopy_sendfile(fsrc, fdst) 2301 2302 def test_non_regular_file_src(self): 2303 with io.BytesIO(self.FILEDATA) as src: 2304 with open(TESTFN2, "wb") as dst: 2305 with self.assertRaises(_GiveupOnFastCopy): 2306 self.zerocopy_fun(src, dst) 2307 shutil.copyfileobj(src, dst) 2308 2309 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) 2310 2311 def test_non_regular_file_dst(self): 2312 with open(TESTFN, "rb") as src: 2313 with io.BytesIO() as dst: 2314 with self.assertRaises(_GiveupOnFastCopy): 2315 self.zerocopy_fun(src, dst) 2316 shutil.copyfileobj(src, dst) 2317 dst.seek(0) 2318 self.assertEqual(dst.read(), self.FILEDATA) 2319 2320 def test_exception_on_second_call(self): 2321 def sendfile(*args, **kwargs): 2322 if not flag: 2323 flag.append(None) 2324 return orig_sendfile(*args, **kwargs) 2325 else: 2326 raise OSError(errno.EBADF, "yo") 2327 2328 flag = [] 2329 orig_sendfile = os.sendfile 2330 with unittest.mock.patch('os.sendfile', create=True, 2331 side_effect=sendfile): 2332 with self.get_files() as (src, dst): 2333 with self.assertRaises(OSError) as cm: 2334 shutil._fastcopy_sendfile(src, dst) 2335 assert flag 2336 self.assertEqual(cm.exception.errno, errno.EBADF) 2337 2338 def test_cant_get_size(self): 2339 # Emulate a case where src file size cannot be determined. 2340 # Internally bufsize will be set to a small value and 2341 # sendfile() will be called repeatedly. 2342 with unittest.mock.patch('os.fstat', side_effect=OSError) as m: 2343 with self.get_files() as (src, dst): 2344 shutil._fastcopy_sendfile(src, dst) 2345 assert m.called 2346 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) 2347 2348 def test_small_chunks(self): 2349 # Force internal file size detection to be smaller than the 2350 # actual file size. We want to force sendfile() to be called 2351 # multiple times, also in order to emulate a src fd which gets 2352 # bigger while it is being copied. 2353 mock = unittest.mock.Mock() 2354 mock.st_size = 65536 + 1 2355 with unittest.mock.patch('os.fstat', return_value=mock) as m: 2356 with self.get_files() as (src, dst): 2357 shutil._fastcopy_sendfile(src, dst) 2358 assert m.called 2359 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) 2360 2361 def test_big_chunk(self): 2362 # Force internal file size detection to be +100MB bigger than 2363 # the actual file size. Make sure sendfile() does not rely on 2364 # file size value except for (maybe) a better throughput / 2365 # performance. 2366 mock = unittest.mock.Mock() 2367 mock.st_size = self.FILESIZE + (100 * 1024 * 1024) 2368 with unittest.mock.patch('os.fstat', return_value=mock) as m: 2369 with self.get_files() as (src, dst): 2370 shutil._fastcopy_sendfile(src, dst) 2371 assert m.called 2372 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) 2373 2374 def test_blocksize_arg(self): 2375 with unittest.mock.patch('os.sendfile', 2376 side_effect=ZeroDivisionError) as m: 2377 self.assertRaises(ZeroDivisionError, 2378 shutil.copyfile, TESTFN, TESTFN2) 2379 blocksize = m.call_args[0][3] 2380 # Make sure file size and the block size arg passed to 2381 # sendfile() are the same. 2382 self.assertEqual(blocksize, os.path.getsize(TESTFN)) 2383 # ...unless we're dealing with a small file. 2384 support.unlink(TESTFN2) 2385 write_file(TESTFN2, b"hello", binary=True) 2386 self.addCleanup(support.unlink, TESTFN2 + '3') 2387 self.assertRaises(ZeroDivisionError, 2388 shutil.copyfile, TESTFN2, TESTFN2 + '3') 2389 blocksize = m.call_args[0][3] 2390 self.assertEqual(blocksize, 2 ** 23) 2391 2392 def test_file2file_not_supported(self): 2393 # Emulate a case where sendfile() only support file->socket 2394 # fds. In such a case copyfile() is supposed to skip the 2395 # fast-copy attempt from then on. 2396 assert shutil._USE_CP_SENDFILE 2397 try: 2398 with unittest.mock.patch( 2399 self.PATCHPOINT, 2400 side_effect=OSError(errno.ENOTSOCK, "yo")) as m: 2401 with self.get_files() as (src, dst): 2402 with self.assertRaises(_GiveupOnFastCopy): 2403 shutil._fastcopy_sendfile(src, dst) 2404 assert m.called 2405 assert not shutil._USE_CP_SENDFILE 2406 2407 with unittest.mock.patch(self.PATCHPOINT) as m: 2408 shutil.copyfile(TESTFN, TESTFN2) 2409 assert not m.called 2410 finally: 2411 shutil._USE_CP_SENDFILE = True 2412 2413 2414@unittest.skipIf(not MACOS, 'macOS only') 2415class TestZeroCopyMACOS(_ZeroCopyFileTest, unittest.TestCase): 2416 PATCHPOINT = "posix._fcopyfile" 2417 2418 def zerocopy_fun(self, src, dst): 2419 return shutil._fastcopy_fcopyfile(src, dst, posix._COPYFILE_DATA) 2420 2421 2422class TermsizeTests(unittest.TestCase): 2423 def test_does_not_crash(self): 2424 """Check if get_terminal_size() returns a meaningful value. 2425 2426 There's no easy portable way to actually check the size of the 2427 terminal, so let's check if it returns something sensible instead. 2428 """ 2429 size = shutil.get_terminal_size() 2430 self.assertGreaterEqual(size.columns, 0) 2431 self.assertGreaterEqual(size.lines, 0) 2432 2433 def test_os_environ_first(self): 2434 "Check if environment variables have precedence" 2435 2436 with support.EnvironmentVarGuard() as env: 2437 env['COLUMNS'] = '777' 2438 del env['LINES'] 2439 size = shutil.get_terminal_size() 2440 self.assertEqual(size.columns, 777) 2441 2442 with support.EnvironmentVarGuard() as env: 2443 del env['COLUMNS'] 2444 env['LINES'] = '888' 2445 size = shutil.get_terminal_size() 2446 self.assertEqual(size.lines, 888) 2447 2448 def test_bad_environ(self): 2449 with support.EnvironmentVarGuard() as env: 2450 env['COLUMNS'] = 'xxx' 2451 env['LINES'] = 'yyy' 2452 size = shutil.get_terminal_size() 2453 self.assertGreaterEqual(size.columns, 0) 2454 self.assertGreaterEqual(size.lines, 0) 2455 2456 @unittest.skipUnless(os.isatty(sys.__stdout__.fileno()), "not on tty") 2457 @unittest.skipUnless(hasattr(os, 'get_terminal_size'), 2458 'need os.get_terminal_size()') 2459 def test_stty_match(self): 2460 """Check if stty returns the same results ignoring env 2461 2462 This test will fail if stdin and stdout are connected to 2463 different terminals with different sizes. Nevertheless, such 2464 situations should be pretty rare. 2465 """ 2466 try: 2467 size = subprocess.check_output(['stty', 'size']).decode().split() 2468 except (FileNotFoundError, PermissionError, 2469 subprocess.CalledProcessError): 2470 self.skipTest("stty invocation failed") 2471 expected = (int(size[1]), int(size[0])) # reversed order 2472 2473 with support.EnvironmentVarGuard() as env: 2474 del env['LINES'] 2475 del env['COLUMNS'] 2476 actual = shutil.get_terminal_size() 2477 2478 self.assertEqual(expected, actual) 2479 2480 def test_fallback(self): 2481 with support.EnvironmentVarGuard() as env: 2482 del env['LINES'] 2483 del env['COLUMNS'] 2484 2485 # sys.__stdout__ has no fileno() 2486 with support.swap_attr(sys, '__stdout__', None): 2487 size = shutil.get_terminal_size(fallback=(10, 20)) 2488 self.assertEqual(size.columns, 10) 2489 self.assertEqual(size.lines, 20) 2490 2491 # sys.__stdout__ is not a terminal on Unix 2492 # or fileno() not in (0, 1, 2) on Windows 2493 with open(os.devnull, 'w') as f, \ 2494 support.swap_attr(sys, '__stdout__', f): 2495 size = shutil.get_terminal_size(fallback=(30, 40)) 2496 self.assertEqual(size.columns, 30) 2497 self.assertEqual(size.lines, 40) 2498 2499 2500class PublicAPITests(unittest.TestCase): 2501 """Ensures that the correct values are exposed in the public API.""" 2502 2503 def test_module_all_attribute(self): 2504 self.assertTrue(hasattr(shutil, '__all__')) 2505 target_api = ['copyfileobj', 'copyfile', 'copymode', 'copystat', 2506 'copy', 'copy2', 'copytree', 'move', 'rmtree', 'Error', 2507 'SpecialFileError', 'ExecError', 'make_archive', 2508 'get_archive_formats', 'register_archive_format', 2509 'unregister_archive_format', 'get_unpack_formats', 2510 'register_unpack_format', 'unregister_unpack_format', 2511 'unpack_archive', 'ignore_patterns', 'chown', 'which', 2512 'get_terminal_size', 'SameFileError'] 2513 if hasattr(os, 'statvfs') or os.name == 'nt': 2514 target_api.append('disk_usage') 2515 self.assertEqual(set(shutil.__all__), set(target_api)) 2516 2517 2518if __name__ == '__main__': 2519 unittest.main() 2520