1# Copyright (C) 2003 Python Software Foundation 2 3import unittest 4import shutil 5import tempfile 6import sys 7import stat 8import os 9import os.path 10from os.path import splitdrive 11from distutils.spawn import find_executable, spawn 12from shutil import (_make_tarball, _make_zipfile, make_archive, 13 register_archive_format, unregister_archive_format, 14 get_archive_formats) 15import tarfile 16import warnings 17 18from test import test_support 19from test.test_support import TESTFN, check_warnings, captured_stdout 20 21TESTFN2 = TESTFN + "2" 22 23try: 24 import grp 25 import pwd 26 UID_GID_SUPPORT = True 27except ImportError: 28 UID_GID_SUPPORT = False 29 30try: 31 import zlib 32except ImportError: 33 zlib = None 34 35try: 36 import zipfile 37 ZIP_SUPPORT = True 38except ImportError: 39 ZIP_SUPPORT = find_executable('zip') 40 41class TestShutil(unittest.TestCase): 42 43 def setUp(self): 44 super(TestShutil, self).setUp() 45 self.tempdirs = [] 46 47 def tearDown(self): 48 super(TestShutil, self).tearDown() 49 while self.tempdirs: 50 d = self.tempdirs.pop() 51 shutil.rmtree(d, os.name in ('nt', 'cygwin')) 52 53 def write_file(self, path, content='xxx'): 54 """Writes a file in the given path. 55 56 57 path can be a string or a sequence. 58 """ 59 if isinstance(path, (list, tuple)): 60 path = os.path.join(*path) 61 f = open(path, 'w') 62 try: 63 f.write(content) 64 finally: 65 f.close() 66 67 def mkdtemp(self): 68 """Create a temporary directory that will be cleaned up. 69 70 Returns the path of the directory. 71 """ 72 d = tempfile.mkdtemp() 73 self.tempdirs.append(d) 74 return d 75 def test_rmtree_errors(self): 76 # filename is guaranteed not to exist 77 filename = tempfile.mktemp() 78 self.assertRaises(OSError, shutil.rmtree, filename) 79 80 # See bug #1071513 for why we don't run this on cygwin 81 # and bug #1076467 for why we don't run this as root. 82 if (hasattr(os, 'chmod') and sys.platform[:6] != 'cygwin' 83 and not (hasattr(os, 'geteuid') and os.geteuid() == 0)): 84 def test_on_error(self): 85 self.errorState = 0 86 os.mkdir(TESTFN) 87 self.childpath = os.path.join(TESTFN, 'a') 88 f = open(self.childpath, 'w') 89 f.close() 90 old_dir_mode = os.stat(TESTFN).st_mode 91 old_child_mode = os.stat(self.childpath).st_mode 92 # Make unwritable. 93 os.chmod(self.childpath, stat.S_IREAD) 94 os.chmod(TESTFN, stat.S_IREAD) 95 96 shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror) 97 # Test whether onerror has actually been called. 98 self.assertEqual(self.errorState, 2, 99 "Expected call to onerror function did not happen.") 100 101 # Make writable again. 102 os.chmod(TESTFN, old_dir_mode) 103 os.chmod(self.childpath, old_child_mode) 104 105 # Clean up. 106 shutil.rmtree(TESTFN) 107 108 def check_args_to_onerror(self, func, arg, exc): 109 # test_rmtree_errors deliberately runs rmtree 110 # on a directory that is chmod 400, which will fail. 111 # This function is run when shutil.rmtree fails. 112 # 99.9% of the time it initially fails to remove 113 # a file in the directory, so the first time through 114 # func is os.remove. 115 # However, some Linux machines running ZFS on 116 # FUSE experienced a failure earlier in the process 117 # at os.listdir. The first failure may legally 118 # be either. 119 if self.errorState == 0: 120 if func is os.remove: 121 self.assertEqual(arg, self.childpath) 122 else: 123 self.assertIs(func, os.listdir, 124 "func must be either os.remove or os.listdir") 125 self.assertEqual(arg, TESTFN) 126 self.assertTrue(issubclass(exc[0], OSError)) 127 self.errorState = 1 128 else: 129 self.assertEqual(func, os.rmdir) 130 self.assertEqual(arg, TESTFN) 131 self.assertTrue(issubclass(exc[0], OSError)) 132 self.errorState = 2 133 134 def test_rmtree_dont_delete_file(self): 135 # When called on a file instead of a directory, don't delete it. 136 handle, path = tempfile.mkstemp() 137 os.fdopen(handle).close() 138 self.assertRaises(OSError, shutil.rmtree, path) 139 os.remove(path) 140 141 def test_copytree_simple(self): 142 def write_data(path, data): 143 f = open(path, "w") 144 f.write(data) 145 f.close() 146 147 def read_data(path): 148 f = open(path) 149 data = f.read() 150 f.close() 151 return data 152 153 src_dir = tempfile.mkdtemp() 154 dst_dir = os.path.join(tempfile.mkdtemp(), 'destination') 155 156 write_data(os.path.join(src_dir, 'test.txt'), '123') 157 158 os.mkdir(os.path.join(src_dir, 'test_dir')) 159 write_data(os.path.join(src_dir, 'test_dir', 'test.txt'), '456') 160 161 try: 162 shutil.copytree(src_dir, dst_dir) 163 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt'))) 164 self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir'))) 165 self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir', 166 'test.txt'))) 167 actual = read_data(os.path.join(dst_dir, 'test.txt')) 168 self.assertEqual(actual, '123') 169 actual = read_data(os.path.join(dst_dir, 'test_dir', 'test.txt')) 170 self.assertEqual(actual, '456') 171 finally: 172 for path in ( 173 os.path.join(src_dir, 'test.txt'), 174 os.path.join(dst_dir, 'test.txt'), 175 os.path.join(src_dir, 'test_dir', 'test.txt'), 176 os.path.join(dst_dir, 'test_dir', 'test.txt'), 177 ): 178 if os.path.exists(path): 179 os.remove(path) 180 for path in (src_dir, 181 os.path.dirname(dst_dir) 182 ): 183 if os.path.exists(path): 184 shutil.rmtree(path) 185 186 def test_copytree_with_exclude(self): 187 188 def write_data(path, data): 189 f = open(path, "w") 190 f.write(data) 191 f.close() 192 193 def read_data(path): 194 f = open(path) 195 data = f.read() 196 f.close() 197 return data 198 199 # creating data 200 join = os.path.join 201 exists = os.path.exists 202 src_dir = tempfile.mkdtemp() 203 try: 204 dst_dir = join(tempfile.mkdtemp(), 'destination') 205 write_data(join(src_dir, 'test.txt'), '123') 206 write_data(join(src_dir, 'test.tmp'), '123') 207 os.mkdir(join(src_dir, 'test_dir')) 208 write_data(join(src_dir, 'test_dir', 'test.txt'), '456') 209 os.mkdir(join(src_dir, 'test_dir2')) 210 write_data(join(src_dir, 'test_dir2', 'test.txt'), '456') 211 os.mkdir(join(src_dir, 'test_dir2', 'subdir')) 212 os.mkdir(join(src_dir, 'test_dir2', 'subdir2')) 213 write_data(join(src_dir, 'test_dir2', 'subdir', 'test.txt'), '456') 214 write_data(join(src_dir, 'test_dir2', 'subdir2', 'test.py'), '456') 215 216 217 # testing glob-like patterns 218 try: 219 patterns = shutil.ignore_patterns('*.tmp', 'test_dir2') 220 shutil.copytree(src_dir, dst_dir, ignore=patterns) 221 # checking the result: some elements should not be copied 222 self.assertTrue(exists(join(dst_dir, 'test.txt'))) 223 self.assertTrue(not exists(join(dst_dir, 'test.tmp'))) 224 self.assertTrue(not exists(join(dst_dir, 'test_dir2'))) 225 finally: 226 if os.path.exists(dst_dir): 227 shutil.rmtree(dst_dir) 228 try: 229 patterns = shutil.ignore_patterns('*.tmp', 'subdir*') 230 shutil.copytree(src_dir, dst_dir, ignore=patterns) 231 # checking the result: some elements should not be copied 232 self.assertTrue(not exists(join(dst_dir, 'test.tmp'))) 233 self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir2'))) 234 self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir'))) 235 finally: 236 if os.path.exists(dst_dir): 237 shutil.rmtree(dst_dir) 238 239 # testing callable-style 240 try: 241 def _filter(src, names): 242 res = [] 243 for name in names: 244 path = os.path.join(src, name) 245 246 if (os.path.isdir(path) and 247 path.split()[-1] == 'subdir'): 248 res.append(name) 249 elif os.path.splitext(path)[-1] in ('.py'): 250 res.append(name) 251 return res 252 253 shutil.copytree(src_dir, dst_dir, ignore=_filter) 254 255 # checking the result: some elements should not be copied 256 self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir2', 257 'test.py'))) 258 self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir'))) 259 260 finally: 261 if os.path.exists(dst_dir): 262 shutil.rmtree(dst_dir) 263 finally: 264 shutil.rmtree(src_dir) 265 shutil.rmtree(os.path.dirname(dst_dir)) 266 267 if hasattr(os, "symlink"): 268 def test_dont_copy_file_onto_link_to_itself(self): 269 # bug 851123. 270 os.mkdir(TESTFN) 271 src = os.path.join(TESTFN, 'cheese') 272 dst = os.path.join(TESTFN, 'shop') 273 try: 274 f = open(src, 'w') 275 f.write('cheddar') 276 f.close() 277 278 os.link(src, dst) 279 self.assertRaises(shutil.Error, shutil.copyfile, src, dst) 280 with open(src, 'r') as f: 281 self.assertEqual(f.read(), 'cheddar') 282 os.remove(dst) 283 284 # Using `src` here would mean we end up with a symlink pointing 285 # to TESTFN/TESTFN/cheese, while it should point at 286 # TESTFN/cheese. 287 os.symlink('cheese', dst) 288 self.assertRaises(shutil.Error, shutil.copyfile, src, dst) 289 with open(src, 'r') as f: 290 self.assertEqual(f.read(), 'cheddar') 291 os.remove(dst) 292 finally: 293 try: 294 shutil.rmtree(TESTFN) 295 except OSError: 296 pass 297 298 def test_rmtree_on_symlink(self): 299 # bug 1669. 300 os.mkdir(TESTFN) 301 try: 302 src = os.path.join(TESTFN, 'cheese') 303 dst = os.path.join(TESTFN, 'shop') 304 os.mkdir(src) 305 os.symlink(src, dst) 306 self.assertRaises(OSError, shutil.rmtree, dst) 307 finally: 308 shutil.rmtree(TESTFN, ignore_errors=True) 309 310 if hasattr(os, "mkfifo"): 311 # Issue #3002: copyfile and copytree block indefinitely on named pipes 312 def test_copyfile_named_pipe(self): 313 os.mkfifo(TESTFN) 314 try: 315 self.assertRaises(shutil.SpecialFileError, 316 shutil.copyfile, TESTFN, TESTFN2) 317 self.assertRaises(shutil.SpecialFileError, 318 shutil.copyfile, __file__, TESTFN) 319 finally: 320 os.remove(TESTFN) 321 322 def test_copytree_named_pipe(self): 323 os.mkdir(TESTFN) 324 try: 325 subdir = os.path.join(TESTFN, "subdir") 326 os.mkdir(subdir) 327 pipe = os.path.join(subdir, "mypipe") 328 os.mkfifo(pipe) 329 try: 330 shutil.copytree(TESTFN, TESTFN2) 331 except shutil.Error as e: 332 errors = e.args[0] 333 self.assertEqual(len(errors), 1) 334 src, dst, error_msg = errors[0] 335 self.assertEqual("`%s` is a named pipe" % pipe, error_msg) 336 else: 337 self.fail("shutil.Error should have been raised") 338 finally: 339 shutil.rmtree(TESTFN, ignore_errors=True) 340 shutil.rmtree(TESTFN2, ignore_errors=True) 341 342 @unittest.skipUnless(zlib, "requires zlib") 343 def test_make_tarball(self): 344 # creating something to tar 345 tmpdir = self.mkdtemp() 346 self.write_file([tmpdir, 'file1'], 'xxx') 347 self.write_file([tmpdir, 'file2'], 'xxx') 348 os.mkdir(os.path.join(tmpdir, 'sub')) 349 self.write_file([tmpdir, 'sub', 'file3'], 'xxx') 350 351 tmpdir2 = self.mkdtemp() 352 unittest.skipUnless(splitdrive(tmpdir)[0] == splitdrive(tmpdir2)[0], 353 "source and target should be on same drive") 354 355 base_name = os.path.join(tmpdir2, 'archive') 356 357 # working with relative paths to avoid tar warnings 358 old_dir = os.getcwd() 359 os.chdir(tmpdir) 360 try: 361 _make_tarball(splitdrive(base_name)[1], '.') 362 finally: 363 os.chdir(old_dir) 364 365 # check if the compressed tarball was created 366 tarball = base_name + '.tar.gz' 367 self.assertTrue(os.path.exists(tarball)) 368 369 # trying an uncompressed one 370 base_name = os.path.join(tmpdir2, 'archive') 371 old_dir = os.getcwd() 372 os.chdir(tmpdir) 373 try: 374 _make_tarball(splitdrive(base_name)[1], '.', compress=None) 375 finally: 376 os.chdir(old_dir) 377 tarball = base_name + '.tar' 378 self.assertTrue(os.path.exists(tarball)) 379 380 def _tarinfo(self, path): 381 tar = tarfile.open(path) 382 try: 383 names = tar.getnames() 384 names.sort() 385 return tuple(names) 386 finally: 387 tar.close() 388 389 def _create_files(self): 390 # creating something to tar 391 tmpdir = self.mkdtemp() 392 dist = os.path.join(tmpdir, 'dist') 393 os.mkdir(dist) 394 self.write_file([dist, 'file1'], 'xxx') 395 self.write_file([dist, 'file2'], 'xxx') 396 os.mkdir(os.path.join(dist, 'sub')) 397 self.write_file([dist, 'sub', 'file3'], 'xxx') 398 os.mkdir(os.path.join(dist, 'sub2')) 399 tmpdir2 = self.mkdtemp() 400 base_name = os.path.join(tmpdir2, 'archive') 401 return tmpdir, tmpdir2, base_name 402 403 @unittest.skipUnless(zlib, "Requires zlib") 404 @unittest.skipUnless(find_executable('tar') and find_executable('gzip'), 405 'Need the tar command to run') 406 def test_tarfile_vs_tar(self): 407 tmpdir, tmpdir2, base_name = self._create_files() 408 old_dir = os.getcwd() 409 os.chdir(tmpdir) 410 try: 411 _make_tarball(base_name, 'dist') 412 finally: 413 os.chdir(old_dir) 414 415 # check if the compressed tarball was created 416 tarball = base_name + '.tar.gz' 417 self.assertTrue(os.path.exists(tarball)) 418 419 # now create another tarball using `tar` 420 tarball2 = os.path.join(tmpdir, 'archive2.tar.gz') 421 tar_cmd = ['tar', '-cf', 'archive2.tar', 'dist'] 422 gzip_cmd = ['gzip', '-f9', 'archive2.tar'] 423 old_dir = os.getcwd() 424 os.chdir(tmpdir) 425 try: 426 with captured_stdout() as s: 427 spawn(tar_cmd) 428 spawn(gzip_cmd) 429 finally: 430 os.chdir(old_dir) 431 432 self.assertTrue(os.path.exists(tarball2)) 433 # let's compare both tarballs 434 self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2)) 435 436 # trying an uncompressed one 437 base_name = os.path.join(tmpdir2, 'archive') 438 old_dir = os.getcwd() 439 os.chdir(tmpdir) 440 try: 441 _make_tarball(base_name, 'dist', compress=None) 442 finally: 443 os.chdir(old_dir) 444 tarball = base_name + '.tar' 445 self.assertTrue(os.path.exists(tarball)) 446 447 # now for a dry_run 448 base_name = os.path.join(tmpdir2, 'archive') 449 old_dir = os.getcwd() 450 os.chdir(tmpdir) 451 try: 452 _make_tarball(base_name, 'dist', compress=None, dry_run=True) 453 finally: 454 os.chdir(old_dir) 455 tarball = base_name + '.tar' 456 self.assertTrue(os.path.exists(tarball)) 457 458 @unittest.skipUnless(zlib, "Requires zlib") 459 @unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run') 460 def test_make_zipfile(self): 461 # creating something to tar 462 tmpdir = self.mkdtemp() 463 self.write_file([tmpdir, 'file1'], 'xxx') 464 self.write_file([tmpdir, 'file2'], 'xxx') 465 466 tmpdir2 = self.mkdtemp() 467 base_name = os.path.join(tmpdir2, 'archive') 468 _make_zipfile(base_name, tmpdir) 469 470 # check if the compressed tarball was created 471 tarball = base_name + '.zip' 472 self.assertTrue(os.path.exists(tarball)) 473 474 475 def test_make_archive(self): 476 tmpdir = self.mkdtemp() 477 base_name = os.path.join(tmpdir, 'archive') 478 self.assertRaises(ValueError, make_archive, base_name, 'xxx') 479 480 @unittest.skipUnless(zlib, "Requires zlib") 481 def test_make_archive_owner_group(self): 482 # testing make_archive with owner and group, with various combinations 483 # this works even if there's not gid/uid support 484 if UID_GID_SUPPORT: 485 group = grp.getgrgid(0)[0] 486 owner = pwd.getpwuid(0)[0] 487 else: 488 group = owner = 'root' 489 490 base_dir, root_dir, base_name = self._create_files() 491 base_name = os.path.join(self.mkdtemp() , 'archive') 492 res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner, 493 group=group) 494 self.assertTrue(os.path.exists(res)) 495 496 res = make_archive(base_name, 'zip', root_dir, base_dir) 497 self.assertTrue(os.path.exists(res)) 498 499 res = make_archive(base_name, 'tar', root_dir, base_dir, 500 owner=owner, group=group) 501 self.assertTrue(os.path.exists(res)) 502 503 res = make_archive(base_name, 'tar', root_dir, base_dir, 504 owner='kjhkjhkjg', group='oihohoh') 505 self.assertTrue(os.path.exists(res)) 506 507 @unittest.skipUnless(zlib, "Requires zlib") 508 @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support") 509 def test_tarfile_root_owner(self): 510 tmpdir, tmpdir2, base_name = self._create_files() 511 old_dir = os.getcwd() 512 os.chdir(tmpdir) 513 group = grp.getgrgid(0)[0] 514 owner = pwd.getpwuid(0)[0] 515 try: 516 archive_name = _make_tarball(base_name, 'dist', compress=None, 517 owner=owner, group=group) 518 finally: 519 os.chdir(old_dir) 520 521 # check if the compressed tarball was created 522 self.assertTrue(os.path.exists(archive_name)) 523 524 # now checks the rights 525 archive = tarfile.open(archive_name) 526 try: 527 for member in archive.getmembers(): 528 self.assertEqual(member.uid, 0) 529 self.assertEqual(member.gid, 0) 530 finally: 531 archive.close() 532 533 def test_make_archive_cwd(self): 534 current_dir = os.getcwd() 535 def _breaks(*args, **kw): 536 raise RuntimeError() 537 538 register_archive_format('xxx', _breaks, [], 'xxx file') 539 try: 540 try: 541 make_archive('xxx', 'xxx', root_dir=self.mkdtemp()) 542 except Exception: 543 pass 544 self.assertEqual(os.getcwd(), current_dir) 545 finally: 546 unregister_archive_format('xxx') 547 548 def test_register_archive_format(self): 549 550 self.assertRaises(TypeError, register_archive_format, 'xxx', 1) 551 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x, 552 1) 553 self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x, 554 [(1, 2), (1, 2, 3)]) 555 556 register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file') 557 formats = [name for name, params in get_archive_formats()] 558 self.assertIn('xxx', formats) 559 560 unregister_archive_format('xxx') 561 formats = [name for name, params in get_archive_formats()] 562 self.assertNotIn('xxx', formats) 563 564 565class TestMove(unittest.TestCase): 566 567 def setUp(self): 568 filename = "foo" 569 self.src_dir = tempfile.mkdtemp() 570 self.dst_dir = tempfile.mkdtemp() 571 self.src_file = os.path.join(self.src_dir, filename) 572 self.dst_file = os.path.join(self.dst_dir, filename) 573 # Try to create a dir in the current directory, hoping that it is 574 # not located on the same filesystem as the system tmp dir. 575 try: 576 self.dir_other_fs = tempfile.mkdtemp( 577 dir=os.path.dirname(__file__)) 578 self.file_other_fs = os.path.join(self.dir_other_fs, 579 filename) 580 except OSError: 581 self.dir_other_fs = None 582 with open(self.src_file, "wb") as f: 583 f.write("spam") 584 585 def tearDown(self): 586 for d in (self.src_dir, self.dst_dir, self.dir_other_fs): 587 try: 588 if d: 589 shutil.rmtree(d) 590 except: 591 pass 592 593 def _check_move_file(self, src, dst, real_dst): 594 with open(src, "rb") as f: 595 contents = f.read() 596 shutil.move(src, dst) 597 with open(real_dst, "rb") as f: 598 self.assertEqual(contents, f.read()) 599 self.assertFalse(os.path.exists(src)) 600 601 def _check_move_dir(self, src, dst, real_dst): 602 contents = sorted(os.listdir(src)) 603 shutil.move(src, dst) 604 self.assertEqual(contents, sorted(os.listdir(real_dst))) 605 self.assertFalse(os.path.exists(src)) 606 607 def test_move_file(self): 608 # Move a file to another location on the same filesystem. 609 self._check_move_file(self.src_file, self.dst_file, self.dst_file) 610 611 def test_move_file_to_dir(self): 612 # Move a file inside an existing dir on the same filesystem. 613 self._check_move_file(self.src_file, self.dst_dir, self.dst_file) 614 615 def test_move_file_other_fs(self): 616 # Move a file to an existing dir on another filesystem. 617 if not self.dir_other_fs: 618 # skip 619 return 620 self._check_move_file(self.src_file, self.file_other_fs, 621 self.file_other_fs) 622 623 def test_move_file_to_dir_other_fs(self): 624 # Move a file to another location on another filesystem. 625 if not self.dir_other_fs: 626 # skip 627 return 628 self._check_move_file(self.src_file, self.dir_other_fs, 629 self.file_other_fs) 630 631 def test_move_dir(self): 632 # Move a dir to another location on the same filesystem. 633 dst_dir = tempfile.mktemp() 634 try: 635 self._check_move_dir(self.src_dir, dst_dir, dst_dir) 636 finally: 637 try: 638 shutil.rmtree(dst_dir) 639 except: 640 pass 641 642 def test_move_dir_other_fs(self): 643 # Move a dir to another location on another filesystem. 644 if not self.dir_other_fs: 645 # skip 646 return 647 dst_dir = tempfile.mktemp(dir=self.dir_other_fs) 648 try: 649 self._check_move_dir(self.src_dir, dst_dir, dst_dir) 650 finally: 651 try: 652 shutil.rmtree(dst_dir) 653 except: 654 pass 655 656 def test_move_dir_to_dir(self): 657 # Move a dir inside an existing dir on the same filesystem. 658 self._check_move_dir(self.src_dir, self.dst_dir, 659 os.path.join(self.dst_dir, os.path.basename(self.src_dir))) 660 661 def test_move_dir_to_dir_other_fs(self): 662 # Move a dir inside an existing dir on another filesystem. 663 if not self.dir_other_fs: 664 # skip 665 return 666 self._check_move_dir(self.src_dir, self.dir_other_fs, 667 os.path.join(self.dir_other_fs, os.path.basename(self.src_dir))) 668 669 def test_existing_file_inside_dest_dir(self): 670 # A file with the same name inside the destination dir already exists. 671 with open(self.dst_file, "wb"): 672 pass 673 self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir) 674 675 def test_dont_move_dir_in_itself(self): 676 # Moving a dir inside itself raises an Error. 677 dst = os.path.join(self.src_dir, "bar") 678 self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst) 679 680 def test_destinsrc_false_negative(self): 681 os.mkdir(TESTFN) 682 try: 683 for src, dst in [('srcdir', 'srcdir/dest')]: 684 src = os.path.join(TESTFN, src) 685 dst = os.path.join(TESTFN, dst) 686 self.assertTrue(shutil._destinsrc(src, dst), 687 msg='_destinsrc() wrongly concluded that ' 688 'dst (%s) is not in src (%s)' % (dst, src)) 689 finally: 690 shutil.rmtree(TESTFN, ignore_errors=True) 691 692 def test_destinsrc_false_positive(self): 693 os.mkdir(TESTFN) 694 try: 695 for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]: 696 src = os.path.join(TESTFN, src) 697 dst = os.path.join(TESTFN, dst) 698 self.assertFalse(shutil._destinsrc(src, dst), 699 msg='_destinsrc() wrongly concluded that ' 700 'dst (%s) is in src (%s)' % (dst, src)) 701 finally: 702 shutil.rmtree(TESTFN, ignore_errors=True) 703 704 705class TestCopyFile(unittest.TestCase): 706 707 _delete = False 708 709 class Faux(object): 710 _entered = False 711 _exited_with = None 712 _raised = False 713 def __init__(self, raise_in_exit=False, suppress_at_exit=True): 714 self._raise_in_exit = raise_in_exit 715 self._suppress_at_exit = suppress_at_exit 716 def read(self, *args): 717 return '' 718 def __enter__(self): 719 self._entered = True 720 def __exit__(self, exc_type, exc_val, exc_tb): 721 self._exited_with = exc_type, exc_val, exc_tb 722 if self._raise_in_exit: 723 self._raised = True 724 raise IOError("Cannot close") 725 return self._suppress_at_exit 726 727 def tearDown(self): 728 if self._delete: 729 del shutil.open 730 731 def _set_shutil_open(self, func): 732 shutil.open = func 733 self._delete = True 734 735 def test_w_source_open_fails(self): 736 def _open(filename, mode='r'): 737 if filename == 'srcfile': 738 raise IOError('Cannot open "srcfile"') 739 assert 0 # shouldn't reach here. 740 741 self._set_shutil_open(_open) 742 743 self.assertRaises(IOError, shutil.copyfile, 'srcfile', 'destfile') 744 745 def test_w_dest_open_fails(self): 746 747 srcfile = self.Faux() 748 749 def _open(filename, mode='r'): 750 if filename == 'srcfile': 751 return srcfile 752 if filename == 'destfile': 753 raise IOError('Cannot open "destfile"') 754 assert 0 # shouldn't reach here. 755 756 self._set_shutil_open(_open) 757 758 shutil.copyfile('srcfile', 'destfile') 759 self.assertTrue(srcfile._entered) 760 self.assertTrue(srcfile._exited_with[0] is IOError) 761 self.assertEqual(srcfile._exited_with[1].args, 762 ('Cannot open "destfile"',)) 763 764 def test_w_dest_close_fails(self): 765 766 srcfile = self.Faux() 767 destfile = self.Faux(True) 768 769 def _open(filename, mode='r'): 770 if filename == 'srcfile': 771 return srcfile 772 if filename == 'destfile': 773 return destfile 774 assert 0 # shouldn't reach here. 775 776 self._set_shutil_open(_open) 777 778 shutil.copyfile('srcfile', 'destfile') 779 self.assertTrue(srcfile._entered) 780 self.assertTrue(destfile._entered) 781 self.assertTrue(destfile._raised) 782 self.assertTrue(srcfile._exited_with[0] is IOError) 783 self.assertEqual(srcfile._exited_with[1].args, 784 ('Cannot close',)) 785 786 def test_w_source_close_fails(self): 787 788 srcfile = self.Faux(True) 789 destfile = self.Faux() 790 791 def _open(filename, mode='r'): 792 if filename == 'srcfile': 793 return srcfile 794 if filename == 'destfile': 795 return destfile 796 assert 0 # shouldn't reach here. 797 798 self._set_shutil_open(_open) 799 800 self.assertRaises(IOError, 801 shutil.copyfile, 'srcfile', 'destfile') 802 self.assertTrue(srcfile._entered) 803 self.assertTrue(destfile._entered) 804 self.assertFalse(destfile._raised) 805 self.assertTrue(srcfile._exited_with[0] is None) 806 self.assertTrue(srcfile._raised) 807 808 def test_move_dir_caseinsensitive(self): 809 # Renames a folder to the same name 810 # but a different case. 811 812 self.src_dir = tempfile.mkdtemp() 813 dst_dir = os.path.join( 814 os.path.dirname(self.src_dir), 815 os.path.basename(self.src_dir).upper()) 816 self.assertNotEqual(self.src_dir, dst_dir) 817 818 try: 819 shutil.move(self.src_dir, dst_dir) 820 self.assertTrue(os.path.isdir(dst_dir)) 821 finally: 822 if os.path.exists(dst_dir): 823 os.rmdir(dst_dir) 824 825 826 827def test_main(): 828 test_support.run_unittest(TestShutil, TestMove, TestCopyFile) 829 830if __name__ == '__main__': 831 test_main() 832