1# 2# Module which supports allocation of memory from an mmap 3# 4# multiprocessing/heap.py 5# 6# Copyright (c) 2006-2008, R Oudkerk 7# Licensed to PSF under a Contributor Agreement. 8# 9 10import bisect 11import mmap 12import os 13import sys 14import tempfile 15import threading 16 17from .context import reduction, assert_spawning 18from . import util 19 20__all__ = ['BufferWrapper'] 21 22# 23# Inheritable class which wraps an mmap, and from which blocks can be allocated 24# 25 26if sys.platform == 'win32': 27 28 import _winapi 29 30 class Arena(object): 31 32 _rand = tempfile._RandomNameSequence() 33 34 def __init__(self, size): 35 self.size = size 36 for i in range(100): 37 name = 'pym-%d-%s' % (os.getpid(), next(self._rand)) 38 buf = mmap.mmap(-1, size, tagname=name) 39 if _winapi.GetLastError() == 0: 40 break 41 # We have reopened a preexisting mmap. 42 buf.close() 43 else: 44 raise FileExistsError('Cannot find name for new mmap') 45 self.name = name 46 self.buffer = buf 47 self._state = (self.size, self.name) 48 49 def __getstate__(self): 50 assert_spawning(self) 51 return self._state 52 53 def __setstate__(self, state): 54 self.size, self.name = self._state = state 55 self.buffer = mmap.mmap(-1, self.size, tagname=self.name) 56 # XXX Temporarily preventing buildbot failures while determining 57 # XXX the correct long-term fix. See issue 23060 58 #assert _winapi.GetLastError() == _winapi.ERROR_ALREADY_EXISTS 59 60else: 61 62 class Arena(object): 63 64 def __init__(self, size, fd=-1): 65 self.size = size 66 self.fd = fd 67 if fd == -1: 68 self.fd, name = tempfile.mkstemp( 69 prefix='pym-%d-'%os.getpid(), dir=util.get_temp_dir()) 70 os.unlink(name) 71 util.Finalize(self, os.close, (self.fd,)) 72 with open(self.fd, 'wb', closefd=False) as f: 73 bs = 1024 * 1024 74 if size >= bs: 75 zeros = b'\0' * bs 76 for _ in range(size // bs): 77 f.write(zeros) 78 del zeros 79 f.write(b'\0' * (size % bs)) 80 assert f.tell() == size 81 self.buffer = mmap.mmap(self.fd, self.size) 82 83 def reduce_arena(a): 84 if a.fd == -1: 85 raise ValueError('Arena is unpicklable because ' 86 'forking was enabled when it was created') 87 return rebuild_arena, (a.size, reduction.DupFd(a.fd)) 88 89 def rebuild_arena(size, dupfd): 90 return Arena(size, dupfd.detach()) 91 92 reduction.register(Arena, reduce_arena) 93 94# 95# Class allowing allocation of chunks of memory from arenas 96# 97 98class Heap(object): 99 100 _alignment = 8 101 102 def __init__(self, size=mmap.PAGESIZE): 103 self._lastpid = os.getpid() 104 self._lock = threading.Lock() 105 self._size = size 106 self._lengths = [] 107 self._len_to_seq = {} 108 self._start_to_block = {} 109 self._stop_to_block = {} 110 self._allocated_blocks = set() 111 self._arenas = [] 112 # list of pending blocks to free - see free() comment below 113 self._pending_free_blocks = [] 114 115 @staticmethod 116 def _roundup(n, alignment): 117 # alignment must be a power of 2 118 mask = alignment - 1 119 return (n + mask) & ~mask 120 121 def _malloc(self, size): 122 # returns a large enough block -- it might be much larger 123 i = bisect.bisect_left(self._lengths, size) 124 if i == len(self._lengths): 125 length = self._roundup(max(self._size, size), mmap.PAGESIZE) 126 self._size *= 2 127 util.info('allocating a new mmap of length %d', length) 128 arena = Arena(length) 129 self._arenas.append(arena) 130 return (arena, 0, length) 131 else: 132 length = self._lengths[i] 133 seq = self._len_to_seq[length] 134 block = seq.pop() 135 if not seq: 136 del self._len_to_seq[length], self._lengths[i] 137 138 (arena, start, stop) = block 139 del self._start_to_block[(arena, start)] 140 del self._stop_to_block[(arena, stop)] 141 return block 142 143 def _free(self, block): 144 # free location and try to merge with neighbours 145 (arena, start, stop) = block 146 147 try: 148 prev_block = self._stop_to_block[(arena, start)] 149 except KeyError: 150 pass 151 else: 152 start, _ = self._absorb(prev_block) 153 154 try: 155 next_block = self._start_to_block[(arena, stop)] 156 except KeyError: 157 pass 158 else: 159 _, stop = self._absorb(next_block) 160 161 block = (arena, start, stop) 162 length = stop - start 163 164 try: 165 self._len_to_seq[length].append(block) 166 except KeyError: 167 self._len_to_seq[length] = [block] 168 bisect.insort(self._lengths, length) 169 170 self._start_to_block[(arena, start)] = block 171 self._stop_to_block[(arena, stop)] = block 172 173 def _absorb(self, block): 174 # deregister this block so it can be merged with a neighbour 175 (arena, start, stop) = block 176 del self._start_to_block[(arena, start)] 177 del self._stop_to_block[(arena, stop)] 178 179 length = stop - start 180 seq = self._len_to_seq[length] 181 seq.remove(block) 182 if not seq: 183 del self._len_to_seq[length] 184 self._lengths.remove(length) 185 186 return start, stop 187 188 def _free_pending_blocks(self): 189 # Free all the blocks in the pending list - called with the lock held. 190 while True: 191 try: 192 block = self._pending_free_blocks.pop() 193 except IndexError: 194 break 195 self._allocated_blocks.remove(block) 196 self._free(block) 197 198 def free(self, block): 199 # free a block returned by malloc() 200 # Since free() can be called asynchronously by the GC, it could happen 201 # that it's called while self._lock is held: in that case, 202 # self._lock.acquire() would deadlock (issue #12352). To avoid that, a 203 # trylock is used instead, and if the lock can't be acquired 204 # immediately, the block is added to a list of blocks to be freed 205 # synchronously sometimes later from malloc() or free(), by calling 206 # _free_pending_blocks() (appending and retrieving from a list is not 207 # strictly thread-safe but under cPython it's atomic thanks to the GIL). 208 assert os.getpid() == self._lastpid 209 if not self._lock.acquire(False): 210 # can't acquire the lock right now, add the block to the list of 211 # pending blocks to free 212 self._pending_free_blocks.append(block) 213 else: 214 # we hold the lock 215 try: 216 self._free_pending_blocks() 217 self._allocated_blocks.remove(block) 218 self._free(block) 219 finally: 220 self._lock.release() 221 222 def malloc(self, size): 223 # return a block of right size (possibly rounded up) 224 assert 0 <= size < sys.maxsize 225 if os.getpid() != self._lastpid: 226 self.__init__() # reinitialize after fork 227 with self._lock: 228 self._free_pending_blocks() 229 size = self._roundup(max(size,1), self._alignment) 230 (arena, start, stop) = self._malloc(size) 231 new_stop = start + size 232 if new_stop < stop: 233 self._free((arena, new_stop, stop)) 234 block = (arena, start, new_stop) 235 self._allocated_blocks.add(block) 236 return block 237 238# 239# Class representing a chunk of an mmap -- can be inherited by child process 240# 241 242class BufferWrapper(object): 243 244 _heap = Heap() 245 246 def __init__(self, size): 247 assert 0 <= size < sys.maxsize 248 block = BufferWrapper._heap.malloc(size) 249 self._state = (block, size) 250 util.Finalize(self, BufferWrapper._heap.free, args=(block,)) 251 252 def create_memoryview(self): 253 (arena, start, stop), size = self._state 254 return memoryview(arena.buffer)[start:start+size] 255