• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#
2# Module which supports allocation of memory from an mmap
3#
4# multiprocessing/heap.py
5#
6# Copyright (c) 2006-2008, R Oudkerk
7# Licensed to PSF under a Contributor Agreement.
8#
9
10import bisect
11from collections import defaultdict
12import mmap
13import os
14import sys
15import tempfile
16import threading
17
18from .context import reduction, assert_spawning
19from . import util
20
21__all__ = ['BufferWrapper']
22
23#
24# Inheritable class which wraps an mmap, and from which blocks can be allocated
25#
26
27if sys.platform == 'win32':
28
29    import _winapi
30
31    class Arena(object):
32        """
33        A shared memory area backed by anonymous memory (Windows).
34        """
35
36        _rand = tempfile._RandomNameSequence()
37
38        def __init__(self, size):
39            self.size = size
40            for i in range(100):
41                name = 'pym-%d-%s' % (os.getpid(), next(self._rand))
42                buf = mmap.mmap(-1, size, tagname=name)
43                if _winapi.GetLastError() == 0:
44                    break
45                # We have reopened a preexisting mmap.
46                buf.close()
47            else:
48                raise FileExistsError('Cannot find name for new mmap')
49            self.name = name
50            self.buffer = buf
51            self._state = (self.size, self.name)
52
53        def __getstate__(self):
54            assert_spawning(self)
55            return self._state
56
57        def __setstate__(self, state):
58            self.size, self.name = self._state = state
59            # Reopen existing mmap
60            self.buffer = mmap.mmap(-1, self.size, tagname=self.name)
61            # XXX Temporarily preventing buildbot failures while determining
62            # XXX the correct long-term fix. See issue 23060
63            #assert _winapi.GetLastError() == _winapi.ERROR_ALREADY_EXISTS
64
65else:
66
67    class Arena(object):
68        """
69        A shared memory area backed by a temporary file (POSIX).
70        """
71
72        if sys.platform == 'linux':
73            _dir_candidates = ['/dev/shm']
74        else:
75            _dir_candidates = []
76
77        def __init__(self, size, fd=-1):
78            self.size = size
79            self.fd = fd
80            if fd == -1:
81                # Arena is created anew (if fd != -1, it means we're coming
82                # from rebuild_arena() below)
83                self.fd, name = tempfile.mkstemp(
84                     prefix='pym-%d-'%os.getpid(),
85                     dir=self._choose_dir(size))
86                os.unlink(name)
87                util.Finalize(self, os.close, (self.fd,))
88                os.ftruncate(self.fd, size)
89            self.buffer = mmap.mmap(self.fd, self.size)
90
91        def _choose_dir(self, size):
92            # Choose a non-storage backed directory if possible,
93            # to improve performance
94            for d in self._dir_candidates:
95                st = os.statvfs(d)
96                if st.f_bavail * st.f_frsize >= size:  # enough free space?
97                    return d
98            return util.get_temp_dir()
99
100    def reduce_arena(a):
101        if a.fd == -1:
102            raise ValueError('Arena is unpicklable because '
103                             'forking was enabled when it was created')
104        return rebuild_arena, (a.size, reduction.DupFd(a.fd))
105
106    def rebuild_arena(size, dupfd):
107        return Arena(size, dupfd.detach())
108
109    reduction.register(Arena, reduce_arena)
110
111#
112# Class allowing allocation of chunks of memory from arenas
113#
114
115class Heap(object):
116
117    # Minimum malloc() alignment
118    _alignment = 8
119
120    _DISCARD_FREE_SPACE_LARGER_THAN = 4 * 1024 ** 2  # 4 MB
121    _DOUBLE_ARENA_SIZE_UNTIL = 4 * 1024 ** 2
122
123    def __init__(self, size=mmap.PAGESIZE):
124        self._lastpid = os.getpid()
125        self._lock = threading.Lock()
126        # Current arena allocation size
127        self._size = size
128        # A sorted list of available block sizes in arenas
129        self._lengths = []
130
131        # Free block management:
132        # - map each block size to a list of `(Arena, start, stop)` blocks
133        self._len_to_seq = {}
134        # - map `(Arena, start)` tuple to the `(Arena, start, stop)` block
135        #   starting at that offset
136        self._start_to_block = {}
137        # - map `(Arena, stop)` tuple to the `(Arena, start, stop)` block
138        #   ending at that offset
139        self._stop_to_block = {}
140
141        # Map arenas to their `(Arena, start, stop)` blocks in use
142        self._allocated_blocks = defaultdict(set)
143        self._arenas = []
144
145        # List of pending blocks to free - see comment in free() below
146        self._pending_free_blocks = []
147
148        # Statistics
149        self._n_mallocs = 0
150        self._n_frees = 0
151
152    @staticmethod
153    def _roundup(n, alignment):
154        # alignment must be a power of 2
155        mask = alignment - 1
156        return (n + mask) & ~mask
157
158    def _new_arena(self, size):
159        # Create a new arena with at least the given *size*
160        length = self._roundup(max(self._size, size), mmap.PAGESIZE)
161        # We carve larger and larger arenas, for efficiency, until we
162        # reach a large-ish size (roughly L3 cache-sized)
163        if self._size < self._DOUBLE_ARENA_SIZE_UNTIL:
164            self._size *= 2
165        util.info('allocating a new mmap of length %d', length)
166        arena = Arena(length)
167        self._arenas.append(arena)
168        return (arena, 0, length)
169
170    def _discard_arena(self, arena):
171        # Possibly delete the given (unused) arena
172        length = arena.size
173        # Reusing an existing arena is faster than creating a new one, so
174        # we only reclaim space if it's large enough.
175        if length < self._DISCARD_FREE_SPACE_LARGER_THAN:
176            return
177        blocks = self._allocated_blocks.pop(arena)
178        assert not blocks
179        del self._start_to_block[(arena, 0)]
180        del self._stop_to_block[(arena, length)]
181        self._arenas.remove(arena)
182        seq = self._len_to_seq[length]
183        seq.remove((arena, 0, length))
184        if not seq:
185            del self._len_to_seq[length]
186            self._lengths.remove(length)
187
188    def _malloc(self, size):
189        # returns a large enough block -- it might be much larger
190        i = bisect.bisect_left(self._lengths, size)
191        if i == len(self._lengths):
192            return self._new_arena(size)
193        else:
194            length = self._lengths[i]
195            seq = self._len_to_seq[length]
196            block = seq.pop()
197            if not seq:
198                del self._len_to_seq[length], self._lengths[i]
199
200        (arena, start, stop) = block
201        del self._start_to_block[(arena, start)]
202        del self._stop_to_block[(arena, stop)]
203        return block
204
205    def _add_free_block(self, block):
206        # make block available and try to merge with its neighbours in the arena
207        (arena, start, stop) = block
208
209        try:
210            prev_block = self._stop_to_block[(arena, start)]
211        except KeyError:
212            pass
213        else:
214            start, _ = self._absorb(prev_block)
215
216        try:
217            next_block = self._start_to_block[(arena, stop)]
218        except KeyError:
219            pass
220        else:
221            _, stop = self._absorb(next_block)
222
223        block = (arena, start, stop)
224        length = stop - start
225
226        try:
227            self._len_to_seq[length].append(block)
228        except KeyError:
229            self._len_to_seq[length] = [block]
230            bisect.insort(self._lengths, length)
231
232        self._start_to_block[(arena, start)] = block
233        self._stop_to_block[(arena, stop)] = block
234
235    def _absorb(self, block):
236        # deregister this block so it can be merged with a neighbour
237        (arena, start, stop) = block
238        del self._start_to_block[(arena, start)]
239        del self._stop_to_block[(arena, stop)]
240
241        length = stop - start
242        seq = self._len_to_seq[length]
243        seq.remove(block)
244        if not seq:
245            del self._len_to_seq[length]
246            self._lengths.remove(length)
247
248        return start, stop
249
250    def _remove_allocated_block(self, block):
251        arena, start, stop = block
252        blocks = self._allocated_blocks[arena]
253        blocks.remove((start, stop))
254        if not blocks:
255            # Arena is entirely free, discard it from this process
256            self._discard_arena(arena)
257
258    def _free_pending_blocks(self):
259        # Free all the blocks in the pending list - called with the lock held.
260        while True:
261            try:
262                block = self._pending_free_blocks.pop()
263            except IndexError:
264                break
265            self._add_free_block(block)
266            self._remove_allocated_block(block)
267
268    def free(self, block):
269        # free a block returned by malloc()
270        # Since free() can be called asynchronously by the GC, it could happen
271        # that it's called while self._lock is held: in that case,
272        # self._lock.acquire() would deadlock (issue #12352). To avoid that, a
273        # trylock is used instead, and if the lock can't be acquired
274        # immediately, the block is added to a list of blocks to be freed
275        # synchronously sometimes later from malloc() or free(), by calling
276        # _free_pending_blocks() (appending and retrieving from a list is not
277        # strictly thread-safe but under CPython it's atomic thanks to the GIL).
278        if os.getpid() != self._lastpid:
279            raise ValueError(
280                "My pid ({0:n}) is not last pid {1:n}".format(
281                    os.getpid(),self._lastpid))
282        if not self._lock.acquire(False):
283            # can't acquire the lock right now, add the block to the list of
284            # pending blocks to free
285            self._pending_free_blocks.append(block)
286        else:
287            # we hold the lock
288            try:
289                self._n_frees += 1
290                self._free_pending_blocks()
291                self._add_free_block(block)
292                self._remove_allocated_block(block)
293            finally:
294                self._lock.release()
295
296    def malloc(self, size):
297        # return a block of right size (possibly rounded up)
298        if size < 0:
299            raise ValueError("Size {0:n} out of range".format(size))
300        if sys.maxsize <= size:
301            raise OverflowError("Size {0:n} too large".format(size))
302        if os.getpid() != self._lastpid:
303            self.__init__()                     # reinitialize after fork
304        with self._lock:
305            self._n_mallocs += 1
306            # allow pending blocks to be marked available
307            self._free_pending_blocks()
308            size = self._roundup(max(size, 1), self._alignment)
309            (arena, start, stop) = self._malloc(size)
310            real_stop = start + size
311            if real_stop < stop:
312                # if the returned block is larger than necessary, mark
313                # the remainder available
314                self._add_free_block((arena, real_stop, stop))
315            self._allocated_blocks[arena].add((start, real_stop))
316            return (arena, start, real_stop)
317
318#
319# Class wrapping a block allocated out of a Heap -- can be inherited by child process
320#
321
322class BufferWrapper(object):
323
324    _heap = Heap()
325
326    def __init__(self, size):
327        if size < 0:
328            raise ValueError("Size {0:n} out of range".format(size))
329        if sys.maxsize <= size:
330            raise OverflowError("Size {0:n} too large".format(size))
331        block = BufferWrapper._heap.malloc(size)
332        self._state = (block, size)
333        util.Finalize(self, BufferWrapper._heap.free, args=(block,))
334
335    def create_memoryview(self):
336        (arena, start, stop), size = self._state
337        return memoryview(arena.buffer)[start:start+size]
338