• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Provides shared memory for direct access across processes.
2
3The API of this package is currently provisional. Refer to the
4documentation for details.
5"""
6
7
8__all__ = [ 'SharedMemory', 'ShareableList' ]
9
10
11from functools import partial
12import mmap
13import os
14import errno
15import struct
16import secrets
17import types
18
19if os.name == "nt":
20    import _winapi
21    _USE_POSIX = False
22else:
23    import _posixshmem
24    _USE_POSIX = True
25
26
27_O_CREX = os.O_CREAT | os.O_EXCL
28
29# FreeBSD (and perhaps other BSDs) limit names to 14 characters.
30_SHM_SAFE_NAME_LENGTH = 14
31
32# Shared memory block name prefix
33if _USE_POSIX:
34    _SHM_NAME_PREFIX = '/psm_'
35else:
36    _SHM_NAME_PREFIX = 'wnsm_'
37
38
39def _make_filename():
40    "Create a random filename for the shared memory object."
41    # number of random bytes to use for name
42    nbytes = (_SHM_SAFE_NAME_LENGTH - len(_SHM_NAME_PREFIX)) // 2
43    assert nbytes >= 2, '_SHM_NAME_PREFIX too long'
44    name = _SHM_NAME_PREFIX + secrets.token_hex(nbytes)
45    assert len(name) <= _SHM_SAFE_NAME_LENGTH
46    return name
47
48
49class SharedMemory:
50    """Creates a new shared memory block or attaches to an existing
51    shared memory block.
52
53    Every shared memory block is assigned a unique name.  This enables
54    one process to create a shared memory block with a particular name
55    so that a different process can attach to that same shared memory
56    block using that same name.
57
58    As a resource for sharing data across processes, shared memory blocks
59    may outlive the original process that created them.  When one process
60    no longer needs access to a shared memory block that might still be
61    needed by other processes, the close() method should be called.
62    When a shared memory block is no longer needed by any process, the
63    unlink() method should be called to ensure proper cleanup."""
64
65    # Defaults; enables close() and unlink() to run without errors.
66    _name = None
67    _fd = -1
68    _mmap = None
69    _buf = None
70    _flags = os.O_RDWR
71    _mode = 0o600
72    _prepend_leading_slash = True if _USE_POSIX else False
73
74    def __init__(self, name=None, create=False, size=0):
75        if not size >= 0:
76            raise ValueError("'size' must be a positive integer")
77        if create:
78            self._flags = _O_CREX | os.O_RDWR
79            if size == 0:
80                raise ValueError("'size' must be a positive number different from zero")
81        if name is None and not self._flags & os.O_EXCL:
82            raise ValueError("'name' can only be None if create=True")
83
84        if _USE_POSIX:
85
86            # POSIX Shared Memory
87
88            if name is None:
89                while True:
90                    name = _make_filename()
91                    try:
92                        self._fd = _posixshmem.shm_open(
93                            name,
94                            self._flags,
95                            mode=self._mode
96                        )
97                    except FileExistsError:
98                        continue
99                    self._name = name
100                    break
101            else:
102                name = "/" + name if self._prepend_leading_slash else name
103                self._fd = _posixshmem.shm_open(
104                    name,
105                    self._flags,
106                    mode=self._mode
107                )
108                self._name = name
109            try:
110                if create and size:
111                    os.ftruncate(self._fd, size)
112                stats = os.fstat(self._fd)
113                size = stats.st_size
114                self._mmap = mmap.mmap(self._fd, size)
115            except OSError:
116                self.unlink()
117                raise
118
119            from .resource_tracker import register
120            register(self._name, "shared_memory")
121
122        else:
123
124            # Windows Named Shared Memory
125
126            if create:
127                while True:
128                    temp_name = _make_filename() if name is None else name
129                    # Create and reserve shared memory block with this name
130                    # until it can be attached to by mmap.
131                    h_map = _winapi.CreateFileMapping(
132                        _winapi.INVALID_HANDLE_VALUE,
133                        _winapi.NULL,
134                        _winapi.PAGE_READWRITE,
135                        (size >> 32) & 0xFFFFFFFF,
136                        size & 0xFFFFFFFF,
137                        temp_name
138                    )
139                    try:
140                        last_error_code = _winapi.GetLastError()
141                        if last_error_code == _winapi.ERROR_ALREADY_EXISTS:
142                            if name is not None:
143                                raise FileExistsError(
144                                    errno.EEXIST,
145                                    os.strerror(errno.EEXIST),
146                                    name,
147                                    _winapi.ERROR_ALREADY_EXISTS
148                                )
149                            else:
150                                continue
151                        self._mmap = mmap.mmap(-1, size, tagname=temp_name)
152                    finally:
153                        _winapi.CloseHandle(h_map)
154                    self._name = temp_name
155                    break
156
157            else:
158                self._name = name
159                # Dynamically determine the existing named shared memory
160                # block's size which is likely a multiple of mmap.PAGESIZE.
161                h_map = _winapi.OpenFileMapping(
162                    _winapi.FILE_MAP_READ,
163                    False,
164                    name
165                )
166                try:
167                    p_buf = _winapi.MapViewOfFile(
168                        h_map,
169                        _winapi.FILE_MAP_READ,
170                        0,
171                        0,
172                        0
173                    )
174                finally:
175                    _winapi.CloseHandle(h_map)
176                size = _winapi.VirtualQuerySize(p_buf)
177                self._mmap = mmap.mmap(-1, size, tagname=name)
178
179        self._size = size
180        self._buf = memoryview(self._mmap)
181
182    def __del__(self):
183        try:
184            self.close()
185        except OSError:
186            pass
187
188    def __reduce__(self):
189        return (
190            self.__class__,
191            (
192                self.name,
193                False,
194                self.size,
195            ),
196        )
197
198    def __repr__(self):
199        return f'{self.__class__.__name__}({self.name!r}, size={self.size})'
200
201    @property
202    def buf(self):
203        "A memoryview of contents of the shared memory block."
204        return self._buf
205
206    @property
207    def name(self):
208        "Unique name that identifies the shared memory block."
209        reported_name = self._name
210        if _USE_POSIX and self._prepend_leading_slash:
211            if self._name.startswith("/"):
212                reported_name = self._name[1:]
213        return reported_name
214
215    @property
216    def size(self):
217        "Size in bytes."
218        return self._size
219
220    def close(self):
221        """Closes access to the shared memory from this instance but does
222        not destroy the shared memory block."""
223        if self._buf is not None:
224            self._buf.release()
225            self._buf = None
226        if self._mmap is not None:
227            self._mmap.close()
228            self._mmap = None
229        if _USE_POSIX and self._fd >= 0:
230            os.close(self._fd)
231            self._fd = -1
232
233    def unlink(self):
234        """Requests that the underlying shared memory block be destroyed.
235
236        In order to ensure proper cleanup of resources, unlink should be
237        called once (and only once) across all processes which have access
238        to the shared memory block."""
239        if _USE_POSIX and self._name:
240            from .resource_tracker import unregister
241            _posixshmem.shm_unlink(self._name)
242            unregister(self._name, "shared_memory")
243
244
245_encoding = "utf8"
246
247class ShareableList:
248    """Pattern for a mutable list-like object shareable via a shared
249    memory block.  It differs from the built-in list type in that these
250    lists can not change their overall length (i.e. no append, insert,
251    etc.)
252
253    Because values are packed into a memoryview as bytes, the struct
254    packing format for any storable value must require no more than 8
255    characters to describe its format."""
256
257    # The shared memory area is organized as follows:
258    # - 8 bytes: number of items (N) as a 64-bit integer
259    # - (N + 1) * 8 bytes: offsets of each element from the start of the
260    #                      data area
261    # - K bytes: the data area storing item values (with encoding and size
262    #            depending on their respective types)
263    # - N * 8 bytes: `struct` format string for each element
264    # - N bytes: index into _back_transforms_mapping for each element
265    #            (for reconstructing the corresponding Python value)
266    _types_mapping = {
267        int: "q",
268        float: "d",
269        bool: "xxxxxxx?",
270        str: "%ds",
271        bytes: "%ds",
272        None.__class__: "xxxxxx?x",
273    }
274    _alignment = 8
275    _back_transforms_mapping = {
276        0: lambda value: value,                   # int, float, bool
277        1: lambda value: value.rstrip(b'\x00').decode(_encoding),  # str
278        2: lambda value: value.rstrip(b'\x00'),   # bytes
279        3: lambda _value: None,                   # None
280    }
281
282    @staticmethod
283    def _extract_recreation_code(value):
284        """Used in concert with _back_transforms_mapping to convert values
285        into the appropriate Python objects when retrieving them from
286        the list as well as when storing them."""
287        if not isinstance(value, (str, bytes, None.__class__)):
288            return 0
289        elif isinstance(value, str):
290            return 1
291        elif isinstance(value, bytes):
292            return 2
293        else:
294            return 3  # NoneType
295
296    def __init__(self, sequence=None, *, name=None):
297        if name is None or sequence is not None:
298            sequence = sequence or ()
299            _formats = [
300                self._types_mapping[type(item)]
301                    if not isinstance(item, (str, bytes))
302                    else self._types_mapping[type(item)] % (
303                        self._alignment * (len(item) // self._alignment + 1),
304                    )
305                for item in sequence
306            ]
307            self._list_len = len(_formats)
308            assert sum(len(fmt) <= 8 for fmt in _formats) == self._list_len
309            offset = 0
310            # The offsets of each list element into the shared memory's
311            # data area (0 meaning the start of the data area, not the start
312            # of the shared memory area).
313            self._allocated_offsets = [0]
314            for fmt in _formats:
315                offset += self._alignment if fmt[-1] != "s" else int(fmt[:-1])
316                self._allocated_offsets.append(offset)
317            _recreation_codes = [
318                self._extract_recreation_code(item) for item in sequence
319            ]
320            requested_size = struct.calcsize(
321                "q" + self._format_size_metainfo +
322                "".join(_formats) +
323                self._format_packing_metainfo +
324                self._format_back_transform_codes
325            )
326
327            self.shm = SharedMemory(name, create=True, size=requested_size)
328        else:
329            self.shm = SharedMemory(name)
330
331        if sequence is not None:
332            _enc = _encoding
333            struct.pack_into(
334                "q" + self._format_size_metainfo,
335                self.shm.buf,
336                0,
337                self._list_len,
338                *(self._allocated_offsets)
339            )
340            struct.pack_into(
341                "".join(_formats),
342                self.shm.buf,
343                self._offset_data_start,
344                *(v.encode(_enc) if isinstance(v, str) else v for v in sequence)
345            )
346            struct.pack_into(
347                self._format_packing_metainfo,
348                self.shm.buf,
349                self._offset_packing_formats,
350                *(v.encode(_enc) for v in _formats)
351            )
352            struct.pack_into(
353                self._format_back_transform_codes,
354                self.shm.buf,
355                self._offset_back_transform_codes,
356                *(_recreation_codes)
357            )
358
359        else:
360            self._list_len = len(self)  # Obtains size from offset 0 in buffer.
361            self._allocated_offsets = list(
362                struct.unpack_from(
363                    self._format_size_metainfo,
364                    self.shm.buf,
365                    1 * 8
366                )
367            )
368
369    def _get_packing_format(self, position):
370        "Gets the packing format for a single value stored in the list."
371        position = position if position >= 0 else position + self._list_len
372        if (position >= self._list_len) or (self._list_len < 0):
373            raise IndexError("Requested position out of range.")
374
375        v = struct.unpack_from(
376            "8s",
377            self.shm.buf,
378            self._offset_packing_formats + position * 8
379        )[0]
380        fmt = v.rstrip(b'\x00')
381        fmt_as_str = fmt.decode(_encoding)
382
383        return fmt_as_str
384
385    def _get_back_transform(self, position):
386        "Gets the back transformation function for a single value."
387
388        if (position >= self._list_len) or (self._list_len < 0):
389            raise IndexError("Requested position out of range.")
390
391        transform_code = struct.unpack_from(
392            "b",
393            self.shm.buf,
394            self._offset_back_transform_codes + position
395        )[0]
396        transform_function = self._back_transforms_mapping[transform_code]
397
398        return transform_function
399
400    def _set_packing_format_and_transform(self, position, fmt_as_str, value):
401        """Sets the packing format and back transformation code for a
402        single value in the list at the specified position."""
403
404        if (position >= self._list_len) or (self._list_len < 0):
405            raise IndexError("Requested position out of range.")
406
407        struct.pack_into(
408            "8s",
409            self.shm.buf,
410            self._offset_packing_formats + position * 8,
411            fmt_as_str.encode(_encoding)
412        )
413
414        transform_code = self._extract_recreation_code(value)
415        struct.pack_into(
416            "b",
417            self.shm.buf,
418            self._offset_back_transform_codes + position,
419            transform_code
420        )
421
422    def __getitem__(self, position):
423        position = position if position >= 0 else position + self._list_len
424        try:
425            offset = self._offset_data_start + self._allocated_offsets[position]
426            (v,) = struct.unpack_from(
427                self._get_packing_format(position),
428                self.shm.buf,
429                offset
430            )
431        except IndexError:
432            raise IndexError("index out of range")
433
434        back_transform = self._get_back_transform(position)
435        v = back_transform(v)
436
437        return v
438
439    def __setitem__(self, position, value):
440        position = position if position >= 0 else position + self._list_len
441        try:
442            item_offset = self._allocated_offsets[position]
443            offset = self._offset_data_start + item_offset
444            current_format = self._get_packing_format(position)
445        except IndexError:
446            raise IndexError("assignment index out of range")
447
448        if not isinstance(value, (str, bytes)):
449            new_format = self._types_mapping[type(value)]
450            encoded_value = value
451        else:
452            allocated_length = self._allocated_offsets[position + 1] - item_offset
453
454            encoded_value = (value.encode(_encoding)
455                             if isinstance(value, str) else value)
456            if len(encoded_value) > allocated_length:
457                raise ValueError("bytes/str item exceeds available storage")
458            if current_format[-1] == "s":
459                new_format = current_format
460            else:
461                new_format = self._types_mapping[str] % (
462                    allocated_length,
463                )
464
465        self._set_packing_format_and_transform(
466            position,
467            new_format,
468            value
469        )
470        struct.pack_into(new_format, self.shm.buf, offset, encoded_value)
471
472    def __reduce__(self):
473        return partial(self.__class__, name=self.shm.name), ()
474
475    def __len__(self):
476        return struct.unpack_from("q", self.shm.buf, 0)[0]
477
478    def __repr__(self):
479        return f'{self.__class__.__name__}({list(self)}, name={self.shm.name!r})'
480
481    @property
482    def format(self):
483        "The struct packing format used by all currently stored items."
484        return "".join(
485            self._get_packing_format(i) for i in range(self._list_len)
486        )
487
488    @property
489    def _format_size_metainfo(self):
490        "The struct packing format used for the items' storage offsets."
491        return "q" * (self._list_len + 1)
492
493    @property
494    def _format_packing_metainfo(self):
495        "The struct packing format used for the items' packing formats."
496        return "8s" * self._list_len
497
498    @property
499    def _format_back_transform_codes(self):
500        "The struct packing format used for the items' back transforms."
501        return "b" * self._list_len
502
503    @property
504    def _offset_data_start(self):
505        # - 8 bytes for the list length
506        # - (N + 1) * 8 bytes for the element offsets
507        return (self._list_len + 2) * 8
508
509    @property
510    def _offset_packing_formats(self):
511        return self._offset_data_start + self._allocated_offsets[-1]
512
513    @property
514    def _offset_back_transform_codes(self):
515        return self._offset_packing_formats + self._list_len * 8
516
517    def count(self, value):
518        "L.count(value) -> integer -- return number of occurrences of value."
519
520        return sum(value == entry for entry in self)
521
522    def index(self, value):
523        """L.index(value) -> integer -- return first index of value.
524        Raises ValueError if the value is not present."""
525
526        for position, entry in enumerate(self):
527            if value == entry:
528                return position
529        else:
530            raise ValueError(f"{value!r} not in this container")
531
532    __class_getitem__ = classmethod(types.GenericAlias)
533