1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/resource_quota.h"
22
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <stdint.h>
26 #include <string.h>
27
28 #include <string>
29
30 #include "absl/strings/str_cat.h"
31
32 #include <grpc/slice_buffer.h>
33 #include <grpc/support/alloc.h>
34 #include <grpc/support/log.h>
35
36 #include "src/core/lib/gpr/useful.h"
37 #include "src/core/lib/iomgr/combiner.h"
38 #include "src/core/lib/slice/slice_internal.h"
39
40 grpc_core::TraceFlag grpc_resource_quota_trace(false, "resource_quota");
41
42 #define MEMORY_USAGE_ESTIMATION_MAX 65536
43
44 /* Internal linked list pointers for a resource user */
45 struct grpc_resource_user_link {
46 grpc_resource_user* next;
47 grpc_resource_user* prev;
48 };
49 /* Resource users are kept in (potentially) several intrusive linked lists
50 at once. These are the list names. */
51 typedef enum {
52 /* Resource users that are waiting for an allocation */
53 GRPC_RULIST_AWAITING_ALLOCATION,
54 /* Resource users that have free memory available for internal reclamation */
55 GRPC_RULIST_NON_EMPTY_FREE_POOL,
56 /* Resource users that have published a benign reclamation is available */
57 GRPC_RULIST_RECLAIMER_BENIGN,
58 /* Resource users that have published a destructive reclamation is
59 available */
60 GRPC_RULIST_RECLAIMER_DESTRUCTIVE,
61 /* Number of lists: must be last */
62 GRPC_RULIST_COUNT
63 } grpc_rulist;
64
65 struct grpc_resource_user {
66 /* The quota this resource user consumes from */
67 grpc_resource_quota* resource_quota;
68
69 /* Closure to schedule an allocation under the resource quota combiner lock */
70 grpc_closure allocate_closure;
71 /* Closure to publish a non empty free pool under the resource quota combiner
72 lock */
73 grpc_closure add_to_free_pool_closure;
74
75 /* one ref for each ref call (released by grpc_resource_user_unref), and one
76 ref for each byte allocated (released by grpc_resource_user_free) */
77 gpr_atm refs;
78 /* is this resource user unlocked? starts at 0, increases for each shutdown
79 call */
80 gpr_atm shutdown;
81
82 gpr_mu mu;
83 /* The amount of memory (in bytes) this user has cached for its own use: to
84 avoid quota contention, each resource user can keep some memory in
85 addition to what it is immediately using (e.g., for caching), and the quota
86 can pull it back under memory pressure.
87 This value can become negative if more memory has been requested than
88 existed in the free pool, at which point the quota is consulted to bring
89 this value non-negative (asynchronously). */
90 int64_t free_pool;
91 /* A list of closures to call once free_pool becomes non-negative - ie when
92 all outstanding allocations have been granted. */
93 grpc_closure_list on_allocated;
94 /* True if we are currently trying to allocate from the quota, false if not */
95 bool allocating;
96 /* The amount of memory (in bytes) that has been requested from this user
97 * asynchronously but hasn't been granted yet. */
98 int64_t outstanding_allocations;
99 /* True if we are currently trying to add ourselves to the non-free quota
100 list, false otherwise */
101 bool added_to_free_pool;
102
103 /* The number of threads currently allocated to this resource user */
104 gpr_atm num_threads_allocated;
105
106 /* Reclaimers: index 0 is the benign reclaimer, 1 is the destructive reclaimer
107 */
108 grpc_closure* reclaimers[2];
109 /* Reclaimers just posted: once we're in the combiner lock, we'll move them
110 to the array above */
111 grpc_closure* new_reclaimers[2];
112 /* Trampoline closures to finish reclamation and re-enter the quota combiner
113 lock */
114 grpc_closure post_reclaimer_closure[2];
115
116 /* Closure to execute under the quota combiner to de-register and shutdown the
117 resource user */
118 grpc_closure destroy_closure;
119
120 /* Links in the various grpc_rulist lists */
121 grpc_resource_user_link links[GRPC_RULIST_COUNT];
122
123 /* The name of this resource user, for debugging/tracing */
124 std::string name;
125 };
126
127 struct grpc_resource_quota {
128 /* refcount */
129 gpr_refcount refs;
130
131 /* estimate of current memory usage
132 scaled to the range [0..RESOURCE_USAGE_ESTIMATION_MAX] */
133 gpr_atm memory_usage_estimation;
134
135 /* Main combiner lock: all activity on a quota executes under this combiner
136 * (so no mutex is needed for this data structure) */
137 grpc_core::Combiner* combiner;
138 /* Size of the resource quota */
139 int64_t size;
140 /* Amount of free memory in the resource quota */
141 int64_t free_pool;
142 /* Used size of memory in the resource quota. Updated as soon as the resource
143 * users start to allocate or free the memory. */
144 gpr_atm used;
145
146 gpr_atm last_size;
147
148 /* Mutex to protect max_threads and num_threads_allocated */
149 /* Note: We could have used gpr_atm for max_threads and num_threads_allocated
150 * and avoid having this mutex; but in that case, each invocation of the
151 * function grpc_resource_user_allocate_threads() would have had to do at
152 * least two atomic loads (for max_threads and num_threads_allocated) followed
153 * by a CAS (on num_threads_allocated).
154 * Moreover, we expect grpc_resource_user_allocate_threads() to be often
155 * called concurrently thereby increasing the chances of failing the CAS
156 * operation. This additional complexity is not worth the tiny perf gain we
157 * may (or may not) have by using atomics */
158 gpr_mu thread_count_mu;
159
160 /* Max number of threads allowed */
161 int max_threads;
162
163 /* Number of threads currently allocated via this resource_quota object */
164 int num_threads_allocated;
165
166 /* Has rq_step been scheduled to occur? */
167 bool step_scheduled;
168
169 /* Are we currently reclaiming memory */
170 bool reclaiming;
171
172 /* Closure around rq_step */
173 grpc_closure rq_step_closure;
174
175 /* Closure around rq_reclamation_done */
176 grpc_closure rq_reclamation_done_closure;
177
178 /* This is only really usable for debugging: it's always a stale pointer, but
179 a stale pointer that might just be fresh enough to guide us to where the
180 reclamation system is stuck */
181 grpc_closure* debug_only_last_initiated_reclaimer;
182 grpc_resource_user* debug_only_last_reclaimer_resource_user;
183
184 /* Roots of all resource user lists */
185 grpc_resource_user* roots[GRPC_RULIST_COUNT];
186
187 std::string name;
188 };
189
190 static void ru_unref_by(grpc_resource_user* resource_user, gpr_atm amount);
191
192 /*******************************************************************************
193 * list management
194 */
195
rulist_add_head(grpc_resource_user * resource_user,grpc_rulist list)196 static void rulist_add_head(grpc_resource_user* resource_user,
197 grpc_rulist list) {
198 grpc_resource_quota* resource_quota = resource_user->resource_quota;
199 grpc_resource_user** root = &resource_quota->roots[list];
200 if (*root == nullptr) {
201 *root = resource_user;
202 resource_user->links[list].next = resource_user->links[list].prev =
203 resource_user;
204 } else {
205 resource_user->links[list].next = *root;
206 resource_user->links[list].prev = (*root)->links[list].prev;
207 resource_user->links[list].next->links[list].prev =
208 resource_user->links[list].prev->links[list].next = resource_user;
209 *root = resource_user;
210 }
211 }
212
rulist_add_tail(grpc_resource_user * resource_user,grpc_rulist list)213 static void rulist_add_tail(grpc_resource_user* resource_user,
214 grpc_rulist list) {
215 grpc_resource_quota* resource_quota = resource_user->resource_quota;
216 grpc_resource_user** root = &resource_quota->roots[list];
217 if (*root == nullptr) {
218 *root = resource_user;
219 resource_user->links[list].next = resource_user->links[list].prev =
220 resource_user;
221 } else {
222 resource_user->links[list].next = (*root)->links[list].next;
223 resource_user->links[list].prev = *root;
224 resource_user->links[list].next->links[list].prev =
225 resource_user->links[list].prev->links[list].next = resource_user;
226 }
227 }
228
rulist_empty(grpc_resource_quota * resource_quota,grpc_rulist list)229 static bool rulist_empty(grpc_resource_quota* resource_quota,
230 grpc_rulist list) {
231 return resource_quota->roots[list] == nullptr;
232 }
233
rulist_pop_head(grpc_resource_quota * resource_quota,grpc_rulist list)234 static grpc_resource_user* rulist_pop_head(grpc_resource_quota* resource_quota,
235 grpc_rulist list) {
236 grpc_resource_user** root = &resource_quota->roots[list];
237 grpc_resource_user* resource_user = *root;
238 if (resource_user == nullptr) {
239 return nullptr;
240 }
241 if (resource_user->links[list].next == resource_user) {
242 *root = nullptr;
243 } else {
244 resource_user->links[list].next->links[list].prev =
245 resource_user->links[list].prev;
246 resource_user->links[list].prev->links[list].next =
247 resource_user->links[list].next;
248 *root = resource_user->links[list].next;
249 }
250 resource_user->links[list].next = resource_user->links[list].prev = nullptr;
251 return resource_user;
252 }
253
rulist_remove(grpc_resource_user * resource_user,grpc_rulist list)254 static void rulist_remove(grpc_resource_user* resource_user, grpc_rulist list) {
255 if (resource_user->links[list].next == nullptr) return;
256 grpc_resource_quota* resource_quota = resource_user->resource_quota;
257 if (resource_quota->roots[list] == resource_user) {
258 resource_quota->roots[list] = resource_user->links[list].next;
259 if (resource_quota->roots[list] == resource_user) {
260 resource_quota->roots[list] = nullptr;
261 }
262 }
263 resource_user->links[list].next->links[list].prev =
264 resource_user->links[list].prev;
265 resource_user->links[list].prev->links[list].next =
266 resource_user->links[list].next;
267 resource_user->links[list].next = resource_user->links[list].prev = nullptr;
268 }
269
270 /*******************************************************************************
271 * resource quota state machine
272 */
273
274 static bool rq_alloc(grpc_resource_quota* resource_quota);
275 static bool rq_reclaim_from_per_user_free_pool(
276 grpc_resource_quota* resource_quota);
277 static bool rq_reclaim(grpc_resource_quota* resource_quota, bool destructive);
278
rq_step(void * rq,grpc_error_handle)279 static void rq_step(void* rq, grpc_error_handle /*error*/) {
280 grpc_resource_quota* resource_quota = static_cast<grpc_resource_quota*>(rq);
281 resource_quota->step_scheduled = false;
282 do {
283 if (rq_alloc(resource_quota)) goto done;
284 } while (rq_reclaim_from_per_user_free_pool(resource_quota));
285
286 if (!rq_reclaim(resource_quota, false)) {
287 rq_reclaim(resource_quota, true);
288 }
289
290 done:
291 grpc_resource_quota_unref_internal(resource_quota);
292 }
293
rq_step_sched(grpc_resource_quota * resource_quota)294 static void rq_step_sched(grpc_resource_quota* resource_quota) {
295 if (resource_quota->step_scheduled) return;
296 resource_quota->step_scheduled = true;
297 grpc_resource_quota_ref_internal(resource_quota);
298 resource_quota->combiner->FinallyRun(&resource_quota->rq_step_closure,
299 GRPC_ERROR_NONE);
300 }
301
302 /* update the atomically available resource estimate - use no barriers since
303 timeliness of delivery really doesn't matter much */
rq_update_estimate(grpc_resource_quota * resource_quota)304 static void rq_update_estimate(grpc_resource_quota* resource_quota) {
305 gpr_atm memory_usage_estimation = MEMORY_USAGE_ESTIMATION_MAX;
306 if (resource_quota->size != 0) {
307 memory_usage_estimation =
308 GPR_CLAMP((gpr_atm)((1.0 - ((double)resource_quota->free_pool) /
309 ((double)resource_quota->size)) *
310 MEMORY_USAGE_ESTIMATION_MAX),
311 0, MEMORY_USAGE_ESTIMATION_MAX);
312 }
313 gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation,
314 memory_usage_estimation);
315 }
316
317 /* returns true if all allocations are completed */
rq_alloc(grpc_resource_quota * resource_quota)318 static bool rq_alloc(grpc_resource_quota* resource_quota) {
319 grpc_resource_user* resource_user;
320 while ((resource_user = rulist_pop_head(resource_quota,
321 GRPC_RULIST_AWAITING_ALLOCATION))) {
322 gpr_mu_lock(&resource_user->mu);
323 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
324 gpr_log(GPR_INFO,
325 "RQ: check allocation for user %p shutdown=%" PRIdPTR
326 " free_pool=%" PRId64 " outstanding_allocations=%" PRId64,
327 resource_user, gpr_atm_no_barrier_load(&resource_user->shutdown),
328 resource_user->free_pool, resource_user->outstanding_allocations);
329 }
330 if (gpr_atm_no_barrier_load(&resource_user->shutdown)) {
331 resource_user->allocating = false;
332 grpc_closure_list_fail_all(
333 &resource_user->on_allocated,
334 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource user shutdown"));
335 int64_t aborted_allocations = resource_user->outstanding_allocations;
336 resource_user->outstanding_allocations = 0;
337 resource_user->free_pool += aborted_allocations;
338 grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &resource_user->on_allocated);
339 gpr_mu_unlock(&resource_user->mu);
340 if (aborted_allocations > 0) {
341 ru_unref_by(resource_user, static_cast<gpr_atm>(aborted_allocations));
342 }
343 continue;
344 }
345 if (resource_user->free_pool < 0 &&
346 -resource_user->free_pool <= resource_quota->free_pool) {
347 int64_t amt = -resource_user->free_pool;
348 resource_user->free_pool = 0;
349 resource_quota->free_pool -= amt;
350 rq_update_estimate(resource_quota);
351 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
352 gpr_log(GPR_INFO,
353 "RQ %s %s: grant alloc %" PRId64
354 " bytes; rq_free_pool -> %" PRId64,
355 resource_quota->name.c_str(), resource_user->name.c_str(), amt,
356 resource_quota->free_pool);
357 }
358 } else if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace) &&
359 resource_user->free_pool >= 0) {
360 gpr_log(GPR_INFO, "RQ %s %s: discard already satisfied alloc request",
361 resource_quota->name.c_str(), resource_user->name.c_str());
362 }
363 if (resource_user->free_pool >= 0) {
364 resource_user->allocating = false;
365 resource_user->outstanding_allocations = 0;
366 grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &resource_user->on_allocated);
367 gpr_mu_unlock(&resource_user->mu);
368 } else {
369 rulist_add_head(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
370 gpr_mu_unlock(&resource_user->mu);
371 return false;
372 }
373 }
374 return true;
375 }
376
377 /* returns true if any memory could be reclaimed from buffers */
rq_reclaim_from_per_user_free_pool(grpc_resource_quota * resource_quota)378 static bool rq_reclaim_from_per_user_free_pool(
379 grpc_resource_quota* resource_quota) {
380 grpc_resource_user* resource_user;
381 while ((resource_user = rulist_pop_head(resource_quota,
382 GRPC_RULIST_NON_EMPTY_FREE_POOL))) {
383 gpr_mu_lock(&resource_user->mu);
384 resource_user->added_to_free_pool = false;
385 if (resource_user->free_pool > 0) {
386 int64_t amt = resource_user->free_pool;
387 resource_user->free_pool = 0;
388 resource_quota->free_pool += amt;
389 rq_update_estimate(resource_quota);
390 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
391 gpr_log(GPR_INFO,
392 "RQ %s %s: reclaim_from_per_user_free_pool %" PRId64
393 " bytes; rq_free_pool -> %" PRId64,
394 resource_quota->name.c_str(), resource_user->name.c_str(), amt,
395 resource_quota->free_pool);
396 }
397 gpr_mu_unlock(&resource_user->mu);
398 return true;
399 } else {
400 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
401 gpr_log(GPR_INFO,
402 "RQ %s %s: failed to reclaim_from_per_user_free_pool; "
403 "free_pool = %" PRId64 "; rq_free_pool = %" PRId64,
404 resource_quota->name.c_str(), resource_user->name.c_str(),
405 resource_user->free_pool, resource_quota->free_pool);
406 }
407 gpr_mu_unlock(&resource_user->mu);
408 }
409 }
410 return false;
411 }
412
413 /* returns true if reclamation is proceeding */
rq_reclaim(grpc_resource_quota * resource_quota,bool destructive)414 static bool rq_reclaim(grpc_resource_quota* resource_quota, bool destructive) {
415 if (resource_quota->reclaiming) return true;
416 grpc_rulist list = destructive ? GRPC_RULIST_RECLAIMER_DESTRUCTIVE
417 : GRPC_RULIST_RECLAIMER_BENIGN;
418 grpc_resource_user* resource_user = rulist_pop_head(resource_quota, list);
419 if (resource_user == nullptr) return false;
420 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
421 gpr_log(GPR_INFO, "RQ %s %s: initiate %s reclamation",
422 resource_quota->name.c_str(), resource_user->name.c_str(),
423 destructive ? "destructive" : "benign");
424 }
425 resource_quota->reclaiming = true;
426 grpc_resource_quota_ref_internal(resource_quota);
427 grpc_closure* c = resource_user->reclaimers[destructive];
428 GPR_ASSERT(c);
429 resource_quota->debug_only_last_reclaimer_resource_user = resource_user;
430 resource_quota->debug_only_last_initiated_reclaimer = c;
431 resource_user->reclaimers[destructive] = nullptr;
432 grpc_core::ExecCtx::Run(DEBUG_LOCATION, c, GRPC_ERROR_NONE);
433 return true;
434 }
435
436 /*******************************************************************************
437 * ru_slice: a slice implementation that is backed by a grpc_resource_user
438 */
439
440 namespace grpc_core {
441
442 class RuSliceRefcount {
443 public:
Destroy(void * p)444 static void Destroy(void* p) {
445 auto* rc = static_cast<RuSliceRefcount*>(p);
446 rc->~RuSliceRefcount();
447 gpr_free(rc);
448 }
RuSliceRefcount(grpc_resource_user * resource_user,size_t size)449 RuSliceRefcount(grpc_resource_user* resource_user, size_t size)
450 : base_(grpc_slice_refcount::Type::REGULAR, &refs_, Destroy, this,
451 &base_),
452 resource_user_(resource_user),
453 size_(size) {
454 // Nothing to do here.
455 }
~RuSliceRefcount()456 ~RuSliceRefcount() { grpc_resource_user_free(resource_user_, size_); }
457
base_refcount()458 grpc_slice_refcount* base_refcount() { return &base_; }
459
460 private:
461 grpc_slice_refcount base_;
462 RefCount refs_;
463 grpc_resource_user* resource_user_;
464 size_t size_;
465 };
466
467 } // namespace grpc_core
468
ru_slice_create(grpc_resource_user * resource_user,size_t size)469 static grpc_slice ru_slice_create(grpc_resource_user* resource_user,
470 size_t size) {
471 auto* rc = static_cast<grpc_core::RuSliceRefcount*>(
472 gpr_malloc(sizeof(grpc_core::RuSliceRefcount) + size));
473 new (rc) grpc_core::RuSliceRefcount(resource_user, size);
474 grpc_slice slice;
475
476 slice.refcount = rc->base_refcount();
477 slice.data.refcounted.bytes = reinterpret_cast<uint8_t*>(rc + 1);
478 slice.data.refcounted.length = size;
479 return slice;
480 }
481
482 /*******************************************************************************
483 * grpc_resource_quota internal implementation: resource user manipulation under
484 * the combiner
485 */
486
ru_allocate(void * ru,grpc_error_handle)487 static void ru_allocate(void* ru, grpc_error_handle /*error*/) {
488 grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
489 if (rulist_empty(resource_user->resource_quota,
490 GRPC_RULIST_AWAITING_ALLOCATION)) {
491 rq_step_sched(resource_user->resource_quota);
492 }
493 rulist_add_tail(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
494 }
495
ru_add_to_free_pool(void * ru,grpc_error_handle)496 static void ru_add_to_free_pool(void* ru, grpc_error_handle /*error*/) {
497 grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
498 if (!rulist_empty(resource_user->resource_quota,
499 GRPC_RULIST_AWAITING_ALLOCATION) &&
500 rulist_empty(resource_user->resource_quota,
501 GRPC_RULIST_NON_EMPTY_FREE_POOL)) {
502 rq_step_sched(resource_user->resource_quota);
503 }
504 rulist_add_tail(resource_user, GRPC_RULIST_NON_EMPTY_FREE_POOL);
505 }
506
ru_post_reclaimer(grpc_resource_user * resource_user,bool destructive)507 static bool ru_post_reclaimer(grpc_resource_user* resource_user,
508 bool destructive) {
509 grpc_closure* closure = resource_user->new_reclaimers[destructive];
510 GPR_ASSERT(closure != nullptr);
511 resource_user->new_reclaimers[destructive] = nullptr;
512 GPR_ASSERT(resource_user->reclaimers[destructive] == nullptr);
513 if (gpr_atm_acq_load(&resource_user->shutdown) > 0) {
514 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_CANCELLED);
515 return false;
516 }
517 resource_user->reclaimers[destructive] = closure;
518 return true;
519 }
520
ru_post_benign_reclaimer(void * ru,grpc_error_handle)521 static void ru_post_benign_reclaimer(void* ru, grpc_error_handle /*error*/) {
522 grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
523 if (!ru_post_reclaimer(resource_user, false)) return;
524 if (!rulist_empty(resource_user->resource_quota,
525 GRPC_RULIST_AWAITING_ALLOCATION) &&
526 rulist_empty(resource_user->resource_quota,
527 GRPC_RULIST_NON_EMPTY_FREE_POOL) &&
528 rulist_empty(resource_user->resource_quota,
529 GRPC_RULIST_RECLAIMER_BENIGN)) {
530 rq_step_sched(resource_user->resource_quota);
531 }
532 rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
533 }
534
ru_post_destructive_reclaimer(void * ru,grpc_error_handle)535 static void ru_post_destructive_reclaimer(void* ru,
536 grpc_error_handle /*error*/) {
537 grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
538 if (!ru_post_reclaimer(resource_user, true)) return;
539 if (!rulist_empty(resource_user->resource_quota,
540 GRPC_RULIST_AWAITING_ALLOCATION) &&
541 rulist_empty(resource_user->resource_quota,
542 GRPC_RULIST_NON_EMPTY_FREE_POOL) &&
543 rulist_empty(resource_user->resource_quota,
544 GRPC_RULIST_RECLAIMER_BENIGN) &&
545 rulist_empty(resource_user->resource_quota,
546 GRPC_RULIST_RECLAIMER_DESTRUCTIVE)) {
547 rq_step_sched(resource_user->resource_quota);
548 }
549 rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE);
550 }
551
ru_shutdown(void * ru,grpc_error_handle)552 static void ru_shutdown(void* ru, grpc_error_handle /*error*/) {
553 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
554 gpr_log(GPR_INFO, "RU shutdown %p", ru);
555 }
556 grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
557 gpr_mu_lock(&resource_user->mu);
558 grpc_core::ExecCtx::Run(DEBUG_LOCATION, resource_user->reclaimers[0],
559 GRPC_ERROR_CANCELLED);
560 grpc_core::ExecCtx::Run(DEBUG_LOCATION, resource_user->reclaimers[1],
561 GRPC_ERROR_CANCELLED);
562 resource_user->reclaimers[0] = nullptr;
563 resource_user->reclaimers[1] = nullptr;
564 rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
565 rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE);
566 if (resource_user->allocating) {
567 rq_step_sched(resource_user->resource_quota);
568 }
569 gpr_mu_unlock(&resource_user->mu);
570 }
571
ru_destroy(void * ru,grpc_error_handle)572 static void ru_destroy(void* ru, grpc_error_handle /*error*/) {
573 grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
574 GPR_ASSERT(gpr_atm_no_barrier_load(&resource_user->refs) == 0);
575 // Free all the remaining thread quota
576 grpc_resource_user_free_threads(resource_user,
577 static_cast<int>(gpr_atm_no_barrier_load(
578 &resource_user->num_threads_allocated)));
579
580 for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
581 rulist_remove(resource_user, static_cast<grpc_rulist>(i));
582 }
583 grpc_core::ExecCtx::Run(DEBUG_LOCATION, resource_user->reclaimers[0],
584 GRPC_ERROR_CANCELLED);
585 grpc_core::ExecCtx::Run(DEBUG_LOCATION, resource_user->reclaimers[1],
586 GRPC_ERROR_CANCELLED);
587 if (resource_user->free_pool != 0) {
588 resource_user->resource_quota->free_pool += resource_user->free_pool;
589 rq_step_sched(resource_user->resource_quota);
590 }
591 grpc_resource_quota_unref_internal(resource_user->resource_quota);
592 gpr_mu_destroy(&resource_user->mu);
593 delete resource_user;
594 }
595
ru_alloc_slices(grpc_resource_user_slice_allocator * slice_allocator)596 static void ru_alloc_slices(
597 grpc_resource_user_slice_allocator* slice_allocator) {
598 for (size_t i = 0; i < slice_allocator->count; i++) {
599 grpc_slice_buffer_add_indexed(
600 slice_allocator->dest, ru_slice_create(slice_allocator->resource_user,
601 slice_allocator->length));
602 }
603 }
604
ru_allocated_slices(void * arg,grpc_error_handle error)605 static void ru_allocated_slices(void* arg, grpc_error_handle error) {
606 grpc_resource_user_slice_allocator* slice_allocator =
607 static_cast<grpc_resource_user_slice_allocator*>(arg);
608 if (error == GRPC_ERROR_NONE) ru_alloc_slices(slice_allocator);
609 grpc_core::Closure::Run(DEBUG_LOCATION, &slice_allocator->on_done,
610 GRPC_ERROR_REF(error));
611 }
612
613 /*******************************************************************************
614 * grpc_resource_quota internal implementation: quota manipulation under the
615 * combiner
616 */
617
618 struct rq_resize_args {
619 int64_t size;
620 grpc_resource_quota* resource_quota;
621 grpc_closure closure;
622 };
rq_resize(void * args,grpc_error_handle)623 static void rq_resize(void* args, grpc_error_handle /*error*/) {
624 rq_resize_args* a = static_cast<rq_resize_args*>(args);
625 int64_t delta = a->size - a->resource_quota->size;
626 a->resource_quota->size += delta;
627 a->resource_quota->free_pool += delta;
628 rq_update_estimate(a->resource_quota);
629 rq_step_sched(a->resource_quota);
630 grpc_resource_quota_unref_internal(a->resource_quota);
631 gpr_free(a);
632 }
633
rq_reclamation_done(void * rq,grpc_error_handle)634 static void rq_reclamation_done(void* rq, grpc_error_handle /*error*/) {
635 grpc_resource_quota* resource_quota = static_cast<grpc_resource_quota*>(rq);
636 resource_quota->reclaiming = false;
637 rq_step_sched(resource_quota);
638 grpc_resource_quota_unref_internal(resource_quota);
639 }
640
641 /*******************************************************************************
642 * grpc_resource_quota api
643 */
644
645 /* Public API */
grpc_resource_quota_create(const char * name)646 grpc_resource_quota* grpc_resource_quota_create(const char* name) {
647 grpc_resource_quota* resource_quota = new grpc_resource_quota;
648 gpr_ref_init(&resource_quota->refs, 1);
649 resource_quota->combiner = grpc_combiner_create();
650 resource_quota->free_pool = INT64_MAX;
651 resource_quota->size = INT64_MAX;
652 resource_quota->used = 0;
653 gpr_atm_no_barrier_store(&resource_quota->last_size, GPR_ATM_MAX);
654 gpr_mu_init(&resource_quota->thread_count_mu);
655 resource_quota->max_threads = INT_MAX;
656 resource_quota->num_threads_allocated = 0;
657 resource_quota->step_scheduled = false;
658 resource_quota->reclaiming = false;
659 gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation, 0);
660 if (name != nullptr) {
661 resource_quota->name = name;
662 } else {
663 resource_quota->name = absl::StrCat(
664 "anonymous_pool_", reinterpret_cast<intptr_t>(resource_quota));
665 }
666 GRPC_CLOSURE_INIT(&resource_quota->rq_step_closure, rq_step, resource_quota,
667 nullptr);
668 GRPC_CLOSURE_INIT(&resource_quota->rq_reclamation_done_closure,
669 rq_reclamation_done, resource_quota, nullptr);
670 for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
671 resource_quota->roots[i] = nullptr;
672 }
673 return resource_quota;
674 }
675
grpc_resource_quota_unref_internal(grpc_resource_quota * resource_quota)676 void grpc_resource_quota_unref_internal(grpc_resource_quota* resource_quota) {
677 if (gpr_unref(&resource_quota->refs)) {
678 // No outstanding thread quota
679 GPR_ASSERT(resource_quota->num_threads_allocated == 0);
680 GRPC_COMBINER_UNREF(resource_quota->combiner, "resource_quota");
681 gpr_mu_destroy(&resource_quota->thread_count_mu);
682 delete resource_quota;
683 }
684 }
685
686 /* Public API */
grpc_resource_quota_unref(grpc_resource_quota * resource_quota)687 void grpc_resource_quota_unref(grpc_resource_quota* resource_quota) {
688 grpc_core::ExecCtx exec_ctx;
689 grpc_resource_quota_unref_internal(resource_quota);
690 }
691
grpc_resource_quota_ref_internal(grpc_resource_quota * resource_quota)692 grpc_resource_quota* grpc_resource_quota_ref_internal(
693 grpc_resource_quota* resource_quota) {
694 gpr_ref(&resource_quota->refs);
695 return resource_quota;
696 }
697
698 /* Public API */
grpc_resource_quota_ref(grpc_resource_quota * resource_quota)699 void grpc_resource_quota_ref(grpc_resource_quota* resource_quota) {
700 grpc_resource_quota_ref_internal(resource_quota);
701 }
702
grpc_resource_quota_get_memory_pressure(grpc_resource_quota * resource_quota)703 double grpc_resource_quota_get_memory_pressure(
704 grpc_resource_quota* resource_quota) {
705 return (static_cast<double>(gpr_atm_no_barrier_load(
706 &resource_quota->memory_usage_estimation))) /
707 (static_cast<double>(MEMORY_USAGE_ESTIMATION_MAX));
708 }
709
710 /* Public API */
grpc_resource_quota_set_max_threads(grpc_resource_quota * resource_quota,int new_max_threads)711 void grpc_resource_quota_set_max_threads(grpc_resource_quota* resource_quota,
712 int new_max_threads) {
713 GPR_ASSERT(new_max_threads >= 0);
714 gpr_mu_lock(&resource_quota->thread_count_mu);
715 resource_quota->max_threads = new_max_threads;
716 gpr_mu_unlock(&resource_quota->thread_count_mu);
717 }
718
719 /* Public API */
grpc_resource_quota_resize(grpc_resource_quota * resource_quota,size_t size)720 void grpc_resource_quota_resize(grpc_resource_quota* resource_quota,
721 size_t size) {
722 grpc_core::ExecCtx exec_ctx;
723 rq_resize_args* a = static_cast<rq_resize_args*>(gpr_malloc(sizeof(*a)));
724 a->resource_quota = grpc_resource_quota_ref_internal(resource_quota);
725 a->size = static_cast<int64_t>(size);
726 gpr_atm_no_barrier_store(&resource_quota->last_size,
727 (gpr_atm)GPR_MIN((size_t)GPR_ATM_MAX, size));
728 GRPC_CLOSURE_INIT(&a->closure, rq_resize, a, grpc_schedule_on_exec_ctx);
729 grpc_core::ExecCtx::Run(DEBUG_LOCATION, &a->closure, GRPC_ERROR_NONE);
730 }
731
grpc_resource_quota_peek_size(grpc_resource_quota * resource_quota)732 size_t grpc_resource_quota_peek_size(grpc_resource_quota* resource_quota) {
733 return static_cast<size_t>(
734 gpr_atm_no_barrier_load(&resource_quota->last_size));
735 }
736
737 /*******************************************************************************
738 * grpc_resource_user channel args api
739 */
740
grpc_resource_quota_from_channel_args(const grpc_channel_args * channel_args,bool create)741 grpc_resource_quota* grpc_resource_quota_from_channel_args(
742 const grpc_channel_args* channel_args, bool create) {
743 for (size_t i = 0; i < channel_args->num_args; i++) {
744 if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
745 if (channel_args->args[i].type == GRPC_ARG_POINTER) {
746 return grpc_resource_quota_ref_internal(
747 static_cast<grpc_resource_quota*>(
748 channel_args->args[i].value.pointer.p));
749 } else {
750 gpr_log(GPR_DEBUG, GRPC_ARG_RESOURCE_QUOTA " should be a pointer");
751 }
752 }
753 }
754 return create ? grpc_resource_quota_create(nullptr) : nullptr;
755 }
756
rq_copy(void * rq)757 static void* rq_copy(void* rq) {
758 grpc_resource_quota_ref(static_cast<grpc_resource_quota*>(rq));
759 return rq;
760 }
761
rq_destroy(void * rq)762 static void rq_destroy(void* rq) {
763 grpc_resource_quota_unref_internal(static_cast<grpc_resource_quota*>(rq));
764 }
765
rq_cmp(void * a,void * b)766 static int rq_cmp(void* a, void* b) { return GPR_ICMP(a, b); }
767
grpc_resource_quota_arg_vtable(void)768 const grpc_arg_pointer_vtable* grpc_resource_quota_arg_vtable(void) {
769 static const grpc_arg_pointer_vtable vtable = {rq_copy, rq_destroy, rq_cmp};
770 return &vtable;
771 }
772
773 /*******************************************************************************
774 * grpc_resource_user api
775 */
776
grpc_resource_user_create(grpc_resource_quota * resource_quota,const char * name)777 grpc_resource_user* grpc_resource_user_create(
778 grpc_resource_quota* resource_quota, const char* name) {
779 grpc_resource_user* resource_user = new grpc_resource_user;
780 resource_user->resource_quota =
781 grpc_resource_quota_ref_internal(resource_quota);
782 GRPC_CLOSURE_INIT(&resource_user->allocate_closure, &ru_allocate,
783 resource_user, nullptr);
784 GRPC_CLOSURE_INIT(&resource_user->add_to_free_pool_closure,
785 &ru_add_to_free_pool, resource_user, nullptr);
786 GRPC_CLOSURE_INIT(&resource_user->post_reclaimer_closure[0],
787 &ru_post_benign_reclaimer, resource_user, nullptr);
788 GRPC_CLOSURE_INIT(&resource_user->post_reclaimer_closure[1],
789 &ru_post_destructive_reclaimer, resource_user, nullptr);
790 GRPC_CLOSURE_INIT(&resource_user->destroy_closure, &ru_destroy, resource_user,
791 nullptr);
792 gpr_mu_init(&resource_user->mu);
793 gpr_atm_rel_store(&resource_user->refs, 1);
794 gpr_atm_rel_store(&resource_user->shutdown, 0);
795 resource_user->free_pool = 0;
796 grpc_closure_list_init(&resource_user->on_allocated);
797 resource_user->allocating = false;
798 resource_user->added_to_free_pool = false;
799 gpr_atm_no_barrier_store(&resource_user->num_threads_allocated, 0);
800 resource_user->reclaimers[0] = nullptr;
801 resource_user->reclaimers[1] = nullptr;
802 resource_user->new_reclaimers[0] = nullptr;
803 resource_user->new_reclaimers[1] = nullptr;
804 resource_user->outstanding_allocations = 0;
805 for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
806 resource_user->links[i].next = resource_user->links[i].prev = nullptr;
807 }
808 if (name != nullptr) {
809 resource_user->name = name;
810 } else {
811 resource_user->name = absl::StrCat(
812 "anonymous_resource_user_", reinterpret_cast<intptr_t>(resource_user));
813 }
814 return resource_user;
815 }
816
grpc_resource_user_quota(grpc_resource_user * resource_user)817 grpc_resource_quota* grpc_resource_user_quota(
818 grpc_resource_user* resource_user) {
819 return resource_user->resource_quota;
820 }
821
ru_ref_by(grpc_resource_user * resource_user,gpr_atm amount)822 static void ru_ref_by(grpc_resource_user* resource_user, gpr_atm amount) {
823 GPR_ASSERT(amount > 0);
824 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&resource_user->refs, amount) != 0);
825 }
826
ru_unref_by(grpc_resource_user * resource_user,gpr_atm amount)827 static void ru_unref_by(grpc_resource_user* resource_user, gpr_atm amount) {
828 GPR_ASSERT(amount > 0);
829 gpr_atm old = gpr_atm_full_fetch_add(&resource_user->refs, -amount);
830 GPR_ASSERT(old >= amount);
831 if (old == amount) {
832 resource_user->resource_quota->combiner->Run(
833 &resource_user->destroy_closure, GRPC_ERROR_NONE);
834 }
835 }
836
grpc_resource_user_ref(grpc_resource_user * resource_user)837 void grpc_resource_user_ref(grpc_resource_user* resource_user) {
838 ru_ref_by(resource_user, 1);
839 }
840
grpc_resource_user_unref(grpc_resource_user * resource_user)841 void grpc_resource_user_unref(grpc_resource_user* resource_user) {
842 ru_unref_by(resource_user, 1);
843 }
844
grpc_resource_user_shutdown(grpc_resource_user * resource_user)845 void grpc_resource_user_shutdown(grpc_resource_user* resource_user) {
846 if (gpr_atm_full_fetch_add(&resource_user->shutdown, 1) == 0) {
847 resource_user->resource_quota->combiner->Run(
848 GRPC_CLOSURE_CREATE(ru_shutdown, resource_user, nullptr),
849 GRPC_ERROR_NONE);
850 }
851 }
852
grpc_resource_user_allocate_threads(grpc_resource_user * resource_user,int thread_count)853 bool grpc_resource_user_allocate_threads(grpc_resource_user* resource_user,
854 int thread_count) {
855 GPR_ASSERT(thread_count >= 0);
856 bool is_success = false;
857 gpr_mu_lock(&resource_user->resource_quota->thread_count_mu);
858 grpc_resource_quota* rq = resource_user->resource_quota;
859 if (rq->num_threads_allocated + thread_count <= rq->max_threads) {
860 rq->num_threads_allocated += thread_count;
861 gpr_atm_no_barrier_fetch_add(&resource_user->num_threads_allocated,
862 thread_count);
863 is_success = true;
864 }
865 gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu);
866 return is_success;
867 }
868
grpc_resource_user_free_threads(grpc_resource_user * resource_user,int thread_count)869 void grpc_resource_user_free_threads(grpc_resource_user* resource_user,
870 int thread_count) {
871 GPR_ASSERT(thread_count >= 0);
872 gpr_mu_lock(&resource_user->resource_quota->thread_count_mu);
873 grpc_resource_quota* rq = resource_user->resource_quota;
874 rq->num_threads_allocated -= thread_count;
875 int old_count = static_cast<int>(gpr_atm_no_barrier_fetch_add(
876 &resource_user->num_threads_allocated, -thread_count));
877 if (old_count < thread_count || rq->num_threads_allocated < 0) {
878 gpr_log(GPR_ERROR,
879 "Releasing more threads (%d) than currently allocated (rq threads: "
880 "%d, ru threads: %d)",
881 thread_count, rq->num_threads_allocated + thread_count, old_count);
882 abort();
883 }
884 gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu);
885 }
886
resource_user_alloc_locked(grpc_resource_user * resource_user,size_t size,grpc_closure * optional_on_done)887 static bool resource_user_alloc_locked(grpc_resource_user* resource_user,
888 size_t size,
889 grpc_closure* optional_on_done) {
890 ru_ref_by(resource_user, static_cast<gpr_atm>(size));
891 resource_user->free_pool -= static_cast<int64_t>(size);
892 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
893 gpr_log(GPR_INFO, "RQ %s %s: alloc %" PRIdPTR "; free_pool -> %" PRId64,
894 resource_user->resource_quota->name.c_str(),
895 resource_user->name.c_str(), size, resource_user->free_pool);
896 }
897 if (GPR_LIKELY(resource_user->free_pool >= 0)) return true;
898 // Slow path: We need to wait for the free pool to refill.
899 if (optional_on_done != nullptr) {
900 resource_user->outstanding_allocations += static_cast<int64_t>(size);
901 grpc_closure_list_append(&resource_user->on_allocated, optional_on_done,
902 GRPC_ERROR_NONE);
903 }
904 if (!resource_user->allocating) {
905 resource_user->allocating = true;
906 resource_user->resource_quota->combiner->Run(
907 &resource_user->allocate_closure, GRPC_ERROR_NONE);
908 }
909 return false;
910 }
911
grpc_resource_user_safe_alloc(grpc_resource_user * resource_user,size_t size)912 bool grpc_resource_user_safe_alloc(grpc_resource_user* resource_user,
913 size_t size) {
914 if (gpr_atm_no_barrier_load(&resource_user->shutdown)) return false;
915 gpr_mu_lock(&resource_user->mu);
916 grpc_resource_quota* resource_quota = resource_user->resource_quota;
917 bool cas_success;
918 do {
919 gpr_atm used = gpr_atm_no_barrier_load(&resource_quota->used);
920 gpr_atm new_used = used + size;
921 if (static_cast<size_t>(new_used) >
922 grpc_resource_quota_peek_size(resource_quota)) {
923 gpr_mu_unlock(&resource_user->mu);
924 return false;
925 }
926 cas_success = gpr_atm_full_cas(&resource_quota->used, used, new_used);
927 } while (!cas_success);
928 resource_user_alloc_locked(resource_user, size, nullptr);
929 gpr_mu_unlock(&resource_user->mu);
930 return true;
931 }
932
grpc_resource_user_alloc(grpc_resource_user * resource_user,size_t size,grpc_closure * optional_on_done)933 bool grpc_resource_user_alloc(grpc_resource_user* resource_user, size_t size,
934 grpc_closure* optional_on_done) {
935 // TODO(juanlishen): Maybe return immediately if shutting down. Deferring this
936 // because some tests become flaky after the change.
937 gpr_mu_lock(&resource_user->mu);
938 grpc_resource_quota* resource_quota = resource_user->resource_quota;
939 gpr_atm_no_barrier_fetch_add(&resource_quota->used, size);
940 const bool ret =
941 resource_user_alloc_locked(resource_user, size, optional_on_done);
942 gpr_mu_unlock(&resource_user->mu);
943 return ret;
944 }
945
grpc_resource_user_free(grpc_resource_user * resource_user,size_t size)946 void grpc_resource_user_free(grpc_resource_user* resource_user, size_t size) {
947 gpr_mu_lock(&resource_user->mu);
948 grpc_resource_quota* resource_quota = resource_user->resource_quota;
949 gpr_atm prior = gpr_atm_no_barrier_fetch_add(&resource_quota->used, -size);
950 GPR_ASSERT(prior >= static_cast<long>(size));
951 bool was_zero_or_negative = resource_user->free_pool <= 0;
952 resource_user->free_pool += static_cast<int64_t>(size);
953 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
954 gpr_log(GPR_INFO, "RQ %s %s: free %" PRIdPTR "; free_pool -> %" PRId64,
955 resource_user->resource_quota->name.c_str(),
956 resource_user->name.c_str(), size, resource_user->free_pool);
957 }
958 bool is_bigger_than_zero = resource_user->free_pool > 0;
959 if (is_bigger_than_zero && was_zero_or_negative &&
960 !resource_user->added_to_free_pool) {
961 resource_user->added_to_free_pool = true;
962 resource_quota->combiner->Run(&resource_user->add_to_free_pool_closure,
963 GRPC_ERROR_NONE);
964 }
965 gpr_mu_unlock(&resource_user->mu);
966 ru_unref_by(resource_user, static_cast<gpr_atm>(size));
967 }
968
grpc_resource_user_post_reclaimer(grpc_resource_user * resource_user,bool destructive,grpc_closure * closure)969 void grpc_resource_user_post_reclaimer(grpc_resource_user* resource_user,
970 bool destructive,
971 grpc_closure* closure) {
972 GPR_ASSERT(resource_user->new_reclaimers[destructive] == nullptr);
973 resource_user->new_reclaimers[destructive] = closure;
974 resource_user->resource_quota->combiner->Run(
975 &resource_user->post_reclaimer_closure[destructive], GRPC_ERROR_NONE);
976 }
977
grpc_resource_user_finish_reclamation(grpc_resource_user * resource_user)978 void grpc_resource_user_finish_reclamation(grpc_resource_user* resource_user) {
979 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
980 gpr_log(GPR_INFO, "RQ %s %s: reclamation complete",
981 resource_user->resource_quota->name.c_str(),
982 resource_user->name.c_str());
983 }
984 resource_user->resource_quota->combiner->Run(
985 &resource_user->resource_quota->rq_reclamation_done_closure,
986 GRPC_ERROR_NONE);
987 }
988
grpc_resource_user_slice_allocator_init(grpc_resource_user_slice_allocator * slice_allocator,grpc_resource_user * resource_user,grpc_iomgr_cb_func cb,void * p)989 void grpc_resource_user_slice_allocator_init(
990 grpc_resource_user_slice_allocator* slice_allocator,
991 grpc_resource_user* resource_user, grpc_iomgr_cb_func cb, void* p) {
992 GRPC_CLOSURE_INIT(&slice_allocator->on_allocated, ru_allocated_slices,
993 slice_allocator, grpc_schedule_on_exec_ctx);
994 GRPC_CLOSURE_INIT(&slice_allocator->on_done, cb, p,
995 grpc_schedule_on_exec_ctx);
996 slice_allocator->resource_user = resource_user;
997 }
998
grpc_resource_user_alloc_slices(grpc_resource_user_slice_allocator * slice_allocator,size_t length,size_t count,grpc_slice_buffer * dest)999 bool grpc_resource_user_alloc_slices(
1000 grpc_resource_user_slice_allocator* slice_allocator, size_t length,
1001 size_t count, grpc_slice_buffer* dest) {
1002 if (GPR_UNLIKELY(
1003 gpr_atm_no_barrier_load(&slice_allocator->resource_user->shutdown))) {
1004 grpc_core::ExecCtx::Run(
1005 DEBUG_LOCATION, &slice_allocator->on_allocated,
1006 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource user shutdown"));
1007 return false;
1008 }
1009 slice_allocator->length = length;
1010 slice_allocator->count = count;
1011 slice_allocator->dest = dest;
1012 const bool ret =
1013 grpc_resource_user_alloc(slice_allocator->resource_user, count * length,
1014 &slice_allocator->on_allocated);
1015 if (ret) ru_alloc_slices(slice_allocator);
1016 return ret;
1017 }
1018