1# 2# Module which supports allocation of memory from an mmap 3# 4# multiprocessing/heap.py 5# 6# Copyright (c) 2006-2008, R Oudkerk 7# Licensed to PSF under a Contributor Agreement. 8# 9 10import bisect 11import mmap 12import os 13import sys 14import tempfile 15import threading 16 17from .context import reduction, assert_spawning 18from . import util 19 20__all__ = ['BufferWrapper'] 21 22# 23# Inheritable class which wraps an mmap, and from which blocks can be allocated 24# 25 26if sys.platform == 'win32': 27 28 import _winapi 29 30 class Arena(object): 31 32 _rand = tempfile._RandomNameSequence() 33 34 def __init__(self, size): 35 self.size = size 36 for i in range(100): 37 name = 'pym-%d-%s' % (os.getpid(), next(self._rand)) 38 buf = mmap.mmap(-1, size, tagname=name) 39 if _winapi.GetLastError() == 0: 40 break 41 # We have reopened a preexisting mmap. 42 buf.close() 43 else: 44 raise FileExistsError('Cannot find name for new mmap') 45 self.name = name 46 self.buffer = buf 47 self._state = (self.size, self.name) 48 49 def __getstate__(self): 50 assert_spawning(self) 51 return self._state 52 53 def __setstate__(self, state): 54 self.size, self.name = self._state = state 55 self.buffer = mmap.mmap(-1, self.size, tagname=self.name) 56 # XXX Temporarily preventing buildbot failures while determining 57 # XXX the correct long-term fix. See issue 23060 58 #assert _winapi.GetLastError() == _winapi.ERROR_ALREADY_EXISTS 59 60else: 61 62 class Arena(object): 63 if sys.platform == 'linux': 64 _dir_candidates = ['/dev/shm'] 65 else: 66 _dir_candidates = [] 67 68 def __init__(self, size, fd=-1): 69 self.size = size 70 self.fd = fd 71 if fd == -1: 72 self.fd, name = tempfile.mkstemp( 73 prefix='pym-%d-'%os.getpid(), 74 dir=self._choose_dir(size)) 75 os.unlink(name) 76 util.Finalize(self, os.close, (self.fd,)) 77 os.ftruncate(self.fd, size) 78 self.buffer = mmap.mmap(self.fd, self.size) 79 80 def _choose_dir(self, size): 81 # Choose a non-storage backed directory if possible, 82 # to improve performance 83 for d in self._dir_candidates: 84 st = os.statvfs(d) 85 if st.f_bavail * st.f_frsize >= size: # enough free space? 86 return d 87 return util.get_temp_dir() 88 89 def reduce_arena(a): 90 if a.fd == -1: 91 raise ValueError('Arena is unpicklable because ' 92 'forking was enabled when it was created') 93 return rebuild_arena, (a.size, reduction.DupFd(a.fd)) 94 95 def rebuild_arena(size, dupfd): 96 return Arena(size, dupfd.detach()) 97 98 reduction.register(Arena, reduce_arena) 99 100# 101# Class allowing allocation of chunks of memory from arenas 102# 103 104class Heap(object): 105 106 _alignment = 8 107 108 def __init__(self, size=mmap.PAGESIZE): 109 self._lastpid = os.getpid() 110 self._lock = threading.Lock() 111 self._size = size 112 self._lengths = [] 113 self._len_to_seq = {} 114 self._start_to_block = {} 115 self._stop_to_block = {} 116 self._allocated_blocks = set() 117 self._arenas = [] 118 # list of pending blocks to free - see free() comment below 119 self._pending_free_blocks = [] 120 121 @staticmethod 122 def _roundup(n, alignment): 123 # alignment must be a power of 2 124 mask = alignment - 1 125 return (n + mask) & ~mask 126 127 def _malloc(self, size): 128 # returns a large enough block -- it might be much larger 129 i = bisect.bisect_left(self._lengths, size) 130 if i == len(self._lengths): 131 length = self._roundup(max(self._size, size), mmap.PAGESIZE) 132 self._size *= 2 133 util.info('allocating a new mmap of length %d', length) 134 arena = Arena(length) 135 self._arenas.append(arena) 136 return (arena, 0, length) 137 else: 138 length = self._lengths[i] 139 seq = self._len_to_seq[length] 140 block = seq.pop() 141 if not seq: 142 del self._len_to_seq[length], self._lengths[i] 143 144 (arena, start, stop) = block 145 del self._start_to_block[(arena, start)] 146 del self._stop_to_block[(arena, stop)] 147 return block 148 149 def _free(self, block): 150 # free location and try to merge with neighbours 151 (arena, start, stop) = block 152 153 try: 154 prev_block = self._stop_to_block[(arena, start)] 155 except KeyError: 156 pass 157 else: 158 start, _ = self._absorb(prev_block) 159 160 try: 161 next_block = self._start_to_block[(arena, stop)] 162 except KeyError: 163 pass 164 else: 165 _, stop = self._absorb(next_block) 166 167 block = (arena, start, stop) 168 length = stop - start 169 170 try: 171 self._len_to_seq[length].append(block) 172 except KeyError: 173 self._len_to_seq[length] = [block] 174 bisect.insort(self._lengths, length) 175 176 self._start_to_block[(arena, start)] = block 177 self._stop_to_block[(arena, stop)] = block 178 179 def _absorb(self, block): 180 # deregister this block so it can be merged with a neighbour 181 (arena, start, stop) = block 182 del self._start_to_block[(arena, start)] 183 del self._stop_to_block[(arena, stop)] 184 185 length = stop - start 186 seq = self._len_to_seq[length] 187 seq.remove(block) 188 if not seq: 189 del self._len_to_seq[length] 190 self._lengths.remove(length) 191 192 return start, stop 193 194 def _free_pending_blocks(self): 195 # Free all the blocks in the pending list - called with the lock held. 196 while True: 197 try: 198 block = self._pending_free_blocks.pop() 199 except IndexError: 200 break 201 self._allocated_blocks.remove(block) 202 self._free(block) 203 204 def free(self, block): 205 # free a block returned by malloc() 206 # Since free() can be called asynchronously by the GC, it could happen 207 # that it's called while self._lock is held: in that case, 208 # self._lock.acquire() would deadlock (issue #12352). To avoid that, a 209 # trylock is used instead, and if the lock can't be acquired 210 # immediately, the block is added to a list of blocks to be freed 211 # synchronously sometimes later from malloc() or free(), by calling 212 # _free_pending_blocks() (appending and retrieving from a list is not 213 # strictly thread-safe but under cPython it's atomic thanks to the GIL). 214 if os.getpid() != self._lastpid: 215 raise ValueError( 216 "My pid ({0:n}) is not last pid {1:n}".format( 217 os.getpid(),self._lastpid)) 218 if not self._lock.acquire(False): 219 # can't acquire the lock right now, add the block to the list of 220 # pending blocks to free 221 self._pending_free_blocks.append(block) 222 else: 223 # we hold the lock 224 try: 225 self._free_pending_blocks() 226 self._allocated_blocks.remove(block) 227 self._free(block) 228 finally: 229 self._lock.release() 230 231 def malloc(self, size): 232 # return a block of right size (possibly rounded up) 233 if size < 0: 234 raise ValueError("Size {0:n} out of range".format(size)) 235 if sys.maxsize <= size: 236 raise OverflowError("Size {0:n} too large".format(size)) 237 if os.getpid() != self._lastpid: 238 self.__init__() # reinitialize after fork 239 with self._lock: 240 self._free_pending_blocks() 241 size = self._roundup(max(size,1), self._alignment) 242 (arena, start, stop) = self._malloc(size) 243 new_stop = start + size 244 if new_stop < stop: 245 self._free((arena, new_stop, stop)) 246 block = (arena, start, new_stop) 247 self._allocated_blocks.add(block) 248 return block 249 250# 251# Class representing a chunk of an mmap -- can be inherited by child process 252# 253 254class BufferWrapper(object): 255 256 _heap = Heap() 257 258 def __init__(self, size): 259 if size < 0: 260 raise ValueError("Size {0:n} out of range".format(size)) 261 if sys.maxsize <= size: 262 raise OverflowError("Size {0:n} too large".format(size)) 263 block = BufferWrapper._heap.malloc(size) 264 self._state = (block, size) 265 util.Finalize(self, BufferWrapper._heap.free, args=(block,)) 266 267 def create_memoryview(self): 268 (arena, start, stop), size = self._state 269 return memoryview(arena.buffer)[start:start+size] 270