1from test.support import (TESTFN, import_module, unlink, 2 requires, _2G, _4G, gc_collect, cpython_only) 3import unittest 4import os 5import re 6import itertools 7import socket 8import sys 9import weakref 10 11# Skip test if we can't import mmap. 12mmap = import_module('mmap') 13 14PAGESIZE = mmap.PAGESIZE 15 16 17class MmapTests(unittest.TestCase): 18 19 def setUp(self): 20 if os.path.exists(TESTFN): 21 os.unlink(TESTFN) 22 23 def tearDown(self): 24 try: 25 os.unlink(TESTFN) 26 except OSError: 27 pass 28 29 def test_basic(self): 30 # Test mmap module on Unix systems and Windows 31 32 # Create a file to be mmap'ed. 33 f = open(TESTFN, 'bw+') 34 try: 35 # Write 2 pages worth of data to the file 36 f.write(b'\0'* PAGESIZE) 37 f.write(b'foo') 38 f.write(b'\0'* (PAGESIZE-3) ) 39 f.flush() 40 m = mmap.mmap(f.fileno(), 2 * PAGESIZE) 41 finally: 42 f.close() 43 44 # Simple sanity checks 45 46 tp = str(type(m)) # SF bug 128713: segfaulted on Linux 47 self.assertEqual(m.find(b'foo'), PAGESIZE) 48 49 self.assertEqual(len(m), 2*PAGESIZE) 50 51 self.assertEqual(m[0], 0) 52 self.assertEqual(m[0:3], b'\0\0\0') 53 54 # Shouldn't crash on boundary (Issue #5292) 55 self.assertRaises(IndexError, m.__getitem__, len(m)) 56 self.assertRaises(IndexError, m.__setitem__, len(m), b'\0') 57 58 # Modify the file's content 59 m[0] = b'3'[0] 60 m[PAGESIZE +3: PAGESIZE +3+3] = b'bar' 61 62 # Check that the modification worked 63 self.assertEqual(m[0], b'3'[0]) 64 self.assertEqual(m[0:3], b'3\0\0') 65 self.assertEqual(m[PAGESIZE-1 : PAGESIZE + 7], b'\0foobar\0') 66 67 m.flush() 68 69 # Test doing a regular expression match in an mmap'ed file 70 match = re.search(b'[A-Za-z]+', m) 71 if match is None: 72 self.fail('regex match on mmap failed!') 73 else: 74 start, end = match.span(0) 75 length = end - start 76 77 self.assertEqual(start, PAGESIZE) 78 self.assertEqual(end, PAGESIZE + 6) 79 80 # test seeking around (try to overflow the seek implementation) 81 m.seek(0,0) 82 self.assertEqual(m.tell(), 0) 83 m.seek(42,1) 84 self.assertEqual(m.tell(), 42) 85 m.seek(0,2) 86 self.assertEqual(m.tell(), len(m)) 87 88 # Try to seek to negative position... 89 self.assertRaises(ValueError, m.seek, -1) 90 91 # Try to seek beyond end of mmap... 92 self.assertRaises(ValueError, m.seek, 1, 2) 93 94 # Try to seek to negative position... 95 self.assertRaises(ValueError, m.seek, -len(m)-1, 2) 96 97 # Try resizing map 98 try: 99 m.resize(512) 100 except SystemError: 101 # resize() not supported 102 # No messages are printed, since the output of this test suite 103 # would then be different across platforms. 104 pass 105 else: 106 # resize() is supported 107 self.assertEqual(len(m), 512) 108 # Check that we can no longer seek beyond the new size. 109 self.assertRaises(ValueError, m.seek, 513, 0) 110 111 # Check that the underlying file is truncated too 112 # (bug #728515) 113 f = open(TESTFN, 'rb') 114 try: 115 f.seek(0, 2) 116 self.assertEqual(f.tell(), 512) 117 finally: 118 f.close() 119 self.assertEqual(m.size(), 512) 120 121 m.close() 122 123 def test_access_parameter(self): 124 # Test for "access" keyword parameter 125 mapsize = 10 126 with open(TESTFN, "wb") as fp: 127 fp.write(b"a"*mapsize) 128 with open(TESTFN, "rb") as f: 129 m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_READ) 130 self.assertEqual(m[:], b'a'*mapsize, "Readonly memory map data incorrect.") 131 132 # Ensuring that readonly mmap can't be slice assigned 133 try: 134 m[:] = b'b'*mapsize 135 except TypeError: 136 pass 137 else: 138 self.fail("Able to write to readonly memory map") 139 140 # Ensuring that readonly mmap can't be item assigned 141 try: 142 m[0] = b'b' 143 except TypeError: 144 pass 145 else: 146 self.fail("Able to write to readonly memory map") 147 148 # Ensuring that readonly mmap can't be write() to 149 try: 150 m.seek(0,0) 151 m.write(b'abc') 152 except TypeError: 153 pass 154 else: 155 self.fail("Able to write to readonly memory map") 156 157 # Ensuring that readonly mmap can't be write_byte() to 158 try: 159 m.seek(0,0) 160 m.write_byte(b'd') 161 except TypeError: 162 pass 163 else: 164 self.fail("Able to write to readonly memory map") 165 166 # Ensuring that readonly mmap can't be resized 167 try: 168 m.resize(2*mapsize) 169 except SystemError: # resize is not universally supported 170 pass 171 except TypeError: 172 pass 173 else: 174 self.fail("Able to resize readonly memory map") 175 with open(TESTFN, "rb") as fp: 176 self.assertEqual(fp.read(), b'a'*mapsize, 177 "Readonly memory map data file was modified") 178 179 # Opening mmap with size too big 180 with open(TESTFN, "r+b") as f: 181 try: 182 m = mmap.mmap(f.fileno(), mapsize+1) 183 except ValueError: 184 # we do not expect a ValueError on Windows 185 # CAUTION: This also changes the size of the file on disk, and 186 # later tests assume that the length hasn't changed. We need to 187 # repair that. 188 if sys.platform.startswith('win'): 189 self.fail("Opening mmap with size+1 should work on Windows.") 190 else: 191 # we expect a ValueError on Unix, but not on Windows 192 if not sys.platform.startswith('win'): 193 self.fail("Opening mmap with size+1 should raise ValueError.") 194 m.close() 195 if sys.platform.startswith('win'): 196 # Repair damage from the resizing test. 197 with open(TESTFN, 'r+b') as f: 198 f.truncate(mapsize) 199 200 # Opening mmap with access=ACCESS_WRITE 201 with open(TESTFN, "r+b") as f: 202 m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_WRITE) 203 # Modifying write-through memory map 204 m[:] = b'c'*mapsize 205 self.assertEqual(m[:], b'c'*mapsize, 206 "Write-through memory map memory not updated properly.") 207 m.flush() 208 m.close() 209 with open(TESTFN, 'rb') as f: 210 stuff = f.read() 211 self.assertEqual(stuff, b'c'*mapsize, 212 "Write-through memory map data file not updated properly.") 213 214 # Opening mmap with access=ACCESS_COPY 215 with open(TESTFN, "r+b") as f: 216 m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_COPY) 217 # Modifying copy-on-write memory map 218 m[:] = b'd'*mapsize 219 self.assertEqual(m[:], b'd' * mapsize, 220 "Copy-on-write memory map data not written correctly.") 221 m.flush() 222 with open(TESTFN, "rb") as fp: 223 self.assertEqual(fp.read(), b'c'*mapsize, 224 "Copy-on-write test data file should not be modified.") 225 # Ensuring copy-on-write maps cannot be resized 226 self.assertRaises(TypeError, m.resize, 2*mapsize) 227 m.close() 228 229 # Ensuring invalid access parameter raises exception 230 with open(TESTFN, "r+b") as f: 231 self.assertRaises(ValueError, mmap.mmap, f.fileno(), mapsize, access=4) 232 233 if os.name == "posix": 234 # Try incompatible flags, prot and access parameters. 235 with open(TESTFN, "r+b") as f: 236 self.assertRaises(ValueError, mmap.mmap, f.fileno(), mapsize, 237 flags=mmap.MAP_PRIVATE, 238 prot=mmap.PROT_READ, access=mmap.ACCESS_WRITE) 239 240 # Try writing with PROT_EXEC and without PROT_WRITE 241 prot = mmap.PROT_READ | getattr(mmap, 'PROT_EXEC', 0) 242 with open(TESTFN, "r+b") as f: 243 m = mmap.mmap(f.fileno(), mapsize, prot=prot) 244 self.assertRaises(TypeError, m.write, b"abcdef") 245 self.assertRaises(TypeError, m.write_byte, 0) 246 m.close() 247 248 def test_bad_file_desc(self): 249 # Try opening a bad file descriptor... 250 self.assertRaises(OSError, mmap.mmap, -2, 4096) 251 252 def test_tougher_find(self): 253 # Do a tougher .find() test. SF bug 515943 pointed out that, in 2.2, 254 # searching for data with embedded \0 bytes didn't work. 255 with open(TESTFN, 'wb+') as f: 256 257 data = b'aabaac\x00deef\x00\x00aa\x00' 258 n = len(data) 259 f.write(data) 260 f.flush() 261 m = mmap.mmap(f.fileno(), n) 262 263 for start in range(n+1): 264 for finish in range(start, n+1): 265 slice = data[start : finish] 266 self.assertEqual(m.find(slice), data.find(slice)) 267 self.assertEqual(m.find(slice + b'x'), -1) 268 m.close() 269 270 def test_find_end(self): 271 # test the new 'end' parameter works as expected 272 with open(TESTFN, 'wb+') as f: 273 data = b'one two ones' 274 n = len(data) 275 f.write(data) 276 f.flush() 277 m = mmap.mmap(f.fileno(), n) 278 279 self.assertEqual(m.find(b'one'), 0) 280 self.assertEqual(m.find(b'ones'), 8) 281 self.assertEqual(m.find(b'one', 0, -1), 0) 282 self.assertEqual(m.find(b'one', 1), 8) 283 self.assertEqual(m.find(b'one', 1, -1), 8) 284 self.assertEqual(m.find(b'one', 1, -2), -1) 285 self.assertEqual(m.find(bytearray(b'one')), 0) 286 287 288 def test_rfind(self): 289 # test the new 'end' parameter works as expected 290 with open(TESTFN, 'wb+') as f: 291 data = b'one two ones' 292 n = len(data) 293 f.write(data) 294 f.flush() 295 m = mmap.mmap(f.fileno(), n) 296 297 self.assertEqual(m.rfind(b'one'), 8) 298 self.assertEqual(m.rfind(b'one '), 0) 299 self.assertEqual(m.rfind(b'one', 0, -1), 8) 300 self.assertEqual(m.rfind(b'one', 0, -2), 0) 301 self.assertEqual(m.rfind(b'one', 1, -1), 8) 302 self.assertEqual(m.rfind(b'one', 1, -2), -1) 303 self.assertEqual(m.rfind(bytearray(b'one')), 8) 304 305 306 def test_double_close(self): 307 # make sure a double close doesn't crash on Solaris (Bug# 665913) 308 with open(TESTFN, 'wb+') as f: 309 f.write(2**16 * b'a') # Arbitrary character 310 311 with open(TESTFN, 'rb') as f: 312 mf = mmap.mmap(f.fileno(), 2**16, access=mmap.ACCESS_READ) 313 mf.close() 314 mf.close() 315 316 def test_entire_file(self): 317 # test mapping of entire file by passing 0 for map length 318 with open(TESTFN, "wb+") as f: 319 f.write(2**16 * b'm') # Arbitrary character 320 321 with open(TESTFN, "rb+") as f, \ 322 mmap.mmap(f.fileno(), 0) as mf: 323 self.assertEqual(len(mf), 2**16, "Map size should equal file size.") 324 self.assertEqual(mf.read(2**16), 2**16 * b"m") 325 326 def test_length_0_offset(self): 327 # Issue #10916: test mapping of remainder of file by passing 0 for 328 # map length with an offset doesn't cause a segfault. 329 # NOTE: allocation granularity is currently 65536 under Win64, 330 # and therefore the minimum offset alignment. 331 with open(TESTFN, "wb") as f: 332 f.write((65536 * 2) * b'm') # Arbitrary character 333 334 with open(TESTFN, "rb") as f: 335 with mmap.mmap(f.fileno(), 0, offset=65536, access=mmap.ACCESS_READ) as mf: 336 self.assertRaises(IndexError, mf.__getitem__, 80000) 337 338 def test_length_0_large_offset(self): 339 # Issue #10959: test mapping of a file by passing 0 for 340 # map length with a large offset doesn't cause a segfault. 341 with open(TESTFN, "wb") as f: 342 f.write(115699 * b'm') # Arbitrary character 343 344 with open(TESTFN, "w+b") as f: 345 self.assertRaises(ValueError, mmap.mmap, f.fileno(), 0, 346 offset=2147418112) 347 348 def test_move(self): 349 # make move works everywhere (64-bit format problem earlier) 350 with open(TESTFN, 'wb+') as f: 351 352 f.write(b"ABCDEabcde") # Arbitrary character 353 f.flush() 354 355 mf = mmap.mmap(f.fileno(), 10) 356 mf.move(5, 0, 5) 357 self.assertEqual(mf[:], b"ABCDEABCDE", "Map move should have duplicated front 5") 358 mf.close() 359 360 # more excessive test 361 data = b"0123456789" 362 for dest in range(len(data)): 363 for src in range(len(data)): 364 for count in range(len(data) - max(dest, src)): 365 expected = data[:dest] + data[src:src+count] + data[dest+count:] 366 m = mmap.mmap(-1, len(data)) 367 m[:] = data 368 m.move(dest, src, count) 369 self.assertEqual(m[:], expected) 370 m.close() 371 372 # segfault test (Issue 5387) 373 m = mmap.mmap(-1, 100) 374 offsets = [-100, -1, 0, 1, 100] 375 for source, dest, size in itertools.product(offsets, offsets, offsets): 376 try: 377 m.move(source, dest, size) 378 except ValueError: 379 pass 380 381 offsets = [(-1, -1, -1), (-1, -1, 0), (-1, 0, -1), (0, -1, -1), 382 (-1, 0, 0), (0, -1, 0), (0, 0, -1)] 383 for source, dest, size in offsets: 384 self.assertRaises(ValueError, m.move, source, dest, size) 385 386 m.close() 387 388 m = mmap.mmap(-1, 1) # single byte 389 self.assertRaises(ValueError, m.move, 0, 0, 2) 390 self.assertRaises(ValueError, m.move, 1, 0, 1) 391 self.assertRaises(ValueError, m.move, 0, 1, 1) 392 m.move(0, 0, 1) 393 m.move(0, 0, 0) 394 395 396 def test_anonymous(self): 397 # anonymous mmap.mmap(-1, PAGE) 398 m = mmap.mmap(-1, PAGESIZE) 399 for x in range(PAGESIZE): 400 self.assertEqual(m[x], 0, 401 "anonymously mmap'ed contents should be zero") 402 403 for x in range(PAGESIZE): 404 b = x & 0xff 405 m[x] = b 406 self.assertEqual(m[x], b) 407 408 def test_read_all(self): 409 m = mmap.mmap(-1, 16) 410 self.addCleanup(m.close) 411 412 # With no parameters, or None or a negative argument, reads all 413 m.write(bytes(range(16))) 414 m.seek(0) 415 self.assertEqual(m.read(), bytes(range(16))) 416 m.seek(8) 417 self.assertEqual(m.read(), bytes(range(8, 16))) 418 m.seek(16) 419 self.assertEqual(m.read(), b'') 420 m.seek(3) 421 self.assertEqual(m.read(None), bytes(range(3, 16))) 422 m.seek(4) 423 self.assertEqual(m.read(-1), bytes(range(4, 16))) 424 m.seek(5) 425 self.assertEqual(m.read(-2), bytes(range(5, 16))) 426 m.seek(9) 427 self.assertEqual(m.read(-42), bytes(range(9, 16))) 428 429 def test_read_invalid_arg(self): 430 m = mmap.mmap(-1, 16) 431 self.addCleanup(m.close) 432 433 self.assertRaises(TypeError, m.read, 'foo') 434 self.assertRaises(TypeError, m.read, 5.5) 435 self.assertRaises(TypeError, m.read, [1, 2, 3]) 436 437 def test_extended_getslice(self): 438 # Test extended slicing by comparing with list slicing. 439 s = bytes(reversed(range(256))) 440 m = mmap.mmap(-1, len(s)) 441 m[:] = s 442 self.assertEqual(m[:], s) 443 indices = (0, None, 1, 3, 19, 300, sys.maxsize, -1, -2, -31, -300) 444 for start in indices: 445 for stop in indices: 446 # Skip step 0 (invalid) 447 for step in indices[1:]: 448 self.assertEqual(m[start:stop:step], 449 s[start:stop:step]) 450 451 def test_extended_set_del_slice(self): 452 # Test extended slicing by comparing with list slicing. 453 s = bytes(reversed(range(256))) 454 m = mmap.mmap(-1, len(s)) 455 indices = (0, None, 1, 3, 19, 300, sys.maxsize, -1, -2, -31, -300) 456 for start in indices: 457 for stop in indices: 458 # Skip invalid step 0 459 for step in indices[1:]: 460 m[:] = s 461 self.assertEqual(m[:], s) 462 L = list(s) 463 # Make sure we have a slice of exactly the right length, 464 # but with different data. 465 data = L[start:stop:step] 466 data = bytes(reversed(data)) 467 L[start:stop:step] = data 468 m[start:stop:step] = data 469 self.assertEqual(m[:], bytes(L)) 470 471 def make_mmap_file (self, f, halfsize): 472 # Write 2 pages worth of data to the file 473 f.write (b'\0' * halfsize) 474 f.write (b'foo') 475 f.write (b'\0' * (halfsize - 3)) 476 f.flush () 477 return mmap.mmap (f.fileno(), 0) 478 479 def test_empty_file (self): 480 f = open (TESTFN, 'w+b') 481 f.close() 482 with open(TESTFN, "rb") as f : 483 self.assertRaisesRegex(ValueError, 484 "cannot mmap an empty file", 485 mmap.mmap, f.fileno(), 0, 486 access=mmap.ACCESS_READ) 487 488 def test_offset (self): 489 f = open (TESTFN, 'w+b') 490 491 try: # unlink TESTFN no matter what 492 halfsize = mmap.ALLOCATIONGRANULARITY 493 m = self.make_mmap_file (f, halfsize) 494 m.close () 495 f.close () 496 497 mapsize = halfsize * 2 498 # Try invalid offset 499 f = open(TESTFN, "r+b") 500 for offset in [-2, -1, None]: 501 try: 502 m = mmap.mmap(f.fileno(), mapsize, offset=offset) 503 self.assertEqual(0, 1) 504 except (ValueError, TypeError, OverflowError): 505 pass 506 else: 507 self.assertEqual(0, 0) 508 f.close() 509 510 # Try valid offset, hopefully 8192 works on all OSes 511 f = open(TESTFN, "r+b") 512 m = mmap.mmap(f.fileno(), mapsize - halfsize, offset=halfsize) 513 self.assertEqual(m[0:3], b'foo') 514 f.close() 515 516 # Try resizing map 517 try: 518 m.resize(512) 519 except SystemError: 520 pass 521 else: 522 # resize() is supported 523 self.assertEqual(len(m), 512) 524 # Check that we can no longer seek beyond the new size. 525 self.assertRaises(ValueError, m.seek, 513, 0) 526 # Check that the content is not changed 527 self.assertEqual(m[0:3], b'foo') 528 529 # Check that the underlying file is truncated too 530 f = open(TESTFN, 'rb') 531 f.seek(0, 2) 532 self.assertEqual(f.tell(), halfsize + 512) 533 f.close() 534 self.assertEqual(m.size(), halfsize + 512) 535 536 m.close() 537 538 finally: 539 f.close() 540 try: 541 os.unlink(TESTFN) 542 except OSError: 543 pass 544 545 def test_subclass(self): 546 class anon_mmap(mmap.mmap): 547 def __new__(klass, *args, **kwargs): 548 return mmap.mmap.__new__(klass, -1, *args, **kwargs) 549 anon_mmap(PAGESIZE) 550 551 @unittest.skipUnless(hasattr(mmap, 'PROT_READ'), "needs mmap.PROT_READ") 552 def test_prot_readonly(self): 553 mapsize = 10 554 with open(TESTFN, "wb") as fp: 555 fp.write(b"a"*mapsize) 556 with open(TESTFN, "rb") as f: 557 m = mmap.mmap(f.fileno(), mapsize, prot=mmap.PROT_READ) 558 self.assertRaises(TypeError, m.write, "foo") 559 560 def test_error(self): 561 self.assertIs(mmap.error, OSError) 562 563 def test_io_methods(self): 564 data = b"0123456789" 565 with open(TESTFN, "wb") as fp: 566 fp.write(b"x"*len(data)) 567 with open(TESTFN, "r+b") as f: 568 m = mmap.mmap(f.fileno(), len(data)) 569 # Test write_byte() 570 for i in range(len(data)): 571 self.assertEqual(m.tell(), i) 572 m.write_byte(data[i]) 573 self.assertEqual(m.tell(), i+1) 574 self.assertRaises(ValueError, m.write_byte, b"x"[0]) 575 self.assertEqual(m[:], data) 576 # Test read_byte() 577 m.seek(0) 578 for i in range(len(data)): 579 self.assertEqual(m.tell(), i) 580 self.assertEqual(m.read_byte(), data[i]) 581 self.assertEqual(m.tell(), i+1) 582 self.assertRaises(ValueError, m.read_byte) 583 # Test read() 584 m.seek(3) 585 self.assertEqual(m.read(3), b"345") 586 self.assertEqual(m.tell(), 6) 587 # Test write() 588 m.seek(3) 589 m.write(b"bar") 590 self.assertEqual(m.tell(), 6) 591 self.assertEqual(m[:], b"012bar6789") 592 m.write(bytearray(b"baz")) 593 self.assertEqual(m.tell(), 9) 594 self.assertEqual(m[:], b"012barbaz9") 595 self.assertRaises(ValueError, m.write, b"ba") 596 597 def test_non_ascii_byte(self): 598 for b in (129, 200, 255): # > 128 599 m = mmap.mmap(-1, 1) 600 m.write_byte(b) 601 self.assertEqual(m[0], b) 602 m.seek(0) 603 self.assertEqual(m.read_byte(), b) 604 m.close() 605 606 @unittest.skipUnless(os.name == 'nt', 'requires Windows') 607 def test_tagname(self): 608 data1 = b"0123456789" 609 data2 = b"abcdefghij" 610 assert len(data1) == len(data2) 611 612 # Test same tag 613 m1 = mmap.mmap(-1, len(data1), tagname="foo") 614 m1[:] = data1 615 m2 = mmap.mmap(-1, len(data2), tagname="foo") 616 m2[:] = data2 617 self.assertEqual(m1[:], data2) 618 self.assertEqual(m2[:], data2) 619 m2.close() 620 m1.close() 621 622 # Test different tag 623 m1 = mmap.mmap(-1, len(data1), tagname="foo") 624 m1[:] = data1 625 m2 = mmap.mmap(-1, len(data2), tagname="boo") 626 m2[:] = data2 627 self.assertEqual(m1[:], data1) 628 self.assertEqual(m2[:], data2) 629 m2.close() 630 m1.close() 631 632 @cpython_only 633 @unittest.skipUnless(os.name == 'nt', 'requires Windows') 634 def test_sizeof(self): 635 m1 = mmap.mmap(-1, 100) 636 tagname = "foo" 637 m2 = mmap.mmap(-1, 100, tagname=tagname) 638 self.assertEqual(sys.getsizeof(m2), 639 sys.getsizeof(m1) + len(tagname) + 1) 640 641 @unittest.skipUnless(os.name == 'nt', 'requires Windows') 642 def test_crasher_on_windows(self): 643 # Should not crash (Issue 1733986) 644 m = mmap.mmap(-1, 1000, tagname="foo") 645 try: 646 mmap.mmap(-1, 5000, tagname="foo")[:] # same tagname, but larger size 647 except: 648 pass 649 m.close() 650 651 # Should not crash (Issue 5385) 652 with open(TESTFN, "wb") as fp: 653 fp.write(b"x"*10) 654 f = open(TESTFN, "r+b") 655 m = mmap.mmap(f.fileno(), 0) 656 f.close() 657 try: 658 m.resize(0) # will raise OSError 659 except: 660 pass 661 try: 662 m[:] 663 except: 664 pass 665 m.close() 666 667 @unittest.skipUnless(os.name == 'nt', 'requires Windows') 668 def test_invalid_descriptor(self): 669 # socket file descriptors are valid, but out of range 670 # for _get_osfhandle, causing a crash when validating the 671 # parameters to _get_osfhandle. 672 s = socket.socket() 673 try: 674 with self.assertRaises(OSError): 675 m = mmap.mmap(s.fileno(), 10) 676 finally: 677 s.close() 678 679 def test_context_manager(self): 680 with mmap.mmap(-1, 10) as m: 681 self.assertFalse(m.closed) 682 self.assertTrue(m.closed) 683 684 def test_context_manager_exception(self): 685 # Test that the OSError gets passed through 686 with self.assertRaises(Exception) as exc: 687 with mmap.mmap(-1, 10) as m: 688 raise OSError 689 self.assertIsInstance(exc.exception, OSError, 690 "wrong exception raised in context manager") 691 self.assertTrue(m.closed, "context manager failed") 692 693 def test_weakref(self): 694 # Check mmap objects are weakrefable 695 mm = mmap.mmap(-1, 16) 696 wr = weakref.ref(mm) 697 self.assertIs(wr(), mm) 698 del mm 699 gc_collect() 700 self.assertIs(wr(), None) 701 702 def test_write_returning_the_number_of_bytes_written(self): 703 mm = mmap.mmap(-1, 16) 704 self.assertEqual(mm.write(b""), 0) 705 self.assertEqual(mm.write(b"x"), 1) 706 self.assertEqual(mm.write(b"yz"), 2) 707 self.assertEqual(mm.write(b"python"), 6) 708 709 @unittest.skipIf(os.name == 'nt', 'cannot resize anonymous mmaps on Windows') 710 def test_resize_past_pos(self): 711 m = mmap.mmap(-1, 8192) 712 self.addCleanup(m.close) 713 m.read(5000) 714 try: 715 m.resize(4096) 716 except SystemError: 717 self.skipTest("resizing not supported") 718 self.assertEqual(m.read(14), b'') 719 self.assertRaises(ValueError, m.read_byte) 720 self.assertRaises(ValueError, m.write_byte, 42) 721 self.assertRaises(ValueError, m.write, b'abc') 722 723 def test_concat_repeat_exception(self): 724 m = mmap.mmap(-1, 16) 725 with self.assertRaises(TypeError): 726 m + m 727 with self.assertRaises(TypeError): 728 m * 2 729 730 def test_flush_return_value(self): 731 # mm.flush() should return None on success, raise an 732 # exception on error under all platforms. 733 mm = mmap.mmap(-1, 16) 734 self.addCleanup(mm.close) 735 mm.write(b'python') 736 result = mm.flush() 737 self.assertIsNone(result) 738 if sys.platform.startswith('linux'): 739 # 'offset' must be a multiple of mmap.PAGESIZE on Linux. 740 # See bpo-34754 for details. 741 self.assertRaises(OSError, mm.flush, 1, len(b'python')) 742 743 @unittest.skipUnless(hasattr(mmap.mmap, 'madvise'), 'needs madvise') 744 def test_madvise(self): 745 size = 2 * PAGESIZE 746 m = mmap.mmap(-1, size) 747 748 with self.assertRaisesRegex(ValueError, "madvise start out of bounds"): 749 m.madvise(mmap.MADV_NORMAL, size) 750 with self.assertRaisesRegex(ValueError, "madvise start out of bounds"): 751 m.madvise(mmap.MADV_NORMAL, -1) 752 with self.assertRaisesRegex(ValueError, "madvise length invalid"): 753 m.madvise(mmap.MADV_NORMAL, 0, -1) 754 with self.assertRaisesRegex(OverflowError, "madvise length too large"): 755 m.madvise(mmap.MADV_NORMAL, PAGESIZE, sys.maxsize) 756 self.assertEqual(m.madvise(mmap.MADV_NORMAL), None) 757 self.assertEqual(m.madvise(mmap.MADV_NORMAL, PAGESIZE), None) 758 self.assertEqual(m.madvise(mmap.MADV_NORMAL, PAGESIZE, size), None) 759 self.assertEqual(m.madvise(mmap.MADV_NORMAL, 0, 2), None) 760 self.assertEqual(m.madvise(mmap.MADV_NORMAL, 0, size), None) 761 762 763class LargeMmapTests(unittest.TestCase): 764 765 def setUp(self): 766 unlink(TESTFN) 767 768 def tearDown(self): 769 unlink(TESTFN) 770 771 def _make_test_file(self, num_zeroes, tail): 772 if sys.platform[:3] == 'win' or sys.platform == 'darwin': 773 requires('largefile', 774 'test requires %s bytes and a long time to run' % str(0x180000000)) 775 f = open(TESTFN, 'w+b') 776 try: 777 f.seek(num_zeroes) 778 f.write(tail) 779 f.flush() 780 except (OSError, OverflowError, ValueError): 781 try: 782 f.close() 783 except (OSError, OverflowError): 784 pass 785 raise unittest.SkipTest("filesystem does not have largefile support") 786 return f 787 788 def test_large_offset(self): 789 with self._make_test_file(0x14FFFFFFF, b" ") as f: 790 with mmap.mmap(f.fileno(), 0, offset=0x140000000, access=mmap.ACCESS_READ) as m: 791 self.assertEqual(m[0xFFFFFFF], 32) 792 793 def test_large_filesize(self): 794 with self._make_test_file(0x17FFFFFFF, b" ") as f: 795 if sys.maxsize < 0x180000000: 796 # On 32 bit platforms the file is larger than sys.maxsize so 797 # mapping the whole file should fail -- Issue #16743 798 with self.assertRaises(OverflowError): 799 mmap.mmap(f.fileno(), 0x180000000, access=mmap.ACCESS_READ) 800 with self.assertRaises(ValueError): 801 mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) 802 with mmap.mmap(f.fileno(), 0x10000, access=mmap.ACCESS_READ) as m: 803 self.assertEqual(m.size(), 0x180000000) 804 805 # Issue 11277: mmap() with large (~4 GiB) sparse files crashes on OS X. 806 807 def _test_around_boundary(self, boundary): 808 tail = b' DEARdear ' 809 start = boundary - len(tail) // 2 810 end = start + len(tail) 811 with self._make_test_file(start, tail) as f: 812 with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as m: 813 self.assertEqual(m[start:end], tail) 814 815 @unittest.skipUnless(sys.maxsize > _4G, "test cannot run on 32-bit systems") 816 def test_around_2GB(self): 817 self._test_around_boundary(_2G) 818 819 @unittest.skipUnless(sys.maxsize > _4G, "test cannot run on 32-bit systems") 820 def test_around_4GB(self): 821 self._test_around_boundary(_4G) 822 823 824if __name__ == '__main__': 825 unittest.main() 826