• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Implementation of biased reference counting inter-thread queue.
2 //
3 // Biased reference counting maintains two refcount fields in each object:
4 // ob_ref_local and ob_ref_shared. The true refcount is the sum of these two
5 // fields. In some cases, when refcounting operations are split across threads,
6 // the ob_ref_shared field can be negative (although the total refcount must
7 // be at least zero). In this case, the thread that decremented the refcount
8 // requests that the owning thread give up ownership and merge the refcount
9 // fields. This file implements the mechanism for doing so.
10 //
11 // Each thread state maintains a queue of objects whose refcounts it should
12 // merge. The thread states are stored in a per-interpreter hash table by
13 // thread id. The hash table has a fixed size and uses a linked list to store
14 // thread states within each bucket.
15 //
16 // The queueing thread uses the eval breaker mechanism to notify the owning
17 // thread that it has objects to merge. Additionaly, all queued objects are
18 // merged during GC.
19 #include "Python.h"
20 #include "pycore_object.h"      // _Py_ExplicitMergeRefcount
21 #include "pycore_brc.h"         // struct _brc_thread_state
22 #include "pycore_ceval.h"       // _Py_set_eval_breaker_bit
23 #include "pycore_llist.h"       // struct llist_node
24 #include "pycore_pystate.h"     // _PyThreadStateImpl
25 
26 #ifdef Py_GIL_DISABLED
27 
28 // Get the hashtable bucket for a given thread id.
29 static struct _brc_bucket *
get_bucket(PyInterpreterState * interp,uintptr_t tid)30 get_bucket(PyInterpreterState *interp, uintptr_t tid)
31 {
32     return &interp->brc.table[tid % _Py_BRC_NUM_BUCKETS];
33 }
34 
35 // Find the thread state in a hash table bucket by thread id.
36 static _PyThreadStateImpl *
find_thread_state(struct _brc_bucket * bucket,uintptr_t thread_id)37 find_thread_state(struct _brc_bucket *bucket, uintptr_t thread_id)
38 {
39     struct llist_node *node;
40     llist_for_each(node, &bucket->root) {
41         // Get the containing _PyThreadStateImpl from the linked-list node.
42         _PyThreadStateImpl *ts = llist_data(node, _PyThreadStateImpl,
43                                             brc.bucket_node);
44         if (ts->brc.tid == thread_id) {
45             return ts;
46         }
47     }
48     return NULL;
49 }
50 
51 // Enqueue an object to be merged by the owning thread. This steals a
52 // reference to the object.
53 void
_Py_brc_queue_object(PyObject * ob)54 _Py_brc_queue_object(PyObject *ob)
55 {
56     PyInterpreterState *interp = _PyInterpreterState_GET();
57 
58     uintptr_t ob_tid = _Py_atomic_load_uintptr(&ob->ob_tid);
59     if (ob_tid == 0) {
60         // The owning thread may have concurrently decided to merge the
61         // refcount fields.
62         Py_DECREF(ob);
63         return;
64     }
65 
66     struct _brc_bucket *bucket = get_bucket(interp, ob_tid);
67     PyMutex_Lock(&bucket->mutex);
68     _PyThreadStateImpl *tstate = find_thread_state(bucket, ob_tid);
69     if (tstate == NULL) {
70         // If we didn't find the owning thread then it must have already exited.
71         // It's safe (and necessary) to merge the refcount. Subtract one when
72         // merging because we've stolen a reference.
73         Py_ssize_t refcount = _Py_ExplicitMergeRefcount(ob, -1);
74         PyMutex_Unlock(&bucket->mutex);
75         if (refcount == 0) {
76             _Py_Dealloc(ob);
77         }
78         return;
79     }
80 
81     if (_PyObjectStack_Push(&tstate->brc.objects_to_merge, ob) < 0) {
82         PyMutex_Unlock(&bucket->mutex);
83 
84         // Fall back to stopping all threads and manually merging the refcount
85         // if we can't enqueue the object to be merged.
86         _PyEval_StopTheWorld(interp);
87         Py_ssize_t refcount = _Py_ExplicitMergeRefcount(ob, -1);
88         _PyEval_StartTheWorld(interp);
89 
90         if (refcount == 0) {
91             _Py_Dealloc(ob);
92         }
93         return;
94     }
95 
96     // Notify owning thread
97     _Py_set_eval_breaker_bit(&tstate->base, _PY_EVAL_EXPLICIT_MERGE_BIT);
98 
99     PyMutex_Unlock(&bucket->mutex);
100 }
101 
102 static void
merge_queued_objects(_PyObjectStack * to_merge)103 merge_queued_objects(_PyObjectStack *to_merge)
104 {
105     PyObject *ob;
106     while ((ob = _PyObjectStack_Pop(to_merge)) != NULL) {
107         // Subtract one when merging because the queue had a reference.
108         Py_ssize_t refcount = _Py_ExplicitMergeRefcount(ob, -1);
109         if (refcount == 0) {
110             _Py_Dealloc(ob);
111         }
112     }
113 }
114 
115 // Process this thread's queue of objects to merge.
116 void
_Py_brc_merge_refcounts(PyThreadState * tstate)117 _Py_brc_merge_refcounts(PyThreadState *tstate)
118 {
119     struct _brc_thread_state *brc = &((_PyThreadStateImpl *)tstate)->brc;
120     struct _brc_bucket *bucket = get_bucket(tstate->interp, brc->tid);
121 
122     assert(brc->tid == _Py_ThreadId());
123 
124     // Append all objects into a local stack. We don't want to hold the lock
125     // while calling destructors.
126     PyMutex_Lock(&bucket->mutex);
127     _PyObjectStack_Merge(&brc->local_objects_to_merge, &brc->objects_to_merge);
128     PyMutex_Unlock(&bucket->mutex);
129 
130     // Process the local stack until it's empty
131     merge_queued_objects(&brc->local_objects_to_merge);
132 }
133 
134 void
_Py_brc_init_state(PyInterpreterState * interp)135 _Py_brc_init_state(PyInterpreterState *interp)
136 {
137     struct _brc_state *brc = &interp->brc;
138     for (Py_ssize_t i = 0; i < _Py_BRC_NUM_BUCKETS; i++) {
139         llist_init(&brc->table[i].root);
140     }
141 }
142 
143 void
_Py_brc_init_thread(PyThreadState * tstate)144 _Py_brc_init_thread(PyThreadState *tstate)
145 {
146     struct _brc_thread_state *brc = &((_PyThreadStateImpl *)tstate)->brc;
147     uintptr_t tid = _Py_ThreadId();
148 
149     // Add ourself to the hashtable
150     struct _brc_bucket *bucket = get_bucket(tstate->interp, tid);
151     PyMutex_Lock(&bucket->mutex);
152     brc->tid = tid;
153     llist_insert_tail(&bucket->root, &brc->bucket_node);
154     PyMutex_Unlock(&bucket->mutex);
155 }
156 
157 void
_Py_brc_remove_thread(PyThreadState * tstate)158 _Py_brc_remove_thread(PyThreadState *tstate)
159 {
160     struct _brc_thread_state *brc = &((_PyThreadStateImpl *)tstate)->brc;
161     if (brc->tid == 0) {
162         // The thread state may have been created, but never bound to a native
163         // thread and therefore never added to the hashtable.
164         assert(tstate->_status.bound == 0);
165         return;
166     }
167 
168     struct _brc_bucket *bucket = get_bucket(tstate->interp, brc->tid);
169 
170     // We need to fully process any objects to merge before removing ourself
171     // from the hashtable. It is not safe to perform any refcount operations
172     // after we are removed. After that point, other threads treat our objects
173     // as abandoned and may merge the objects' refcounts directly.
174     bool empty = false;
175     while (!empty) {
176         // Process the local stack until it's empty
177         merge_queued_objects(&brc->local_objects_to_merge);
178 
179         PyMutex_Lock(&bucket->mutex);
180         empty = (brc->objects_to_merge.head == NULL);
181         if (empty) {
182             llist_remove(&brc->bucket_node);
183         }
184         else {
185             _PyObjectStack_Merge(&brc->local_objects_to_merge,
186                                  &brc->objects_to_merge);
187         }
188         PyMutex_Unlock(&bucket->mutex);
189     }
190 
191     assert(brc->local_objects_to_merge.head == NULL);
192     assert(brc->objects_to_merge.head == NULL);
193 }
194 
195 void
_Py_brc_after_fork(PyInterpreterState * interp)196 _Py_brc_after_fork(PyInterpreterState *interp)
197 {
198     // Unlock all bucket mutexes. Some of the buckets may be locked because
199     // locks can be handed off to a parked thread (see lock.c). We don't have
200     // to worry about consistency here, becuase no thread can be actively
201     // modifying a bucket, but it might be paused (not yet woken up) on a
202     // PyMutex_Lock while holding that lock.
203     for (Py_ssize_t i = 0; i < _Py_BRC_NUM_BUCKETS; i++) {
204         _PyMutex_at_fork_reinit(&interp->brc.table[i].mutex);
205     }
206 }
207 
208 #endif  /* Py_GIL_DISABLED */
209