• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1from test.support import (requires, _2G, _4G, gc_collect, cpython_only)
2from test.support.import_helper import import_module
3from test.support.os_helper import TESTFN, unlink
4import unittest
5import os
6import re
7import itertools
8import socket
9import sys
10import weakref
11
12# Skip test if we can't import mmap.
13mmap = import_module('mmap')
14
15PAGESIZE = mmap.PAGESIZE
16
17
18class MmapTests(unittest.TestCase):
19
20    def setUp(self):
21        if os.path.exists(TESTFN):
22            os.unlink(TESTFN)
23
24    def tearDown(self):
25        try:
26            os.unlink(TESTFN)
27        except OSError:
28            pass
29
30    def test_basic(self):
31        # Test mmap module on Unix systems and Windows
32
33        # Create a file to be mmap'ed.
34        f = open(TESTFN, 'bw+')
35        try:
36            # Write 2 pages worth of data to the file
37            f.write(b'\0'* PAGESIZE)
38            f.write(b'foo')
39            f.write(b'\0'* (PAGESIZE-3) )
40            f.flush()
41            m = mmap.mmap(f.fileno(), 2 * PAGESIZE)
42        finally:
43            f.close()
44
45        # Simple sanity checks
46
47        tp = str(type(m))  # SF bug 128713:  segfaulted on Linux
48        self.assertEqual(m.find(b'foo'), PAGESIZE)
49
50        self.assertEqual(len(m), 2*PAGESIZE)
51
52        self.assertEqual(m[0], 0)
53        self.assertEqual(m[0:3], b'\0\0\0')
54
55        # Shouldn't crash on boundary (Issue #5292)
56        self.assertRaises(IndexError, m.__getitem__, len(m))
57        self.assertRaises(IndexError, m.__setitem__, len(m), b'\0')
58
59        # Modify the file's content
60        m[0] = b'3'[0]
61        m[PAGESIZE +3: PAGESIZE +3+3] = b'bar'
62
63        # Check that the modification worked
64        self.assertEqual(m[0], b'3'[0])
65        self.assertEqual(m[0:3], b'3\0\0')
66        self.assertEqual(m[PAGESIZE-1 : PAGESIZE + 7], b'\0foobar\0')
67
68        m.flush()
69
70        # Test doing a regular expression match in an mmap'ed file
71        match = re.search(b'[A-Za-z]+', m)
72        if match is None:
73            self.fail('regex match on mmap failed!')
74        else:
75            start, end = match.span(0)
76            length = end - start
77
78            self.assertEqual(start, PAGESIZE)
79            self.assertEqual(end, PAGESIZE + 6)
80
81        # test seeking around (try to overflow the seek implementation)
82        m.seek(0,0)
83        self.assertEqual(m.tell(), 0)
84        m.seek(42,1)
85        self.assertEqual(m.tell(), 42)
86        m.seek(0,2)
87        self.assertEqual(m.tell(), len(m))
88
89        # Try to seek to negative position...
90        self.assertRaises(ValueError, m.seek, -1)
91
92        # Try to seek beyond end of mmap...
93        self.assertRaises(ValueError, m.seek, 1, 2)
94
95        # Try to seek to negative position...
96        self.assertRaises(ValueError, m.seek, -len(m)-1, 2)
97
98        # Try resizing map
99        try:
100            m.resize(512)
101        except SystemError:
102            # resize() not supported
103            # No messages are printed, since the output of this test suite
104            # would then be different across platforms.
105            pass
106        else:
107            # resize() is supported
108            self.assertEqual(len(m), 512)
109            # Check that we can no longer seek beyond the new size.
110            self.assertRaises(ValueError, m.seek, 513, 0)
111
112            # Check that the underlying file is truncated too
113            # (bug #728515)
114            f = open(TESTFN, 'rb')
115            try:
116                f.seek(0, 2)
117                self.assertEqual(f.tell(), 512)
118            finally:
119                f.close()
120            self.assertEqual(m.size(), 512)
121
122        m.close()
123
124    def test_access_parameter(self):
125        # Test for "access" keyword parameter
126        mapsize = 10
127        with open(TESTFN, "wb") as fp:
128            fp.write(b"a"*mapsize)
129        with open(TESTFN, "rb") as f:
130            m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_READ)
131            self.assertEqual(m[:], b'a'*mapsize, "Readonly memory map data incorrect.")
132
133            # Ensuring that readonly mmap can't be slice assigned
134            try:
135                m[:] = b'b'*mapsize
136            except TypeError:
137                pass
138            else:
139                self.fail("Able to write to readonly memory map")
140
141            # Ensuring that readonly mmap can't be item assigned
142            try:
143                m[0] = b'b'
144            except TypeError:
145                pass
146            else:
147                self.fail("Able to write to readonly memory map")
148
149            # Ensuring that readonly mmap can't be write() to
150            try:
151                m.seek(0,0)
152                m.write(b'abc')
153            except TypeError:
154                pass
155            else:
156                self.fail("Able to write to readonly memory map")
157
158            # Ensuring that readonly mmap can't be write_byte() to
159            try:
160                m.seek(0,0)
161                m.write_byte(b'd')
162            except TypeError:
163                pass
164            else:
165                self.fail("Able to write to readonly memory map")
166
167            # Ensuring that readonly mmap can't be resized
168            try:
169                m.resize(2*mapsize)
170            except SystemError:   # resize is not universally supported
171                pass
172            except TypeError:
173                pass
174            else:
175                self.fail("Able to resize readonly memory map")
176            with open(TESTFN, "rb") as fp:
177                self.assertEqual(fp.read(), b'a'*mapsize,
178                                 "Readonly memory map data file was modified")
179
180        # Opening mmap with size too big
181        with open(TESTFN, "r+b") as f:
182            try:
183                m = mmap.mmap(f.fileno(), mapsize+1)
184            except ValueError:
185                # we do not expect a ValueError on Windows
186                # CAUTION:  This also changes the size of the file on disk, and
187                # later tests assume that the length hasn't changed.  We need to
188                # repair that.
189                if sys.platform.startswith('win'):
190                    self.fail("Opening mmap with size+1 should work on Windows.")
191            else:
192                # we expect a ValueError on Unix, but not on Windows
193                if not sys.platform.startswith('win'):
194                    self.fail("Opening mmap with size+1 should raise ValueError.")
195                m.close()
196            if sys.platform.startswith('win'):
197                # Repair damage from the resizing test.
198                with open(TESTFN, 'r+b') as f:
199                    f.truncate(mapsize)
200
201        # Opening mmap with access=ACCESS_WRITE
202        with open(TESTFN, "r+b") as f:
203            m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_WRITE)
204            # Modifying write-through memory map
205            m[:] = b'c'*mapsize
206            self.assertEqual(m[:], b'c'*mapsize,
207                   "Write-through memory map memory not updated properly.")
208            m.flush()
209            m.close()
210        with open(TESTFN, 'rb') as f:
211            stuff = f.read()
212        self.assertEqual(stuff, b'c'*mapsize,
213               "Write-through memory map data file not updated properly.")
214
215        # Opening mmap with access=ACCESS_COPY
216        with open(TESTFN, "r+b") as f:
217            m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_COPY)
218            # Modifying copy-on-write memory map
219            m[:] = b'd'*mapsize
220            self.assertEqual(m[:], b'd' * mapsize,
221                             "Copy-on-write memory map data not written correctly.")
222            m.flush()
223            with open(TESTFN, "rb") as fp:
224                self.assertEqual(fp.read(), b'c'*mapsize,
225                                 "Copy-on-write test data file should not be modified.")
226            # Ensuring copy-on-write maps cannot be resized
227            self.assertRaises(TypeError, m.resize, 2*mapsize)
228            m.close()
229
230        # Ensuring invalid access parameter raises exception
231        with open(TESTFN, "r+b") as f:
232            self.assertRaises(ValueError, mmap.mmap, f.fileno(), mapsize, access=4)
233
234        if os.name == "posix":
235            # Try incompatible flags, prot and access parameters.
236            with open(TESTFN, "r+b") as f:
237                self.assertRaises(ValueError, mmap.mmap, f.fileno(), mapsize,
238                                  flags=mmap.MAP_PRIVATE,
239                                  prot=mmap.PROT_READ, access=mmap.ACCESS_WRITE)
240
241            # Try writing with PROT_EXEC and without PROT_WRITE
242            prot = mmap.PROT_READ | getattr(mmap, 'PROT_EXEC', 0)
243            with open(TESTFN, "r+b") as f:
244                m = mmap.mmap(f.fileno(), mapsize, prot=prot)
245                self.assertRaises(TypeError, m.write, b"abcdef")
246                self.assertRaises(TypeError, m.write_byte, 0)
247                m.close()
248
249    def test_bad_file_desc(self):
250        # Try opening a bad file descriptor...
251        self.assertRaises(OSError, mmap.mmap, -2, 4096)
252
253    def test_tougher_find(self):
254        # Do a tougher .find() test.  SF bug 515943 pointed out that, in 2.2,
255        # searching for data with embedded \0 bytes didn't work.
256        with open(TESTFN, 'wb+') as f:
257
258            data = b'aabaac\x00deef\x00\x00aa\x00'
259            n = len(data)
260            f.write(data)
261            f.flush()
262            m = mmap.mmap(f.fileno(), n)
263
264        for start in range(n+1):
265            for finish in range(start, n+1):
266                slice = data[start : finish]
267                self.assertEqual(m.find(slice), data.find(slice))
268                self.assertEqual(m.find(slice + b'x'), -1)
269        m.close()
270
271    def test_find_end(self):
272        # test the new 'end' parameter works as expected
273        with open(TESTFN, 'wb+') as f:
274            data = b'one two ones'
275            n = len(data)
276            f.write(data)
277            f.flush()
278            m = mmap.mmap(f.fileno(), n)
279
280        self.assertEqual(m.find(b'one'), 0)
281        self.assertEqual(m.find(b'ones'), 8)
282        self.assertEqual(m.find(b'one', 0, -1), 0)
283        self.assertEqual(m.find(b'one', 1), 8)
284        self.assertEqual(m.find(b'one', 1, -1), 8)
285        self.assertEqual(m.find(b'one', 1, -2), -1)
286        self.assertEqual(m.find(bytearray(b'one')), 0)
287
288
289    def test_rfind(self):
290        # test the new 'end' parameter works as expected
291        with open(TESTFN, 'wb+') as f:
292            data = b'one two ones'
293            n = len(data)
294            f.write(data)
295            f.flush()
296            m = mmap.mmap(f.fileno(), n)
297
298        self.assertEqual(m.rfind(b'one'), 8)
299        self.assertEqual(m.rfind(b'one '), 0)
300        self.assertEqual(m.rfind(b'one', 0, -1), 8)
301        self.assertEqual(m.rfind(b'one', 0, -2), 0)
302        self.assertEqual(m.rfind(b'one', 1, -1), 8)
303        self.assertEqual(m.rfind(b'one', 1, -2), -1)
304        self.assertEqual(m.rfind(bytearray(b'one')), 8)
305
306
307    def test_double_close(self):
308        # make sure a double close doesn't crash on Solaris (Bug# 665913)
309        with open(TESTFN, 'wb+') as f:
310            f.write(2**16 * b'a') # Arbitrary character
311
312        with open(TESTFN, 'rb') as f:
313            mf = mmap.mmap(f.fileno(), 2**16, access=mmap.ACCESS_READ)
314            mf.close()
315            mf.close()
316
317    def test_entire_file(self):
318        # test mapping of entire file by passing 0 for map length
319        with open(TESTFN, "wb+") as f:
320            f.write(2**16 * b'm') # Arbitrary character
321
322        with open(TESTFN, "rb+") as f, \
323             mmap.mmap(f.fileno(), 0) as mf:
324            self.assertEqual(len(mf), 2**16, "Map size should equal file size.")
325            self.assertEqual(mf.read(2**16), 2**16 * b"m")
326
327    def test_length_0_offset(self):
328        # Issue #10916: test mapping of remainder of file by passing 0 for
329        # map length with an offset doesn't cause a segfault.
330        # NOTE: allocation granularity is currently 65536 under Win64,
331        # and therefore the minimum offset alignment.
332        with open(TESTFN, "wb") as f:
333            f.write((65536 * 2) * b'm') # Arbitrary character
334
335        with open(TESTFN, "rb") as f:
336            with mmap.mmap(f.fileno(), 0, offset=65536, access=mmap.ACCESS_READ) as mf:
337                self.assertRaises(IndexError, mf.__getitem__, 80000)
338
339    def test_length_0_large_offset(self):
340        # Issue #10959: test mapping of a file by passing 0 for
341        # map length with a large offset doesn't cause a segfault.
342        with open(TESTFN, "wb") as f:
343            f.write(115699 * b'm') # Arbitrary character
344
345        with open(TESTFN, "w+b") as f:
346            self.assertRaises(ValueError, mmap.mmap, f.fileno(), 0,
347                              offset=2147418112)
348
349    def test_move(self):
350        # make move works everywhere (64-bit format problem earlier)
351        with open(TESTFN, 'wb+') as f:
352
353            f.write(b"ABCDEabcde") # Arbitrary character
354            f.flush()
355
356            mf = mmap.mmap(f.fileno(), 10)
357            mf.move(5, 0, 5)
358            self.assertEqual(mf[:], b"ABCDEABCDE", "Map move should have duplicated front 5")
359            mf.close()
360
361        # more excessive test
362        data = b"0123456789"
363        for dest in range(len(data)):
364            for src in range(len(data)):
365                for count in range(len(data) - max(dest, src)):
366                    expected = data[:dest] + data[src:src+count] + data[dest+count:]
367                    m = mmap.mmap(-1, len(data))
368                    m[:] = data
369                    m.move(dest, src, count)
370                    self.assertEqual(m[:], expected)
371                    m.close()
372
373        # segfault test (Issue 5387)
374        m = mmap.mmap(-1, 100)
375        offsets = [-100, -1, 0, 1, 100]
376        for source, dest, size in itertools.product(offsets, offsets, offsets):
377            try:
378                m.move(source, dest, size)
379            except ValueError:
380                pass
381
382        offsets = [(-1, -1, -1), (-1, -1, 0), (-1, 0, -1), (0, -1, -1),
383                   (-1, 0, 0), (0, -1, 0), (0, 0, -1)]
384        for source, dest, size in offsets:
385            self.assertRaises(ValueError, m.move, source, dest, size)
386
387        m.close()
388
389        m = mmap.mmap(-1, 1) # single byte
390        self.assertRaises(ValueError, m.move, 0, 0, 2)
391        self.assertRaises(ValueError, m.move, 1, 0, 1)
392        self.assertRaises(ValueError, m.move, 0, 1, 1)
393        m.move(0, 0, 1)
394        m.move(0, 0, 0)
395
396
397    def test_anonymous(self):
398        # anonymous mmap.mmap(-1, PAGE)
399        m = mmap.mmap(-1, PAGESIZE)
400        for x in range(PAGESIZE):
401            self.assertEqual(m[x], 0,
402                             "anonymously mmap'ed contents should be zero")
403
404        for x in range(PAGESIZE):
405            b = x & 0xff
406            m[x] = b
407            self.assertEqual(m[x], b)
408
409    def test_read_all(self):
410        m = mmap.mmap(-1, 16)
411        self.addCleanup(m.close)
412
413        # With no parameters, or None or a negative argument, reads all
414        m.write(bytes(range(16)))
415        m.seek(0)
416        self.assertEqual(m.read(), bytes(range(16)))
417        m.seek(8)
418        self.assertEqual(m.read(), bytes(range(8, 16)))
419        m.seek(16)
420        self.assertEqual(m.read(), b'')
421        m.seek(3)
422        self.assertEqual(m.read(None), bytes(range(3, 16)))
423        m.seek(4)
424        self.assertEqual(m.read(-1), bytes(range(4, 16)))
425        m.seek(5)
426        self.assertEqual(m.read(-2), bytes(range(5, 16)))
427        m.seek(9)
428        self.assertEqual(m.read(-42), bytes(range(9, 16)))
429
430    def test_read_invalid_arg(self):
431        m = mmap.mmap(-1, 16)
432        self.addCleanup(m.close)
433
434        self.assertRaises(TypeError, m.read, 'foo')
435        self.assertRaises(TypeError, m.read, 5.5)
436        self.assertRaises(TypeError, m.read, [1, 2, 3])
437
438    def test_extended_getslice(self):
439        # Test extended slicing by comparing with list slicing.
440        s = bytes(reversed(range(256)))
441        m = mmap.mmap(-1, len(s))
442        m[:] = s
443        self.assertEqual(m[:], s)
444        indices = (0, None, 1, 3, 19, 300, sys.maxsize, -1, -2, -31, -300)
445        for start in indices:
446            for stop in indices:
447                # Skip step 0 (invalid)
448                for step in indices[1:]:
449                    self.assertEqual(m[start:stop:step],
450                                     s[start:stop:step])
451
452    def test_extended_set_del_slice(self):
453        # Test extended slicing by comparing with list slicing.
454        s = bytes(reversed(range(256)))
455        m = mmap.mmap(-1, len(s))
456        indices = (0, None, 1, 3, 19, 300, sys.maxsize, -1, -2, -31, -300)
457        for start in indices:
458            for stop in indices:
459                # Skip invalid step 0
460                for step in indices[1:]:
461                    m[:] = s
462                    self.assertEqual(m[:], s)
463                    L = list(s)
464                    # Make sure we have a slice of exactly the right length,
465                    # but with different data.
466                    data = L[start:stop:step]
467                    data = bytes(reversed(data))
468                    L[start:stop:step] = data
469                    m[start:stop:step] = data
470                    self.assertEqual(m[:], bytes(L))
471
472    def make_mmap_file (self, f, halfsize):
473        # Write 2 pages worth of data to the file
474        f.write (b'\0' * halfsize)
475        f.write (b'foo')
476        f.write (b'\0' * (halfsize - 3))
477        f.flush ()
478        return mmap.mmap (f.fileno(), 0)
479
480    def test_empty_file (self):
481        f = open (TESTFN, 'w+b')
482        f.close()
483        with open(TESTFN, "rb") as f :
484            self.assertRaisesRegex(ValueError,
485                                   "cannot mmap an empty file",
486                                   mmap.mmap, f.fileno(), 0,
487                                   access=mmap.ACCESS_READ)
488
489    def test_offset (self):
490        f = open (TESTFN, 'w+b')
491
492        try: # unlink TESTFN no matter what
493            halfsize = mmap.ALLOCATIONGRANULARITY
494            m = self.make_mmap_file (f, halfsize)
495            m.close ()
496            f.close ()
497
498            mapsize = halfsize * 2
499            # Try invalid offset
500            f = open(TESTFN, "r+b")
501            for offset in [-2, -1, None]:
502                try:
503                    m = mmap.mmap(f.fileno(), mapsize, offset=offset)
504                    self.assertEqual(0, 1)
505                except (ValueError, TypeError, OverflowError):
506                    pass
507                else:
508                    self.assertEqual(0, 0)
509            f.close()
510
511            # Try valid offset, hopefully 8192 works on all OSes
512            f = open(TESTFN, "r+b")
513            m = mmap.mmap(f.fileno(), mapsize - halfsize, offset=halfsize)
514            self.assertEqual(m[0:3], b'foo')
515            f.close()
516
517            # Try resizing map
518            try:
519                m.resize(512)
520            except SystemError:
521                pass
522            else:
523                # resize() is supported
524                self.assertEqual(len(m), 512)
525                # Check that we can no longer seek beyond the new size.
526                self.assertRaises(ValueError, m.seek, 513, 0)
527                # Check that the content is not changed
528                self.assertEqual(m[0:3], b'foo')
529
530                # Check that the underlying file is truncated too
531                f = open(TESTFN, 'rb')
532                f.seek(0, 2)
533                self.assertEqual(f.tell(), halfsize + 512)
534                f.close()
535                self.assertEqual(m.size(), halfsize + 512)
536
537            m.close()
538
539        finally:
540            f.close()
541            try:
542                os.unlink(TESTFN)
543            except OSError:
544                pass
545
546    def test_subclass(self):
547        class anon_mmap(mmap.mmap):
548            def __new__(klass, *args, **kwargs):
549                return mmap.mmap.__new__(klass, -1, *args, **kwargs)
550        anon_mmap(PAGESIZE)
551
552    @unittest.skipUnless(hasattr(mmap, 'PROT_READ'), "needs mmap.PROT_READ")
553    def test_prot_readonly(self):
554        mapsize = 10
555        with open(TESTFN, "wb") as fp:
556            fp.write(b"a"*mapsize)
557        with open(TESTFN, "rb") as f:
558            m = mmap.mmap(f.fileno(), mapsize, prot=mmap.PROT_READ)
559            self.assertRaises(TypeError, m.write, "foo")
560
561    def test_error(self):
562        self.assertIs(mmap.error, OSError)
563
564    def test_io_methods(self):
565        data = b"0123456789"
566        with open(TESTFN, "wb") as fp:
567            fp.write(b"x"*len(data))
568        with open(TESTFN, "r+b") as f:
569            m = mmap.mmap(f.fileno(), len(data))
570        # Test write_byte()
571        for i in range(len(data)):
572            self.assertEqual(m.tell(), i)
573            m.write_byte(data[i])
574            self.assertEqual(m.tell(), i+1)
575        self.assertRaises(ValueError, m.write_byte, b"x"[0])
576        self.assertEqual(m[:], data)
577        # Test read_byte()
578        m.seek(0)
579        for i in range(len(data)):
580            self.assertEqual(m.tell(), i)
581            self.assertEqual(m.read_byte(), data[i])
582            self.assertEqual(m.tell(), i+1)
583        self.assertRaises(ValueError, m.read_byte)
584        # Test read()
585        m.seek(3)
586        self.assertEqual(m.read(3), b"345")
587        self.assertEqual(m.tell(), 6)
588        # Test write()
589        m.seek(3)
590        m.write(b"bar")
591        self.assertEqual(m.tell(), 6)
592        self.assertEqual(m[:], b"012bar6789")
593        m.write(bytearray(b"baz"))
594        self.assertEqual(m.tell(), 9)
595        self.assertEqual(m[:], b"012barbaz9")
596        self.assertRaises(ValueError, m.write, b"ba")
597
598    def test_non_ascii_byte(self):
599        for b in (129, 200, 255): # > 128
600            m = mmap.mmap(-1, 1)
601            m.write_byte(b)
602            self.assertEqual(m[0], b)
603            m.seek(0)
604            self.assertEqual(m.read_byte(), b)
605            m.close()
606
607    @unittest.skipUnless(os.name == 'nt', 'requires Windows')
608    def test_tagname(self):
609        data1 = b"0123456789"
610        data2 = b"abcdefghij"
611        assert len(data1) == len(data2)
612
613        # Test same tag
614        m1 = mmap.mmap(-1, len(data1), tagname="foo")
615        m1[:] = data1
616        m2 = mmap.mmap(-1, len(data2), tagname="foo")
617        m2[:] = data2
618        self.assertEqual(m1[:], data2)
619        self.assertEqual(m2[:], data2)
620        m2.close()
621        m1.close()
622
623        # Test different tag
624        m1 = mmap.mmap(-1, len(data1), tagname="foo")
625        m1[:] = data1
626        m2 = mmap.mmap(-1, len(data2), tagname="boo")
627        m2[:] = data2
628        self.assertEqual(m1[:], data1)
629        self.assertEqual(m2[:], data2)
630        m2.close()
631        m1.close()
632
633    @cpython_only
634    @unittest.skipUnless(os.name == 'nt', 'requires Windows')
635    def test_sizeof(self):
636        m1 = mmap.mmap(-1, 100)
637        tagname = "foo"
638        m2 = mmap.mmap(-1, 100, tagname=tagname)
639        self.assertEqual(sys.getsizeof(m2),
640                         sys.getsizeof(m1) + len(tagname) + 1)
641
642    @unittest.skipUnless(os.name == 'nt', 'requires Windows')
643    def test_crasher_on_windows(self):
644        # Should not crash (Issue 1733986)
645        m = mmap.mmap(-1, 1000, tagname="foo")
646        try:
647            mmap.mmap(-1, 5000, tagname="foo")[:] # same tagname, but larger size
648        except:
649            pass
650        m.close()
651
652        # Should not crash (Issue 5385)
653        with open(TESTFN, "wb") as fp:
654            fp.write(b"x"*10)
655        f = open(TESTFN, "r+b")
656        m = mmap.mmap(f.fileno(), 0)
657        f.close()
658        try:
659            m.resize(0) # will raise OSError
660        except:
661            pass
662        try:
663            m[:]
664        except:
665            pass
666        m.close()
667
668    @unittest.skipUnless(os.name == 'nt', 'requires Windows')
669    def test_invalid_descriptor(self):
670        # socket file descriptors are valid, but out of range
671        # for _get_osfhandle, causing a crash when validating the
672        # parameters to _get_osfhandle.
673        s = socket.socket()
674        try:
675            with self.assertRaises(OSError):
676                m = mmap.mmap(s.fileno(), 10)
677        finally:
678            s.close()
679
680    def test_context_manager(self):
681        with mmap.mmap(-1, 10) as m:
682            self.assertFalse(m.closed)
683        self.assertTrue(m.closed)
684
685    def test_context_manager_exception(self):
686        # Test that the OSError gets passed through
687        with self.assertRaises(Exception) as exc:
688            with mmap.mmap(-1, 10) as m:
689                raise OSError
690        self.assertIsInstance(exc.exception, OSError,
691                              "wrong exception raised in context manager")
692        self.assertTrue(m.closed, "context manager failed")
693
694    def test_weakref(self):
695        # Check mmap objects are weakrefable
696        mm = mmap.mmap(-1, 16)
697        wr = weakref.ref(mm)
698        self.assertIs(wr(), mm)
699        del mm
700        gc_collect()
701        self.assertIs(wr(), None)
702
703    def test_write_returning_the_number_of_bytes_written(self):
704        mm = mmap.mmap(-1, 16)
705        self.assertEqual(mm.write(b""), 0)
706        self.assertEqual(mm.write(b"x"), 1)
707        self.assertEqual(mm.write(b"yz"), 2)
708        self.assertEqual(mm.write(b"python"), 6)
709
710    @unittest.skipIf(os.name == 'nt', 'cannot resize anonymous mmaps on Windows')
711    def test_resize_past_pos(self):
712        m = mmap.mmap(-1, 8192)
713        self.addCleanup(m.close)
714        m.read(5000)
715        try:
716            m.resize(4096)
717        except SystemError:
718            self.skipTest("resizing not supported")
719        self.assertEqual(m.read(14), b'')
720        self.assertRaises(ValueError, m.read_byte)
721        self.assertRaises(ValueError, m.write_byte, 42)
722        self.assertRaises(ValueError, m.write, b'abc')
723
724    def test_concat_repeat_exception(self):
725        m = mmap.mmap(-1, 16)
726        with self.assertRaises(TypeError):
727            m + m
728        with self.assertRaises(TypeError):
729            m * 2
730
731    def test_flush_return_value(self):
732        # mm.flush() should return None on success, raise an
733        # exception on error under all platforms.
734        mm = mmap.mmap(-1, 16)
735        self.addCleanup(mm.close)
736        mm.write(b'python')
737        result = mm.flush()
738        self.assertIsNone(result)
739        if sys.platform.startswith('linux'):
740            # 'offset' must be a multiple of mmap.PAGESIZE on Linux.
741            # See bpo-34754 for details.
742            self.assertRaises(OSError, mm.flush, 1, len(b'python'))
743
744    def test_repr(self):
745        open_mmap_repr_pat = re.compile(
746            r"<mmap.mmap closed=False, "
747            r"access=(?P<access>\S+), "
748            r"length=(?P<length>\d+), "
749            r"pos=(?P<pos>\d+), "
750            r"offset=(?P<offset>\d+)>")
751        closed_mmap_repr_pat = re.compile(r"<mmap.mmap closed=True>")
752        mapsizes = (50, 100, 1_000, 1_000_000, 10_000_000)
753        offsets = tuple((mapsize // 2 // mmap.ALLOCATIONGRANULARITY)
754                        * mmap.ALLOCATIONGRANULARITY for mapsize in mapsizes)
755        for offset, mapsize in zip(offsets, mapsizes):
756            data = b'a' * mapsize
757            length = mapsize - offset
758            accesses = ('ACCESS_DEFAULT', 'ACCESS_READ',
759                        'ACCESS_COPY', 'ACCESS_WRITE')
760            positions = (0, length//10, length//5, length//4)
761            with open(TESTFN, "wb+") as fp:
762                fp.write(data)
763                fp.flush()
764                for access, pos in itertools.product(accesses, positions):
765                    accint = getattr(mmap, access)
766                    with mmap.mmap(fp.fileno(),
767                                   length,
768                                   access=accint,
769                                   offset=offset) as mm:
770                        mm.seek(pos)
771                        match = open_mmap_repr_pat.match(repr(mm))
772                        self.assertIsNotNone(match)
773                        self.assertEqual(match.group('access'), access)
774                        self.assertEqual(match.group('length'), str(length))
775                        self.assertEqual(match.group('pos'), str(pos))
776                        self.assertEqual(match.group('offset'), str(offset))
777                    match = closed_mmap_repr_pat.match(repr(mm))
778                    self.assertIsNotNone(match)
779
780    @unittest.skipUnless(hasattr(mmap.mmap, 'madvise'), 'needs madvise')
781    def test_madvise(self):
782        size = 2 * PAGESIZE
783        m = mmap.mmap(-1, size)
784
785        with self.assertRaisesRegex(ValueError, "madvise start out of bounds"):
786            m.madvise(mmap.MADV_NORMAL, size)
787        with self.assertRaisesRegex(ValueError, "madvise start out of bounds"):
788            m.madvise(mmap.MADV_NORMAL, -1)
789        with self.assertRaisesRegex(ValueError, "madvise length invalid"):
790            m.madvise(mmap.MADV_NORMAL, 0, -1)
791        with self.assertRaisesRegex(OverflowError, "madvise length too large"):
792            m.madvise(mmap.MADV_NORMAL, PAGESIZE, sys.maxsize)
793        self.assertEqual(m.madvise(mmap.MADV_NORMAL), None)
794        self.assertEqual(m.madvise(mmap.MADV_NORMAL, PAGESIZE), None)
795        self.assertEqual(m.madvise(mmap.MADV_NORMAL, PAGESIZE, size), None)
796        self.assertEqual(m.madvise(mmap.MADV_NORMAL, 0, 2), None)
797        self.assertEqual(m.madvise(mmap.MADV_NORMAL, 0, size), None)
798
799
800class LargeMmapTests(unittest.TestCase):
801
802    def setUp(self):
803        unlink(TESTFN)
804
805    def tearDown(self):
806        unlink(TESTFN)
807
808    def _make_test_file(self, num_zeroes, tail):
809        if sys.platform[:3] == 'win' or sys.platform == 'darwin':
810            requires('largefile',
811                'test requires %s bytes and a long time to run' % str(0x180000000))
812        f = open(TESTFN, 'w+b')
813        try:
814            f.seek(num_zeroes)
815            f.write(tail)
816            f.flush()
817        except (OSError, OverflowError, ValueError):
818            try:
819                f.close()
820            except (OSError, OverflowError):
821                pass
822            raise unittest.SkipTest("filesystem does not have largefile support")
823        return f
824
825    def test_large_offset(self):
826        with self._make_test_file(0x14FFFFFFF, b" ") as f:
827            with mmap.mmap(f.fileno(), 0, offset=0x140000000, access=mmap.ACCESS_READ) as m:
828                self.assertEqual(m[0xFFFFFFF], 32)
829
830    def test_large_filesize(self):
831        with self._make_test_file(0x17FFFFFFF, b" ") as f:
832            if sys.maxsize < 0x180000000:
833                # On 32 bit platforms the file is larger than sys.maxsize so
834                # mapping the whole file should fail -- Issue #16743
835                with self.assertRaises(OverflowError):
836                    mmap.mmap(f.fileno(), 0x180000000, access=mmap.ACCESS_READ)
837                with self.assertRaises(ValueError):
838                    mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
839            with mmap.mmap(f.fileno(), 0x10000, access=mmap.ACCESS_READ) as m:
840                self.assertEqual(m.size(), 0x180000000)
841
842    # Issue 11277: mmap() with large (~4 GiB) sparse files crashes on OS X.
843
844    def _test_around_boundary(self, boundary):
845        tail = b'  DEARdear  '
846        start = boundary - len(tail) // 2
847        end = start + len(tail)
848        with self._make_test_file(start, tail) as f:
849            with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as m:
850                self.assertEqual(m[start:end], tail)
851
852    @unittest.skipUnless(sys.maxsize > _4G, "test cannot run on 32-bit systems")
853    def test_around_2GB(self):
854        self._test_around_boundary(_2G)
855
856    @unittest.skipUnless(sys.maxsize > _4G, "test cannot run on 32-bit systems")
857    def test_around_4GB(self):
858        self._test_around_boundary(_4G)
859
860
861if __name__ == '__main__':
862    unittest.main()
863