1from test.support import (requires, _2G, _4G, gc_collect, cpython_only) 2from test.support.import_helper import import_module 3from test.support.os_helper import TESTFN, unlink 4import unittest 5import os 6import re 7import itertools 8import socket 9import sys 10import weakref 11 12# Skip test if we can't import mmap. 13mmap = import_module('mmap') 14 15PAGESIZE = mmap.PAGESIZE 16 17 18class MmapTests(unittest.TestCase): 19 20 def setUp(self): 21 if os.path.exists(TESTFN): 22 os.unlink(TESTFN) 23 24 def tearDown(self): 25 try: 26 os.unlink(TESTFN) 27 except OSError: 28 pass 29 30 def test_basic(self): 31 # Test mmap module on Unix systems and Windows 32 33 # Create a file to be mmap'ed. 34 f = open(TESTFN, 'bw+') 35 try: 36 # Write 2 pages worth of data to the file 37 f.write(b'\0'* PAGESIZE) 38 f.write(b'foo') 39 f.write(b'\0'* (PAGESIZE-3) ) 40 f.flush() 41 m = mmap.mmap(f.fileno(), 2 * PAGESIZE) 42 finally: 43 f.close() 44 45 # Simple sanity checks 46 47 tp = str(type(m)) # SF bug 128713: segfaulted on Linux 48 self.assertEqual(m.find(b'foo'), PAGESIZE) 49 50 self.assertEqual(len(m), 2*PAGESIZE) 51 52 self.assertEqual(m[0], 0) 53 self.assertEqual(m[0:3], b'\0\0\0') 54 55 # Shouldn't crash on boundary (Issue #5292) 56 self.assertRaises(IndexError, m.__getitem__, len(m)) 57 self.assertRaises(IndexError, m.__setitem__, len(m), b'\0') 58 59 # Modify the file's content 60 m[0] = b'3'[0] 61 m[PAGESIZE +3: PAGESIZE +3+3] = b'bar' 62 63 # Check that the modification worked 64 self.assertEqual(m[0], b'3'[0]) 65 self.assertEqual(m[0:3], b'3\0\0') 66 self.assertEqual(m[PAGESIZE-1 : PAGESIZE + 7], b'\0foobar\0') 67 68 m.flush() 69 70 # Test doing a regular expression match in an mmap'ed file 71 match = re.search(b'[A-Za-z]+', m) 72 if match is None: 73 self.fail('regex match on mmap failed!') 74 else: 75 start, end = match.span(0) 76 length = end - start 77 78 self.assertEqual(start, PAGESIZE) 79 self.assertEqual(end, PAGESIZE + 6) 80 81 # test seeking around (try to overflow the seek implementation) 82 m.seek(0,0) 83 self.assertEqual(m.tell(), 0) 84 m.seek(42,1) 85 self.assertEqual(m.tell(), 42) 86 m.seek(0,2) 87 self.assertEqual(m.tell(), len(m)) 88 89 # Try to seek to negative position... 90 self.assertRaises(ValueError, m.seek, -1) 91 92 # Try to seek beyond end of mmap... 93 self.assertRaises(ValueError, m.seek, 1, 2) 94 95 # Try to seek to negative position... 96 self.assertRaises(ValueError, m.seek, -len(m)-1, 2) 97 98 # Try resizing map 99 try: 100 m.resize(512) 101 except SystemError: 102 # resize() not supported 103 # No messages are printed, since the output of this test suite 104 # would then be different across platforms. 105 pass 106 else: 107 # resize() is supported 108 self.assertEqual(len(m), 512) 109 # Check that we can no longer seek beyond the new size. 110 self.assertRaises(ValueError, m.seek, 513, 0) 111 112 # Check that the underlying file is truncated too 113 # (bug #728515) 114 f = open(TESTFN, 'rb') 115 try: 116 f.seek(0, 2) 117 self.assertEqual(f.tell(), 512) 118 finally: 119 f.close() 120 self.assertEqual(m.size(), 512) 121 122 m.close() 123 124 def test_access_parameter(self): 125 # Test for "access" keyword parameter 126 mapsize = 10 127 with open(TESTFN, "wb") as fp: 128 fp.write(b"a"*mapsize) 129 with open(TESTFN, "rb") as f: 130 m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_READ) 131 self.assertEqual(m[:], b'a'*mapsize, "Readonly memory map data incorrect.") 132 133 # Ensuring that readonly mmap can't be slice assigned 134 try: 135 m[:] = b'b'*mapsize 136 except TypeError: 137 pass 138 else: 139 self.fail("Able to write to readonly memory map") 140 141 # Ensuring that readonly mmap can't be item assigned 142 try: 143 m[0] = b'b' 144 except TypeError: 145 pass 146 else: 147 self.fail("Able to write to readonly memory map") 148 149 # Ensuring that readonly mmap can't be write() to 150 try: 151 m.seek(0,0) 152 m.write(b'abc') 153 except TypeError: 154 pass 155 else: 156 self.fail("Able to write to readonly memory map") 157 158 # Ensuring that readonly mmap can't be write_byte() to 159 try: 160 m.seek(0,0) 161 m.write_byte(b'd') 162 except TypeError: 163 pass 164 else: 165 self.fail("Able to write to readonly memory map") 166 167 # Ensuring that readonly mmap can't be resized 168 try: 169 m.resize(2*mapsize) 170 except SystemError: # resize is not universally supported 171 pass 172 except TypeError: 173 pass 174 else: 175 self.fail("Able to resize readonly memory map") 176 with open(TESTFN, "rb") as fp: 177 self.assertEqual(fp.read(), b'a'*mapsize, 178 "Readonly memory map data file was modified") 179 180 # Opening mmap with size too big 181 with open(TESTFN, "r+b") as f: 182 try: 183 m = mmap.mmap(f.fileno(), mapsize+1) 184 except ValueError: 185 # we do not expect a ValueError on Windows 186 # CAUTION: This also changes the size of the file on disk, and 187 # later tests assume that the length hasn't changed. We need to 188 # repair that. 189 if sys.platform.startswith('win'): 190 self.fail("Opening mmap with size+1 should work on Windows.") 191 else: 192 # we expect a ValueError on Unix, but not on Windows 193 if not sys.platform.startswith('win'): 194 self.fail("Opening mmap with size+1 should raise ValueError.") 195 m.close() 196 if sys.platform.startswith('win'): 197 # Repair damage from the resizing test. 198 with open(TESTFN, 'r+b') as f: 199 f.truncate(mapsize) 200 201 # Opening mmap with access=ACCESS_WRITE 202 with open(TESTFN, "r+b") as f: 203 m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_WRITE) 204 # Modifying write-through memory map 205 m[:] = b'c'*mapsize 206 self.assertEqual(m[:], b'c'*mapsize, 207 "Write-through memory map memory not updated properly.") 208 m.flush() 209 m.close() 210 with open(TESTFN, 'rb') as f: 211 stuff = f.read() 212 self.assertEqual(stuff, b'c'*mapsize, 213 "Write-through memory map data file not updated properly.") 214 215 # Opening mmap with access=ACCESS_COPY 216 with open(TESTFN, "r+b") as f: 217 m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_COPY) 218 # Modifying copy-on-write memory map 219 m[:] = b'd'*mapsize 220 self.assertEqual(m[:], b'd' * mapsize, 221 "Copy-on-write memory map data not written correctly.") 222 m.flush() 223 with open(TESTFN, "rb") as fp: 224 self.assertEqual(fp.read(), b'c'*mapsize, 225 "Copy-on-write test data file should not be modified.") 226 # Ensuring copy-on-write maps cannot be resized 227 self.assertRaises(TypeError, m.resize, 2*mapsize) 228 m.close() 229 230 # Ensuring invalid access parameter raises exception 231 with open(TESTFN, "r+b") as f: 232 self.assertRaises(ValueError, mmap.mmap, f.fileno(), mapsize, access=4) 233 234 if os.name == "posix": 235 # Try incompatible flags, prot and access parameters. 236 with open(TESTFN, "r+b") as f: 237 self.assertRaises(ValueError, mmap.mmap, f.fileno(), mapsize, 238 flags=mmap.MAP_PRIVATE, 239 prot=mmap.PROT_READ, access=mmap.ACCESS_WRITE) 240 241 # Try writing with PROT_EXEC and without PROT_WRITE 242 prot = mmap.PROT_READ | getattr(mmap, 'PROT_EXEC', 0) 243 with open(TESTFN, "r+b") as f: 244 m = mmap.mmap(f.fileno(), mapsize, prot=prot) 245 self.assertRaises(TypeError, m.write, b"abcdef") 246 self.assertRaises(TypeError, m.write_byte, 0) 247 m.close() 248 249 def test_bad_file_desc(self): 250 # Try opening a bad file descriptor... 251 self.assertRaises(OSError, mmap.mmap, -2, 4096) 252 253 def test_tougher_find(self): 254 # Do a tougher .find() test. SF bug 515943 pointed out that, in 2.2, 255 # searching for data with embedded \0 bytes didn't work. 256 with open(TESTFN, 'wb+') as f: 257 258 data = b'aabaac\x00deef\x00\x00aa\x00' 259 n = len(data) 260 f.write(data) 261 f.flush() 262 m = mmap.mmap(f.fileno(), n) 263 264 for start in range(n+1): 265 for finish in range(start, n+1): 266 slice = data[start : finish] 267 self.assertEqual(m.find(slice), data.find(slice)) 268 self.assertEqual(m.find(slice + b'x'), -1) 269 m.close() 270 271 def test_find_end(self): 272 # test the new 'end' parameter works as expected 273 with open(TESTFN, 'wb+') as f: 274 data = b'one two ones' 275 n = len(data) 276 f.write(data) 277 f.flush() 278 m = mmap.mmap(f.fileno(), n) 279 280 self.assertEqual(m.find(b'one'), 0) 281 self.assertEqual(m.find(b'ones'), 8) 282 self.assertEqual(m.find(b'one', 0, -1), 0) 283 self.assertEqual(m.find(b'one', 1), 8) 284 self.assertEqual(m.find(b'one', 1, -1), 8) 285 self.assertEqual(m.find(b'one', 1, -2), -1) 286 self.assertEqual(m.find(bytearray(b'one')), 0) 287 288 289 def test_rfind(self): 290 # test the new 'end' parameter works as expected 291 with open(TESTFN, 'wb+') as f: 292 data = b'one two ones' 293 n = len(data) 294 f.write(data) 295 f.flush() 296 m = mmap.mmap(f.fileno(), n) 297 298 self.assertEqual(m.rfind(b'one'), 8) 299 self.assertEqual(m.rfind(b'one '), 0) 300 self.assertEqual(m.rfind(b'one', 0, -1), 8) 301 self.assertEqual(m.rfind(b'one', 0, -2), 0) 302 self.assertEqual(m.rfind(b'one', 1, -1), 8) 303 self.assertEqual(m.rfind(b'one', 1, -2), -1) 304 self.assertEqual(m.rfind(bytearray(b'one')), 8) 305 306 307 def test_double_close(self): 308 # make sure a double close doesn't crash on Solaris (Bug# 665913) 309 with open(TESTFN, 'wb+') as f: 310 f.write(2**16 * b'a') # Arbitrary character 311 312 with open(TESTFN, 'rb') as f: 313 mf = mmap.mmap(f.fileno(), 2**16, access=mmap.ACCESS_READ) 314 mf.close() 315 mf.close() 316 317 def test_entire_file(self): 318 # test mapping of entire file by passing 0 for map length 319 with open(TESTFN, "wb+") as f: 320 f.write(2**16 * b'm') # Arbitrary character 321 322 with open(TESTFN, "rb+") as f, \ 323 mmap.mmap(f.fileno(), 0) as mf: 324 self.assertEqual(len(mf), 2**16, "Map size should equal file size.") 325 self.assertEqual(mf.read(2**16), 2**16 * b"m") 326 327 def test_length_0_offset(self): 328 # Issue #10916: test mapping of remainder of file by passing 0 for 329 # map length with an offset doesn't cause a segfault. 330 # NOTE: allocation granularity is currently 65536 under Win64, 331 # and therefore the minimum offset alignment. 332 with open(TESTFN, "wb") as f: 333 f.write((65536 * 2) * b'm') # Arbitrary character 334 335 with open(TESTFN, "rb") as f: 336 with mmap.mmap(f.fileno(), 0, offset=65536, access=mmap.ACCESS_READ) as mf: 337 self.assertRaises(IndexError, mf.__getitem__, 80000) 338 339 def test_length_0_large_offset(self): 340 # Issue #10959: test mapping of a file by passing 0 for 341 # map length with a large offset doesn't cause a segfault. 342 with open(TESTFN, "wb") as f: 343 f.write(115699 * b'm') # Arbitrary character 344 345 with open(TESTFN, "w+b") as f: 346 self.assertRaises(ValueError, mmap.mmap, f.fileno(), 0, 347 offset=2147418112) 348 349 def test_move(self): 350 # make move works everywhere (64-bit format problem earlier) 351 with open(TESTFN, 'wb+') as f: 352 353 f.write(b"ABCDEabcde") # Arbitrary character 354 f.flush() 355 356 mf = mmap.mmap(f.fileno(), 10) 357 mf.move(5, 0, 5) 358 self.assertEqual(mf[:], b"ABCDEABCDE", "Map move should have duplicated front 5") 359 mf.close() 360 361 # more excessive test 362 data = b"0123456789" 363 for dest in range(len(data)): 364 for src in range(len(data)): 365 for count in range(len(data) - max(dest, src)): 366 expected = data[:dest] + data[src:src+count] + data[dest+count:] 367 m = mmap.mmap(-1, len(data)) 368 m[:] = data 369 m.move(dest, src, count) 370 self.assertEqual(m[:], expected) 371 m.close() 372 373 # segfault test (Issue 5387) 374 m = mmap.mmap(-1, 100) 375 offsets = [-100, -1, 0, 1, 100] 376 for source, dest, size in itertools.product(offsets, offsets, offsets): 377 try: 378 m.move(source, dest, size) 379 except ValueError: 380 pass 381 382 offsets = [(-1, -1, -1), (-1, -1, 0), (-1, 0, -1), (0, -1, -1), 383 (-1, 0, 0), (0, -1, 0), (0, 0, -1)] 384 for source, dest, size in offsets: 385 self.assertRaises(ValueError, m.move, source, dest, size) 386 387 m.close() 388 389 m = mmap.mmap(-1, 1) # single byte 390 self.assertRaises(ValueError, m.move, 0, 0, 2) 391 self.assertRaises(ValueError, m.move, 1, 0, 1) 392 self.assertRaises(ValueError, m.move, 0, 1, 1) 393 m.move(0, 0, 1) 394 m.move(0, 0, 0) 395 396 397 def test_anonymous(self): 398 # anonymous mmap.mmap(-1, PAGE) 399 m = mmap.mmap(-1, PAGESIZE) 400 for x in range(PAGESIZE): 401 self.assertEqual(m[x], 0, 402 "anonymously mmap'ed contents should be zero") 403 404 for x in range(PAGESIZE): 405 b = x & 0xff 406 m[x] = b 407 self.assertEqual(m[x], b) 408 409 def test_read_all(self): 410 m = mmap.mmap(-1, 16) 411 self.addCleanup(m.close) 412 413 # With no parameters, or None or a negative argument, reads all 414 m.write(bytes(range(16))) 415 m.seek(0) 416 self.assertEqual(m.read(), bytes(range(16))) 417 m.seek(8) 418 self.assertEqual(m.read(), bytes(range(8, 16))) 419 m.seek(16) 420 self.assertEqual(m.read(), b'') 421 m.seek(3) 422 self.assertEqual(m.read(None), bytes(range(3, 16))) 423 m.seek(4) 424 self.assertEqual(m.read(-1), bytes(range(4, 16))) 425 m.seek(5) 426 self.assertEqual(m.read(-2), bytes(range(5, 16))) 427 m.seek(9) 428 self.assertEqual(m.read(-42), bytes(range(9, 16))) 429 430 def test_read_invalid_arg(self): 431 m = mmap.mmap(-1, 16) 432 self.addCleanup(m.close) 433 434 self.assertRaises(TypeError, m.read, 'foo') 435 self.assertRaises(TypeError, m.read, 5.5) 436 self.assertRaises(TypeError, m.read, [1, 2, 3]) 437 438 def test_extended_getslice(self): 439 # Test extended slicing by comparing with list slicing. 440 s = bytes(reversed(range(256))) 441 m = mmap.mmap(-1, len(s)) 442 m[:] = s 443 self.assertEqual(m[:], s) 444 indices = (0, None, 1, 3, 19, 300, sys.maxsize, -1, -2, -31, -300) 445 for start in indices: 446 for stop in indices: 447 # Skip step 0 (invalid) 448 for step in indices[1:]: 449 self.assertEqual(m[start:stop:step], 450 s[start:stop:step]) 451 452 def test_extended_set_del_slice(self): 453 # Test extended slicing by comparing with list slicing. 454 s = bytes(reversed(range(256))) 455 m = mmap.mmap(-1, len(s)) 456 indices = (0, None, 1, 3, 19, 300, sys.maxsize, -1, -2, -31, -300) 457 for start in indices: 458 for stop in indices: 459 # Skip invalid step 0 460 for step in indices[1:]: 461 m[:] = s 462 self.assertEqual(m[:], s) 463 L = list(s) 464 # Make sure we have a slice of exactly the right length, 465 # but with different data. 466 data = L[start:stop:step] 467 data = bytes(reversed(data)) 468 L[start:stop:step] = data 469 m[start:stop:step] = data 470 self.assertEqual(m[:], bytes(L)) 471 472 def make_mmap_file (self, f, halfsize): 473 # Write 2 pages worth of data to the file 474 f.write (b'\0' * halfsize) 475 f.write (b'foo') 476 f.write (b'\0' * (halfsize - 3)) 477 f.flush () 478 return mmap.mmap (f.fileno(), 0) 479 480 def test_empty_file (self): 481 f = open (TESTFN, 'w+b') 482 f.close() 483 with open(TESTFN, "rb") as f : 484 self.assertRaisesRegex(ValueError, 485 "cannot mmap an empty file", 486 mmap.mmap, f.fileno(), 0, 487 access=mmap.ACCESS_READ) 488 489 def test_offset (self): 490 f = open (TESTFN, 'w+b') 491 492 try: # unlink TESTFN no matter what 493 halfsize = mmap.ALLOCATIONGRANULARITY 494 m = self.make_mmap_file (f, halfsize) 495 m.close () 496 f.close () 497 498 mapsize = halfsize * 2 499 # Try invalid offset 500 f = open(TESTFN, "r+b") 501 for offset in [-2, -1, None]: 502 try: 503 m = mmap.mmap(f.fileno(), mapsize, offset=offset) 504 self.assertEqual(0, 1) 505 except (ValueError, TypeError, OverflowError): 506 pass 507 else: 508 self.assertEqual(0, 0) 509 f.close() 510 511 # Try valid offset, hopefully 8192 works on all OSes 512 f = open(TESTFN, "r+b") 513 m = mmap.mmap(f.fileno(), mapsize - halfsize, offset=halfsize) 514 self.assertEqual(m[0:3], b'foo') 515 f.close() 516 517 # Try resizing map 518 try: 519 m.resize(512) 520 except SystemError: 521 pass 522 else: 523 # resize() is supported 524 self.assertEqual(len(m), 512) 525 # Check that we can no longer seek beyond the new size. 526 self.assertRaises(ValueError, m.seek, 513, 0) 527 # Check that the content is not changed 528 self.assertEqual(m[0:3], b'foo') 529 530 # Check that the underlying file is truncated too 531 f = open(TESTFN, 'rb') 532 f.seek(0, 2) 533 self.assertEqual(f.tell(), halfsize + 512) 534 f.close() 535 self.assertEqual(m.size(), halfsize + 512) 536 537 m.close() 538 539 finally: 540 f.close() 541 try: 542 os.unlink(TESTFN) 543 except OSError: 544 pass 545 546 def test_subclass(self): 547 class anon_mmap(mmap.mmap): 548 def __new__(klass, *args, **kwargs): 549 return mmap.mmap.__new__(klass, -1, *args, **kwargs) 550 anon_mmap(PAGESIZE) 551 552 @unittest.skipUnless(hasattr(mmap, 'PROT_READ'), "needs mmap.PROT_READ") 553 def test_prot_readonly(self): 554 mapsize = 10 555 with open(TESTFN, "wb") as fp: 556 fp.write(b"a"*mapsize) 557 with open(TESTFN, "rb") as f: 558 m = mmap.mmap(f.fileno(), mapsize, prot=mmap.PROT_READ) 559 self.assertRaises(TypeError, m.write, "foo") 560 561 def test_error(self): 562 self.assertIs(mmap.error, OSError) 563 564 def test_io_methods(self): 565 data = b"0123456789" 566 with open(TESTFN, "wb") as fp: 567 fp.write(b"x"*len(data)) 568 with open(TESTFN, "r+b") as f: 569 m = mmap.mmap(f.fileno(), len(data)) 570 # Test write_byte() 571 for i in range(len(data)): 572 self.assertEqual(m.tell(), i) 573 m.write_byte(data[i]) 574 self.assertEqual(m.tell(), i+1) 575 self.assertRaises(ValueError, m.write_byte, b"x"[0]) 576 self.assertEqual(m[:], data) 577 # Test read_byte() 578 m.seek(0) 579 for i in range(len(data)): 580 self.assertEqual(m.tell(), i) 581 self.assertEqual(m.read_byte(), data[i]) 582 self.assertEqual(m.tell(), i+1) 583 self.assertRaises(ValueError, m.read_byte) 584 # Test read() 585 m.seek(3) 586 self.assertEqual(m.read(3), b"345") 587 self.assertEqual(m.tell(), 6) 588 # Test write() 589 m.seek(3) 590 m.write(b"bar") 591 self.assertEqual(m.tell(), 6) 592 self.assertEqual(m[:], b"012bar6789") 593 m.write(bytearray(b"baz")) 594 self.assertEqual(m.tell(), 9) 595 self.assertEqual(m[:], b"012barbaz9") 596 self.assertRaises(ValueError, m.write, b"ba") 597 598 def test_non_ascii_byte(self): 599 for b in (129, 200, 255): # > 128 600 m = mmap.mmap(-1, 1) 601 m.write_byte(b) 602 self.assertEqual(m[0], b) 603 m.seek(0) 604 self.assertEqual(m.read_byte(), b) 605 m.close() 606 607 @unittest.skipUnless(os.name == 'nt', 'requires Windows') 608 def test_tagname(self): 609 data1 = b"0123456789" 610 data2 = b"abcdefghij" 611 assert len(data1) == len(data2) 612 613 # Test same tag 614 m1 = mmap.mmap(-1, len(data1), tagname="foo") 615 m1[:] = data1 616 m2 = mmap.mmap(-1, len(data2), tagname="foo") 617 m2[:] = data2 618 self.assertEqual(m1[:], data2) 619 self.assertEqual(m2[:], data2) 620 m2.close() 621 m1.close() 622 623 # Test different tag 624 m1 = mmap.mmap(-1, len(data1), tagname="foo") 625 m1[:] = data1 626 m2 = mmap.mmap(-1, len(data2), tagname="boo") 627 m2[:] = data2 628 self.assertEqual(m1[:], data1) 629 self.assertEqual(m2[:], data2) 630 m2.close() 631 m1.close() 632 633 @cpython_only 634 @unittest.skipUnless(os.name == 'nt', 'requires Windows') 635 def test_sizeof(self): 636 m1 = mmap.mmap(-1, 100) 637 tagname = "foo" 638 m2 = mmap.mmap(-1, 100, tagname=tagname) 639 self.assertEqual(sys.getsizeof(m2), 640 sys.getsizeof(m1) + len(tagname) + 1) 641 642 @unittest.skipUnless(os.name == 'nt', 'requires Windows') 643 def test_crasher_on_windows(self): 644 # Should not crash (Issue 1733986) 645 m = mmap.mmap(-1, 1000, tagname="foo") 646 try: 647 mmap.mmap(-1, 5000, tagname="foo")[:] # same tagname, but larger size 648 except: 649 pass 650 m.close() 651 652 # Should not crash (Issue 5385) 653 with open(TESTFN, "wb") as fp: 654 fp.write(b"x"*10) 655 f = open(TESTFN, "r+b") 656 m = mmap.mmap(f.fileno(), 0) 657 f.close() 658 try: 659 m.resize(0) # will raise OSError 660 except: 661 pass 662 try: 663 m[:] 664 except: 665 pass 666 m.close() 667 668 @unittest.skipUnless(os.name == 'nt', 'requires Windows') 669 def test_invalid_descriptor(self): 670 # socket file descriptors are valid, but out of range 671 # for _get_osfhandle, causing a crash when validating the 672 # parameters to _get_osfhandle. 673 s = socket.socket() 674 try: 675 with self.assertRaises(OSError): 676 m = mmap.mmap(s.fileno(), 10) 677 finally: 678 s.close() 679 680 def test_context_manager(self): 681 with mmap.mmap(-1, 10) as m: 682 self.assertFalse(m.closed) 683 self.assertTrue(m.closed) 684 685 def test_context_manager_exception(self): 686 # Test that the OSError gets passed through 687 with self.assertRaises(Exception) as exc: 688 with mmap.mmap(-1, 10) as m: 689 raise OSError 690 self.assertIsInstance(exc.exception, OSError, 691 "wrong exception raised in context manager") 692 self.assertTrue(m.closed, "context manager failed") 693 694 def test_weakref(self): 695 # Check mmap objects are weakrefable 696 mm = mmap.mmap(-1, 16) 697 wr = weakref.ref(mm) 698 self.assertIs(wr(), mm) 699 del mm 700 gc_collect() 701 self.assertIs(wr(), None) 702 703 def test_write_returning_the_number_of_bytes_written(self): 704 mm = mmap.mmap(-1, 16) 705 self.assertEqual(mm.write(b""), 0) 706 self.assertEqual(mm.write(b"x"), 1) 707 self.assertEqual(mm.write(b"yz"), 2) 708 self.assertEqual(mm.write(b"python"), 6) 709 710 @unittest.skipIf(os.name == 'nt', 'cannot resize anonymous mmaps on Windows') 711 def test_resize_past_pos(self): 712 m = mmap.mmap(-1, 8192) 713 self.addCleanup(m.close) 714 m.read(5000) 715 try: 716 m.resize(4096) 717 except SystemError: 718 self.skipTest("resizing not supported") 719 self.assertEqual(m.read(14), b'') 720 self.assertRaises(ValueError, m.read_byte) 721 self.assertRaises(ValueError, m.write_byte, 42) 722 self.assertRaises(ValueError, m.write, b'abc') 723 724 def test_concat_repeat_exception(self): 725 m = mmap.mmap(-1, 16) 726 with self.assertRaises(TypeError): 727 m + m 728 with self.assertRaises(TypeError): 729 m * 2 730 731 def test_flush_return_value(self): 732 # mm.flush() should return None on success, raise an 733 # exception on error under all platforms. 734 mm = mmap.mmap(-1, 16) 735 self.addCleanup(mm.close) 736 mm.write(b'python') 737 result = mm.flush() 738 self.assertIsNone(result) 739 if sys.platform.startswith('linux'): 740 # 'offset' must be a multiple of mmap.PAGESIZE on Linux. 741 # See bpo-34754 for details. 742 self.assertRaises(OSError, mm.flush, 1, len(b'python')) 743 744 def test_repr(self): 745 open_mmap_repr_pat = re.compile( 746 r"<mmap.mmap closed=False, " 747 r"access=(?P<access>\S+), " 748 r"length=(?P<length>\d+), " 749 r"pos=(?P<pos>\d+), " 750 r"offset=(?P<offset>\d+)>") 751 closed_mmap_repr_pat = re.compile(r"<mmap.mmap closed=True>") 752 mapsizes = (50, 100, 1_000, 1_000_000, 10_000_000) 753 offsets = tuple((mapsize // 2 // mmap.ALLOCATIONGRANULARITY) 754 * mmap.ALLOCATIONGRANULARITY for mapsize in mapsizes) 755 for offset, mapsize in zip(offsets, mapsizes): 756 data = b'a' * mapsize 757 length = mapsize - offset 758 accesses = ('ACCESS_DEFAULT', 'ACCESS_READ', 759 'ACCESS_COPY', 'ACCESS_WRITE') 760 positions = (0, length//10, length//5, length//4) 761 with open(TESTFN, "wb+") as fp: 762 fp.write(data) 763 fp.flush() 764 for access, pos in itertools.product(accesses, positions): 765 accint = getattr(mmap, access) 766 with mmap.mmap(fp.fileno(), 767 length, 768 access=accint, 769 offset=offset) as mm: 770 mm.seek(pos) 771 match = open_mmap_repr_pat.match(repr(mm)) 772 self.assertIsNotNone(match) 773 self.assertEqual(match.group('access'), access) 774 self.assertEqual(match.group('length'), str(length)) 775 self.assertEqual(match.group('pos'), str(pos)) 776 self.assertEqual(match.group('offset'), str(offset)) 777 match = closed_mmap_repr_pat.match(repr(mm)) 778 self.assertIsNotNone(match) 779 780 @unittest.skipUnless(hasattr(mmap.mmap, 'madvise'), 'needs madvise') 781 def test_madvise(self): 782 size = 2 * PAGESIZE 783 m = mmap.mmap(-1, size) 784 785 with self.assertRaisesRegex(ValueError, "madvise start out of bounds"): 786 m.madvise(mmap.MADV_NORMAL, size) 787 with self.assertRaisesRegex(ValueError, "madvise start out of bounds"): 788 m.madvise(mmap.MADV_NORMAL, -1) 789 with self.assertRaisesRegex(ValueError, "madvise length invalid"): 790 m.madvise(mmap.MADV_NORMAL, 0, -1) 791 with self.assertRaisesRegex(OverflowError, "madvise length too large"): 792 m.madvise(mmap.MADV_NORMAL, PAGESIZE, sys.maxsize) 793 self.assertEqual(m.madvise(mmap.MADV_NORMAL), None) 794 self.assertEqual(m.madvise(mmap.MADV_NORMAL, PAGESIZE), None) 795 self.assertEqual(m.madvise(mmap.MADV_NORMAL, PAGESIZE, size), None) 796 self.assertEqual(m.madvise(mmap.MADV_NORMAL, 0, 2), None) 797 self.assertEqual(m.madvise(mmap.MADV_NORMAL, 0, size), None) 798 799 800class LargeMmapTests(unittest.TestCase): 801 802 def setUp(self): 803 unlink(TESTFN) 804 805 def tearDown(self): 806 unlink(TESTFN) 807 808 def _make_test_file(self, num_zeroes, tail): 809 if sys.platform[:3] == 'win' or sys.platform == 'darwin': 810 requires('largefile', 811 'test requires %s bytes and a long time to run' % str(0x180000000)) 812 f = open(TESTFN, 'w+b') 813 try: 814 f.seek(num_zeroes) 815 f.write(tail) 816 f.flush() 817 except (OSError, OverflowError, ValueError): 818 try: 819 f.close() 820 except (OSError, OverflowError): 821 pass 822 raise unittest.SkipTest("filesystem does not have largefile support") 823 return f 824 825 def test_large_offset(self): 826 with self._make_test_file(0x14FFFFFFF, b" ") as f: 827 with mmap.mmap(f.fileno(), 0, offset=0x140000000, access=mmap.ACCESS_READ) as m: 828 self.assertEqual(m[0xFFFFFFF], 32) 829 830 def test_large_filesize(self): 831 with self._make_test_file(0x17FFFFFFF, b" ") as f: 832 if sys.maxsize < 0x180000000: 833 # On 32 bit platforms the file is larger than sys.maxsize so 834 # mapping the whole file should fail -- Issue #16743 835 with self.assertRaises(OverflowError): 836 mmap.mmap(f.fileno(), 0x180000000, access=mmap.ACCESS_READ) 837 with self.assertRaises(ValueError): 838 mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) 839 with mmap.mmap(f.fileno(), 0x10000, access=mmap.ACCESS_READ) as m: 840 self.assertEqual(m.size(), 0x180000000) 841 842 # Issue 11277: mmap() with large (~4 GiB) sparse files crashes on OS X. 843 844 def _test_around_boundary(self, boundary): 845 tail = b' DEARdear ' 846 start = boundary - len(tail) // 2 847 end = start + len(tail) 848 with self._make_test_file(start, tail) as f: 849 with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as m: 850 self.assertEqual(m[start:end], tail) 851 852 @unittest.skipUnless(sys.maxsize > _4G, "test cannot run on 32-bit systems") 853 def test_around_2GB(self): 854 self._test_around_boundary(_2G) 855 856 @unittest.skipUnless(sys.maxsize > _4G, "test cannot run on 32-bit systems") 857 def test_around_4GB(self): 858 self._test_around_boundary(_4G) 859 860 861if __name__ == '__main__': 862 unittest.main() 863