1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/resource_quota.h"
22
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <stdint.h>
26 #include <string.h>
27
28 #include <grpc/slice_buffer.h>
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/log.h>
31 #include <grpc/support/string_util.h>
32
33 #include "src/core/lib/gpr/useful.h"
34 #include "src/core/lib/iomgr/combiner.h"
35
36 grpc_core::TraceFlag grpc_resource_quota_trace(false, "resource_quota");
37
38 #define MEMORY_USAGE_ESTIMATION_MAX 65536
39
40 /* Internal linked list pointers for a resource user */
41 typedef struct {
42 grpc_resource_user* next;
43 grpc_resource_user* prev;
44 } grpc_resource_user_link;
45
46 /* Resource users are kept in (potentially) several intrusive linked lists
47 at once. These are the list names. */
48 typedef enum {
49 /* Resource users that are waiting for an allocation */
50 GRPC_RULIST_AWAITING_ALLOCATION,
51 /* Resource users that have free memory available for internal reclamation */
52 GRPC_RULIST_NON_EMPTY_FREE_POOL,
53 /* Resource users that have published a benign reclamation is available */
54 GRPC_RULIST_RECLAIMER_BENIGN,
55 /* Resource users that have published a destructive reclamation is
56 available */
57 GRPC_RULIST_RECLAIMER_DESTRUCTIVE,
58 /* Number of lists: must be last */
59 GRPC_RULIST_COUNT
60 } grpc_rulist;
61
62 struct grpc_resource_user {
63 /* The quota this resource user consumes from */
64 grpc_resource_quota* resource_quota;
65
66 /* Closure to schedule an allocation under the resource quota combiner lock */
67 grpc_closure allocate_closure;
68 /* Closure to publish a non empty free pool under the resource quota combiner
69 lock */
70 grpc_closure add_to_free_pool_closure;
71
72 /* one ref for each ref call (released by grpc_resource_user_unref), and one
73 ref for each byte allocated (released by grpc_resource_user_free) */
74 gpr_atm refs;
75 /* is this resource user unlocked? starts at 0, increases for each shutdown
76 call */
77 gpr_atm shutdown;
78
79 gpr_mu mu;
80 /* The amount of memory (in bytes) this user has cached for its own use: to
81 avoid quota contention, each resource user can keep some memory in
82 addition to what it is immediately using (e.g., for caching), and the quota
83 can pull it back under memory pressure.
84 This value can become negative if more memory has been requested than
85 existed in the free pool, at which point the quota is consulted to bring
86 this value non-negative (asynchronously). */
87 int64_t free_pool;
88 /* A list of closures to call once free_pool becomes non-negative - ie when
89 all outstanding allocations have been granted. */
90 grpc_closure_list on_allocated;
91 /* True if we are currently trying to allocate from the quota, false if not */
92 bool allocating;
93 /* How many bytes of allocations are outstanding */
94 int64_t outstanding_allocations;
95 /* True if we are currently trying to add ourselves to the non-free quota
96 list, false otherwise */
97 bool added_to_free_pool;
98
99 /* The number of threads currently allocated to this resource user */
100 gpr_atm num_threads_allocated;
101
102 /* Reclaimers: index 0 is the benign reclaimer, 1 is the destructive reclaimer
103 */
104 grpc_closure* reclaimers[2];
105 /* Reclaimers just posted: once we're in the combiner lock, we'll move them
106 to the array above */
107 grpc_closure* new_reclaimers[2];
108 /* Trampoline closures to finish reclamation and re-enter the quota combiner
109 lock */
110 grpc_closure post_reclaimer_closure[2];
111
112 /* Closure to execute under the quota combiner to de-register and shutdown the
113 resource user */
114 grpc_closure destroy_closure;
115
116 /* Links in the various grpc_rulist lists */
117 grpc_resource_user_link links[GRPC_RULIST_COUNT];
118
119 /* The name of this resource user, for debugging/tracing */
120 char* name;
121 };
122
123 struct grpc_resource_quota {
124 /* refcount */
125 gpr_refcount refs;
126
127 /* estimate of current memory usage
128 scaled to the range [0..RESOURCE_USAGE_ESTIMATION_MAX] */
129 gpr_atm memory_usage_estimation;
130
131 /* Master combiner lock: all activity on a quota executes under this combiner
132 * (so no mutex is needed for this data structure) */
133 grpc_combiner* combiner;
134 /* Size of the resource quota */
135 int64_t size;
136 /* Amount of free memory in the resource quota */
137 int64_t free_pool;
138
139 gpr_atm last_size;
140
141 /* Mutex to protect max_threads and num_threads_allocated */
142 /* Note: We could have used gpr_atm for max_threads and num_threads_allocated
143 * and avoid having this mutex; but in that case, each invocation of the
144 * function grpc_resource_user_allocate_threads() would have had to do at
145 * least two atomic loads (for max_threads and num_threads_allocated) followed
146 * by a CAS (on num_threads_allocated).
147 * Moreover, we expect grpc_resource_user_allocate_threads() to be often
148 * called concurrently thereby increasing the chances of failing the CAS
149 * operation. This additional complexity is not worth the tiny perf gain we
150 * may (or may not) have by using atomics */
151 gpr_mu thread_count_mu;
152
153 /* Max number of threads allowed */
154 int max_threads;
155
156 /* Number of threads currently allocated via this resource_quota object */
157 int num_threads_allocated;
158
159 /* Has rq_step been scheduled to occur? */
160 bool step_scheduled;
161
162 /* Are we currently reclaiming memory */
163 bool reclaiming;
164
165 /* Closure around rq_step */
166 grpc_closure rq_step_closure;
167
168 /* Closure around rq_reclamation_done */
169 grpc_closure rq_reclamation_done_closure;
170
171 /* This is only really usable for debugging: it's always a stale pointer, but
172 a stale pointer that might just be fresh enough to guide us to where the
173 reclamation system is stuck */
174 grpc_closure* debug_only_last_initiated_reclaimer;
175 grpc_resource_user* debug_only_last_reclaimer_resource_user;
176
177 /* Roots of all resource user lists */
178 grpc_resource_user* roots[GRPC_RULIST_COUNT];
179
180 char* name;
181 };
182
183 static void ru_unref_by(grpc_resource_user* resource_user, gpr_atm amount);
184
185 /*******************************************************************************
186 * list management
187 */
188
rulist_add_head(grpc_resource_user * resource_user,grpc_rulist list)189 static void rulist_add_head(grpc_resource_user* resource_user,
190 grpc_rulist list) {
191 grpc_resource_quota* resource_quota = resource_user->resource_quota;
192 grpc_resource_user** root = &resource_quota->roots[list];
193 if (*root == nullptr) {
194 *root = resource_user;
195 resource_user->links[list].next = resource_user->links[list].prev =
196 resource_user;
197 } else {
198 resource_user->links[list].next = *root;
199 resource_user->links[list].prev = (*root)->links[list].prev;
200 resource_user->links[list].next->links[list].prev =
201 resource_user->links[list].prev->links[list].next = resource_user;
202 *root = resource_user;
203 }
204 }
205
rulist_add_tail(grpc_resource_user * resource_user,grpc_rulist list)206 static void rulist_add_tail(grpc_resource_user* resource_user,
207 grpc_rulist list) {
208 grpc_resource_quota* resource_quota = resource_user->resource_quota;
209 grpc_resource_user** root = &resource_quota->roots[list];
210 if (*root == nullptr) {
211 *root = resource_user;
212 resource_user->links[list].next = resource_user->links[list].prev =
213 resource_user;
214 } else {
215 resource_user->links[list].next = (*root)->links[list].next;
216 resource_user->links[list].prev = *root;
217 resource_user->links[list].next->links[list].prev =
218 resource_user->links[list].prev->links[list].next = resource_user;
219 }
220 }
221
rulist_empty(grpc_resource_quota * resource_quota,grpc_rulist list)222 static bool rulist_empty(grpc_resource_quota* resource_quota,
223 grpc_rulist list) {
224 return resource_quota->roots[list] == nullptr;
225 }
226
rulist_pop_head(grpc_resource_quota * resource_quota,grpc_rulist list)227 static grpc_resource_user* rulist_pop_head(grpc_resource_quota* resource_quota,
228 grpc_rulist list) {
229 grpc_resource_user** root = &resource_quota->roots[list];
230 grpc_resource_user* resource_user = *root;
231 if (resource_user == nullptr) {
232 return nullptr;
233 }
234 if (resource_user->links[list].next == resource_user) {
235 *root = nullptr;
236 } else {
237 resource_user->links[list].next->links[list].prev =
238 resource_user->links[list].prev;
239 resource_user->links[list].prev->links[list].next =
240 resource_user->links[list].next;
241 *root = resource_user->links[list].next;
242 }
243 resource_user->links[list].next = resource_user->links[list].prev = nullptr;
244 return resource_user;
245 }
246
rulist_remove(grpc_resource_user * resource_user,grpc_rulist list)247 static void rulist_remove(grpc_resource_user* resource_user, grpc_rulist list) {
248 if (resource_user->links[list].next == nullptr) return;
249 grpc_resource_quota* resource_quota = resource_user->resource_quota;
250 if (resource_quota->roots[list] == resource_user) {
251 resource_quota->roots[list] = resource_user->links[list].next;
252 if (resource_quota->roots[list] == resource_user) {
253 resource_quota->roots[list] = nullptr;
254 }
255 }
256 resource_user->links[list].next->links[list].prev =
257 resource_user->links[list].prev;
258 resource_user->links[list].prev->links[list].next =
259 resource_user->links[list].next;
260 resource_user->links[list].next = resource_user->links[list].prev = nullptr;
261 }
262
263 /*******************************************************************************
264 * resource quota state machine
265 */
266
267 static bool rq_alloc(grpc_resource_quota* resource_quota);
268 static bool rq_reclaim_from_per_user_free_pool(
269 grpc_resource_quota* resource_quota);
270 static bool rq_reclaim(grpc_resource_quota* resource_quota, bool destructive);
271
rq_step(void * rq,grpc_error * error)272 static void rq_step(void* rq, grpc_error* error) {
273 grpc_resource_quota* resource_quota = static_cast<grpc_resource_quota*>(rq);
274 resource_quota->step_scheduled = false;
275 do {
276 if (rq_alloc(resource_quota)) goto done;
277 } while (rq_reclaim_from_per_user_free_pool(resource_quota));
278
279 if (!rq_reclaim(resource_quota, false)) {
280 rq_reclaim(resource_quota, true);
281 }
282
283 done:
284 grpc_resource_quota_unref_internal(resource_quota);
285 }
286
rq_step_sched(grpc_resource_quota * resource_quota)287 static void rq_step_sched(grpc_resource_quota* resource_quota) {
288 if (resource_quota->step_scheduled) return;
289 resource_quota->step_scheduled = true;
290 grpc_resource_quota_ref_internal(resource_quota);
291 GRPC_CLOSURE_SCHED(&resource_quota->rq_step_closure, GRPC_ERROR_NONE);
292 }
293
294 /* update the atomically available resource estimate - use no barriers since
295 timeliness of delivery really doesn't matter much */
rq_update_estimate(grpc_resource_quota * resource_quota)296 static void rq_update_estimate(grpc_resource_quota* resource_quota) {
297 gpr_atm memory_usage_estimation = MEMORY_USAGE_ESTIMATION_MAX;
298 if (resource_quota->size != 0) {
299 memory_usage_estimation =
300 GPR_CLAMP((gpr_atm)((1.0 - ((double)resource_quota->free_pool) /
301 ((double)resource_quota->size)) *
302 MEMORY_USAGE_ESTIMATION_MAX),
303 0, MEMORY_USAGE_ESTIMATION_MAX);
304 }
305 gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation,
306 memory_usage_estimation);
307 }
308
309 /* returns true if all allocations are completed */
rq_alloc(grpc_resource_quota * resource_quota)310 static bool rq_alloc(grpc_resource_quota* resource_quota) {
311 grpc_resource_user* resource_user;
312 while ((resource_user = rulist_pop_head(resource_quota,
313 GRPC_RULIST_AWAITING_ALLOCATION))) {
314 gpr_mu_lock(&resource_user->mu);
315 if (grpc_resource_quota_trace.enabled()) {
316 gpr_log(GPR_INFO,
317 "RQ: check allocation for user %p shutdown=%" PRIdPTR
318 " free_pool=%" PRId64,
319 resource_user, gpr_atm_no_barrier_load(&resource_user->shutdown),
320 resource_user->free_pool);
321 }
322 if (gpr_atm_no_barrier_load(&resource_user->shutdown)) {
323 resource_user->allocating = false;
324 grpc_closure_list_fail_all(
325 &resource_user->on_allocated,
326 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource user shutdown"));
327 int64_t aborted_allocations = resource_user->outstanding_allocations;
328 resource_user->outstanding_allocations = 0;
329 resource_user->free_pool += aborted_allocations;
330 GRPC_CLOSURE_LIST_SCHED(&resource_user->on_allocated);
331 gpr_mu_unlock(&resource_user->mu);
332 ru_unref_by(resource_user, static_cast<gpr_atm>(aborted_allocations));
333 continue;
334 }
335 if (resource_user->free_pool < 0 &&
336 -resource_user->free_pool <= resource_quota->free_pool) {
337 int64_t amt = -resource_user->free_pool;
338 resource_user->free_pool = 0;
339 resource_quota->free_pool -= amt;
340 rq_update_estimate(resource_quota);
341 if (grpc_resource_quota_trace.enabled()) {
342 gpr_log(GPR_INFO,
343 "RQ %s %s: grant alloc %" PRId64
344 " bytes; rq_free_pool -> %" PRId64,
345 resource_quota->name, resource_user->name, amt,
346 resource_quota->free_pool);
347 }
348 } else if (grpc_resource_quota_trace.enabled() &&
349 resource_user->free_pool >= 0) {
350 gpr_log(GPR_INFO, "RQ %s %s: discard already satisfied alloc request",
351 resource_quota->name, resource_user->name);
352 }
353 if (resource_user->free_pool >= 0) {
354 resource_user->allocating = false;
355 resource_user->outstanding_allocations = 0;
356 GRPC_CLOSURE_LIST_SCHED(&resource_user->on_allocated);
357 gpr_mu_unlock(&resource_user->mu);
358 } else {
359 rulist_add_head(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
360 gpr_mu_unlock(&resource_user->mu);
361 return false;
362 }
363 }
364 return true;
365 }
366
367 /* returns true if any memory could be reclaimed from buffers */
rq_reclaim_from_per_user_free_pool(grpc_resource_quota * resource_quota)368 static bool rq_reclaim_from_per_user_free_pool(
369 grpc_resource_quota* resource_quota) {
370 grpc_resource_user* resource_user;
371 while ((resource_user = rulist_pop_head(resource_quota,
372 GRPC_RULIST_NON_EMPTY_FREE_POOL))) {
373 gpr_mu_lock(&resource_user->mu);
374 if (resource_user->free_pool > 0) {
375 int64_t amt = resource_user->free_pool;
376 resource_user->free_pool = 0;
377 resource_quota->free_pool += amt;
378 rq_update_estimate(resource_quota);
379 if (grpc_resource_quota_trace.enabled()) {
380 gpr_log(GPR_INFO,
381 "RQ %s %s: reclaim_from_per_user_free_pool %" PRId64
382 " bytes; rq_free_pool -> %" PRId64,
383 resource_quota->name, resource_user->name, amt,
384 resource_quota->free_pool);
385 }
386 gpr_mu_unlock(&resource_user->mu);
387 return true;
388 } else {
389 gpr_mu_unlock(&resource_user->mu);
390 }
391 }
392 return false;
393 }
394
395 /* returns true if reclamation is proceeding */
rq_reclaim(grpc_resource_quota * resource_quota,bool destructive)396 static bool rq_reclaim(grpc_resource_quota* resource_quota, bool destructive) {
397 if (resource_quota->reclaiming) return true;
398 grpc_rulist list = destructive ? GRPC_RULIST_RECLAIMER_DESTRUCTIVE
399 : GRPC_RULIST_RECLAIMER_BENIGN;
400 grpc_resource_user* resource_user = rulist_pop_head(resource_quota, list);
401 if (resource_user == nullptr) return false;
402 if (grpc_resource_quota_trace.enabled()) {
403 gpr_log(GPR_INFO, "RQ %s %s: initiate %s reclamation", resource_quota->name,
404 resource_user->name, destructive ? "destructive" : "benign");
405 }
406 resource_quota->reclaiming = true;
407 grpc_resource_quota_ref_internal(resource_quota);
408 grpc_closure* c = resource_user->reclaimers[destructive];
409 GPR_ASSERT(c);
410 resource_quota->debug_only_last_reclaimer_resource_user = resource_user;
411 resource_quota->debug_only_last_initiated_reclaimer = c;
412 resource_user->reclaimers[destructive] = nullptr;
413 GRPC_CLOSURE_SCHED(c, GRPC_ERROR_NONE);
414 return true;
415 }
416
417 /*******************************************************************************
418 * ru_slice: a slice implementation that is backed by a grpc_resource_user
419 */
420
421 typedef struct {
422 grpc_slice_refcount base;
423 gpr_refcount refs;
424 grpc_resource_user* resource_user;
425 size_t size;
426 } ru_slice_refcount;
427
ru_slice_ref(void * p)428 static void ru_slice_ref(void* p) {
429 ru_slice_refcount* rc = static_cast<ru_slice_refcount*>(p);
430 gpr_ref(&rc->refs);
431 }
432
ru_slice_unref(void * p)433 static void ru_slice_unref(void* p) {
434 ru_slice_refcount* rc = static_cast<ru_slice_refcount*>(p);
435 if (gpr_unref(&rc->refs)) {
436 grpc_resource_user_free(rc->resource_user, rc->size);
437 gpr_free(rc);
438 }
439 }
440
441 static const grpc_slice_refcount_vtable ru_slice_vtable = {
442 ru_slice_ref, ru_slice_unref, grpc_slice_default_eq_impl,
443 grpc_slice_default_hash_impl};
444
ru_slice_create(grpc_resource_user * resource_user,size_t size)445 static grpc_slice ru_slice_create(grpc_resource_user* resource_user,
446 size_t size) {
447 ru_slice_refcount* rc = static_cast<ru_slice_refcount*>(
448 gpr_malloc(sizeof(ru_slice_refcount) + size));
449 rc->base.vtable = &ru_slice_vtable;
450 rc->base.sub_refcount = &rc->base;
451 gpr_ref_init(&rc->refs, 1);
452 rc->resource_user = resource_user;
453 rc->size = size;
454 grpc_slice slice;
455 slice.refcount = &rc->base;
456 slice.data.refcounted.bytes = reinterpret_cast<uint8_t*>(rc + 1);
457 slice.data.refcounted.length = size;
458 return slice;
459 }
460
461 /*******************************************************************************
462 * grpc_resource_quota internal implementation: resource user manipulation under
463 * the combiner
464 */
465
ru_allocate(void * ru,grpc_error * error)466 static void ru_allocate(void* ru, grpc_error* error) {
467 grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
468 if (rulist_empty(resource_user->resource_quota,
469 GRPC_RULIST_AWAITING_ALLOCATION)) {
470 rq_step_sched(resource_user->resource_quota);
471 }
472 rulist_add_tail(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
473 }
474
ru_add_to_free_pool(void * ru,grpc_error * error)475 static void ru_add_to_free_pool(void* ru, grpc_error* error) {
476 grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
477 if (!rulist_empty(resource_user->resource_quota,
478 GRPC_RULIST_AWAITING_ALLOCATION) &&
479 rulist_empty(resource_user->resource_quota,
480 GRPC_RULIST_NON_EMPTY_FREE_POOL)) {
481 rq_step_sched(resource_user->resource_quota);
482 }
483 rulist_add_tail(resource_user, GRPC_RULIST_NON_EMPTY_FREE_POOL);
484 }
485
ru_post_reclaimer(grpc_resource_user * resource_user,bool destructive)486 static bool ru_post_reclaimer(grpc_resource_user* resource_user,
487 bool destructive) {
488 grpc_closure* closure = resource_user->new_reclaimers[destructive];
489 GPR_ASSERT(closure != nullptr);
490 resource_user->new_reclaimers[destructive] = nullptr;
491 GPR_ASSERT(resource_user->reclaimers[destructive] == nullptr);
492 if (gpr_atm_acq_load(&resource_user->shutdown) > 0) {
493 GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_CANCELLED);
494 return false;
495 }
496 resource_user->reclaimers[destructive] = closure;
497 return true;
498 }
499
ru_post_benign_reclaimer(void * ru,grpc_error * error)500 static void ru_post_benign_reclaimer(void* ru, grpc_error* error) {
501 grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
502 if (!ru_post_reclaimer(resource_user, false)) return;
503 if (!rulist_empty(resource_user->resource_quota,
504 GRPC_RULIST_AWAITING_ALLOCATION) &&
505 rulist_empty(resource_user->resource_quota,
506 GRPC_RULIST_NON_EMPTY_FREE_POOL) &&
507 rulist_empty(resource_user->resource_quota,
508 GRPC_RULIST_RECLAIMER_BENIGN)) {
509 rq_step_sched(resource_user->resource_quota);
510 }
511 rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
512 }
513
ru_post_destructive_reclaimer(void * ru,grpc_error * error)514 static void ru_post_destructive_reclaimer(void* ru, grpc_error* error) {
515 grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
516 if (!ru_post_reclaimer(resource_user, true)) return;
517 if (!rulist_empty(resource_user->resource_quota,
518 GRPC_RULIST_AWAITING_ALLOCATION) &&
519 rulist_empty(resource_user->resource_quota,
520 GRPC_RULIST_NON_EMPTY_FREE_POOL) &&
521 rulist_empty(resource_user->resource_quota,
522 GRPC_RULIST_RECLAIMER_BENIGN) &&
523 rulist_empty(resource_user->resource_quota,
524 GRPC_RULIST_RECLAIMER_DESTRUCTIVE)) {
525 rq_step_sched(resource_user->resource_quota);
526 }
527 rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE);
528 }
529
ru_shutdown(void * ru,grpc_error * error)530 static void ru_shutdown(void* ru, grpc_error* error) {
531 if (grpc_resource_quota_trace.enabled()) {
532 gpr_log(GPR_INFO, "RU shutdown %p", ru);
533 }
534 grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
535 gpr_mu_lock(&resource_user->mu);
536 GRPC_CLOSURE_SCHED(resource_user->reclaimers[0], GRPC_ERROR_CANCELLED);
537 GRPC_CLOSURE_SCHED(resource_user->reclaimers[1], GRPC_ERROR_CANCELLED);
538 resource_user->reclaimers[0] = nullptr;
539 resource_user->reclaimers[1] = nullptr;
540 rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
541 rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE);
542 if (resource_user->allocating) {
543 rq_step_sched(resource_user->resource_quota);
544 }
545 gpr_mu_unlock(&resource_user->mu);
546 }
547
ru_destroy(void * ru,grpc_error * error)548 static void ru_destroy(void* ru, grpc_error* error) {
549 grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
550 GPR_ASSERT(gpr_atm_no_barrier_load(&resource_user->refs) == 0);
551 // Free all the remaining thread quota
552 grpc_resource_user_free_threads(resource_user,
553 static_cast<int>(gpr_atm_no_barrier_load(
554 &resource_user->num_threads_allocated)));
555
556 for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
557 rulist_remove(resource_user, static_cast<grpc_rulist>(i));
558 }
559 GRPC_CLOSURE_SCHED(resource_user->reclaimers[0], GRPC_ERROR_CANCELLED);
560 GRPC_CLOSURE_SCHED(resource_user->reclaimers[1], GRPC_ERROR_CANCELLED);
561 if (resource_user->free_pool != 0) {
562 resource_user->resource_quota->free_pool += resource_user->free_pool;
563 rq_step_sched(resource_user->resource_quota);
564 }
565 grpc_resource_quota_unref_internal(resource_user->resource_quota);
566 gpr_mu_destroy(&resource_user->mu);
567 gpr_free(resource_user->name);
568 gpr_free(resource_user);
569 }
570
ru_allocated_slices(void * arg,grpc_error * error)571 static void ru_allocated_slices(void* arg, grpc_error* error) {
572 grpc_resource_user_slice_allocator* slice_allocator =
573 static_cast<grpc_resource_user_slice_allocator*>(arg);
574 if (error == GRPC_ERROR_NONE) {
575 for (size_t i = 0; i < slice_allocator->count; i++) {
576 grpc_slice_buffer_add_indexed(
577 slice_allocator->dest, ru_slice_create(slice_allocator->resource_user,
578 slice_allocator->length));
579 }
580 }
581 GRPC_CLOSURE_RUN(&slice_allocator->on_done, GRPC_ERROR_REF(error));
582 }
583
584 /*******************************************************************************
585 * grpc_resource_quota internal implementation: quota manipulation under the
586 * combiner
587 */
588
589 typedef struct {
590 int64_t size;
591 grpc_resource_quota* resource_quota;
592 grpc_closure closure;
593 } rq_resize_args;
594
rq_resize(void * args,grpc_error * error)595 static void rq_resize(void* args, grpc_error* error) {
596 rq_resize_args* a = static_cast<rq_resize_args*>(args);
597 int64_t delta = a->size - a->resource_quota->size;
598 a->resource_quota->size += delta;
599 a->resource_quota->free_pool += delta;
600 rq_update_estimate(a->resource_quota);
601 rq_step_sched(a->resource_quota);
602 grpc_resource_quota_unref_internal(a->resource_quota);
603 gpr_free(a);
604 }
605
rq_reclamation_done(void * rq,grpc_error * error)606 static void rq_reclamation_done(void* rq, grpc_error* error) {
607 grpc_resource_quota* resource_quota = static_cast<grpc_resource_quota*>(rq);
608 resource_quota->reclaiming = false;
609 rq_step_sched(resource_quota);
610 grpc_resource_quota_unref_internal(resource_quota);
611 }
612
613 /*******************************************************************************
614 * grpc_resource_quota api
615 */
616
617 /* Public API */
grpc_resource_quota_create(const char * name)618 grpc_resource_quota* grpc_resource_quota_create(const char* name) {
619 grpc_resource_quota* resource_quota =
620 static_cast<grpc_resource_quota*>(gpr_malloc(sizeof(*resource_quota)));
621 gpr_ref_init(&resource_quota->refs, 1);
622 resource_quota->combiner = grpc_combiner_create();
623 resource_quota->free_pool = INT64_MAX;
624 resource_quota->size = INT64_MAX;
625 gpr_atm_no_barrier_store(&resource_quota->last_size, GPR_ATM_MAX);
626 gpr_mu_init(&resource_quota->thread_count_mu);
627 resource_quota->max_threads = INT_MAX;
628 resource_quota->num_threads_allocated = 0;
629 resource_quota->step_scheduled = false;
630 resource_quota->reclaiming = false;
631 gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation, 0);
632 if (name != nullptr) {
633 resource_quota->name = gpr_strdup(name);
634 } else {
635 gpr_asprintf(&resource_quota->name, "anonymous_pool_%" PRIxPTR,
636 (intptr_t)resource_quota);
637 }
638 GRPC_CLOSURE_INIT(&resource_quota->rq_step_closure, rq_step, resource_quota,
639 grpc_combiner_finally_scheduler(resource_quota->combiner));
640 GRPC_CLOSURE_INIT(&resource_quota->rq_reclamation_done_closure,
641 rq_reclamation_done, resource_quota,
642 grpc_combiner_scheduler(resource_quota->combiner));
643 for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
644 resource_quota->roots[i] = nullptr;
645 }
646 return resource_quota;
647 }
648
grpc_resource_quota_unref_internal(grpc_resource_quota * resource_quota)649 void grpc_resource_quota_unref_internal(grpc_resource_quota* resource_quota) {
650 if (gpr_unref(&resource_quota->refs)) {
651 // No outstanding thread quota
652 GPR_ASSERT(resource_quota->num_threads_allocated == 0);
653 GRPC_COMBINER_UNREF(resource_quota->combiner, "resource_quota");
654 gpr_free(resource_quota->name);
655 gpr_free(resource_quota);
656 }
657 }
658
659 /* Public API */
grpc_resource_quota_unref(grpc_resource_quota * resource_quota)660 void grpc_resource_quota_unref(grpc_resource_quota* resource_quota) {
661 grpc_core::ExecCtx exec_ctx;
662 grpc_resource_quota_unref_internal(resource_quota);
663 }
664
grpc_resource_quota_ref_internal(grpc_resource_quota * resource_quota)665 grpc_resource_quota* grpc_resource_quota_ref_internal(
666 grpc_resource_quota* resource_quota) {
667 gpr_ref(&resource_quota->refs);
668 return resource_quota;
669 }
670
671 /* Public API */
grpc_resource_quota_ref(grpc_resource_quota * resource_quota)672 void grpc_resource_quota_ref(grpc_resource_quota* resource_quota) {
673 grpc_resource_quota_ref_internal(resource_quota);
674 }
675
grpc_resource_quota_get_memory_pressure(grpc_resource_quota * resource_quota)676 double grpc_resource_quota_get_memory_pressure(
677 grpc_resource_quota* resource_quota) {
678 return (static_cast<double>(gpr_atm_no_barrier_load(
679 &resource_quota->memory_usage_estimation))) /
680 (static_cast<double>(MEMORY_USAGE_ESTIMATION_MAX));
681 }
682
683 /* Public API */
grpc_resource_quota_set_max_threads(grpc_resource_quota * resource_quota,int new_max_threads)684 void grpc_resource_quota_set_max_threads(grpc_resource_quota* resource_quota,
685 int new_max_threads) {
686 GPR_ASSERT(new_max_threads >= 0);
687 gpr_mu_lock(&resource_quota->thread_count_mu);
688 resource_quota->max_threads = new_max_threads;
689 gpr_mu_unlock(&resource_quota->thread_count_mu);
690 }
691
692 /* Public API */
grpc_resource_quota_resize(grpc_resource_quota * resource_quota,size_t size)693 void grpc_resource_quota_resize(grpc_resource_quota* resource_quota,
694 size_t size) {
695 grpc_core::ExecCtx exec_ctx;
696 rq_resize_args* a = static_cast<rq_resize_args*>(gpr_malloc(sizeof(*a)));
697 a->resource_quota = grpc_resource_quota_ref_internal(resource_quota);
698 a->size = static_cast<int64_t>(size);
699 gpr_atm_no_barrier_store(&resource_quota->last_size,
700 (gpr_atm)GPR_MIN((size_t)GPR_ATM_MAX, size));
701 GRPC_CLOSURE_INIT(&a->closure, rq_resize, a, grpc_schedule_on_exec_ctx);
702 GRPC_CLOSURE_SCHED(&a->closure, GRPC_ERROR_NONE);
703 }
704
grpc_resource_quota_peek_size(grpc_resource_quota * resource_quota)705 size_t grpc_resource_quota_peek_size(grpc_resource_quota* resource_quota) {
706 return static_cast<size_t>(
707 gpr_atm_no_barrier_load(&resource_quota->last_size));
708 }
709
710 /*******************************************************************************
711 * grpc_resource_user channel args api
712 */
713
grpc_resource_quota_from_channel_args(const grpc_channel_args * channel_args)714 grpc_resource_quota* grpc_resource_quota_from_channel_args(
715 const grpc_channel_args* channel_args) {
716 for (size_t i = 0; i < channel_args->num_args; i++) {
717 if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
718 if (channel_args->args[i].type == GRPC_ARG_POINTER) {
719 return grpc_resource_quota_ref_internal(
720 static_cast<grpc_resource_quota*>(
721 channel_args->args[i].value.pointer.p));
722 } else {
723 gpr_log(GPR_DEBUG, GRPC_ARG_RESOURCE_QUOTA " should be a pointer");
724 }
725 }
726 }
727 return grpc_resource_quota_create(nullptr);
728 }
729
rq_copy(void * rq)730 static void* rq_copy(void* rq) {
731 grpc_resource_quota_ref(static_cast<grpc_resource_quota*>(rq));
732 return rq;
733 }
734
rq_destroy(void * rq)735 static void rq_destroy(void* rq) {
736 grpc_resource_quota_unref_internal(static_cast<grpc_resource_quota*>(rq));
737 }
738
rq_cmp(void * a,void * b)739 static int rq_cmp(void* a, void* b) { return GPR_ICMP(a, b); }
740
grpc_resource_quota_arg_vtable(void)741 const grpc_arg_pointer_vtable* grpc_resource_quota_arg_vtable(void) {
742 static const grpc_arg_pointer_vtable vtable = {rq_copy, rq_destroy, rq_cmp};
743 return &vtable;
744 }
745
746 /*******************************************************************************
747 * grpc_resource_user api
748 */
749
grpc_resource_user_create(grpc_resource_quota * resource_quota,const char * name)750 grpc_resource_user* grpc_resource_user_create(
751 grpc_resource_quota* resource_quota, const char* name) {
752 grpc_resource_user* resource_user =
753 static_cast<grpc_resource_user*>(gpr_malloc(sizeof(*resource_user)));
754 resource_user->resource_quota =
755 grpc_resource_quota_ref_internal(resource_quota);
756 GRPC_CLOSURE_INIT(&resource_user->allocate_closure, &ru_allocate,
757 resource_user,
758 grpc_combiner_scheduler(resource_quota->combiner));
759 GRPC_CLOSURE_INIT(&resource_user->add_to_free_pool_closure,
760 &ru_add_to_free_pool, resource_user,
761 grpc_combiner_scheduler(resource_quota->combiner));
762 GRPC_CLOSURE_INIT(&resource_user->post_reclaimer_closure[0],
763 &ru_post_benign_reclaimer, resource_user,
764 grpc_combiner_scheduler(resource_quota->combiner));
765 GRPC_CLOSURE_INIT(&resource_user->post_reclaimer_closure[1],
766 &ru_post_destructive_reclaimer, resource_user,
767 grpc_combiner_scheduler(resource_quota->combiner));
768 GRPC_CLOSURE_INIT(&resource_user->destroy_closure, &ru_destroy, resource_user,
769 grpc_combiner_scheduler(resource_quota->combiner));
770 gpr_mu_init(&resource_user->mu);
771 gpr_atm_rel_store(&resource_user->refs, 1);
772 gpr_atm_rel_store(&resource_user->shutdown, 0);
773 resource_user->free_pool = 0;
774 grpc_closure_list_init(&resource_user->on_allocated);
775 resource_user->allocating = false;
776 resource_user->added_to_free_pool = false;
777 gpr_atm_no_barrier_store(&resource_user->num_threads_allocated, 0);
778 resource_user->reclaimers[0] = nullptr;
779 resource_user->reclaimers[1] = nullptr;
780 resource_user->new_reclaimers[0] = nullptr;
781 resource_user->new_reclaimers[1] = nullptr;
782 resource_user->outstanding_allocations = 0;
783 for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
784 resource_user->links[i].next = resource_user->links[i].prev = nullptr;
785 }
786 if (name != nullptr) {
787 resource_user->name = gpr_strdup(name);
788 } else {
789 gpr_asprintf(&resource_user->name, "anonymous_resource_user_%" PRIxPTR,
790 (intptr_t)resource_user);
791 }
792 return resource_user;
793 }
794
grpc_resource_user_quota(grpc_resource_user * resource_user)795 grpc_resource_quota* grpc_resource_user_quota(
796 grpc_resource_user* resource_user) {
797 return resource_user->resource_quota;
798 }
799
ru_ref_by(grpc_resource_user * resource_user,gpr_atm amount)800 static void ru_ref_by(grpc_resource_user* resource_user, gpr_atm amount) {
801 GPR_ASSERT(amount > 0);
802 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&resource_user->refs, amount) != 0);
803 }
804
ru_unref_by(grpc_resource_user * resource_user,gpr_atm amount)805 static void ru_unref_by(grpc_resource_user* resource_user, gpr_atm amount) {
806 GPR_ASSERT(amount > 0);
807 gpr_atm old = gpr_atm_full_fetch_add(&resource_user->refs, -amount);
808 GPR_ASSERT(old >= amount);
809 if (old == amount) {
810 GRPC_CLOSURE_SCHED(&resource_user->destroy_closure, GRPC_ERROR_NONE);
811 }
812 }
813
grpc_resource_user_ref(grpc_resource_user * resource_user)814 void grpc_resource_user_ref(grpc_resource_user* resource_user) {
815 ru_ref_by(resource_user, 1);
816 }
817
grpc_resource_user_unref(grpc_resource_user * resource_user)818 void grpc_resource_user_unref(grpc_resource_user* resource_user) {
819 ru_unref_by(resource_user, 1);
820 }
821
grpc_resource_user_shutdown(grpc_resource_user * resource_user)822 void grpc_resource_user_shutdown(grpc_resource_user* resource_user) {
823 if (gpr_atm_full_fetch_add(&resource_user->shutdown, 1) == 0) {
824 GRPC_CLOSURE_SCHED(
825 GRPC_CLOSURE_CREATE(
826 ru_shutdown, resource_user,
827 grpc_combiner_scheduler(resource_user->resource_quota->combiner)),
828 GRPC_ERROR_NONE);
829 }
830 }
831
grpc_resource_user_allocate_threads(grpc_resource_user * resource_user,int thread_count)832 bool grpc_resource_user_allocate_threads(grpc_resource_user* resource_user,
833 int thread_count) {
834 GPR_ASSERT(thread_count >= 0);
835 bool is_success = false;
836 gpr_mu_lock(&resource_user->resource_quota->thread_count_mu);
837 grpc_resource_quota* rq = resource_user->resource_quota;
838 if (rq->num_threads_allocated + thread_count <= rq->max_threads) {
839 rq->num_threads_allocated += thread_count;
840 gpr_atm_no_barrier_fetch_add(&resource_user->num_threads_allocated,
841 thread_count);
842 is_success = true;
843 }
844 gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu);
845 return is_success;
846 }
847
grpc_resource_user_free_threads(grpc_resource_user * resource_user,int thread_count)848 void grpc_resource_user_free_threads(grpc_resource_user* resource_user,
849 int thread_count) {
850 GPR_ASSERT(thread_count >= 0);
851 gpr_mu_lock(&resource_user->resource_quota->thread_count_mu);
852 grpc_resource_quota* rq = resource_user->resource_quota;
853 rq->num_threads_allocated -= thread_count;
854 int old_count = static_cast<int>(gpr_atm_no_barrier_fetch_add(
855 &resource_user->num_threads_allocated, -thread_count));
856 if (old_count < thread_count || rq->num_threads_allocated < 0) {
857 gpr_log(GPR_ERROR,
858 "Releasing more threads (%d) than currently allocated (rq threads: "
859 "%d, ru threads: %d)",
860 thread_count, rq->num_threads_allocated + thread_count, old_count);
861 abort();
862 }
863 gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu);
864 }
865
grpc_resource_user_alloc(grpc_resource_user * resource_user,size_t size,grpc_closure * optional_on_done)866 void grpc_resource_user_alloc(grpc_resource_user* resource_user, size_t size,
867 grpc_closure* optional_on_done) {
868 gpr_mu_lock(&resource_user->mu);
869 ru_ref_by(resource_user, static_cast<gpr_atm>(size));
870 resource_user->free_pool -= static_cast<int64_t>(size);
871 resource_user->outstanding_allocations += static_cast<int64_t>(size);
872 if (grpc_resource_quota_trace.enabled()) {
873 gpr_log(GPR_INFO, "RQ %s %s: alloc %" PRIdPTR "; free_pool -> %" PRId64,
874 resource_user->resource_quota->name, resource_user->name, size,
875 resource_user->free_pool);
876 }
877 if (resource_user->free_pool < 0) {
878 grpc_closure_list_append(&resource_user->on_allocated, optional_on_done,
879 GRPC_ERROR_NONE);
880 if (!resource_user->allocating) {
881 resource_user->allocating = true;
882 GRPC_CLOSURE_SCHED(&resource_user->allocate_closure, GRPC_ERROR_NONE);
883 }
884 } else {
885 resource_user->outstanding_allocations -= static_cast<int64_t>(size);
886 GRPC_CLOSURE_SCHED(optional_on_done, GRPC_ERROR_NONE);
887 }
888 gpr_mu_unlock(&resource_user->mu);
889 }
890
grpc_resource_user_free(grpc_resource_user * resource_user,size_t size)891 void grpc_resource_user_free(grpc_resource_user* resource_user, size_t size) {
892 gpr_mu_lock(&resource_user->mu);
893 bool was_zero_or_negative = resource_user->free_pool <= 0;
894 resource_user->free_pool += static_cast<int64_t>(size);
895 if (grpc_resource_quota_trace.enabled()) {
896 gpr_log(GPR_INFO, "RQ %s %s: free %" PRIdPTR "; free_pool -> %" PRId64,
897 resource_user->resource_quota->name, resource_user->name, size,
898 resource_user->free_pool);
899 }
900 bool is_bigger_than_zero = resource_user->free_pool > 0;
901 if (is_bigger_than_zero && was_zero_or_negative &&
902 !resource_user->added_to_free_pool) {
903 resource_user->added_to_free_pool = true;
904 GRPC_CLOSURE_SCHED(&resource_user->add_to_free_pool_closure,
905 GRPC_ERROR_NONE);
906 }
907 gpr_mu_unlock(&resource_user->mu);
908 ru_unref_by(resource_user, static_cast<gpr_atm>(size));
909 }
910
grpc_resource_user_post_reclaimer(grpc_resource_user * resource_user,bool destructive,grpc_closure * closure)911 void grpc_resource_user_post_reclaimer(grpc_resource_user* resource_user,
912 bool destructive,
913 grpc_closure* closure) {
914 GPR_ASSERT(resource_user->new_reclaimers[destructive] == nullptr);
915 resource_user->new_reclaimers[destructive] = closure;
916 GRPC_CLOSURE_SCHED(&resource_user->post_reclaimer_closure[destructive],
917 GRPC_ERROR_NONE);
918 }
919
grpc_resource_user_finish_reclamation(grpc_resource_user * resource_user)920 void grpc_resource_user_finish_reclamation(grpc_resource_user* resource_user) {
921 if (grpc_resource_quota_trace.enabled()) {
922 gpr_log(GPR_INFO, "RQ %s %s: reclamation complete",
923 resource_user->resource_quota->name, resource_user->name);
924 }
925 GRPC_CLOSURE_SCHED(
926 &resource_user->resource_quota->rq_reclamation_done_closure,
927 GRPC_ERROR_NONE);
928 }
929
grpc_resource_user_slice_allocator_init(grpc_resource_user_slice_allocator * slice_allocator,grpc_resource_user * resource_user,grpc_iomgr_cb_func cb,void * p)930 void grpc_resource_user_slice_allocator_init(
931 grpc_resource_user_slice_allocator* slice_allocator,
932 grpc_resource_user* resource_user, grpc_iomgr_cb_func cb, void* p) {
933 GRPC_CLOSURE_INIT(&slice_allocator->on_allocated, ru_allocated_slices,
934 slice_allocator, grpc_schedule_on_exec_ctx);
935 GRPC_CLOSURE_INIT(&slice_allocator->on_done, cb, p,
936 grpc_schedule_on_exec_ctx);
937 slice_allocator->resource_user = resource_user;
938 }
939
grpc_resource_user_alloc_slices(grpc_resource_user_slice_allocator * slice_allocator,size_t length,size_t count,grpc_slice_buffer * dest)940 void grpc_resource_user_alloc_slices(
941 grpc_resource_user_slice_allocator* slice_allocator, size_t length,
942 size_t count, grpc_slice_buffer* dest) {
943 slice_allocator->length = length;
944 slice_allocator->count = count;
945 slice_allocator->dest = dest;
946 grpc_resource_user_alloc(slice_allocator->resource_user, count * length,
947 &slice_allocator->on_allocated);
948 }
949