1# Copyright (C) 2003 Python Software Foundation 2 3import unittest 4import unittest.mock 5import shutil 6import tempfile 7import sys 8import stat 9import os 10import os.path 11import errno 12import functools 13import pathlib 14import subprocess 15import random 16import string 17import contextlib 18import io 19from shutil import (make_archive, 20 register_archive_format, unregister_archive_format, 21 get_archive_formats, Error, unpack_archive, 22 register_unpack_format, RegistryError, 23 unregister_unpack_format, get_unpack_formats, 24 SameFileError, _GiveupOnFastCopy) 25import tarfile 26import zipfile 27try: 28 import posix 29except ImportError: 30 posix = None 31 32from test import support 33from test.support import TESTFN, FakePath 34 35TESTFN2 = TESTFN + "2" 36MACOS = sys.platform.startswith("darwin") 37AIX = sys.platform[:3] == 'aix' 38try: 39 import grp 40 import pwd 41 UID_GID_SUPPORT = True 42except ImportError: 43 UID_GID_SUPPORT = False 44 45try: 46 import _winapi 47except ImportError: 48 _winapi = None 49 50def _fake_rename(*args, **kwargs): 51 # Pretend the destination path is on a different filesystem. 52 raise OSError(getattr(errno, 'EXDEV', 18), "Invalid cross-device link") 53 54def mock_rename(func): 55 @functools.wraps(func) 56 def wrap(*args, **kwargs): 57 try: 58 builtin_rename = os.rename 59 os.rename = _fake_rename 60 return func(*args, **kwargs) 61 finally: 62 os.rename = builtin_rename 63 return wrap 64 65def write_file(path, content, binary=False): 66 """Write *content* to a file located at *path*. 67 68 If *path* is a tuple instead of a string, os.path.join will be used to 69 make a path. If *binary* is true, the file will be opened in binary 70 mode. 71 """ 72 if isinstance(path, tuple): 73 path = os.path.join(*path) 74 with open(path, 'wb' if binary else 'w') as fp: 75 fp.write(content) 76 77def write_test_file(path, size): 78 """Create a test file with an arbitrary size and random text content.""" 79 def chunks(total, step): 80 assert total >= step 81 while total > step: 82 yield step 83 total -= step 84 if total: 85 yield total 86 87 bufsize = min(size, 8192) 88 chunk = b"".join([random.choice(string.ascii_letters).encode() 89 for i in range(bufsize)]) 90 with open(path, 'wb') as f: 91 for csize in chunks(size, bufsize): 92 f.write(chunk) 93 assert os.path.getsize(path) == size 94 95def read_file(path, binary=False): 96 """Return contents from a file located at *path*. 97 98 If *path* is a tuple instead of a string, os.path.join will be used to 99 make a path. If *binary* is true, the file will be opened in binary 100 mode. 101 """ 102 if isinstance(path, tuple): 103 path = os.path.join(*path) 104 with open(path, 'rb' if binary else 'r') as fp: 105 return fp.read() 106 107def rlistdir(path): 108 res = [] 109 for name in sorted(os.listdir(path)): 110 p = os.path.join(path, name) 111 if os.path.isdir(p) and not os.path.islink(p): 112 res.append(name + '/') 113 for n in rlistdir(p): 114 res.append(name + '/' + n) 115 else: 116 res.append(name) 117 return res 118 119def supports_file2file_sendfile(): 120 # ...apparently Linux and Solaris are the only ones 121 if not hasattr(os, "sendfile"): 122 return False 123 srcname = None 124 dstname = None 125 try: 126 with tempfile.NamedTemporaryFile("wb", delete=False) as f: 127 srcname = f.name 128 f.write(b"0123456789") 129 130 with open(srcname, "rb") as src: 131 with tempfile.NamedTemporaryFile("wb", delete=False) as dst: 132 dstname = dst.name 133 infd = src.fileno() 134 outfd = dst.fileno() 135 try: 136 os.sendfile(outfd, infd, 0, 2) 137 except OSError: 138 return False 139 else: 140 return True 141 finally: 142 if srcname is not None: 143 support.unlink(srcname) 144 if dstname is not None: 145 support.unlink(dstname) 146 147 148SUPPORTS_SENDFILE = supports_file2file_sendfile() 149 150# AIX 32-bit mode, by default, lacks enough memory for the xz/lzma compiler test 151# The AIX command 'dump -o program' gives XCOFF header information 152# The second word of the last line in the maxdata value 153# when 32-bit maxdata must be greater than 0x1000000 for the xz test to succeed 154def _maxdataOK(): 155 if AIX and sys.maxsize == 2147483647: 156 hdrs=subprocess.getoutput("/usr/bin/dump -o %s" % sys.executable) 157 maxdata=hdrs.split("\n")[-1].split()[1] 158 return int(maxdata,16) >= 0x20000000 159 else: 160 return True 161 162class TestShutil(unittest.TestCase): 163 164 def setUp(self): 165 super(TestShutil, self).setUp() 166 self.tempdirs = [] 167 168 def tearDown(self): 169 super(TestShutil, self).tearDown() 170 while self.tempdirs: 171 d = self.tempdirs.pop() 172 shutil.rmtree(d, os.name in ('nt', 'cygwin')) 173 174 175 def mkdtemp(self): 176 """Create a temporary directory that will be cleaned up. 177 178 Returns the path of the directory. 179 """ 180 basedir = None 181 if sys.platform == "win32": 182 basedir = os.path.realpath(os.getcwd()) 183 d = tempfile.mkdtemp(dir=basedir) 184 self.tempdirs.append(d) 185 return d 186 187 def test_rmtree_works_on_bytes(self): 188 tmp = self.mkdtemp() 189 victim = os.path.join(tmp, 'killme') 190 os.mkdir(victim) 191 write_file(os.path.join(victim, 'somefile'), 'foo') 192 victim = os.fsencode(victim) 193 self.assertIsInstance(victim, bytes) 194 shutil.rmtree(victim) 195 196 @support.skip_unless_symlink 197 def test_rmtree_fails_on_symlink(self): 198 tmp = self.mkdtemp() 199 dir_ = os.path.join(tmp, 'dir') 200 os.mkdir(dir_) 201 link = os.path.join(tmp, 'link') 202 os.symlink(dir_, link) 203 self.assertRaises(OSError, shutil.rmtree, link) 204 self.assertTrue(os.path.exists(dir_)) 205 self.assertTrue(os.path.lexists(link)) 206 errors = [] 207 def onerror(*args): 208 errors.append(args) 209 shutil.rmtree(link, onerror=onerror) 210 self.assertEqual(len(errors), 1) 211 self.assertIs(errors[0][0], os.path.islink) 212 self.assertEqual(errors[0][1], link) 213 self.assertIsInstance(errors[0][2][1], OSError) 214 215 @support.skip_unless_symlink 216 def test_rmtree_works_on_symlinks(self): 217 tmp = self.mkdtemp() 218 dir1 = os.path.join(tmp, 'dir1') 219 dir2 = os.path.join(dir1, 'dir2') 220 dir3 = os.path.join(tmp, 'dir3') 221 for d in dir1, dir2, dir3: 222 os.mkdir(d) 223 file1 = os.path.join(tmp, 'file1') 224 write_file(file1, 'foo') 225 link1 = os.path.join(dir1, 'link1') 226 os.symlink(dir2, link1) 227 link2 = os.path.join(dir1, 'link2') 228 os.symlink(dir3, link2) 229 link3 = os.path.join(dir1, 'link3') 230 os.symlink(file1, link3) 231 # make sure symlinks are removed but not followed 232 shutil.rmtree(dir1) 233 self.assertFalse(os.path.exists(dir1)) 234 self.assertTrue(os.path.exists(dir3)) 235 self.assertTrue(os.path.exists(file1)) 236 237 @unittest.skipUnless(_winapi, 'only relevant on Windows') 238 def test_rmtree_fails_on_junctions(self): 239 tmp = self.mkdtemp() 240 dir_ = os.path.join(tmp, 'dir') 241 os.mkdir(dir_) 242 link = os.path.join(tmp, 'link') 243 _winapi.CreateJunction(dir_, link) 244 self.assertRaises(OSError, shutil.rmtree, link) 245 self.assertTrue(os.path.exists(dir_)) 246 self.assertTrue(os.path.lexists(link)) 247 errors = [] 248 def onerror(*args): 249 errors.append(args) 250 shutil.rmtree(link, onerror=onerror) 251 self.assertEqual(len(errors), 1) 252 self.assertIs(errors[0][0], os.path.islink) 253 self.assertEqual(errors[0][1], link) 254 self.assertIsInstance(errors[0][2][1], OSError) 255 256 @unittest.skipUnless(_winapi, 'only relevant on Windows') 257 def test_rmtree_works_on_junctions(self): 258 tmp = self.mkdtemp() 259 dir1 = os.path.join(tmp, 'dir1') 260 dir2 = os.path.join(dir1, 'dir2') 261 dir3 = os.path.join(tmp, 'dir3') 262 for d in dir1, dir2, dir3: 263 os.mkdir(d) 264 file1 = os.path.join(tmp, 'file1') 265 write_file(file1, 'foo') 266 link1 = os.path.join(dir1, 'link1') 267 _winapi.CreateJunction(dir2, link1) 268 link2 = os.path.join(dir1, 'link2') 269 _winapi.CreateJunction(dir3, link2) 270 link3 = os.path.join(dir1, 'link3') 271 _winapi.CreateJunction(file1, link3) 272 # make sure junctions are removed but not followed 273 shutil.rmtree(dir1) 274 self.assertFalse(os.path.exists(dir1)) 275 self.assertTrue(os.path.exists(dir3)) 276 self.assertTrue(os.path.exists(file1)) 277 278 def test_rmtree_errors(self): 279 # filename is guaranteed not to exist 280 filename = tempfile.mktemp() 281 self.assertRaises(FileNotFoundError, shutil.rmtree, filename) 282 # test that ignore_errors option is honored 283 shutil.rmtree(filename, ignore_errors=True) 284 285 # existing file 286 tmpdir = self.mkdtemp() 287 write_file((tmpdir, "tstfile"), "") 288 filename = os.path.join(tmpdir, "tstfile") 289 with self.assertRaises(NotADirectoryError) as cm: 290 shutil.rmtree(filename) 291 # The reason for this rather odd construct is that Windows sprinkles 292 # a \*.* at the end of file names. But only sometimes on some buildbots 293 possible_args = [filename, os.path.join(filename, '*.*')] 294 self.assertIn(cm.exception.filename, possible_args) 295 self.assertTrue(os.path.exists(filename)) 296 # test that ignore_errors option is honored 297 shutil.rmtree(filename, ignore_errors=True) 298 self.assertTrue(os.path.exists(filename)) 299 errors = [] 300 def onerror(*args): 301 errors.append(args) 302 shutil.rmtree(filename, onerror=onerror) 303 self.assertEqual(len(errors), 2) 304 self.assertIs(errors[0][0], os.scandir) 305 self.assertEqual(errors[0][1], filename) 306 self.assertIsInstance(errors[0][2][1], NotADirectoryError) 307 self.assertIn(errors[0][2][1].filename, possible_args) 308 self.assertIs(errors[1][0], os.rmdir) 309 self.assertEqual(errors[1][1], filename) 310 self.assertIsInstance(errors[1][2][1], NotADirectoryError) 311 self.assertIn(errors[1][2][1].filename, possible_args) 312 313 314 @unittest.skipIf(sys.platform[:6] == 'cygwin', 315 "This test can't be run on Cygwin (issue #1071513).") 316 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0, 317 "This test can't be run reliably as root (issue #1076467).") 318 def test_on_error(self): 319 self.errorState = 0 320 os.mkdir(TESTFN) 321 self.addCleanup(shutil.rmtree, TESTFN) 322 323 self.child_file_path = os.path.join(TESTFN, 'a') 324 self.child_dir_path = os.path.join(TESTFN, 'b') 325 support.create_empty_file(self.child_file_path) 326 os.mkdir(self.child_dir_path) 327 old_dir_mode = os.stat(TESTFN).st_mode 328 old_child_file_mode = os.stat(self.child_file_path).st_mode 329 old_child_dir_mode = os.stat(self.child_dir_path).st_mode 330 # Make unwritable. 331 new_mode = stat.S_IREAD|stat.S_IEXEC 332 os.chmod(self.child_file_path, new_mode) 333 os.chmod(self.child_dir_path, new_mode) 334 os.chmod(TESTFN, new_mode) 335 336 self.addCleanup(os.chmod, TESTFN, old_dir_mode) 337 self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode) 338 self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode) 339 340 shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror) 341 # Test whether onerror has actually been called. 342 self.assertEqual(self.errorState, 3, 343 "Expected call to onerror function did not happen.") 344 345 def check_args_to_onerror(self, func, arg, exc): 346 # test_rmtree_errors deliberately runs rmtree 347 # on a directory that is chmod 500, which will fail. 348 # This function is run when shutil.rmtree fails. 349 # 99.9% of the time it initially fails to remove 350 # a file in the directory, so the first time through 351 # func is os.remove. 352 # However, some Linux machines running ZFS on 353 # FUSE experienced a failure earlier in the process 354 # at os.listdir. The first failure may legally 355 # be either. 356 if self.errorState < 2: 357 if func is os.unlink: 358 self.assertEqual(arg, self.child_file_path) 359 elif func is os.rmdir: 360 self.assertEqual(arg, self.child_dir_path) 361 else: 362 self.assertIs(func, os.listdir) 363 self.assertIn(arg, [TESTFN, self.child_dir_path]) 364 self.assertTrue(issubclass(exc[0], OSError)) 365 self.errorState += 1 366 else: 367 self.assertEqual(func, os.rmdir) 368 self.assertEqual(arg, TESTFN) 369 self.assertTrue(issubclass(exc[0], OSError)) 370 self.errorState = 3 371 372 def test_rmtree_does_not_choke_on_failing_lstat(self): 373 try: 374 orig_lstat = os.lstat 375 def raiser(fn, *args, **kwargs): 376 if fn != TESTFN: 377 raise OSError() 378 else: 379 return orig_lstat(fn) 380 os.lstat = raiser 381 382 os.mkdir(TESTFN) 383 write_file((TESTFN, 'foo'), 'foo') 384 shutil.rmtree(TESTFN) 385 finally: 386 os.lstat = orig_lstat 387 388 @support.skip_unless_symlink 389 def test_copymode_follow_symlinks(self): 390 tmp_dir = self.mkdtemp() 391 src = os.path.join(tmp_dir, 'foo') 392 dst = os.path.join(tmp_dir, 'bar') 393 src_link = os.path.join(tmp_dir, 'baz') 394 dst_link = os.path.join(tmp_dir, 'quux') 395 write_file(src, 'foo') 396 write_file(dst, 'foo') 397 os.symlink(src, src_link) 398 os.symlink(dst, dst_link) 399 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG) 400 # file to file 401 os.chmod(dst, stat.S_IRWXO) 402 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 403 shutil.copymode(src, dst) 404 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 405 # On Windows, os.chmod does not follow symlinks (issue #15411) 406 if os.name != 'nt': 407 # follow src link 408 os.chmod(dst, stat.S_IRWXO) 409 shutil.copymode(src_link, dst) 410 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 411 # follow dst link 412 os.chmod(dst, stat.S_IRWXO) 413 shutil.copymode(src, dst_link) 414 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 415 # follow both links 416 os.chmod(dst, stat.S_IRWXO) 417 shutil.copymode(src_link, dst_link) 418 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 419 420 @unittest.skipUnless(hasattr(os, 'lchmod'), 'requires os.lchmod') 421 @support.skip_unless_symlink 422 def test_copymode_symlink_to_symlink(self): 423 tmp_dir = self.mkdtemp() 424 src = os.path.join(tmp_dir, 'foo') 425 dst = os.path.join(tmp_dir, 'bar') 426 src_link = os.path.join(tmp_dir, 'baz') 427 dst_link = os.path.join(tmp_dir, 'quux') 428 write_file(src, 'foo') 429 write_file(dst, 'foo') 430 os.symlink(src, src_link) 431 os.symlink(dst, dst_link) 432 os.chmod(src, stat.S_IRWXU|stat.S_IRWXG) 433 os.chmod(dst, stat.S_IRWXU) 434 os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG) 435 # link to link 436 os.lchmod(dst_link, stat.S_IRWXO) 437 shutil.copymode(src_link, dst_link, follow_symlinks=False) 438 self.assertEqual(os.lstat(src_link).st_mode, 439 os.lstat(dst_link).st_mode) 440 self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 441 # src link - use chmod 442 os.lchmod(dst_link, stat.S_IRWXO) 443 shutil.copymode(src_link, dst, follow_symlinks=False) 444 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 445 # dst link - use chmod 446 os.lchmod(dst_link, stat.S_IRWXO) 447 shutil.copymode(src, dst_link, follow_symlinks=False) 448 self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 449 450 @unittest.skipIf(hasattr(os, 'lchmod'), 'requires os.lchmod to be missing') 451 @support.skip_unless_symlink 452 def test_copymode_symlink_to_symlink_wo_lchmod(self): 453 tmp_dir = self.mkdtemp() 454 src = os.path.join(tmp_dir, 'foo') 455 dst = os.path.join(tmp_dir, 'bar') 456 src_link = os.path.join(tmp_dir, 'baz') 457 dst_link = os.path.join(tmp_dir, 'quux') 458 write_file(src, 'foo') 459 write_file(dst, 'foo') 460 os.symlink(src, src_link) 461 os.symlink(dst, dst_link) 462 shutil.copymode(src_link, dst_link, follow_symlinks=False) # silent fail 463 464 @support.skip_unless_symlink 465 def test_copystat_symlinks(self): 466 tmp_dir = self.mkdtemp() 467 src = os.path.join(tmp_dir, 'foo') 468 dst = os.path.join(tmp_dir, 'bar') 469 src_link = os.path.join(tmp_dir, 'baz') 470 dst_link = os.path.join(tmp_dir, 'qux') 471 write_file(src, 'foo') 472 src_stat = os.stat(src) 473 os.utime(src, (src_stat.st_atime, 474 src_stat.st_mtime - 42.0)) # ensure different mtimes 475 write_file(dst, 'bar') 476 self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime) 477 os.symlink(src, src_link) 478 os.symlink(dst, dst_link) 479 if hasattr(os, 'lchmod'): 480 os.lchmod(src_link, stat.S_IRWXO) 481 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'): 482 os.lchflags(src_link, stat.UF_NODUMP) 483 src_link_stat = os.lstat(src_link) 484 # follow 485 if hasattr(os, 'lchmod'): 486 shutil.copystat(src_link, dst_link, follow_symlinks=True) 487 self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode) 488 # don't follow 489 shutil.copystat(src_link, dst_link, follow_symlinks=False) 490 dst_link_stat = os.lstat(dst_link) 491 if os.utime in os.supports_follow_symlinks: 492 for attr in 'st_atime', 'st_mtime': 493 # The modification times may be truncated in the new file. 494 self.assertLessEqual(getattr(src_link_stat, attr), 495 getattr(dst_link_stat, attr) + 1) 496 if hasattr(os, 'lchmod'): 497 self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode) 498 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'): 499 self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags) 500 # tell to follow but dst is not a link 501 shutil.copystat(src_link, dst, follow_symlinks=False) 502 self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) < 503 00000.1) 504 505 @unittest.skipUnless(hasattr(os, 'chflags') and 506 hasattr(errno, 'EOPNOTSUPP') and 507 hasattr(errno, 'ENOTSUP'), 508 "requires os.chflags, EOPNOTSUPP & ENOTSUP") 509 def test_copystat_handles_harmless_chflags_errors(self): 510 tmpdir = self.mkdtemp() 511 file1 = os.path.join(tmpdir, 'file1') 512 file2 = os.path.join(tmpdir, 'file2') 513 write_file(file1, 'xxx') 514 write_file(file2, 'xxx') 515 516 def make_chflags_raiser(err): 517 ex = OSError() 518 519 def _chflags_raiser(path, flags, *, follow_symlinks=True): 520 ex.errno = err 521 raise ex 522 return _chflags_raiser 523 old_chflags = os.chflags 524 try: 525 for err in errno.EOPNOTSUPP, errno.ENOTSUP: 526 os.chflags = make_chflags_raiser(err) 527 shutil.copystat(file1, file2) 528 # assert others errors break it 529 os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP) 530 self.assertRaises(OSError, shutil.copystat, file1, file2) 531 finally: 532 os.chflags = old_chflags 533 534 @support.skip_unless_xattr 535 def test_copyxattr(self): 536 tmp_dir = self.mkdtemp() 537 src = os.path.join(tmp_dir, 'foo') 538 write_file(src, 'foo') 539 dst = os.path.join(tmp_dir, 'bar') 540 write_file(dst, 'bar') 541 542 # no xattr == no problem 543 shutil._copyxattr(src, dst) 544 # common case 545 os.setxattr(src, 'user.foo', b'42') 546 os.setxattr(src, 'user.bar', b'43') 547 shutil._copyxattr(src, dst) 548 self.assertEqual(sorted(os.listxattr(src)), sorted(os.listxattr(dst))) 549 self.assertEqual( 550 os.getxattr(src, 'user.foo'), 551 os.getxattr(dst, 'user.foo')) 552 # check errors don't affect other attrs 553 os.remove(dst) 554 write_file(dst, 'bar') 555 os_error = OSError(errno.EPERM, 'EPERM') 556 557 def _raise_on_user_foo(fname, attr, val, **kwargs): 558 if attr == 'user.foo': 559 raise os_error 560 else: 561 orig_setxattr(fname, attr, val, **kwargs) 562 try: 563 orig_setxattr = os.setxattr 564 os.setxattr = _raise_on_user_foo 565 shutil._copyxattr(src, dst) 566 self.assertIn('user.bar', os.listxattr(dst)) 567 finally: 568 os.setxattr = orig_setxattr 569 # the source filesystem not supporting xattrs should be ok, too. 570 def _raise_on_src(fname, *, follow_symlinks=True): 571 if fname == src: 572 raise OSError(errno.ENOTSUP, 'Operation not supported') 573 return orig_listxattr(fname, follow_symlinks=follow_symlinks) 574 try: 575 orig_listxattr = os.listxattr 576 os.listxattr = _raise_on_src 577 shutil._copyxattr(src, dst) 578 finally: 579 os.listxattr = orig_listxattr 580 581 # test that shutil.copystat copies xattrs 582 src = os.path.join(tmp_dir, 'the_original') 583 srcro = os.path.join(tmp_dir, 'the_original_ro') 584 write_file(src, src) 585 write_file(srcro, srcro) 586 os.setxattr(src, 'user.the_value', b'fiddly') 587 os.setxattr(srcro, 'user.the_value', b'fiddly') 588 os.chmod(srcro, 0o444) 589 dst = os.path.join(tmp_dir, 'the_copy') 590 dstro = os.path.join(tmp_dir, 'the_copy_ro') 591 write_file(dst, dst) 592 write_file(dstro, dstro) 593 shutil.copystat(src, dst) 594 shutil.copystat(srcro, dstro) 595 self.assertEqual(os.getxattr(dst, 'user.the_value'), b'fiddly') 596 self.assertEqual(os.getxattr(dstro, 'user.the_value'), b'fiddly') 597 598 @support.skip_unless_symlink 599 @support.skip_unless_xattr 600 @unittest.skipUnless(hasattr(os, 'geteuid') and os.geteuid() == 0, 601 'root privileges required') 602 def test_copyxattr_symlinks(self): 603 # On Linux, it's only possible to access non-user xattr for symlinks; 604 # which in turn require root privileges. This test should be expanded 605 # as soon as other platforms gain support for extended attributes. 606 tmp_dir = self.mkdtemp() 607 src = os.path.join(tmp_dir, 'foo') 608 src_link = os.path.join(tmp_dir, 'baz') 609 write_file(src, 'foo') 610 os.symlink(src, src_link) 611 os.setxattr(src, 'trusted.foo', b'42') 612 os.setxattr(src_link, 'trusted.foo', b'43', follow_symlinks=False) 613 dst = os.path.join(tmp_dir, 'bar') 614 dst_link = os.path.join(tmp_dir, 'qux') 615 write_file(dst, 'bar') 616 os.symlink(dst, dst_link) 617 shutil._copyxattr(src_link, dst_link, follow_symlinks=False) 618 self.assertEqual(os.getxattr(dst_link, 'trusted.foo', follow_symlinks=False), b'43') 619 self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo') 620 shutil._copyxattr(src_link, dst, follow_symlinks=False) 621 self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43') 622 623 @support.skip_unless_symlink 624 def test_copy_symlinks(self): 625 tmp_dir = self.mkdtemp() 626 src = os.path.join(tmp_dir, 'foo') 627 dst = os.path.join(tmp_dir, 'bar') 628 src_link = os.path.join(tmp_dir, 'baz') 629 write_file(src, 'foo') 630 os.symlink(src, src_link) 631 if hasattr(os, 'lchmod'): 632 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO) 633 # don't follow 634 shutil.copy(src_link, dst, follow_symlinks=True) 635 self.assertFalse(os.path.islink(dst)) 636 self.assertEqual(read_file(src), read_file(dst)) 637 os.remove(dst) 638 # follow 639 shutil.copy(src_link, dst, follow_symlinks=False) 640 self.assertTrue(os.path.islink(dst)) 641 self.assertEqual(os.readlink(dst), os.readlink(src_link)) 642 if hasattr(os, 'lchmod'): 643 self.assertEqual(os.lstat(src_link).st_mode, 644 os.lstat(dst).st_mode) 645 646 @support.skip_unless_symlink 647 def test_copy2_symlinks(self): 648 tmp_dir = self.mkdtemp() 649 src = os.path.join(tmp_dir, 'foo') 650 dst = os.path.join(tmp_dir, 'bar') 651 src_link = os.path.join(tmp_dir, 'baz') 652 write_file(src, 'foo') 653 os.symlink(src, src_link) 654 if hasattr(os, 'lchmod'): 655 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO) 656 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'): 657 os.lchflags(src_link, stat.UF_NODUMP) 658 src_stat = os.stat(src) 659 src_link_stat = os.lstat(src_link) 660 # follow 661 shutil.copy2(src_link, dst, follow_symlinks=True) 662 self.assertFalse(os.path.islink(dst)) 663 self.assertEqual(read_file(src), read_file(dst)) 664 os.remove(dst) 665 # don't follow 666 shutil.copy2(src_link, dst, follow_symlinks=False) 667 self.assertTrue(os.path.islink(dst)) 668 self.assertEqual(os.readlink(dst), os.readlink(src_link)) 669 dst_stat = os.lstat(dst) 670 if os.utime in os.supports_follow_symlinks: 671 for attr in 'st_atime', 'st_mtime': 672 # The modification times may be truncated in the new file. 673 self.assertLessEqual(getattr(src_link_stat, attr), 674 getattr(dst_stat, attr) + 1) 675 if hasattr(os, 'lchmod'): 676 self.assertEqual(src_link_stat.st_mode, dst_stat.st_mode) 677 self.assertNotEqual(src_stat.st_mode, dst_stat.st_mode) 678 if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'): 679 self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags) 680 681 @support.skip_unless_xattr 682 def test_copy2_xattr(self): 683 tmp_dir = self.mkdtemp() 684 src = os.path.join(tmp_dir, 'foo') 685 dst = os.path.join(tmp_dir, 'bar') 686 write_file(src, 'foo') 687 os.setxattr(src, 'user.foo', b'42') 688 shutil.copy2(src, dst) 689 self.assertEqual( 690 os.getxattr(src, 'user.foo'), 691 os.getxattr(dst, 'user.foo')) 692 os.remove(dst) 693 694 @support.skip_unless_symlink 695 def test_copyfile_symlinks(self): 696 tmp_dir = self.mkdtemp() 697 src = os.path.join(tmp_dir, 'src') 698 dst = os.path.join(tmp_dir, 'dst') 699 dst_link = os.path.join(tmp_dir, 'dst_link') 700 link = os.path.join(tmp_dir, 'link') 701 write_file(src, 'foo') 702 os.symlink(src, link) 703 # don't follow 704 shutil.copyfile(link, dst_link, follow_symlinks=False) 705 self.assertTrue(os.path.islink(dst_link)) 706 self.assertEqual(os.readlink(link), os.readlink(dst_link)) 707 # follow 708 shutil.copyfile(link, dst) 709 self.assertFalse(os.path.islink(dst)) 710 711 def test_rmtree_uses_safe_fd_version_if_available(self): 712 _use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <= 713 os.supports_dir_fd and 714 os.listdir in os.supports_fd and 715 os.stat in os.supports_follow_symlinks) 716 if _use_fd_functions: 717 self.assertTrue(shutil._use_fd_functions) 718 self.assertTrue(shutil.rmtree.avoids_symlink_attacks) 719 tmp_dir = self.mkdtemp() 720 d = os.path.join(tmp_dir, 'a') 721 os.mkdir(d) 722 try: 723 real_rmtree = shutil._rmtree_safe_fd 724 class Called(Exception): pass 725 def _raiser(*args, **kwargs): 726 raise Called 727 shutil._rmtree_safe_fd = _raiser 728 self.assertRaises(Called, shutil.rmtree, d) 729 finally: 730 shutil._rmtree_safe_fd = real_rmtree 731 else: 732 self.assertFalse(shutil._use_fd_functions) 733 self.assertFalse(shutil.rmtree.avoids_symlink_attacks) 734 735 def test_rmtree_dont_delete_file(self): 736 # When called on a file instead of a directory, don't delete it. 737 handle, path = tempfile.mkstemp() 738 os.close(handle) 739 self.assertRaises(NotADirectoryError, shutil.rmtree, path) 740 os.remove(path) 741 742 def test_copytree_simple(self): 743 src_dir = tempfile.mkdtemp() 744 dst_dir = os.path.join(tempfile.mkdtemp(), 'destination') 745 self.addCleanup(shutil.rmtree, src_dir) 746 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir)) 747 write_file((src_dir, 'test.txt'), '123') 748 os.mkdir(os.path.join(src_dir, 'test_dir')) 749 write_file((src_dir, 'test_dir', 'test.txt'), '456') 750 751 shutil.copytree(src_dir, dst_dir) 752 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt'))) 753 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir'))) 754 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir', 755 'test.txt'))) 756 actual = read_file((dst_dir, 'test.txt')) 757 self.assertEqual(actual, '123') 758 actual = read_file((dst_dir, 'test_dir', 'test.txt')) 759 self.assertEqual(actual, '456') 760 761 def test_copytree_dirs_exist_ok(self): 762 src_dir = tempfile.mkdtemp() 763 dst_dir = tempfile.mkdtemp() 764 self.addCleanup(shutil.rmtree, src_dir) 765 self.addCleanup(shutil.rmtree, dst_dir) 766 767 write_file((src_dir, 'nonexisting.txt'), '123') 768 os.mkdir(os.path.join(src_dir, 'existing_dir')) 769 os.mkdir(os.path.join(dst_dir, 'existing_dir')) 770 write_file((dst_dir, 'existing_dir', 'existing.txt'), 'will be replaced') 771 write_file((src_dir, 'existing_dir', 'existing.txt'), 'has been replaced') 772 773 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=True) 774 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'nonexisting.txt'))) 775 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'existing_dir'))) 776 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'existing_dir', 777 'existing.txt'))) 778 actual = read_file((dst_dir, 'nonexisting.txt')) 779 self.assertEqual(actual, '123') 780 actual = read_file((dst_dir, 'existing_dir', 'existing.txt')) 781 self.assertEqual(actual, 'has been replaced') 782 783 with self.assertRaises(FileExistsError): 784 shutil.copytree(src_dir, dst_dir, dirs_exist_ok=False) 785 786 @support.skip_unless_symlink 787 def test_copytree_symlinks(self): 788 tmp_dir = self.mkdtemp() 789 src_dir = os.path.join(tmp_dir, 'src') 790 dst_dir = os.path.join(tmp_dir, 'dst') 791 sub_dir = os.path.join(src_dir, 'sub') 792 os.mkdir(src_dir) 793 os.mkdir(sub_dir) 794 write_file((src_dir, 'file.txt'), 'foo') 795 src_link = os.path.join(sub_dir, 'link') 796 dst_link = os.path.join(dst_dir, 'sub/link') 797 os.symlink(os.path.join(src_dir, 'file.txt'), 798 src_link) 799 if hasattr(os, 'lchmod'): 800 os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO) 801 if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'): 802 os.lchflags(src_link, stat.UF_NODUMP) 803 src_stat = os.lstat(src_link) 804 shutil.copytree(src_dir, dst_dir, symlinks=True) 805 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'sub', 'link'))) 806 actual = os.readlink(os.path.join(dst_dir, 'sub', 'link')) 807 # Bad practice to blindly strip the prefix as it may be required to 808 # correctly refer to the file, but we're only comparing paths here. 809 if os.name == 'nt' and actual.startswith('\\\\?\\'): 810 actual = actual[4:] 811 self.assertEqual(actual, os.path.join(src_dir, 'file.txt')) 812 dst_stat = os.lstat(dst_link) 813 if hasattr(os, 'lchmod'): 814 self.assertEqual(dst_stat.st_mode, src_stat.st_mode) 815 if hasattr(os, 'lchflags'): 816 self.assertEqual(dst_stat.st_flags, src_stat.st_flags) 817 818 def test_copytree_with_exclude(self): 819 # creating data 820 join = os.path.join 821 exists = os.path.exists 822 src_dir = tempfile.mkdtemp() 823 try: 824 dst_dir = join(tempfile.mkdtemp(), 'destination') 825 write_file((src_dir, 'test.txt'), '123') 826 write_file((src_dir, 'test.tmp'), '123') 827 os.mkdir(join(src_dir, 'test_dir')) 828 write_file((src_dir, 'test_dir', 'test.txt'), '456') 829 os.mkdir(join(src_dir, 'test_dir2')) 830 write_file((src_dir, 'test_dir2', 'test.txt'), '456') 831 os.mkdir(join(src_dir, 'test_dir2', 'subdir')) 832 os.mkdir(join(src_dir, 'test_dir2', 'subdir2')) 833 write_file((src_dir, 'test_dir2', 'subdir', 'test.txt'), '456') 834 write_file((src_dir, 'test_dir2', 'subdir2', 'test.py'), '456') 835 836 # testing glob-like patterns 837 try: 838 patterns = shutil.ignore_patterns('*.tmp', 'test_dir2') 839 shutil.copytree(src_dir, dst_dir, ignore=patterns) 840 # checking the result: some elements should not be copied 841 self.assertTrue(exists(join(dst_dir, 'test.txt'))) 842 self.assertFalse(exists(join(dst_dir, 'test.tmp'))) 843 self.assertFalse(exists(join(dst_dir, 'test_dir2'))) 844 finally: 845 shutil.rmtree(dst_dir) 846 try: 847 patterns = shutil.ignore_patterns('*.tmp', 'subdir*') 848 shutil.copytree(src_dir, dst_dir, ignore=patterns) 849 # checking the result: some elements should not be copied 850 self.assertFalse(exists(join(dst_dir, 'test.tmp'))) 851 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2'))) 852 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir'))) 853 finally: 854 shutil.rmtree(dst_dir) 855 856 # testing callable-style 857 try: 858 def _filter(src, names): 859 res = [] 860 for name in names: 861 path = os.path.join(src, name) 862 863 if (os.path.isdir(path) and 864 path.split()[-1] == 'subdir'): 865 res.append(name) 866 elif os.path.splitext(path)[-1] in ('.py'): 867 res.append(name) 868 return res 869 870 shutil.copytree(src_dir, dst_dir, ignore=_filter) 871 872 # checking the result: some elements should not be copied 873 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2', 874 'test.py'))) 875 self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir'))) 876 877 finally: 878 shutil.rmtree(dst_dir) 879 finally: 880 shutil.rmtree(src_dir) 881 shutil.rmtree(os.path.dirname(dst_dir)) 882 883 def test_copytree_arg_types_of_ignore(self): 884 join = os.path.join 885 exists = os.path.exists 886 887 tmp_dir = self.mkdtemp() 888 src_dir = join(tmp_dir, "source") 889 890 os.mkdir(join(src_dir)) 891 os.mkdir(join(src_dir, 'test_dir')) 892 os.mkdir(os.path.join(src_dir, 'test_dir', 'subdir')) 893 write_file((src_dir, 'test_dir', 'subdir', 'test.txt'), '456') 894 895 invokations = [] 896 897 def _ignore(src, names): 898 invokations.append(src) 899 self.assertIsInstance(src, str) 900 self.assertIsInstance(names, list) 901 self.assertEqual(len(names), len(set(names))) 902 for name in names: 903 self.assertIsInstance(name, str) 904 return [] 905 906 dst_dir = join(self.mkdtemp(), 'destination') 907 shutil.copytree(src_dir, dst_dir, ignore=_ignore) 908 self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir', 909 'test.txt'))) 910 911 dst_dir = join(self.mkdtemp(), 'destination') 912 shutil.copytree(pathlib.Path(src_dir), dst_dir, ignore=_ignore) 913 self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir', 914 'test.txt'))) 915 916 dst_dir = join(self.mkdtemp(), 'destination') 917 src_dir_entry = list(os.scandir(tmp_dir))[0] 918 self.assertIsInstance(src_dir_entry, os.DirEntry) 919 shutil.copytree(src_dir_entry, dst_dir, ignore=_ignore) 920 self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir', 921 'test.txt'))) 922 923 self.assertEqual(len(invokations), 9) 924 925 def test_copytree_retains_permissions(self): 926 tmp_dir = tempfile.mkdtemp() 927 src_dir = os.path.join(tmp_dir, 'source') 928 os.mkdir(src_dir) 929 dst_dir = os.path.join(tmp_dir, 'destination') 930 self.addCleanup(shutil.rmtree, tmp_dir) 931 932 os.chmod(src_dir, 0o777) 933 write_file((src_dir, 'permissive.txt'), '123') 934 os.chmod(os.path.join(src_dir, 'permissive.txt'), 0o777) 935 write_file((src_dir, 'restrictive.txt'), '456') 936 os.chmod(os.path.join(src_dir, 'restrictive.txt'), 0o600) 937 restrictive_subdir = tempfile.mkdtemp(dir=src_dir) 938 os.chmod(restrictive_subdir, 0o600) 939 940 shutil.copytree(src_dir, dst_dir) 941 self.assertEqual(os.stat(src_dir).st_mode, os.stat(dst_dir).st_mode) 942 self.assertEqual(os.stat(os.path.join(src_dir, 'permissive.txt')).st_mode, 943 os.stat(os.path.join(dst_dir, 'permissive.txt')).st_mode) 944 self.assertEqual(os.stat(os.path.join(src_dir, 'restrictive.txt')).st_mode, 945 os.stat(os.path.join(dst_dir, 'restrictive.txt')).st_mode) 946 restrictive_subdir_dst = os.path.join(dst_dir, 947 os.path.split(restrictive_subdir)[1]) 948 self.assertEqual(os.stat(restrictive_subdir).st_mode, 949 os.stat(restrictive_subdir_dst).st_mode) 950 951 @unittest.mock.patch('os.chmod') 952 def test_copytree_winerror(self, mock_patch): 953 # When copying to VFAT, copystat() raises OSError. On Windows, the 954 # exception object has a meaningful 'winerror' attribute, but not 955 # on other operating systems. Do not assume 'winerror' is set. 956 src_dir = tempfile.mkdtemp() 957 dst_dir = os.path.join(tempfile.mkdtemp(), 'destination') 958 self.addCleanup(shutil.rmtree, src_dir) 959 self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir)) 960 961 mock_patch.side_effect = PermissionError('ka-boom') 962 with self.assertRaises(shutil.Error): 963 shutil.copytree(src_dir, dst_dir) 964 965 def test_copytree_custom_copy_function(self): 966 # See: https://bugs.python.org/issue35648 967 def custom_cpfun(a, b): 968 flag.append(None) 969 self.assertIsInstance(a, str) 970 self.assertIsInstance(b, str) 971 self.assertEqual(a, os.path.join(src, 'foo')) 972 self.assertEqual(b, os.path.join(dst, 'foo')) 973 974 flag = [] 975 src = tempfile.mkdtemp() 976 self.addCleanup(support.rmtree, src) 977 dst = tempfile.mktemp() 978 self.addCleanup(support.rmtree, dst) 979 with open(os.path.join(src, 'foo'), 'w') as f: 980 f.close() 981 shutil.copytree(src, dst, copy_function=custom_cpfun) 982 self.assertEqual(len(flag), 1) 983 984 @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link') 985 def test_dont_copy_file_onto_link_to_itself(self): 986 # bug 851123. 987 os.mkdir(TESTFN) 988 src = os.path.join(TESTFN, 'cheese') 989 dst = os.path.join(TESTFN, 'shop') 990 try: 991 with open(src, 'w') as f: 992 f.write('cheddar') 993 try: 994 os.link(src, dst) 995 except PermissionError as e: 996 self.skipTest('os.link(): %s' % e) 997 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst) 998 with open(src, 'r') as f: 999 self.assertEqual(f.read(), 'cheddar') 1000 os.remove(dst) 1001 finally: 1002 shutil.rmtree(TESTFN, ignore_errors=True) 1003 1004 @support.skip_unless_symlink 1005 def test_dont_copy_file_onto_symlink_to_itself(self): 1006 # bug 851123. 1007 os.mkdir(TESTFN) 1008 src = os.path.join(TESTFN, 'cheese') 1009 dst = os.path.join(TESTFN, 'shop') 1010 try: 1011 with open(src, 'w') as f: 1012 f.write('cheddar') 1013 # Using `src` here would mean we end up with a symlink pointing 1014 # to TESTFN/TESTFN/cheese, while it should point at 1015 # TESTFN/cheese. 1016 os.symlink('cheese', dst) 1017 self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst) 1018 with open(src, 'r') as f: 1019 self.assertEqual(f.read(), 'cheddar') 1020 os.remove(dst) 1021 finally: 1022 shutil.rmtree(TESTFN, ignore_errors=True) 1023 1024 @support.skip_unless_symlink 1025 def test_rmtree_on_symlink(self): 1026 # bug 1669. 1027 os.mkdir(TESTFN) 1028 try: 1029 src = os.path.join(TESTFN, 'cheese') 1030 dst = os.path.join(TESTFN, 'shop') 1031 os.mkdir(src) 1032 os.symlink(src, dst) 1033 self.assertRaises(OSError, shutil.rmtree, dst) 1034 shutil.rmtree(dst, ignore_errors=True) 1035 finally: 1036 shutil.rmtree(TESTFN, ignore_errors=True) 1037 1038 @unittest.skipUnless(_winapi, 'only relevant on Windows') 1039 def test_rmtree_on_junction(self): 1040 os.mkdir(TESTFN) 1041 try: 1042 src = os.path.join(TESTFN, 'cheese') 1043 dst = os.path.join(TESTFN, 'shop') 1044 os.mkdir(src) 1045 open(os.path.join(src, 'spam'), 'wb').close() 1046 _winapi.CreateJunction(src, dst) 1047 self.assertRaises(OSError, shutil.rmtree, dst) 1048 shutil.rmtree(dst, ignore_errors=True) 1049 finally: 1050 shutil.rmtree(TESTFN, ignore_errors=True) 1051 1052 # Issue #3002: copyfile and copytree block indefinitely on named pipes 1053 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()') 1054 def test_copyfile_named_pipe(self): 1055 try: 1056 os.mkfifo(TESTFN) 1057 except PermissionError as e: 1058 self.skipTest('os.mkfifo(): %s' % e) 1059 try: 1060 self.assertRaises(shutil.SpecialFileError, 1061 shutil.copyfile, TESTFN, TESTFN2) 1062 self.assertRaises(shutil.SpecialFileError, 1063 shutil.copyfile, __file__, TESTFN) 1064 finally: 1065 os.remove(TESTFN) 1066 1067 @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()') 1068 @support.skip_unless_symlink 1069 def test_copytree_named_pipe(self): 1070 os.mkdir(TESTFN) 1071 try: 1072 subdir = os.path.join(TESTFN, "subdir") 1073 os.mkdir(subdir) 1074 pipe = os.path.join(subdir, "mypipe") 1075 try: 1076 os.mkfifo(pipe) 1077 except PermissionError as e: 1078 self.skipTest('os.mkfifo(): %s' % e) 1079 try: 1080 shutil.copytree(TESTFN, TESTFN2) 1081 except shutil.Error as e: 1082 errors = e.args[0] 1083 self.assertEqual(len(errors), 1) 1084 src, dst, error_msg = errors[0] 1085 self.assertEqual("`%s` is a named pipe" % pipe, error_msg) 1086 else: 1087 self.fail("shutil.Error should have been raised") 1088 finally: 1089 shutil.rmtree(TESTFN, ignore_errors=True) 1090 shutil.rmtree(TESTFN2, ignore_errors=True) 1091 1092 def test_copytree_special_func(self): 1093 1094 src_dir = self.mkdtemp() 1095 dst_dir = os.path.join(self.mkdtemp(), 'destination') 1096 write_file((src_dir, 'test.txt'), '123') 1097 os.mkdir(os.path.join(src_dir, 'test_dir')) 1098 write_file((src_dir, 'test_dir', 'test.txt'), '456') 1099 1100 copied = [] 1101 def _copy(src, dst): 1102 copied.append((src, dst)) 1103 1104 shutil.copytree(src_dir, dst_dir, copy_function=_copy) 1105 self.assertEqual(len(copied), 2) 1106 1107 @support.skip_unless_symlink 1108 def test_copytree_dangling_symlinks(self): 1109 1110 # a dangling symlink raises an error at the end 1111 src_dir = self.mkdtemp() 1112 dst_dir = os.path.join(self.mkdtemp(), 'destination') 1113 os.symlink('IDONTEXIST', os.path.join(src_dir, 'test.txt')) 1114 os.mkdir(os.path.join(src_dir, 'test_dir')) 1115 write_file((src_dir, 'test_dir', 'test.txt'), '456') 1116 self.assertRaises(Error, shutil.copytree, src_dir, dst_dir) 1117 1118 # a dangling symlink is ignored with the proper flag 1119 dst_dir = os.path.join(self.mkdtemp(), 'destination2') 1120 shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True) 1121 self.assertNotIn('test.txt', os.listdir(dst_dir)) 1122 1123 # a dangling symlink is copied if symlinks=True 1124 dst_dir = os.path.join(self.mkdtemp(), 'destination3') 1125 shutil.copytree(src_dir, dst_dir, symlinks=True) 1126 self.assertIn('test.txt', os.listdir(dst_dir)) 1127 1128 @support.skip_unless_symlink 1129 def test_copytree_symlink_dir(self): 1130 src_dir = self.mkdtemp() 1131 dst_dir = os.path.join(self.mkdtemp(), 'destination') 1132 os.mkdir(os.path.join(src_dir, 'real_dir')) 1133 with open(os.path.join(src_dir, 'real_dir', 'test.txt'), 'w'): 1134 pass 1135 os.symlink(os.path.join(src_dir, 'real_dir'), 1136 os.path.join(src_dir, 'link_to_dir'), 1137 target_is_directory=True) 1138 1139 shutil.copytree(src_dir, dst_dir, symlinks=False) 1140 self.assertFalse(os.path.islink(os.path.join(dst_dir, 'link_to_dir'))) 1141 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir'))) 1142 1143 dst_dir = os.path.join(self.mkdtemp(), 'destination2') 1144 shutil.copytree(src_dir, dst_dir, symlinks=True) 1145 self.assertTrue(os.path.islink(os.path.join(dst_dir, 'link_to_dir'))) 1146 self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir'))) 1147 1148 def _copy_file(self, method): 1149 fname = 'test.txt' 1150 tmpdir = self.mkdtemp() 1151 write_file((tmpdir, fname), 'xxx') 1152 file1 = os.path.join(tmpdir, fname) 1153 tmpdir2 = self.mkdtemp() 1154 method(file1, tmpdir2) 1155 file2 = os.path.join(tmpdir2, fname) 1156 return (file1, file2) 1157 1158 def test_copy(self): 1159 # Ensure that the copied file exists and has the same mode bits. 1160 file1, file2 = self._copy_file(shutil.copy) 1161 self.assertTrue(os.path.exists(file2)) 1162 self.assertEqual(os.stat(file1).st_mode, os.stat(file2).st_mode) 1163 1164 @unittest.skipUnless(hasattr(os, 'utime'), 'requires os.utime') 1165 def test_copy2(self): 1166 # Ensure that the copied file exists and has the same mode and 1167 # modification time bits. 1168 file1, file2 = self._copy_file(shutil.copy2) 1169 self.assertTrue(os.path.exists(file2)) 1170 file1_stat = os.stat(file1) 1171 file2_stat = os.stat(file2) 1172 self.assertEqual(file1_stat.st_mode, file2_stat.st_mode) 1173 for attr in 'st_atime', 'st_mtime': 1174 # The modification times may be truncated in the new file. 1175 self.assertLessEqual(getattr(file1_stat, attr), 1176 getattr(file2_stat, attr) + 1) 1177 if hasattr(os, 'chflags') and hasattr(file1_stat, 'st_flags'): 1178 self.assertEqual(getattr(file1_stat, 'st_flags'), 1179 getattr(file2_stat, 'st_flags')) 1180 1181 @support.requires_zlib 1182 def test_make_tarball(self): 1183 # creating something to tar 1184 root_dir, base_dir = self._create_files('') 1185 1186 tmpdir2 = self.mkdtemp() 1187 # force shutil to create the directory 1188 os.rmdir(tmpdir2) 1189 # working with relative paths 1190 work_dir = os.path.dirname(tmpdir2) 1191 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive') 1192 1193 with support.change_cwd(work_dir): 1194 base_name = os.path.abspath(rel_base_name) 1195 tarball = make_archive(rel_base_name, 'gztar', root_dir, '.') 1196 1197 # check if the compressed tarball was created 1198 self.assertEqual(tarball, base_name + '.tar.gz') 1199 self.assertTrue(os.path.isfile(tarball)) 1200 self.assertTrue(tarfile.is_tarfile(tarball)) 1201 with tarfile.open(tarball, 'r:gz') as tf: 1202 self.assertCountEqual(tf.getnames(), 1203 ['.', './sub', './sub2', 1204 './file1', './file2', './sub/file3']) 1205 1206 # trying an uncompressed one 1207 with support.change_cwd(work_dir): 1208 tarball = make_archive(rel_base_name, 'tar', root_dir, '.') 1209 self.assertEqual(tarball, base_name + '.tar') 1210 self.assertTrue(os.path.isfile(tarball)) 1211 self.assertTrue(tarfile.is_tarfile(tarball)) 1212 with tarfile.open(tarball, 'r') as tf: 1213 self.assertCountEqual(tf.getnames(), 1214 ['.', './sub', './sub2', 1215 './file1', './file2', './sub/file3']) 1216 1217 def _tarinfo(self, path): 1218 with tarfile.open(path) as tar: 1219 names = tar.getnames() 1220 names.sort() 1221 return tuple(names) 1222 1223 def _create_files(self, base_dir='dist'): 1224 # creating something to tar 1225 root_dir = self.mkdtemp() 1226 dist = os.path.join(root_dir, base_dir) 1227 os.makedirs(dist, exist_ok=True) 1228 write_file((dist, 'file1'), 'xxx') 1229 write_file((dist, 'file2'), 'xxx') 1230 os.mkdir(os.path.join(dist, 'sub')) 1231 write_file((dist, 'sub', 'file3'), 'xxx') 1232 os.mkdir(os.path.join(dist, 'sub2')) 1233 if base_dir: 1234 write_file((root_dir, 'outer'), 'xxx') 1235 return root_dir, base_dir 1236 1237 @support.requires_zlib 1238 @unittest.skipUnless(shutil.which('tar'), 1239 'Need the tar command to run') 1240 def test_tarfile_vs_tar(self): 1241 root_dir, base_dir = self._create_files() 1242 base_name = os.path.join(self.mkdtemp(), 'archive') 1243 tarball = make_archive(base_name, 'gztar', root_dir, base_dir) 1244 1245 # check if the compressed tarball was created 1246 self.assertEqual(tarball, base_name + '.tar.gz') 1247 self.assertTrue(os.path.isfile(tarball)) 1248 1249 # now create another tarball using `tar` 1250 tarball2 = os.path.join(root_dir, 'archive2.tar') 1251 tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir] 1252 subprocess.check_call(tar_cmd, cwd=root_dir, 1253 stdout=subprocess.DEVNULL) 1254 1255 self.assertTrue(os.path.isfile(tarball2)) 1256 # let's compare both tarballs 1257 self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2)) 1258 1259 # trying an uncompressed one 1260 tarball = make_archive(base_name, 'tar', root_dir, base_dir) 1261 self.assertEqual(tarball, base_name + '.tar') 1262 self.assertTrue(os.path.isfile(tarball)) 1263 1264 # now for a dry_run 1265 tarball = make_archive(base_name, 'tar', root_dir, base_dir, 1266 dry_run=True) 1267 self.assertEqual(tarball, base_name + '.tar') 1268 self.assertTrue(os.path.isfile(tarball)) 1269 1270 @support.requires_zlib 1271 def test_make_zipfile(self): 1272 # creating something to zip 1273 root_dir, base_dir = self._create_files() 1274 1275 tmpdir2 = self.mkdtemp() 1276 # force shutil to create the directory 1277 os.rmdir(tmpdir2) 1278 # working with relative paths 1279 work_dir = os.path.dirname(tmpdir2) 1280 rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive') 1281 1282 with support.change_cwd(work_dir): 1283 base_name = os.path.abspath(rel_base_name) 1284 res = make_archive(rel_base_name, 'zip', root_dir) 1285 1286 self.assertEqual(res, base_name + '.zip') 1287 self.assertTrue(os.path.isfile(res)) 1288 self.assertTrue(zipfile.is_zipfile(res)) 1289 with zipfile.ZipFile(res) as zf: 1290 self.assertCountEqual(zf.namelist(), 1291 ['dist/', 'dist/sub/', 'dist/sub2/', 1292 'dist/file1', 'dist/file2', 'dist/sub/file3', 1293 'outer']) 1294 1295 with support.change_cwd(work_dir): 1296 base_name = os.path.abspath(rel_base_name) 1297 res = make_archive(rel_base_name, 'zip', root_dir, base_dir) 1298 1299 self.assertEqual(res, base_name + '.zip') 1300 self.assertTrue(os.path.isfile(res)) 1301 self.assertTrue(zipfile.is_zipfile(res)) 1302 with zipfile.ZipFile(res) as zf: 1303 self.assertCountEqual(zf.namelist(), 1304 ['dist/', 'dist/sub/', 'dist/sub2/', 1305 'dist/file1', 'dist/file2', 'dist/sub/file3']) 1306 1307 @support.requires_zlib 1308 @unittest.skipUnless(shutil.which('zip'), 1309 'Need the zip command to run') 1310 def test_zipfile_vs_zip(self): 1311 root_dir, base_dir = self._create_files() 1312 base_name = os.path.join(self.mkdtemp(), 'archive') 1313 archive = make_archive(base_name, 'zip', root_dir, base_dir) 1314 1315 # check if ZIP file was created 1316 self.assertEqual(archive, base_name + '.zip') 1317 self.assertTrue(os.path.isfile(archive)) 1318 1319 # now create another ZIP file using `zip` 1320 archive2 = os.path.join(root_dir, 'archive2.zip') 1321 zip_cmd = ['zip', '-q', '-r', 'archive2.zip', base_dir] 1322 subprocess.check_call(zip_cmd, cwd=root_dir, 1323 stdout=subprocess.DEVNULL) 1324 1325 self.assertTrue(os.path.isfile(archive2)) 1326 # let's compare both ZIP files 1327 with zipfile.ZipFile(archive) as zf: 1328 names = zf.namelist() 1329 with zipfile.ZipFile(archive2) as zf: 1330 names2 = zf.namelist() 1331 self.assertEqual(sorted(names), sorted(names2)) 1332 1333 @support.requires_zlib 1334 @unittest.skipUnless(shutil.which('unzip'), 1335 'Need the unzip command to run') 1336 def test_unzip_zipfile(self): 1337 root_dir, base_dir = self._create_files() 1338 base_name = os.path.join(self.mkdtemp(), 'archive') 1339 archive = make_archive(base_name, 'zip', root_dir, base_dir) 1340 1341 # check if ZIP file was created 1342 self.assertEqual(archive, base_name + '.zip') 1343 self.assertTrue(os.path.isfile(archive)) 1344 1345 # now check the ZIP file using `unzip -t` 1346 zip_cmd = ['unzip', '-t', archive] 1347 with support.change_cwd(root_dir): 1348 try: 1349 subprocess.check_output(zip_cmd, stderr=subprocess.STDOUT) 1350 except subprocess.CalledProcessError as exc: 1351 details = exc.output.decode(errors="replace") 1352 if 'unrecognized option: t' in details: 1353 self.skipTest("unzip doesn't support -t") 1354 msg = "{}\n\n**Unzip Output**\n{}" 1355 self.fail(msg.format(exc, details)) 1356 1357 def test_make_archive(self): 1358 tmpdir = self.mkdtemp() 1359 base_name = os.path.join(tmpdir, 'archive') 1360 self.assertRaises(ValueError, make_archive, base_name, 'xxx') 1361 1362 @support.requires_zlib 1363 def test_make_archive_owner_group(self): 1364 # testing make_archive with owner and group, with various combinations 1365 # this works even if there's not gid/uid support 1366 if UID_GID_SUPPORT: 1367 group = grp.getgrgid(0)[0] 1368 owner = pwd.getpwuid(0)[0] 1369 else: 1370 group = owner = 'root' 1371 1372 root_dir, base_dir = self._create_files() 1373 base_name = os.path.join(self.mkdtemp(), 'archive') 1374 res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner, 1375 group=group) 1376 self.assertTrue(os.path.isfile(res)) 1377 1378 res = make_archive(base_name, 'zip', root_dir, base_dir) 1379 self.assertTrue(os.path.isfile(res)) 1380 1381 res = make_archive(base_name, 'tar', root_dir, base_dir, 1382 owner=owner, group=group) 1383 self.assertTrue(os.path.isfile(res)) 1384 1385 res = make_archive(base_name, 'tar', root_dir, base_dir, 1386 owner='kjhkjhkjg', group='oihohoh') 1387 self.assertTrue(os.path.isfile(res)) 1388 1389 1390 @support.requires_zlib 1391 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support") 1392 def test_tarfile_root_owner(self): 1393 root_dir, base_dir = self._create_files() 1394 base_name = os.path.join(self.mkdtemp(), 'archive') 1395 group = grp.getgrgid(0)[0] 1396 owner = pwd.getpwuid(0)[0] 1397 with support.change_cwd(root_dir): 1398 archive_name = make_archive(base_name, 'gztar', root_dir, 'dist', 1399 owner=owner, group=group) 1400 1401 # check if the compressed tarball was created 1402 self.assertTrue(os.path.isfile(archive_name)) 1403 1404 # now checks the rights 1405 archive = tarfile.open(archive_name) 1406 try: 1407 for member in archive.getmembers(): 1408 self.assertEqual(member.uid, 0) 1409 self.assertEqual(member.gid, 0) 1410 finally: 1411 archive.close() 1412 1413 def test_make_archive_cwd(self): 1414 current_dir = os.getcwd() 1415 def _breaks(*args, **kw): 1416 raise RuntimeError() 1417 1418 register_archive_format('xxx', _breaks, [], 'xxx file') 1419 try: 1420 try: 1421 make_archive('xxx', 'xxx', root_dir=self.mkdtemp()) 1422 except Exception: 1423 pass 1424 self.assertEqual(os.getcwd(), current_dir) 1425 finally: 1426 unregister_archive_format('xxx') 1427 1428 def test_make_tarfile_in_curdir(self): 1429 # Issue #21280 1430 root_dir = self.mkdtemp() 1431 with support.change_cwd(root_dir): 1432 self.assertEqual(make_archive('test', 'tar'), 'test.tar') 1433 self.assertTrue(os.path.isfile('test.tar')) 1434 1435 @support.requires_zlib 1436 def test_make_zipfile_in_curdir(self): 1437 # Issue #21280 1438 root_dir = self.mkdtemp() 1439 with support.change_cwd(root_dir): 1440 self.assertEqual(make_archive('test', 'zip'), 'test.zip') 1441 self.assertTrue(os.path.isfile('test.zip')) 1442 1443 def test_register_archive_format(self): 1444 1445 self.assertRaises(TypeError, register_archive_format, 'xxx', 1) 1446 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x, 1447 1) 1448 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x, 1449 [(1, 2), (1, 2, 3)]) 1450 1451 register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file') 1452 formats = [name for name, params in get_archive_formats()] 1453 self.assertIn('xxx', formats) 1454 1455 unregister_archive_format('xxx') 1456 formats = [name for name, params in get_archive_formats()] 1457 self.assertNotIn('xxx', formats) 1458 1459 def check_unpack_archive(self, format): 1460 self.check_unpack_archive_with_converter(format, lambda path: path) 1461 self.check_unpack_archive_with_converter(format, pathlib.Path) 1462 self.check_unpack_archive_with_converter(format, FakePath) 1463 1464 def check_unpack_archive_with_converter(self, format, converter): 1465 root_dir, base_dir = self._create_files() 1466 expected = rlistdir(root_dir) 1467 expected.remove('outer') 1468 1469 base_name = os.path.join(self.mkdtemp(), 'archive') 1470 filename = make_archive(base_name, format, root_dir, base_dir) 1471 1472 # let's try to unpack it now 1473 tmpdir2 = self.mkdtemp() 1474 unpack_archive(converter(filename), converter(tmpdir2)) 1475 self.assertEqual(rlistdir(tmpdir2), expected) 1476 1477 # and again, this time with the format specified 1478 tmpdir3 = self.mkdtemp() 1479 unpack_archive(converter(filename), converter(tmpdir3), format=format) 1480 self.assertEqual(rlistdir(tmpdir3), expected) 1481 1482 self.assertRaises(shutil.ReadError, unpack_archive, converter(TESTFN)) 1483 self.assertRaises(ValueError, unpack_archive, converter(TESTFN), format='xxx') 1484 1485 def test_unpack_archive_tar(self): 1486 self.check_unpack_archive('tar') 1487 1488 @support.requires_zlib 1489 def test_unpack_archive_gztar(self): 1490 self.check_unpack_archive('gztar') 1491 1492 @support.requires_bz2 1493 def test_unpack_archive_bztar(self): 1494 self.check_unpack_archive('bztar') 1495 1496 @support.requires_lzma 1497 @unittest.skipIf(AIX and not _maxdataOK(), "AIX MAXDATA must be 0x20000000 or larger") 1498 def test_unpack_archive_xztar(self): 1499 self.check_unpack_archive('xztar') 1500 1501 @support.requires_zlib 1502 def test_unpack_archive_zip(self): 1503 self.check_unpack_archive('zip') 1504 1505 def test_unpack_registry(self): 1506 1507 formats = get_unpack_formats() 1508 1509 def _boo(filename, extract_dir, extra): 1510 self.assertEqual(extra, 1) 1511 self.assertEqual(filename, 'stuff.boo') 1512 self.assertEqual(extract_dir, 'xx') 1513 1514 register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)]) 1515 unpack_archive('stuff.boo', 'xx') 1516 1517 # trying to register a .boo unpacker again 1518 self.assertRaises(RegistryError, register_unpack_format, 'Boo2', 1519 ['.boo'], _boo) 1520 1521 # should work now 1522 unregister_unpack_format('Boo') 1523 register_unpack_format('Boo2', ['.boo'], _boo) 1524 self.assertIn(('Boo2', ['.boo'], ''), get_unpack_formats()) 1525 self.assertNotIn(('Boo', ['.boo'], ''), get_unpack_formats()) 1526 1527 # let's leave a clean state 1528 unregister_unpack_format('Boo2') 1529 self.assertEqual(get_unpack_formats(), formats) 1530 1531 @unittest.skipUnless(hasattr(shutil, 'disk_usage'), 1532 "disk_usage not available on this platform") 1533 def test_disk_usage(self): 1534 usage = shutil.disk_usage(os.path.dirname(__file__)) 1535 for attr in ('total', 'used', 'free'): 1536 self.assertIsInstance(getattr(usage, attr), int) 1537 self.assertGreater(usage.total, 0) 1538 self.assertGreater(usage.used, 0) 1539 self.assertGreaterEqual(usage.free, 0) 1540 self.assertGreaterEqual(usage.total, usage.used) 1541 self.assertGreater(usage.total, usage.free) 1542 1543 # bpo-32557: Check that disk_usage() also accepts a filename 1544 shutil.disk_usage(__file__) 1545 1546 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support") 1547 @unittest.skipUnless(hasattr(os, 'chown'), 'requires os.chown') 1548 def test_chown(self): 1549 1550 # cleaned-up automatically by TestShutil.tearDown method 1551 dirname = self.mkdtemp() 1552 filename = tempfile.mktemp(dir=dirname) 1553 write_file(filename, 'testing chown function') 1554 1555 with self.assertRaises(ValueError): 1556 shutil.chown(filename) 1557 1558 with self.assertRaises(LookupError): 1559 shutil.chown(filename, user='non-existing username') 1560 1561 with self.assertRaises(LookupError): 1562 shutil.chown(filename, group='non-existing groupname') 1563 1564 with self.assertRaises(TypeError): 1565 shutil.chown(filename, b'spam') 1566 1567 with self.assertRaises(TypeError): 1568 shutil.chown(filename, 3.14) 1569 1570 uid = os.getuid() 1571 gid = os.getgid() 1572 1573 def check_chown(path, uid=None, gid=None): 1574 s = os.stat(filename) 1575 if uid is not None: 1576 self.assertEqual(uid, s.st_uid) 1577 if gid is not None: 1578 self.assertEqual(gid, s.st_gid) 1579 1580 shutil.chown(filename, uid, gid) 1581 check_chown(filename, uid, gid) 1582 shutil.chown(filename, uid) 1583 check_chown(filename, uid) 1584 shutil.chown(filename, user=uid) 1585 check_chown(filename, uid) 1586 shutil.chown(filename, group=gid) 1587 check_chown(filename, gid=gid) 1588 1589 shutil.chown(dirname, uid, gid) 1590 check_chown(dirname, uid, gid) 1591 shutil.chown(dirname, uid) 1592 check_chown(dirname, uid) 1593 shutil.chown(dirname, user=uid) 1594 check_chown(dirname, uid) 1595 shutil.chown(dirname, group=gid) 1596 check_chown(dirname, gid=gid) 1597 1598 user = pwd.getpwuid(uid)[0] 1599 group = grp.getgrgid(gid)[0] 1600 shutil.chown(filename, user, group) 1601 check_chown(filename, uid, gid) 1602 shutil.chown(dirname, user, group) 1603 check_chown(dirname, uid, gid) 1604 1605 def test_copy_return_value(self): 1606 # copy and copy2 both return their destination path. 1607 for fn in (shutil.copy, shutil.copy2): 1608 src_dir = self.mkdtemp() 1609 dst_dir = self.mkdtemp() 1610 src = os.path.join(src_dir, 'foo') 1611 write_file(src, 'foo') 1612 rv = fn(src, dst_dir) 1613 self.assertEqual(rv, os.path.join(dst_dir, 'foo')) 1614 rv = fn(src, os.path.join(dst_dir, 'bar')) 1615 self.assertEqual(rv, os.path.join(dst_dir, 'bar')) 1616 1617 def test_copyfile_return_value(self): 1618 # copytree returns its destination path. 1619 src_dir = self.mkdtemp() 1620 dst_dir = self.mkdtemp() 1621 dst_file = os.path.join(dst_dir, 'bar') 1622 src_file = os.path.join(src_dir, 'foo') 1623 write_file(src_file, 'foo') 1624 rv = shutil.copyfile(src_file, dst_file) 1625 self.assertTrue(os.path.exists(rv)) 1626 self.assertEqual(read_file(src_file), read_file(dst_file)) 1627 1628 def test_copyfile_same_file(self): 1629 # copyfile() should raise SameFileError if the source and destination 1630 # are the same. 1631 src_dir = self.mkdtemp() 1632 src_file = os.path.join(src_dir, 'foo') 1633 write_file(src_file, 'foo') 1634 self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file) 1635 # But Error should work too, to stay backward compatible. 1636 self.assertRaises(Error, shutil.copyfile, src_file, src_file) 1637 # Make sure file is not corrupted. 1638 self.assertEqual(read_file(src_file), 'foo') 1639 1640 def test_copytree_return_value(self): 1641 # copytree returns its destination path. 1642 src_dir = self.mkdtemp() 1643 dst_dir = src_dir + "dest" 1644 self.addCleanup(shutil.rmtree, dst_dir, True) 1645 src = os.path.join(src_dir, 'foo') 1646 write_file(src, 'foo') 1647 rv = shutil.copytree(src_dir, dst_dir) 1648 self.assertEqual(['foo'], os.listdir(rv)) 1649 1650 def test_copytree_subdirectory(self): 1651 # copytree where dst is a subdirectory of src, see Issue 38688 1652 base_dir = self.mkdtemp() 1653 self.addCleanup(shutil.rmtree, base_dir, ignore_errors=True) 1654 src_dir = os.path.join(base_dir, "t", "pg") 1655 dst_dir = os.path.join(src_dir, "somevendor", "1.0") 1656 os.makedirs(src_dir) 1657 src = os.path.join(src_dir, 'pol') 1658 write_file(src, 'pol') 1659 rv = shutil.copytree(src_dir, dst_dir) 1660 self.assertEqual(['pol'], os.listdir(rv)) 1661 1662 1663class TestWhich(unittest.TestCase): 1664 1665 def setUp(self): 1666 self.temp_dir = tempfile.mkdtemp(prefix="Tmp") 1667 self.addCleanup(shutil.rmtree, self.temp_dir, True) 1668 # Give the temp_file an ".exe" suffix for all. 1669 # It's needed on Windows and not harmful on other platforms. 1670 self.temp_file = tempfile.NamedTemporaryFile(dir=self.temp_dir, 1671 prefix="Tmp", 1672 suffix=".Exe") 1673 os.chmod(self.temp_file.name, stat.S_IXUSR) 1674 self.addCleanup(self.temp_file.close) 1675 self.dir, self.file = os.path.split(self.temp_file.name) 1676 self.env_path = self.dir 1677 self.curdir = os.curdir 1678 self.ext = ".EXE" 1679 1680 def test_basic(self): 1681 # Given an EXE in a directory, it should be returned. 1682 rv = shutil.which(self.file, path=self.dir) 1683 self.assertEqual(rv, self.temp_file.name) 1684 1685 def test_absolute_cmd(self): 1686 # When given the fully qualified path to an executable that exists, 1687 # it should be returned. 1688 rv = shutil.which(self.temp_file.name, path=self.temp_dir) 1689 self.assertEqual(rv, self.temp_file.name) 1690 1691 def test_relative_cmd(self): 1692 # When given the relative path with a directory part to an executable 1693 # that exists, it should be returned. 1694 base_dir, tail_dir = os.path.split(self.dir) 1695 relpath = os.path.join(tail_dir, self.file) 1696 with support.change_cwd(path=base_dir): 1697 rv = shutil.which(relpath, path=self.temp_dir) 1698 self.assertEqual(rv, relpath) 1699 # But it shouldn't be searched in PATH directories (issue #16957). 1700 with support.change_cwd(path=self.dir): 1701 rv = shutil.which(relpath, path=base_dir) 1702 self.assertIsNone(rv) 1703 1704 def test_cwd(self): 1705 # Issue #16957 1706 base_dir = os.path.dirname(self.dir) 1707 with support.change_cwd(path=self.dir): 1708 rv = shutil.which(self.file, path=base_dir) 1709 if sys.platform == "win32": 1710 # Windows: current directory implicitly on PATH 1711 self.assertEqual(rv, os.path.join(self.curdir, self.file)) 1712 else: 1713 # Other platforms: shouldn't match in the current directory. 1714 self.assertIsNone(rv) 1715 1716 @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0, 1717 'non-root user required') 1718 def test_non_matching_mode(self): 1719 # Set the file read-only and ask for writeable files. 1720 os.chmod(self.temp_file.name, stat.S_IREAD) 1721 if os.access(self.temp_file.name, os.W_OK): 1722 self.skipTest("can't set the file read-only") 1723 rv = shutil.which(self.file, path=self.dir, mode=os.W_OK) 1724 self.assertIsNone(rv) 1725 1726 def test_relative_path(self): 1727 base_dir, tail_dir = os.path.split(self.dir) 1728 with support.change_cwd(path=base_dir): 1729 rv = shutil.which(self.file, path=tail_dir) 1730 self.assertEqual(rv, os.path.join(tail_dir, self.file)) 1731 1732 def test_nonexistent_file(self): 1733 # Return None when no matching executable file is found on the path. 1734 rv = shutil.which("foo.exe", path=self.dir) 1735 self.assertIsNone(rv) 1736 1737 @unittest.skipUnless(sys.platform == "win32", 1738 "pathext check is Windows-only") 1739 def test_pathext_checking(self): 1740 # Ask for the file without the ".exe" extension, then ensure that 1741 # it gets found properly with the extension. 1742 rv = shutil.which(self.file[:-4], path=self.dir) 1743 self.assertEqual(rv, self.temp_file.name[:-4] + self.ext) 1744 1745 def test_environ_path(self): 1746 with support.EnvironmentVarGuard() as env: 1747 env['PATH'] = self.env_path 1748 rv = shutil.which(self.file) 1749 self.assertEqual(rv, self.temp_file.name) 1750 1751 def test_environ_path_empty(self): 1752 # PATH='': no match 1753 with support.EnvironmentVarGuard() as env: 1754 env['PATH'] = '' 1755 with unittest.mock.patch('os.confstr', return_value=self.dir, \ 1756 create=True), \ 1757 support.swap_attr(os, 'defpath', self.dir), \ 1758 support.change_cwd(self.dir): 1759 rv = shutil.which(self.file) 1760 self.assertIsNone(rv) 1761 1762 def test_environ_path_cwd(self): 1763 expected_cwd = os.path.basename(self.temp_file.name) 1764 if sys.platform == "win32": 1765 curdir = os.curdir 1766 if isinstance(expected_cwd, bytes): 1767 curdir = os.fsencode(curdir) 1768 expected_cwd = os.path.join(curdir, expected_cwd) 1769 1770 # PATH=':': explicitly looks in the current directory 1771 with support.EnvironmentVarGuard() as env: 1772 env['PATH'] = os.pathsep 1773 with unittest.mock.patch('os.confstr', return_value=self.dir, \ 1774 create=True), \ 1775 support.swap_attr(os, 'defpath', self.dir): 1776 rv = shutil.which(self.file) 1777 self.assertIsNone(rv) 1778 1779 # look in current directory 1780 with support.change_cwd(self.dir): 1781 rv = shutil.which(self.file) 1782 self.assertEqual(rv, expected_cwd) 1783 1784 def test_environ_path_missing(self): 1785 with support.EnvironmentVarGuard() as env: 1786 env.pop('PATH', None) 1787 1788 # without confstr 1789 with unittest.mock.patch('os.confstr', side_effect=ValueError, \ 1790 create=True), \ 1791 support.swap_attr(os, 'defpath', self.dir): 1792 rv = shutil.which(self.file) 1793 self.assertEqual(rv, self.temp_file.name) 1794 1795 # with confstr 1796 with unittest.mock.patch('os.confstr', return_value=self.dir, \ 1797 create=True), \ 1798 support.swap_attr(os, 'defpath', ''): 1799 rv = shutil.which(self.file) 1800 self.assertEqual(rv, self.temp_file.name) 1801 1802 def test_empty_path(self): 1803 base_dir = os.path.dirname(self.dir) 1804 with support.change_cwd(path=self.dir), \ 1805 support.EnvironmentVarGuard() as env: 1806 env['PATH'] = self.env_path 1807 rv = shutil.which(self.file, path='') 1808 self.assertIsNone(rv) 1809 1810 def test_empty_path_no_PATH(self): 1811 with support.EnvironmentVarGuard() as env: 1812 env.pop('PATH', None) 1813 rv = shutil.which(self.file) 1814 self.assertIsNone(rv) 1815 1816 @unittest.skipUnless(sys.platform == "win32", 'test specific to Windows') 1817 def test_pathext(self): 1818 ext = ".xyz" 1819 temp_filexyz = tempfile.NamedTemporaryFile(dir=self.temp_dir, 1820 prefix="Tmp2", suffix=ext) 1821 os.chmod(temp_filexyz.name, stat.S_IXUSR) 1822 self.addCleanup(temp_filexyz.close) 1823 1824 # strip path and extension 1825 program = os.path.basename(temp_filexyz.name) 1826 program = os.path.splitext(program)[0] 1827 1828 with support.EnvironmentVarGuard() as env: 1829 env['PATHEXT'] = ext 1830 rv = shutil.which(program, path=self.temp_dir) 1831 self.assertEqual(rv, temp_filexyz.name) 1832 1833 1834class TestWhichBytes(TestWhich): 1835 def setUp(self): 1836 TestWhich.setUp(self) 1837 self.dir = os.fsencode(self.dir) 1838 self.file = os.fsencode(self.file) 1839 self.temp_file.name = os.fsencode(self.temp_file.name) 1840 self.curdir = os.fsencode(self.curdir) 1841 self.ext = os.fsencode(self.ext) 1842 1843 1844class TestMove(unittest.TestCase): 1845 1846 def setUp(self): 1847 filename = "foo" 1848 basedir = None 1849 if sys.platform == "win32": 1850 basedir = os.path.realpath(os.getcwd()) 1851 self.src_dir = tempfile.mkdtemp(dir=basedir) 1852 self.dst_dir = tempfile.mkdtemp(dir=basedir) 1853 self.src_file = os.path.join(self.src_dir, filename) 1854 self.dst_file = os.path.join(self.dst_dir, filename) 1855 with open(self.src_file, "wb") as f: 1856 f.write(b"spam") 1857 1858 def tearDown(self): 1859 for d in (self.src_dir, self.dst_dir): 1860 try: 1861 if d: 1862 shutil.rmtree(d) 1863 except: 1864 pass 1865 1866 def _check_move_file(self, src, dst, real_dst): 1867 with open(src, "rb") as f: 1868 contents = f.read() 1869 shutil.move(src, dst) 1870 with open(real_dst, "rb") as f: 1871 self.assertEqual(contents, f.read()) 1872 self.assertFalse(os.path.exists(src)) 1873 1874 def _check_move_dir(self, src, dst, real_dst): 1875 contents = sorted(os.listdir(src)) 1876 shutil.move(src, dst) 1877 self.assertEqual(contents, sorted(os.listdir(real_dst))) 1878 self.assertFalse(os.path.exists(src)) 1879 1880 def test_move_file(self): 1881 # Move a file to another location on the same filesystem. 1882 self._check_move_file(self.src_file, self.dst_file, self.dst_file) 1883 1884 def test_move_file_to_dir(self): 1885 # Move a file inside an existing dir on the same filesystem. 1886 self._check_move_file(self.src_file, self.dst_dir, self.dst_file) 1887 1888 @mock_rename 1889 def test_move_file_other_fs(self): 1890 # Move a file to an existing dir on another filesystem. 1891 self.test_move_file() 1892 1893 @mock_rename 1894 def test_move_file_to_dir_other_fs(self): 1895 # Move a file to another location on another filesystem. 1896 self.test_move_file_to_dir() 1897 1898 def test_move_dir(self): 1899 # Move a dir to another location on the same filesystem. 1900 dst_dir = tempfile.mktemp() 1901 try: 1902 self._check_move_dir(self.src_dir, dst_dir, dst_dir) 1903 finally: 1904 try: 1905 shutil.rmtree(dst_dir) 1906 except: 1907 pass 1908 1909 @mock_rename 1910 def test_move_dir_other_fs(self): 1911 # Move a dir to another location on another filesystem. 1912 self.test_move_dir() 1913 1914 def test_move_dir_to_dir(self): 1915 # Move a dir inside an existing dir on the same filesystem. 1916 self._check_move_dir(self.src_dir, self.dst_dir, 1917 os.path.join(self.dst_dir, os.path.basename(self.src_dir))) 1918 1919 @mock_rename 1920 def test_move_dir_to_dir_other_fs(self): 1921 # Move a dir inside an existing dir on another filesystem. 1922 self.test_move_dir_to_dir() 1923 1924 def test_move_dir_sep_to_dir(self): 1925 self._check_move_dir(self.src_dir + os.path.sep, self.dst_dir, 1926 os.path.join(self.dst_dir, os.path.basename(self.src_dir))) 1927 1928 @unittest.skipUnless(os.path.altsep, 'requires os.path.altsep') 1929 def test_move_dir_altsep_to_dir(self): 1930 self._check_move_dir(self.src_dir + os.path.altsep, self.dst_dir, 1931 os.path.join(self.dst_dir, os.path.basename(self.src_dir))) 1932 1933 def test_existing_file_inside_dest_dir(self): 1934 # A file with the same name inside the destination dir already exists. 1935 with open(self.dst_file, "wb"): 1936 pass 1937 self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir) 1938 1939 def test_dont_move_dir_in_itself(self): 1940 # Moving a dir inside itself raises an Error. 1941 dst = os.path.join(self.src_dir, "bar") 1942 self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst) 1943 1944 def test_destinsrc_false_negative(self): 1945 os.mkdir(TESTFN) 1946 try: 1947 for src, dst in [('srcdir', 'srcdir/dest')]: 1948 src = os.path.join(TESTFN, src) 1949 dst = os.path.join(TESTFN, dst) 1950 self.assertTrue(shutil._destinsrc(src, dst), 1951 msg='_destinsrc() wrongly concluded that ' 1952 'dst (%s) is not in src (%s)' % (dst, src)) 1953 finally: 1954 shutil.rmtree(TESTFN, ignore_errors=True) 1955 1956 def test_destinsrc_false_positive(self): 1957 os.mkdir(TESTFN) 1958 try: 1959 for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]: 1960 src = os.path.join(TESTFN, src) 1961 dst = os.path.join(TESTFN, dst) 1962 self.assertFalse(shutil._destinsrc(src, dst), 1963 msg='_destinsrc() wrongly concluded that ' 1964 'dst (%s) is in src (%s)' % (dst, src)) 1965 finally: 1966 shutil.rmtree(TESTFN, ignore_errors=True) 1967 1968 @support.skip_unless_symlink 1969 @mock_rename 1970 def test_move_file_symlink(self): 1971 dst = os.path.join(self.src_dir, 'bar') 1972 os.symlink(self.src_file, dst) 1973 shutil.move(dst, self.dst_file) 1974 self.assertTrue(os.path.islink(self.dst_file)) 1975 self.assertTrue(os.path.samefile(self.src_file, self.dst_file)) 1976 1977 @support.skip_unless_symlink 1978 @mock_rename 1979 def test_move_file_symlink_to_dir(self): 1980 filename = "bar" 1981 dst = os.path.join(self.src_dir, filename) 1982 os.symlink(self.src_file, dst) 1983 shutil.move(dst, self.dst_dir) 1984 final_link = os.path.join(self.dst_dir, filename) 1985 self.assertTrue(os.path.islink(final_link)) 1986 self.assertTrue(os.path.samefile(self.src_file, final_link)) 1987 1988 @support.skip_unless_symlink 1989 @mock_rename 1990 def test_move_dangling_symlink(self): 1991 src = os.path.join(self.src_dir, 'baz') 1992 dst = os.path.join(self.src_dir, 'bar') 1993 os.symlink(src, dst) 1994 dst_link = os.path.join(self.dst_dir, 'quux') 1995 shutil.move(dst, dst_link) 1996 self.assertTrue(os.path.islink(dst_link)) 1997 self.assertEqual(os.path.realpath(src), os.path.realpath(dst_link)) 1998 1999 @support.skip_unless_symlink 2000 @mock_rename 2001 def test_move_dir_symlink(self): 2002 src = os.path.join(self.src_dir, 'baz') 2003 dst = os.path.join(self.src_dir, 'bar') 2004 os.mkdir(src) 2005 os.symlink(src, dst) 2006 dst_link = os.path.join(self.dst_dir, 'quux') 2007 shutil.move(dst, dst_link) 2008 self.assertTrue(os.path.islink(dst_link)) 2009 self.assertTrue(os.path.samefile(src, dst_link)) 2010 2011 def test_move_return_value(self): 2012 rv = shutil.move(self.src_file, self.dst_dir) 2013 self.assertEqual(rv, 2014 os.path.join(self.dst_dir, os.path.basename(self.src_file))) 2015 2016 def test_move_as_rename_return_value(self): 2017 rv = shutil.move(self.src_file, os.path.join(self.dst_dir, 'bar')) 2018 self.assertEqual(rv, os.path.join(self.dst_dir, 'bar')) 2019 2020 @mock_rename 2021 def test_move_file_special_function(self): 2022 moved = [] 2023 def _copy(src, dst): 2024 moved.append((src, dst)) 2025 shutil.move(self.src_file, self.dst_dir, copy_function=_copy) 2026 self.assertEqual(len(moved), 1) 2027 2028 @mock_rename 2029 def test_move_dir_special_function(self): 2030 moved = [] 2031 def _copy(src, dst): 2032 moved.append((src, dst)) 2033 support.create_empty_file(os.path.join(self.src_dir, 'child')) 2034 support.create_empty_file(os.path.join(self.src_dir, 'child1')) 2035 shutil.move(self.src_dir, self.dst_dir, copy_function=_copy) 2036 self.assertEqual(len(moved), 3) 2037 2038 2039class TestCopyFile(unittest.TestCase): 2040 2041 _delete = False 2042 2043 class Faux(object): 2044 _entered = False 2045 _exited_with = None 2046 _raised = False 2047 def __init__(self, raise_in_exit=False, suppress_at_exit=True): 2048 self._raise_in_exit = raise_in_exit 2049 self._suppress_at_exit = suppress_at_exit 2050 def read(self, *args): 2051 return '' 2052 def __enter__(self): 2053 self._entered = True 2054 def __exit__(self, exc_type, exc_val, exc_tb): 2055 self._exited_with = exc_type, exc_val, exc_tb 2056 if self._raise_in_exit: 2057 self._raised = True 2058 raise OSError("Cannot close") 2059 return self._suppress_at_exit 2060 2061 def tearDown(self): 2062 if self._delete: 2063 del shutil.open 2064 2065 def _set_shutil_open(self, func): 2066 shutil.open = func 2067 self._delete = True 2068 2069 def test_w_source_open_fails(self): 2070 def _open(filename, mode='r'): 2071 if filename == 'srcfile': 2072 raise OSError('Cannot open "srcfile"') 2073 assert 0 # shouldn't reach here. 2074 2075 self._set_shutil_open(_open) 2076 2077 self.assertRaises(OSError, shutil.copyfile, 'srcfile', 'destfile') 2078 2079 @unittest.skipIf(MACOS, "skipped on macOS") 2080 def test_w_dest_open_fails(self): 2081 2082 srcfile = self.Faux() 2083 2084 def _open(filename, mode='r'): 2085 if filename == 'srcfile': 2086 return srcfile 2087 if filename == 'destfile': 2088 raise OSError('Cannot open "destfile"') 2089 assert 0 # shouldn't reach here. 2090 2091 self._set_shutil_open(_open) 2092 2093 shutil.copyfile('srcfile', 'destfile') 2094 self.assertTrue(srcfile._entered) 2095 self.assertTrue(srcfile._exited_with[0] is OSError) 2096 self.assertEqual(srcfile._exited_with[1].args, 2097 ('Cannot open "destfile"',)) 2098 2099 @unittest.skipIf(MACOS, "skipped on macOS") 2100 def test_w_dest_close_fails(self): 2101 2102 srcfile = self.Faux() 2103 destfile = self.Faux(True) 2104 2105 def _open(filename, mode='r'): 2106 if filename == 'srcfile': 2107 return srcfile 2108 if filename == 'destfile': 2109 return destfile 2110 assert 0 # shouldn't reach here. 2111 2112 self._set_shutil_open(_open) 2113 2114 shutil.copyfile('srcfile', 'destfile') 2115 self.assertTrue(srcfile._entered) 2116 self.assertTrue(destfile._entered) 2117 self.assertTrue(destfile._raised) 2118 self.assertTrue(srcfile._exited_with[0] is OSError) 2119 self.assertEqual(srcfile._exited_with[1].args, 2120 ('Cannot close',)) 2121 2122 @unittest.skipIf(MACOS, "skipped on macOS") 2123 def test_w_source_close_fails(self): 2124 2125 srcfile = self.Faux(True) 2126 destfile = self.Faux() 2127 2128 def _open(filename, mode='r'): 2129 if filename == 'srcfile': 2130 return srcfile 2131 if filename == 'destfile': 2132 return destfile 2133 assert 0 # shouldn't reach here. 2134 2135 self._set_shutil_open(_open) 2136 2137 self.assertRaises(OSError, 2138 shutil.copyfile, 'srcfile', 'destfile') 2139 self.assertTrue(srcfile._entered) 2140 self.assertTrue(destfile._entered) 2141 self.assertFalse(destfile._raised) 2142 self.assertTrue(srcfile._exited_with[0] is None) 2143 self.assertTrue(srcfile._raised) 2144 2145 def test_move_dir_caseinsensitive(self): 2146 # Renames a folder to the same name 2147 # but a different case. 2148 2149 self.src_dir = tempfile.mkdtemp() 2150 self.addCleanup(shutil.rmtree, self.src_dir, True) 2151 dst_dir = os.path.join( 2152 os.path.dirname(self.src_dir), 2153 os.path.basename(self.src_dir).upper()) 2154 self.assertNotEqual(self.src_dir, dst_dir) 2155 2156 try: 2157 shutil.move(self.src_dir, dst_dir) 2158 self.assertTrue(os.path.isdir(dst_dir)) 2159 finally: 2160 os.rmdir(dst_dir) 2161 2162 2163class TestCopyFileObj(unittest.TestCase): 2164 FILESIZE = 2 * 1024 * 1024 2165 2166 @classmethod 2167 def setUpClass(cls): 2168 write_test_file(TESTFN, cls.FILESIZE) 2169 2170 @classmethod 2171 def tearDownClass(cls): 2172 support.unlink(TESTFN) 2173 support.unlink(TESTFN2) 2174 2175 def tearDown(self): 2176 support.unlink(TESTFN2) 2177 2178 @contextlib.contextmanager 2179 def get_files(self): 2180 with open(TESTFN, "rb") as src: 2181 with open(TESTFN2, "wb") as dst: 2182 yield (src, dst) 2183 2184 def assert_files_eq(self, src, dst): 2185 with open(src, 'rb') as fsrc: 2186 with open(dst, 'rb') as fdst: 2187 self.assertEqual(fsrc.read(), fdst.read()) 2188 2189 def test_content(self): 2190 with self.get_files() as (src, dst): 2191 shutil.copyfileobj(src, dst) 2192 self.assert_files_eq(TESTFN, TESTFN2) 2193 2194 def test_file_not_closed(self): 2195 with self.get_files() as (src, dst): 2196 shutil.copyfileobj(src, dst) 2197 assert not src.closed 2198 assert not dst.closed 2199 2200 def test_file_offset(self): 2201 with self.get_files() as (src, dst): 2202 shutil.copyfileobj(src, dst) 2203 self.assertEqual(src.tell(), self.FILESIZE) 2204 self.assertEqual(dst.tell(), self.FILESIZE) 2205 2206 @unittest.skipIf(os.name != 'nt', "Windows only") 2207 def test_win_impl(self): 2208 # Make sure alternate Windows implementation is called. 2209 with unittest.mock.patch("shutil._copyfileobj_readinto") as m: 2210 shutil.copyfile(TESTFN, TESTFN2) 2211 assert m.called 2212 2213 # File size is 2 MiB but max buf size should be 1 MiB. 2214 self.assertEqual(m.call_args[0][2], 1 * 1024 * 1024) 2215 2216 # If file size < 1 MiB memoryview() length must be equal to 2217 # the actual file size. 2218 with tempfile.NamedTemporaryFile(delete=False) as f: 2219 f.write(b'foo') 2220 fname = f.name 2221 self.addCleanup(support.unlink, fname) 2222 with unittest.mock.patch("shutil._copyfileobj_readinto") as m: 2223 shutil.copyfile(fname, TESTFN2) 2224 self.assertEqual(m.call_args[0][2], 3) 2225 2226 # Empty files should not rely on readinto() variant. 2227 with tempfile.NamedTemporaryFile(delete=False) as f: 2228 pass 2229 fname = f.name 2230 self.addCleanup(support.unlink, fname) 2231 with unittest.mock.patch("shutil._copyfileobj_readinto") as m: 2232 shutil.copyfile(fname, TESTFN2) 2233 assert not m.called 2234 self.assert_files_eq(fname, TESTFN2) 2235 2236 2237class _ZeroCopyFileTest(object): 2238 """Tests common to all zero-copy APIs.""" 2239 FILESIZE = (10 * 1024 * 1024) # 10 MiB 2240 FILEDATA = b"" 2241 PATCHPOINT = "" 2242 2243 @classmethod 2244 def setUpClass(cls): 2245 write_test_file(TESTFN, cls.FILESIZE) 2246 with open(TESTFN, 'rb') as f: 2247 cls.FILEDATA = f.read() 2248 assert len(cls.FILEDATA) == cls.FILESIZE 2249 2250 @classmethod 2251 def tearDownClass(cls): 2252 support.unlink(TESTFN) 2253 2254 def tearDown(self): 2255 support.unlink(TESTFN2) 2256 2257 @contextlib.contextmanager 2258 def get_files(self): 2259 with open(TESTFN, "rb") as src: 2260 with open(TESTFN2, "wb") as dst: 2261 yield (src, dst) 2262 2263 def zerocopy_fun(self, *args, **kwargs): 2264 raise NotImplementedError("must be implemented in subclass") 2265 2266 def reset(self): 2267 self.tearDown() 2268 self.tearDownClass() 2269 self.setUpClass() 2270 self.setUp() 2271 2272 # --- 2273 2274 def test_regular_copy(self): 2275 with self.get_files() as (src, dst): 2276 self.zerocopy_fun(src, dst) 2277 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) 2278 # Make sure the fallback function is not called. 2279 with self.get_files() as (src, dst): 2280 with unittest.mock.patch('shutil.copyfileobj') as m: 2281 shutil.copyfile(TESTFN, TESTFN2) 2282 assert not m.called 2283 2284 def test_same_file(self): 2285 self.addCleanup(self.reset) 2286 with self.get_files() as (src, dst): 2287 with self.assertRaises(Exception): 2288 self.zerocopy_fun(src, src) 2289 # Make sure src file is not corrupted. 2290 self.assertEqual(read_file(TESTFN, binary=True), self.FILEDATA) 2291 2292 def test_non_existent_src(self): 2293 name = tempfile.mktemp() 2294 with self.assertRaises(FileNotFoundError) as cm: 2295 shutil.copyfile(name, "new") 2296 self.assertEqual(cm.exception.filename, name) 2297 2298 def test_empty_file(self): 2299 srcname = TESTFN + 'src' 2300 dstname = TESTFN + 'dst' 2301 self.addCleanup(lambda: support.unlink(srcname)) 2302 self.addCleanup(lambda: support.unlink(dstname)) 2303 with open(srcname, "wb"): 2304 pass 2305 2306 with open(srcname, "rb") as src: 2307 with open(dstname, "wb") as dst: 2308 self.zerocopy_fun(src, dst) 2309 2310 self.assertEqual(read_file(dstname, binary=True), b"") 2311 2312 def test_unhandled_exception(self): 2313 with unittest.mock.patch(self.PATCHPOINT, 2314 side_effect=ZeroDivisionError): 2315 self.assertRaises(ZeroDivisionError, 2316 shutil.copyfile, TESTFN, TESTFN2) 2317 2318 def test_exception_on_first_call(self): 2319 # Emulate a case where the first call to the zero-copy 2320 # function raises an exception in which case the function is 2321 # supposed to give up immediately. 2322 with unittest.mock.patch(self.PATCHPOINT, 2323 side_effect=OSError(errno.EINVAL, "yo")): 2324 with self.get_files() as (src, dst): 2325 with self.assertRaises(_GiveupOnFastCopy): 2326 self.zerocopy_fun(src, dst) 2327 2328 def test_filesystem_full(self): 2329 # Emulate a case where filesystem is full and sendfile() fails 2330 # on first call. 2331 with unittest.mock.patch(self.PATCHPOINT, 2332 side_effect=OSError(errno.ENOSPC, "yo")): 2333 with self.get_files() as (src, dst): 2334 self.assertRaises(OSError, self.zerocopy_fun, src, dst) 2335 2336 2337@unittest.skipIf(not SUPPORTS_SENDFILE, 'os.sendfile() not supported') 2338class TestZeroCopySendfile(_ZeroCopyFileTest, unittest.TestCase): 2339 PATCHPOINT = "os.sendfile" 2340 2341 def zerocopy_fun(self, fsrc, fdst): 2342 return shutil._fastcopy_sendfile(fsrc, fdst) 2343 2344 def test_non_regular_file_src(self): 2345 with io.BytesIO(self.FILEDATA) as src: 2346 with open(TESTFN2, "wb") as dst: 2347 with self.assertRaises(_GiveupOnFastCopy): 2348 self.zerocopy_fun(src, dst) 2349 shutil.copyfileobj(src, dst) 2350 2351 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) 2352 2353 def test_non_regular_file_dst(self): 2354 with open(TESTFN, "rb") as src: 2355 with io.BytesIO() as dst: 2356 with self.assertRaises(_GiveupOnFastCopy): 2357 self.zerocopy_fun(src, dst) 2358 shutil.copyfileobj(src, dst) 2359 dst.seek(0) 2360 self.assertEqual(dst.read(), self.FILEDATA) 2361 2362 def test_exception_on_second_call(self): 2363 def sendfile(*args, **kwargs): 2364 if not flag: 2365 flag.append(None) 2366 return orig_sendfile(*args, **kwargs) 2367 else: 2368 raise OSError(errno.EBADF, "yo") 2369 2370 flag = [] 2371 orig_sendfile = os.sendfile 2372 with unittest.mock.patch('os.sendfile', create=True, 2373 side_effect=sendfile): 2374 with self.get_files() as (src, dst): 2375 with self.assertRaises(OSError) as cm: 2376 shutil._fastcopy_sendfile(src, dst) 2377 assert flag 2378 self.assertEqual(cm.exception.errno, errno.EBADF) 2379 2380 def test_cant_get_size(self): 2381 # Emulate a case where src file size cannot be determined. 2382 # Internally bufsize will be set to a small value and 2383 # sendfile() will be called repeatedly. 2384 with unittest.mock.patch('os.fstat', side_effect=OSError) as m: 2385 with self.get_files() as (src, dst): 2386 shutil._fastcopy_sendfile(src, dst) 2387 assert m.called 2388 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) 2389 2390 def test_small_chunks(self): 2391 # Force internal file size detection to be smaller than the 2392 # actual file size. We want to force sendfile() to be called 2393 # multiple times, also in order to emulate a src fd which gets 2394 # bigger while it is being copied. 2395 mock = unittest.mock.Mock() 2396 mock.st_size = 65536 + 1 2397 with unittest.mock.patch('os.fstat', return_value=mock) as m: 2398 with self.get_files() as (src, dst): 2399 shutil._fastcopy_sendfile(src, dst) 2400 assert m.called 2401 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) 2402 2403 def test_big_chunk(self): 2404 # Force internal file size detection to be +100MB bigger than 2405 # the actual file size. Make sure sendfile() does not rely on 2406 # file size value except for (maybe) a better throughput / 2407 # performance. 2408 mock = unittest.mock.Mock() 2409 mock.st_size = self.FILESIZE + (100 * 1024 * 1024) 2410 with unittest.mock.patch('os.fstat', return_value=mock) as m: 2411 with self.get_files() as (src, dst): 2412 shutil._fastcopy_sendfile(src, dst) 2413 assert m.called 2414 self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) 2415 2416 def test_blocksize_arg(self): 2417 with unittest.mock.patch('os.sendfile', 2418 side_effect=ZeroDivisionError) as m: 2419 self.assertRaises(ZeroDivisionError, 2420 shutil.copyfile, TESTFN, TESTFN2) 2421 blocksize = m.call_args[0][3] 2422 # Make sure file size and the block size arg passed to 2423 # sendfile() are the same. 2424 self.assertEqual(blocksize, os.path.getsize(TESTFN)) 2425 # ...unless we're dealing with a small file. 2426 support.unlink(TESTFN2) 2427 write_file(TESTFN2, b"hello", binary=True) 2428 self.addCleanup(support.unlink, TESTFN2 + '3') 2429 self.assertRaises(ZeroDivisionError, 2430 shutil.copyfile, TESTFN2, TESTFN2 + '3') 2431 blocksize = m.call_args[0][3] 2432 self.assertEqual(blocksize, 2 ** 23) 2433 2434 def test_file2file_not_supported(self): 2435 # Emulate a case where sendfile() only support file->socket 2436 # fds. In such a case copyfile() is supposed to skip the 2437 # fast-copy attempt from then on. 2438 assert shutil._USE_CP_SENDFILE 2439 try: 2440 with unittest.mock.patch( 2441 self.PATCHPOINT, 2442 side_effect=OSError(errno.ENOTSOCK, "yo")) as m: 2443 with self.get_files() as (src, dst): 2444 with self.assertRaises(_GiveupOnFastCopy): 2445 shutil._fastcopy_sendfile(src, dst) 2446 assert m.called 2447 assert not shutil._USE_CP_SENDFILE 2448 2449 with unittest.mock.patch(self.PATCHPOINT) as m: 2450 shutil.copyfile(TESTFN, TESTFN2) 2451 assert not m.called 2452 finally: 2453 shutil._USE_CP_SENDFILE = True 2454 2455 2456@unittest.skipIf(not MACOS, 'macOS only') 2457class TestZeroCopyMACOS(_ZeroCopyFileTest, unittest.TestCase): 2458 PATCHPOINT = "posix._fcopyfile" 2459 2460 def zerocopy_fun(self, src, dst): 2461 return shutil._fastcopy_fcopyfile(src, dst, posix._COPYFILE_DATA) 2462 2463 2464class TermsizeTests(unittest.TestCase): 2465 def test_does_not_crash(self): 2466 """Check if get_terminal_size() returns a meaningful value. 2467 2468 There's no easy portable way to actually check the size of the 2469 terminal, so let's check if it returns something sensible instead. 2470 """ 2471 size = shutil.get_terminal_size() 2472 self.assertGreaterEqual(size.columns, 0) 2473 self.assertGreaterEqual(size.lines, 0) 2474 2475 def test_os_environ_first(self): 2476 "Check if environment variables have precedence" 2477 2478 with support.EnvironmentVarGuard() as env: 2479 env['COLUMNS'] = '777' 2480 del env['LINES'] 2481 size = shutil.get_terminal_size() 2482 self.assertEqual(size.columns, 777) 2483 2484 with support.EnvironmentVarGuard() as env: 2485 del env['COLUMNS'] 2486 env['LINES'] = '888' 2487 size = shutil.get_terminal_size() 2488 self.assertEqual(size.lines, 888) 2489 2490 def test_bad_environ(self): 2491 with support.EnvironmentVarGuard() as env: 2492 env['COLUMNS'] = 'xxx' 2493 env['LINES'] = 'yyy' 2494 size = shutil.get_terminal_size() 2495 self.assertGreaterEqual(size.columns, 0) 2496 self.assertGreaterEqual(size.lines, 0) 2497 2498 @unittest.skipUnless(os.isatty(sys.__stdout__.fileno()), "not on tty") 2499 @unittest.skipUnless(hasattr(os, 'get_terminal_size'), 2500 'need os.get_terminal_size()') 2501 def test_stty_match(self): 2502 """Check if stty returns the same results ignoring env 2503 2504 This test will fail if stdin and stdout are connected to 2505 different terminals with different sizes. Nevertheless, such 2506 situations should be pretty rare. 2507 """ 2508 try: 2509 size = subprocess.check_output(['stty', 'size']).decode().split() 2510 except (FileNotFoundError, PermissionError, 2511 subprocess.CalledProcessError): 2512 self.skipTest("stty invocation failed") 2513 expected = (int(size[1]), int(size[0])) # reversed order 2514 2515 with support.EnvironmentVarGuard() as env: 2516 del env['LINES'] 2517 del env['COLUMNS'] 2518 actual = shutil.get_terminal_size() 2519 2520 self.assertEqual(expected, actual) 2521 2522 def test_fallback(self): 2523 with support.EnvironmentVarGuard() as env: 2524 del env['LINES'] 2525 del env['COLUMNS'] 2526 2527 # sys.__stdout__ has no fileno() 2528 with support.swap_attr(sys, '__stdout__', None): 2529 size = shutil.get_terminal_size(fallback=(10, 20)) 2530 self.assertEqual(size.columns, 10) 2531 self.assertEqual(size.lines, 20) 2532 2533 # sys.__stdout__ is not a terminal on Unix 2534 # or fileno() not in (0, 1, 2) on Windows 2535 with open(os.devnull, 'w') as f, \ 2536 support.swap_attr(sys, '__stdout__', f): 2537 size = shutil.get_terminal_size(fallback=(30, 40)) 2538 self.assertEqual(size.columns, 30) 2539 self.assertEqual(size.lines, 40) 2540 2541 2542class PublicAPITests(unittest.TestCase): 2543 """Ensures that the correct values are exposed in the public API.""" 2544 2545 def test_module_all_attribute(self): 2546 self.assertTrue(hasattr(shutil, '__all__')) 2547 target_api = ['copyfileobj', 'copyfile', 'copymode', 'copystat', 2548 'copy', 'copy2', 'copytree', 'move', 'rmtree', 'Error', 2549 'SpecialFileError', 'ExecError', 'make_archive', 2550 'get_archive_formats', 'register_archive_format', 2551 'unregister_archive_format', 'get_unpack_formats', 2552 'register_unpack_format', 'unregister_unpack_format', 2553 'unpack_archive', 'ignore_patterns', 'chown', 'which', 2554 'get_terminal_size', 'SameFileError'] 2555 if hasattr(os, 'statvfs') or os.name == 'nt': 2556 target_api.append('disk_usage') 2557 self.assertEqual(set(shutil.__all__), set(target_api)) 2558 2559 2560if __name__ == '__main__': 2561 unittest.main() 2562