• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1from test.support import (TESTFN, import_module, unlink,
2                          requires, _2G, _4G, gc_collect, cpython_only)
3import unittest
4import os
5import re
6import itertools
7import socket
8import sys
9import weakref
10
11# Skip test if we can't import mmap.
12mmap = import_module('mmap')
13
14PAGESIZE = mmap.PAGESIZE
15
16
17class MmapTests(unittest.TestCase):
18
19    def setUp(self):
20        if os.path.exists(TESTFN):
21            os.unlink(TESTFN)
22
23    def tearDown(self):
24        try:
25            os.unlink(TESTFN)
26        except OSError:
27            pass
28
29    def test_basic(self):
30        # Test mmap module on Unix systems and Windows
31
32        # Create a file to be mmap'ed.
33        f = open(TESTFN, 'bw+')
34        try:
35            # Write 2 pages worth of data to the file
36            f.write(b'\0'* PAGESIZE)
37            f.write(b'foo')
38            f.write(b'\0'* (PAGESIZE-3) )
39            f.flush()
40            m = mmap.mmap(f.fileno(), 2 * PAGESIZE)
41        finally:
42            f.close()
43
44        # Simple sanity checks
45
46        tp = str(type(m))  # SF bug 128713:  segfaulted on Linux
47        self.assertEqual(m.find(b'foo'), PAGESIZE)
48
49        self.assertEqual(len(m), 2*PAGESIZE)
50
51        self.assertEqual(m[0], 0)
52        self.assertEqual(m[0:3], b'\0\0\0')
53
54        # Shouldn't crash on boundary (Issue #5292)
55        self.assertRaises(IndexError, m.__getitem__, len(m))
56        self.assertRaises(IndexError, m.__setitem__, len(m), b'\0')
57
58        # Modify the file's content
59        m[0] = b'3'[0]
60        m[PAGESIZE +3: PAGESIZE +3+3] = b'bar'
61
62        # Check that the modification worked
63        self.assertEqual(m[0], b'3'[0])
64        self.assertEqual(m[0:3], b'3\0\0')
65        self.assertEqual(m[PAGESIZE-1 : PAGESIZE + 7], b'\0foobar\0')
66
67        m.flush()
68
69        # Test doing a regular expression match in an mmap'ed file
70        match = re.search(b'[A-Za-z]+', m)
71        if match is None:
72            self.fail('regex match on mmap failed!')
73        else:
74            start, end = match.span(0)
75            length = end - start
76
77            self.assertEqual(start, PAGESIZE)
78            self.assertEqual(end, PAGESIZE + 6)
79
80        # test seeking around (try to overflow the seek implementation)
81        m.seek(0,0)
82        self.assertEqual(m.tell(), 0)
83        m.seek(42,1)
84        self.assertEqual(m.tell(), 42)
85        m.seek(0,2)
86        self.assertEqual(m.tell(), len(m))
87
88        # Try to seek to negative position...
89        self.assertRaises(ValueError, m.seek, -1)
90
91        # Try to seek beyond end of mmap...
92        self.assertRaises(ValueError, m.seek, 1, 2)
93
94        # Try to seek to negative position...
95        self.assertRaises(ValueError, m.seek, -len(m)-1, 2)
96
97        # Try resizing map
98        try:
99            m.resize(512)
100        except SystemError:
101            # resize() not supported
102            # No messages are printed, since the output of this test suite
103            # would then be different across platforms.
104            pass
105        else:
106            # resize() is supported
107            self.assertEqual(len(m), 512)
108            # Check that we can no longer seek beyond the new size.
109            self.assertRaises(ValueError, m.seek, 513, 0)
110
111            # Check that the underlying file is truncated too
112            # (bug #728515)
113            f = open(TESTFN, 'rb')
114            try:
115                f.seek(0, 2)
116                self.assertEqual(f.tell(), 512)
117            finally:
118                f.close()
119            self.assertEqual(m.size(), 512)
120
121        m.close()
122
123    def test_access_parameter(self):
124        # Test for "access" keyword parameter
125        mapsize = 10
126        with open(TESTFN, "wb") as fp:
127            fp.write(b"a"*mapsize)
128        with open(TESTFN, "rb") as f:
129            m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_READ)
130            self.assertEqual(m[:], b'a'*mapsize, "Readonly memory map data incorrect.")
131
132            # Ensuring that readonly mmap can't be slice assigned
133            try:
134                m[:] = b'b'*mapsize
135            except TypeError:
136                pass
137            else:
138                self.fail("Able to write to readonly memory map")
139
140            # Ensuring that readonly mmap can't be item assigned
141            try:
142                m[0] = b'b'
143            except TypeError:
144                pass
145            else:
146                self.fail("Able to write to readonly memory map")
147
148            # Ensuring that readonly mmap can't be write() to
149            try:
150                m.seek(0,0)
151                m.write(b'abc')
152            except TypeError:
153                pass
154            else:
155                self.fail("Able to write to readonly memory map")
156
157            # Ensuring that readonly mmap can't be write_byte() to
158            try:
159                m.seek(0,0)
160                m.write_byte(b'd')
161            except TypeError:
162                pass
163            else:
164                self.fail("Able to write to readonly memory map")
165
166            # Ensuring that readonly mmap can't be resized
167            try:
168                m.resize(2*mapsize)
169            except SystemError:   # resize is not universally supported
170                pass
171            except TypeError:
172                pass
173            else:
174                self.fail("Able to resize readonly memory map")
175            with open(TESTFN, "rb") as fp:
176                self.assertEqual(fp.read(), b'a'*mapsize,
177                                 "Readonly memory map data file was modified")
178
179        # Opening mmap with size too big
180        with open(TESTFN, "r+b") as f:
181            try:
182                m = mmap.mmap(f.fileno(), mapsize+1)
183            except ValueError:
184                # we do not expect a ValueError on Windows
185                # CAUTION:  This also changes the size of the file on disk, and
186                # later tests assume that the length hasn't changed.  We need to
187                # repair that.
188                if sys.platform.startswith('win'):
189                    self.fail("Opening mmap with size+1 should work on Windows.")
190            else:
191                # we expect a ValueError on Unix, but not on Windows
192                if not sys.platform.startswith('win'):
193                    self.fail("Opening mmap with size+1 should raise ValueError.")
194                m.close()
195            if sys.platform.startswith('win'):
196                # Repair damage from the resizing test.
197                with open(TESTFN, 'r+b') as f:
198                    f.truncate(mapsize)
199
200        # Opening mmap with access=ACCESS_WRITE
201        with open(TESTFN, "r+b") as f:
202            m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_WRITE)
203            # Modifying write-through memory map
204            m[:] = b'c'*mapsize
205            self.assertEqual(m[:], b'c'*mapsize,
206                   "Write-through memory map memory not updated properly.")
207            m.flush()
208            m.close()
209        with open(TESTFN, 'rb') as f:
210            stuff = f.read()
211        self.assertEqual(stuff, b'c'*mapsize,
212               "Write-through memory map data file not updated properly.")
213
214        # Opening mmap with access=ACCESS_COPY
215        with open(TESTFN, "r+b") as f:
216            m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_COPY)
217            # Modifying copy-on-write memory map
218            m[:] = b'd'*mapsize
219            self.assertEqual(m[:], b'd' * mapsize,
220                             "Copy-on-write memory map data not written correctly.")
221            m.flush()
222            with open(TESTFN, "rb") as fp:
223                self.assertEqual(fp.read(), b'c'*mapsize,
224                                 "Copy-on-write test data file should not be modified.")
225            # Ensuring copy-on-write maps cannot be resized
226            self.assertRaises(TypeError, m.resize, 2*mapsize)
227            m.close()
228
229        # Ensuring invalid access parameter raises exception
230        with open(TESTFN, "r+b") as f:
231            self.assertRaises(ValueError, mmap.mmap, f.fileno(), mapsize, access=4)
232
233        if os.name == "posix":
234            # Try incompatible flags, prot and access parameters.
235            with open(TESTFN, "r+b") as f:
236                self.assertRaises(ValueError, mmap.mmap, f.fileno(), mapsize,
237                                  flags=mmap.MAP_PRIVATE,
238                                  prot=mmap.PROT_READ, access=mmap.ACCESS_WRITE)
239
240            # Try writing with PROT_EXEC and without PROT_WRITE
241            prot = mmap.PROT_READ | getattr(mmap, 'PROT_EXEC', 0)
242            with open(TESTFN, "r+b") as f:
243                m = mmap.mmap(f.fileno(), mapsize, prot=prot)
244                self.assertRaises(TypeError, m.write, b"abcdef")
245                self.assertRaises(TypeError, m.write_byte, 0)
246                m.close()
247
248    def test_bad_file_desc(self):
249        # Try opening a bad file descriptor...
250        self.assertRaises(OSError, mmap.mmap, -2, 4096)
251
252    def test_tougher_find(self):
253        # Do a tougher .find() test.  SF bug 515943 pointed out that, in 2.2,
254        # searching for data with embedded \0 bytes didn't work.
255        with open(TESTFN, 'wb+') as f:
256
257            data = b'aabaac\x00deef\x00\x00aa\x00'
258            n = len(data)
259            f.write(data)
260            f.flush()
261            m = mmap.mmap(f.fileno(), n)
262
263        for start in range(n+1):
264            for finish in range(start, n+1):
265                slice = data[start : finish]
266                self.assertEqual(m.find(slice), data.find(slice))
267                self.assertEqual(m.find(slice + b'x'), -1)
268        m.close()
269
270    def test_find_end(self):
271        # test the new 'end' parameter works as expected
272        with open(TESTFN, 'wb+') as f:
273            data = b'one two ones'
274            n = len(data)
275            f.write(data)
276            f.flush()
277            m = mmap.mmap(f.fileno(), n)
278
279        self.assertEqual(m.find(b'one'), 0)
280        self.assertEqual(m.find(b'ones'), 8)
281        self.assertEqual(m.find(b'one', 0, -1), 0)
282        self.assertEqual(m.find(b'one', 1), 8)
283        self.assertEqual(m.find(b'one', 1, -1), 8)
284        self.assertEqual(m.find(b'one', 1, -2), -1)
285        self.assertEqual(m.find(bytearray(b'one')), 0)
286
287
288    def test_rfind(self):
289        # test the new 'end' parameter works as expected
290        with open(TESTFN, 'wb+') as f:
291            data = b'one two ones'
292            n = len(data)
293            f.write(data)
294            f.flush()
295            m = mmap.mmap(f.fileno(), n)
296
297        self.assertEqual(m.rfind(b'one'), 8)
298        self.assertEqual(m.rfind(b'one '), 0)
299        self.assertEqual(m.rfind(b'one', 0, -1), 8)
300        self.assertEqual(m.rfind(b'one', 0, -2), 0)
301        self.assertEqual(m.rfind(b'one', 1, -1), 8)
302        self.assertEqual(m.rfind(b'one', 1, -2), -1)
303        self.assertEqual(m.rfind(bytearray(b'one')), 8)
304
305
306    def test_double_close(self):
307        # make sure a double close doesn't crash on Solaris (Bug# 665913)
308        with open(TESTFN, 'wb+') as f:
309            f.write(2**16 * b'a') # Arbitrary character
310
311        with open(TESTFN, 'rb') as f:
312            mf = mmap.mmap(f.fileno(), 2**16, access=mmap.ACCESS_READ)
313            mf.close()
314            mf.close()
315
316    def test_entire_file(self):
317        # test mapping of entire file by passing 0 for map length
318        with open(TESTFN, "wb+") as f:
319            f.write(2**16 * b'm') # Arbitrary character
320
321        with open(TESTFN, "rb+") as f, \
322             mmap.mmap(f.fileno(), 0) as mf:
323            self.assertEqual(len(mf), 2**16, "Map size should equal file size.")
324            self.assertEqual(mf.read(2**16), 2**16 * b"m")
325
326    def test_length_0_offset(self):
327        # Issue #10916: test mapping of remainder of file by passing 0 for
328        # map length with an offset doesn't cause a segfault.
329        # NOTE: allocation granularity is currently 65536 under Win64,
330        # and therefore the minimum offset alignment.
331        with open(TESTFN, "wb") as f:
332            f.write((65536 * 2) * b'm') # Arbitrary character
333
334        with open(TESTFN, "rb") as f:
335            with mmap.mmap(f.fileno(), 0, offset=65536, access=mmap.ACCESS_READ) as mf:
336                self.assertRaises(IndexError, mf.__getitem__, 80000)
337
338    def test_length_0_large_offset(self):
339        # Issue #10959: test mapping of a file by passing 0 for
340        # map length with a large offset doesn't cause a segfault.
341        with open(TESTFN, "wb") as f:
342            f.write(115699 * b'm') # Arbitrary character
343
344        with open(TESTFN, "w+b") as f:
345            self.assertRaises(ValueError, mmap.mmap, f.fileno(), 0,
346                              offset=2147418112)
347
348    def test_move(self):
349        # make move works everywhere (64-bit format problem earlier)
350        with open(TESTFN, 'wb+') as f:
351
352            f.write(b"ABCDEabcde") # Arbitrary character
353            f.flush()
354
355            mf = mmap.mmap(f.fileno(), 10)
356            mf.move(5, 0, 5)
357            self.assertEqual(mf[:], b"ABCDEABCDE", "Map move should have duplicated front 5")
358            mf.close()
359
360        # more excessive test
361        data = b"0123456789"
362        for dest in range(len(data)):
363            for src in range(len(data)):
364                for count in range(len(data) - max(dest, src)):
365                    expected = data[:dest] + data[src:src+count] + data[dest+count:]
366                    m = mmap.mmap(-1, len(data))
367                    m[:] = data
368                    m.move(dest, src, count)
369                    self.assertEqual(m[:], expected)
370                    m.close()
371
372        # segfault test (Issue 5387)
373        m = mmap.mmap(-1, 100)
374        offsets = [-100, -1, 0, 1, 100]
375        for source, dest, size in itertools.product(offsets, offsets, offsets):
376            try:
377                m.move(source, dest, size)
378            except ValueError:
379                pass
380
381        offsets = [(-1, -1, -1), (-1, -1, 0), (-1, 0, -1), (0, -1, -1),
382                   (-1, 0, 0), (0, -1, 0), (0, 0, -1)]
383        for source, dest, size in offsets:
384            self.assertRaises(ValueError, m.move, source, dest, size)
385
386        m.close()
387
388        m = mmap.mmap(-1, 1) # single byte
389        self.assertRaises(ValueError, m.move, 0, 0, 2)
390        self.assertRaises(ValueError, m.move, 1, 0, 1)
391        self.assertRaises(ValueError, m.move, 0, 1, 1)
392        m.move(0, 0, 1)
393        m.move(0, 0, 0)
394
395
396    def test_anonymous(self):
397        # anonymous mmap.mmap(-1, PAGE)
398        m = mmap.mmap(-1, PAGESIZE)
399        for x in range(PAGESIZE):
400            self.assertEqual(m[x], 0,
401                             "anonymously mmap'ed contents should be zero")
402
403        for x in range(PAGESIZE):
404            b = x & 0xff
405            m[x] = b
406            self.assertEqual(m[x], b)
407
408    def test_read_all(self):
409        m = mmap.mmap(-1, 16)
410        self.addCleanup(m.close)
411
412        # With no parameters, or None or a negative argument, reads all
413        m.write(bytes(range(16)))
414        m.seek(0)
415        self.assertEqual(m.read(), bytes(range(16)))
416        m.seek(8)
417        self.assertEqual(m.read(), bytes(range(8, 16)))
418        m.seek(16)
419        self.assertEqual(m.read(), b'')
420        m.seek(3)
421        self.assertEqual(m.read(None), bytes(range(3, 16)))
422        m.seek(4)
423        self.assertEqual(m.read(-1), bytes(range(4, 16)))
424        m.seek(5)
425        self.assertEqual(m.read(-2), bytes(range(5, 16)))
426        m.seek(9)
427        self.assertEqual(m.read(-42), bytes(range(9, 16)))
428
429    def test_read_invalid_arg(self):
430        m = mmap.mmap(-1, 16)
431        self.addCleanup(m.close)
432
433        self.assertRaises(TypeError, m.read, 'foo')
434        self.assertRaises(TypeError, m.read, 5.5)
435        self.assertRaises(TypeError, m.read, [1, 2, 3])
436
437    def test_extended_getslice(self):
438        # Test extended slicing by comparing with list slicing.
439        s = bytes(reversed(range(256)))
440        m = mmap.mmap(-1, len(s))
441        m[:] = s
442        self.assertEqual(m[:], s)
443        indices = (0, None, 1, 3, 19, 300, sys.maxsize, -1, -2, -31, -300)
444        for start in indices:
445            for stop in indices:
446                # Skip step 0 (invalid)
447                for step in indices[1:]:
448                    self.assertEqual(m[start:stop:step],
449                                     s[start:stop:step])
450
451    def test_extended_set_del_slice(self):
452        # Test extended slicing by comparing with list slicing.
453        s = bytes(reversed(range(256)))
454        m = mmap.mmap(-1, len(s))
455        indices = (0, None, 1, 3, 19, 300, sys.maxsize, -1, -2, -31, -300)
456        for start in indices:
457            for stop in indices:
458                # Skip invalid step 0
459                for step in indices[1:]:
460                    m[:] = s
461                    self.assertEqual(m[:], s)
462                    L = list(s)
463                    # Make sure we have a slice of exactly the right length,
464                    # but with different data.
465                    data = L[start:stop:step]
466                    data = bytes(reversed(data))
467                    L[start:stop:step] = data
468                    m[start:stop:step] = data
469                    self.assertEqual(m[:], bytes(L))
470
471    def make_mmap_file (self, f, halfsize):
472        # Write 2 pages worth of data to the file
473        f.write (b'\0' * halfsize)
474        f.write (b'foo')
475        f.write (b'\0' * (halfsize - 3))
476        f.flush ()
477        return mmap.mmap (f.fileno(), 0)
478
479    def test_empty_file (self):
480        f = open (TESTFN, 'w+b')
481        f.close()
482        with open(TESTFN, "rb") as f :
483            self.assertRaisesRegex(ValueError,
484                                   "cannot mmap an empty file",
485                                   mmap.mmap, f.fileno(), 0,
486                                   access=mmap.ACCESS_READ)
487
488    def test_offset (self):
489        f = open (TESTFN, 'w+b')
490
491        try: # unlink TESTFN no matter what
492            halfsize = mmap.ALLOCATIONGRANULARITY
493            m = self.make_mmap_file (f, halfsize)
494            m.close ()
495            f.close ()
496
497            mapsize = halfsize * 2
498            # Try invalid offset
499            f = open(TESTFN, "r+b")
500            for offset in [-2, -1, None]:
501                try:
502                    m = mmap.mmap(f.fileno(), mapsize, offset=offset)
503                    self.assertEqual(0, 1)
504                except (ValueError, TypeError, OverflowError):
505                    pass
506                else:
507                    self.assertEqual(0, 0)
508            f.close()
509
510            # Try valid offset, hopefully 8192 works on all OSes
511            f = open(TESTFN, "r+b")
512            m = mmap.mmap(f.fileno(), mapsize - halfsize, offset=halfsize)
513            self.assertEqual(m[0:3], b'foo')
514            f.close()
515
516            # Try resizing map
517            try:
518                m.resize(512)
519            except SystemError:
520                pass
521            else:
522                # resize() is supported
523                self.assertEqual(len(m), 512)
524                # Check that we can no longer seek beyond the new size.
525                self.assertRaises(ValueError, m.seek, 513, 0)
526                # Check that the content is not changed
527                self.assertEqual(m[0:3], b'foo')
528
529                # Check that the underlying file is truncated too
530                f = open(TESTFN, 'rb')
531                f.seek(0, 2)
532                self.assertEqual(f.tell(), halfsize + 512)
533                f.close()
534                self.assertEqual(m.size(), halfsize + 512)
535
536            m.close()
537
538        finally:
539            f.close()
540            try:
541                os.unlink(TESTFN)
542            except OSError:
543                pass
544
545    def test_subclass(self):
546        class anon_mmap(mmap.mmap):
547            def __new__(klass, *args, **kwargs):
548                return mmap.mmap.__new__(klass, -1, *args, **kwargs)
549        anon_mmap(PAGESIZE)
550
551    @unittest.skipUnless(hasattr(mmap, 'PROT_READ'), "needs mmap.PROT_READ")
552    def test_prot_readonly(self):
553        mapsize = 10
554        with open(TESTFN, "wb") as fp:
555            fp.write(b"a"*mapsize)
556        with open(TESTFN, "rb") as f:
557            m = mmap.mmap(f.fileno(), mapsize, prot=mmap.PROT_READ)
558            self.assertRaises(TypeError, m.write, "foo")
559
560    def test_error(self):
561        self.assertIs(mmap.error, OSError)
562
563    def test_io_methods(self):
564        data = b"0123456789"
565        with open(TESTFN, "wb") as fp:
566            fp.write(b"x"*len(data))
567        with open(TESTFN, "r+b") as f:
568            m = mmap.mmap(f.fileno(), len(data))
569        # Test write_byte()
570        for i in range(len(data)):
571            self.assertEqual(m.tell(), i)
572            m.write_byte(data[i])
573            self.assertEqual(m.tell(), i+1)
574        self.assertRaises(ValueError, m.write_byte, b"x"[0])
575        self.assertEqual(m[:], data)
576        # Test read_byte()
577        m.seek(0)
578        for i in range(len(data)):
579            self.assertEqual(m.tell(), i)
580            self.assertEqual(m.read_byte(), data[i])
581            self.assertEqual(m.tell(), i+1)
582        self.assertRaises(ValueError, m.read_byte)
583        # Test read()
584        m.seek(3)
585        self.assertEqual(m.read(3), b"345")
586        self.assertEqual(m.tell(), 6)
587        # Test write()
588        m.seek(3)
589        m.write(b"bar")
590        self.assertEqual(m.tell(), 6)
591        self.assertEqual(m[:], b"012bar6789")
592        m.write(bytearray(b"baz"))
593        self.assertEqual(m.tell(), 9)
594        self.assertEqual(m[:], b"012barbaz9")
595        self.assertRaises(ValueError, m.write, b"ba")
596
597    def test_non_ascii_byte(self):
598        for b in (129, 200, 255): # > 128
599            m = mmap.mmap(-1, 1)
600            m.write_byte(b)
601            self.assertEqual(m[0], b)
602            m.seek(0)
603            self.assertEqual(m.read_byte(), b)
604            m.close()
605
606    @unittest.skipUnless(os.name == 'nt', 'requires Windows')
607    def test_tagname(self):
608        data1 = b"0123456789"
609        data2 = b"abcdefghij"
610        assert len(data1) == len(data2)
611
612        # Test same tag
613        m1 = mmap.mmap(-1, len(data1), tagname="foo")
614        m1[:] = data1
615        m2 = mmap.mmap(-1, len(data2), tagname="foo")
616        m2[:] = data2
617        self.assertEqual(m1[:], data2)
618        self.assertEqual(m2[:], data2)
619        m2.close()
620        m1.close()
621
622        # Test different tag
623        m1 = mmap.mmap(-1, len(data1), tagname="foo")
624        m1[:] = data1
625        m2 = mmap.mmap(-1, len(data2), tagname="boo")
626        m2[:] = data2
627        self.assertEqual(m1[:], data1)
628        self.assertEqual(m2[:], data2)
629        m2.close()
630        m1.close()
631
632    @cpython_only
633    @unittest.skipUnless(os.name == 'nt', 'requires Windows')
634    def test_sizeof(self):
635        m1 = mmap.mmap(-1, 100)
636        tagname = "foo"
637        m2 = mmap.mmap(-1, 100, tagname=tagname)
638        self.assertEqual(sys.getsizeof(m2),
639                         sys.getsizeof(m1) + len(tagname) + 1)
640
641    @unittest.skipUnless(os.name == 'nt', 'requires Windows')
642    def test_crasher_on_windows(self):
643        # Should not crash (Issue 1733986)
644        m = mmap.mmap(-1, 1000, tagname="foo")
645        try:
646            mmap.mmap(-1, 5000, tagname="foo")[:] # same tagname, but larger size
647        except:
648            pass
649        m.close()
650
651        # Should not crash (Issue 5385)
652        with open(TESTFN, "wb") as fp:
653            fp.write(b"x"*10)
654        f = open(TESTFN, "r+b")
655        m = mmap.mmap(f.fileno(), 0)
656        f.close()
657        try:
658            m.resize(0) # will raise OSError
659        except:
660            pass
661        try:
662            m[:]
663        except:
664            pass
665        m.close()
666
667    @unittest.skipUnless(os.name == 'nt', 'requires Windows')
668    def test_invalid_descriptor(self):
669        # socket file descriptors are valid, but out of range
670        # for _get_osfhandle, causing a crash when validating the
671        # parameters to _get_osfhandle.
672        s = socket.socket()
673        try:
674            with self.assertRaises(OSError):
675                m = mmap.mmap(s.fileno(), 10)
676        finally:
677            s.close()
678
679    def test_context_manager(self):
680        with mmap.mmap(-1, 10) as m:
681            self.assertFalse(m.closed)
682        self.assertTrue(m.closed)
683
684    def test_context_manager_exception(self):
685        # Test that the OSError gets passed through
686        with self.assertRaises(Exception) as exc:
687            with mmap.mmap(-1, 10) as m:
688                raise OSError
689        self.assertIsInstance(exc.exception, OSError,
690                              "wrong exception raised in context manager")
691        self.assertTrue(m.closed, "context manager failed")
692
693    def test_weakref(self):
694        # Check mmap objects are weakrefable
695        mm = mmap.mmap(-1, 16)
696        wr = weakref.ref(mm)
697        self.assertIs(wr(), mm)
698        del mm
699        gc_collect()
700        self.assertIs(wr(), None)
701
702    def test_write_returning_the_number_of_bytes_written(self):
703        mm = mmap.mmap(-1, 16)
704        self.assertEqual(mm.write(b""), 0)
705        self.assertEqual(mm.write(b"x"), 1)
706        self.assertEqual(mm.write(b"yz"), 2)
707        self.assertEqual(mm.write(b"python"), 6)
708
709    @unittest.skipIf(os.name == 'nt', 'cannot resize anonymous mmaps on Windows')
710    def test_resize_past_pos(self):
711        m = mmap.mmap(-1, 8192)
712        self.addCleanup(m.close)
713        m.read(5000)
714        try:
715            m.resize(4096)
716        except SystemError:
717            self.skipTest("resizing not supported")
718        self.assertEqual(m.read(14), b'')
719        self.assertRaises(ValueError, m.read_byte)
720        self.assertRaises(ValueError, m.write_byte, 42)
721        self.assertRaises(ValueError, m.write, b'abc')
722
723    def test_concat_repeat_exception(self):
724        m = mmap.mmap(-1, 16)
725        with self.assertRaises(TypeError):
726            m + m
727        with self.assertRaises(TypeError):
728            m * 2
729
730    def test_flush_return_value(self):
731        # mm.flush() should return None on success, raise an
732        # exception on error under all platforms.
733        mm = mmap.mmap(-1, 16)
734        self.addCleanup(mm.close)
735        mm.write(b'python')
736        result = mm.flush()
737        self.assertIsNone(result)
738        if sys.platform.startswith('linux'):
739            # 'offset' must be a multiple of mmap.PAGESIZE on Linux.
740            # See bpo-34754 for details.
741            self.assertRaises(OSError, mm.flush, 1, len(b'python'))
742
743    @unittest.skipUnless(hasattr(mmap.mmap, 'madvise'), 'needs madvise')
744    def test_madvise(self):
745        size = 2 * PAGESIZE
746        m = mmap.mmap(-1, size)
747
748        with self.assertRaisesRegex(ValueError, "madvise start out of bounds"):
749            m.madvise(mmap.MADV_NORMAL, size)
750        with self.assertRaisesRegex(ValueError, "madvise start out of bounds"):
751            m.madvise(mmap.MADV_NORMAL, -1)
752        with self.assertRaisesRegex(ValueError, "madvise length invalid"):
753            m.madvise(mmap.MADV_NORMAL, 0, -1)
754        with self.assertRaisesRegex(OverflowError, "madvise length too large"):
755            m.madvise(mmap.MADV_NORMAL, PAGESIZE, sys.maxsize)
756        self.assertEqual(m.madvise(mmap.MADV_NORMAL), None)
757        self.assertEqual(m.madvise(mmap.MADV_NORMAL, PAGESIZE), None)
758        self.assertEqual(m.madvise(mmap.MADV_NORMAL, PAGESIZE, size), None)
759        self.assertEqual(m.madvise(mmap.MADV_NORMAL, 0, 2), None)
760        self.assertEqual(m.madvise(mmap.MADV_NORMAL, 0, size), None)
761
762
763class LargeMmapTests(unittest.TestCase):
764
765    def setUp(self):
766        unlink(TESTFN)
767
768    def tearDown(self):
769        unlink(TESTFN)
770
771    def _make_test_file(self, num_zeroes, tail):
772        if sys.platform[:3] == 'win' or sys.platform == 'darwin':
773            requires('largefile',
774                'test requires %s bytes and a long time to run' % str(0x180000000))
775        f = open(TESTFN, 'w+b')
776        try:
777            f.seek(num_zeroes)
778            f.write(tail)
779            f.flush()
780        except (OSError, OverflowError, ValueError):
781            try:
782                f.close()
783            except (OSError, OverflowError):
784                pass
785            raise unittest.SkipTest("filesystem does not have largefile support")
786        return f
787
788    def test_large_offset(self):
789        with self._make_test_file(0x14FFFFFFF, b" ") as f:
790            with mmap.mmap(f.fileno(), 0, offset=0x140000000, access=mmap.ACCESS_READ) as m:
791                self.assertEqual(m[0xFFFFFFF], 32)
792
793    def test_large_filesize(self):
794        with self._make_test_file(0x17FFFFFFF, b" ") as f:
795            if sys.maxsize < 0x180000000:
796                # On 32 bit platforms the file is larger than sys.maxsize so
797                # mapping the whole file should fail -- Issue #16743
798                with self.assertRaises(OverflowError):
799                    mmap.mmap(f.fileno(), 0x180000000, access=mmap.ACCESS_READ)
800                with self.assertRaises(ValueError):
801                    mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
802            with mmap.mmap(f.fileno(), 0x10000, access=mmap.ACCESS_READ) as m:
803                self.assertEqual(m.size(), 0x180000000)
804
805    # Issue 11277: mmap() with large (~4 GiB) sparse files crashes on OS X.
806
807    def _test_around_boundary(self, boundary):
808        tail = b'  DEARdear  '
809        start = boundary - len(tail) // 2
810        end = start + len(tail)
811        with self._make_test_file(start, tail) as f:
812            with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as m:
813                self.assertEqual(m[start:end], tail)
814
815    @unittest.skipUnless(sys.maxsize > _4G, "test cannot run on 32-bit systems")
816    def test_around_2GB(self):
817        self._test_around_boundary(_2G)
818
819    @unittest.skipUnless(sys.maxsize > _4G, "test cannot run on 32-bit systems")
820    def test_around_4GB(self):
821        self._test_around_boundary(_4G)
822
823
824if __name__ == '__main__':
825    unittest.main()
826