• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/resource_quota.h"
22 
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <stdint.h>
26 #include <string.h>
27 
28 #include <string>
29 
30 #include "absl/strings/str_cat.h"
31 
32 #include <grpc/slice_buffer.h>
33 #include <grpc/support/alloc.h>
34 #include <grpc/support/log.h>
35 
36 #include "src/core/lib/gpr/useful.h"
37 #include "src/core/lib/iomgr/combiner.h"
38 #include "src/core/lib/slice/slice_internal.h"
39 
40 grpc_core::TraceFlag grpc_resource_quota_trace(false, "resource_quota");
41 
42 #define MEMORY_USAGE_ESTIMATION_MAX 65536
43 
44 /* Internal linked list pointers for a resource user */
45 struct grpc_resource_user_link {
46   grpc_resource_user* next;
47   grpc_resource_user* prev;
48 };
49 /* Resource users are kept in (potentially) several intrusive linked lists
50    at once. These are the list names. */
51 typedef enum {
52   /* Resource users that are waiting for an allocation */
53   GRPC_RULIST_AWAITING_ALLOCATION,
54   /* Resource users that have free memory available for internal reclamation */
55   GRPC_RULIST_NON_EMPTY_FREE_POOL,
56   /* Resource users that have published a benign reclamation is available */
57   GRPC_RULIST_RECLAIMER_BENIGN,
58   /* Resource users that have published a destructive reclamation is
59      available */
60   GRPC_RULIST_RECLAIMER_DESTRUCTIVE,
61   /* Number of lists: must be last */
62   GRPC_RULIST_COUNT
63 } grpc_rulist;
64 
65 struct grpc_resource_user {
66   /* The quota this resource user consumes from */
67   grpc_resource_quota* resource_quota;
68 
69   /* Closure to schedule an allocation under the resource quota combiner lock */
70   grpc_closure allocate_closure;
71   /* Closure to publish a non empty free pool under the resource quota combiner
72      lock */
73   grpc_closure add_to_free_pool_closure;
74 
75   /* one ref for each ref call (released by grpc_resource_user_unref), and one
76      ref for each byte allocated (released by grpc_resource_user_free) */
77   gpr_atm refs;
78   /* is this resource user unlocked? starts at 0, increases for each shutdown
79      call */
80   gpr_atm shutdown;
81 
82   gpr_mu mu;
83   /* The amount of memory (in bytes) this user has cached for its own use: to
84      avoid quota contention, each resource user can keep some memory in
85      addition to what it is immediately using (e.g., for caching), and the quota
86      can pull it back under memory pressure.
87      This value can become negative if more memory has been requested than
88      existed in the free pool, at which point the quota is consulted to bring
89      this value non-negative (asynchronously). */
90   int64_t free_pool;
91   /* A list of closures to call once free_pool becomes non-negative - ie when
92      all outstanding allocations have been granted. */
93   grpc_closure_list on_allocated;
94   /* True if we are currently trying to allocate from the quota, false if not */
95   bool allocating;
96   /* The amount of memory (in bytes) that has been requested from this user
97    * asynchronously but hasn't been granted yet. */
98   int64_t outstanding_allocations;
99   /* True if we are currently trying to add ourselves to the non-free quota
100      list, false otherwise */
101   bool added_to_free_pool;
102 
103   /* The number of threads currently allocated to this resource user */
104   gpr_atm num_threads_allocated;
105 
106   /* Reclaimers: index 0 is the benign reclaimer, 1 is the destructive reclaimer
107    */
108   grpc_closure* reclaimers[2];
109   /* Reclaimers just posted: once we're in the combiner lock, we'll move them
110      to the array above */
111   grpc_closure* new_reclaimers[2];
112   /* Trampoline closures to finish reclamation and re-enter the quota combiner
113      lock */
114   grpc_closure post_reclaimer_closure[2];
115 
116   /* Closure to execute under the quota combiner to de-register and shutdown the
117      resource user */
118   grpc_closure destroy_closure;
119 
120   /* Links in the various grpc_rulist lists */
121   grpc_resource_user_link links[GRPC_RULIST_COUNT];
122 
123   /* The name of this resource user, for debugging/tracing */
124   std::string name;
125 };
126 
127 struct grpc_resource_quota {
128   /* refcount */
129   gpr_refcount refs;
130 
131   /* estimate of current memory usage
132      scaled to the range [0..RESOURCE_USAGE_ESTIMATION_MAX] */
133   gpr_atm memory_usage_estimation;
134 
135   /* Master combiner lock: all activity on a quota executes under this combiner
136    * (so no mutex is needed for this data structure) */
137   grpc_core::Combiner* combiner;
138   /* Size of the resource quota */
139   int64_t size;
140   /* Amount of free memory in the resource quota */
141   int64_t free_pool;
142   /* Used size of memory in the resource quota. Updated as soon as the resource
143    * users start to allocate or free the memory. */
144   gpr_atm used;
145 
146   gpr_atm last_size;
147 
148   /* Mutex to protect max_threads and num_threads_allocated */
149   /* Note: We could have used gpr_atm for max_threads and num_threads_allocated
150    * and avoid having this mutex; but in that case, each invocation of the
151    * function grpc_resource_user_allocate_threads() would have had to do at
152    * least two atomic loads (for max_threads and num_threads_allocated) followed
153    * by a CAS (on num_threads_allocated).
154    * Moreover, we expect grpc_resource_user_allocate_threads() to be often
155    * called concurrently thereby increasing the chances of failing the CAS
156    * operation. This additional complexity is not worth the tiny perf gain we
157    * may (or may not) have by using atomics */
158   gpr_mu thread_count_mu;
159 
160   /* Max number of threads allowed */
161   int max_threads;
162 
163   /* Number of threads currently allocated via this resource_quota object */
164   int num_threads_allocated;
165 
166   /* Has rq_step been scheduled to occur? */
167   bool step_scheduled;
168 
169   /* Are we currently reclaiming memory */
170   bool reclaiming;
171 
172   /* Closure around rq_step */
173   grpc_closure rq_step_closure;
174 
175   /* Closure around rq_reclamation_done */
176   grpc_closure rq_reclamation_done_closure;
177 
178   /* This is only really usable for debugging: it's always a stale pointer, but
179      a stale pointer that might just be fresh enough to guide us to where the
180      reclamation system is stuck */
181   grpc_closure* debug_only_last_initiated_reclaimer;
182   grpc_resource_user* debug_only_last_reclaimer_resource_user;
183 
184   /* Roots of all resource user lists */
185   grpc_resource_user* roots[GRPC_RULIST_COUNT];
186 
187   std::string name;
188 };
189 
190 static void ru_unref_by(grpc_resource_user* resource_user, gpr_atm amount);
191 
192 /*******************************************************************************
193  * list management
194  */
195 
rulist_add_head(grpc_resource_user * resource_user,grpc_rulist list)196 static void rulist_add_head(grpc_resource_user* resource_user,
197                             grpc_rulist list) {
198   grpc_resource_quota* resource_quota = resource_user->resource_quota;
199   grpc_resource_user** root = &resource_quota->roots[list];
200   if (*root == nullptr) {
201     *root = resource_user;
202     resource_user->links[list].next = resource_user->links[list].prev =
203         resource_user;
204   } else {
205     resource_user->links[list].next = *root;
206     resource_user->links[list].prev = (*root)->links[list].prev;
207     resource_user->links[list].next->links[list].prev =
208         resource_user->links[list].prev->links[list].next = resource_user;
209     *root = resource_user;
210   }
211 }
212 
rulist_add_tail(grpc_resource_user * resource_user,grpc_rulist list)213 static void rulist_add_tail(grpc_resource_user* resource_user,
214                             grpc_rulist list) {
215   grpc_resource_quota* resource_quota = resource_user->resource_quota;
216   grpc_resource_user** root = &resource_quota->roots[list];
217   if (*root == nullptr) {
218     *root = resource_user;
219     resource_user->links[list].next = resource_user->links[list].prev =
220         resource_user;
221   } else {
222     resource_user->links[list].next = (*root)->links[list].next;
223     resource_user->links[list].prev = *root;
224     resource_user->links[list].next->links[list].prev =
225         resource_user->links[list].prev->links[list].next = resource_user;
226   }
227 }
228 
rulist_empty(grpc_resource_quota * resource_quota,grpc_rulist list)229 static bool rulist_empty(grpc_resource_quota* resource_quota,
230                          grpc_rulist list) {
231   return resource_quota->roots[list] == nullptr;
232 }
233 
rulist_pop_head(grpc_resource_quota * resource_quota,grpc_rulist list)234 static grpc_resource_user* rulist_pop_head(grpc_resource_quota* resource_quota,
235                                            grpc_rulist list) {
236   grpc_resource_user** root = &resource_quota->roots[list];
237   grpc_resource_user* resource_user = *root;
238   if (resource_user == nullptr) {
239     return nullptr;
240   }
241   if (resource_user->links[list].next == resource_user) {
242     *root = nullptr;
243   } else {
244     resource_user->links[list].next->links[list].prev =
245         resource_user->links[list].prev;
246     resource_user->links[list].prev->links[list].next =
247         resource_user->links[list].next;
248     *root = resource_user->links[list].next;
249   }
250   resource_user->links[list].next = resource_user->links[list].prev = nullptr;
251   return resource_user;
252 }
253 
rulist_remove(grpc_resource_user * resource_user,grpc_rulist list)254 static void rulist_remove(grpc_resource_user* resource_user, grpc_rulist list) {
255   if (resource_user->links[list].next == nullptr) return;
256   grpc_resource_quota* resource_quota = resource_user->resource_quota;
257   if (resource_quota->roots[list] == resource_user) {
258     resource_quota->roots[list] = resource_user->links[list].next;
259     if (resource_quota->roots[list] == resource_user) {
260       resource_quota->roots[list] = nullptr;
261     }
262   }
263   resource_user->links[list].next->links[list].prev =
264       resource_user->links[list].prev;
265   resource_user->links[list].prev->links[list].next =
266       resource_user->links[list].next;
267   resource_user->links[list].next = resource_user->links[list].prev = nullptr;
268 }
269 
270 /*******************************************************************************
271  * resource quota state machine
272  */
273 
274 static bool rq_alloc(grpc_resource_quota* resource_quota);
275 static bool rq_reclaim_from_per_user_free_pool(
276     grpc_resource_quota* resource_quota);
277 static bool rq_reclaim(grpc_resource_quota* resource_quota, bool destructive);
278 
rq_step(void * rq,grpc_error *)279 static void rq_step(void* rq, grpc_error* /*error*/) {
280   grpc_resource_quota* resource_quota = static_cast<grpc_resource_quota*>(rq);
281   resource_quota->step_scheduled = false;
282   do {
283     if (rq_alloc(resource_quota)) goto done;
284   } while (rq_reclaim_from_per_user_free_pool(resource_quota));
285 
286   if (!rq_reclaim(resource_quota, false)) {
287     rq_reclaim(resource_quota, true);
288   }
289 
290 done:
291   grpc_resource_quota_unref_internal(resource_quota);
292 }
293 
rq_step_sched(grpc_resource_quota * resource_quota)294 static void rq_step_sched(grpc_resource_quota* resource_quota) {
295   if (resource_quota->step_scheduled) return;
296   resource_quota->step_scheduled = true;
297   grpc_resource_quota_ref_internal(resource_quota);
298   resource_quota->combiner->FinallyRun(&resource_quota->rq_step_closure,
299                                        GRPC_ERROR_NONE);
300 }
301 
302 /* update the atomically available resource estimate - use no barriers since
303    timeliness of delivery really doesn't matter much */
rq_update_estimate(grpc_resource_quota * resource_quota)304 static void rq_update_estimate(grpc_resource_quota* resource_quota) {
305   gpr_atm memory_usage_estimation = MEMORY_USAGE_ESTIMATION_MAX;
306   if (resource_quota->size != 0) {
307     memory_usage_estimation =
308         GPR_CLAMP((gpr_atm)((1.0 - ((double)resource_quota->free_pool) /
309                                        ((double)resource_quota->size)) *
310                             MEMORY_USAGE_ESTIMATION_MAX),
311                   0, MEMORY_USAGE_ESTIMATION_MAX);
312   }
313   gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation,
314                            memory_usage_estimation);
315 }
316 
317 /* returns true if all allocations are completed */
rq_alloc(grpc_resource_quota * resource_quota)318 static bool rq_alloc(grpc_resource_quota* resource_quota) {
319   grpc_resource_user* resource_user;
320   while ((resource_user = rulist_pop_head(resource_quota,
321                                           GRPC_RULIST_AWAITING_ALLOCATION))) {
322     gpr_mu_lock(&resource_user->mu);
323     if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
324       gpr_log(GPR_INFO,
325               "RQ: check allocation for user %p shutdown=%" PRIdPTR
326               " free_pool=%" PRId64 " outstanding_allocations=%" PRId64,
327               resource_user, gpr_atm_no_barrier_load(&resource_user->shutdown),
328               resource_user->free_pool, resource_user->outstanding_allocations);
329     }
330     if (gpr_atm_no_barrier_load(&resource_user->shutdown)) {
331       resource_user->allocating = false;
332       grpc_closure_list_fail_all(
333           &resource_user->on_allocated,
334           GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource user shutdown"));
335       int64_t aborted_allocations = resource_user->outstanding_allocations;
336       resource_user->outstanding_allocations = 0;
337       resource_user->free_pool += aborted_allocations;
338       grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &resource_user->on_allocated);
339       gpr_mu_unlock(&resource_user->mu);
340       if (aborted_allocations > 0) {
341         ru_unref_by(resource_user, static_cast<gpr_atm>(aborted_allocations));
342       }
343       continue;
344     }
345     if (resource_user->free_pool < 0 &&
346         -resource_user->free_pool <= resource_quota->free_pool) {
347       int64_t amt = -resource_user->free_pool;
348       resource_user->free_pool = 0;
349       resource_quota->free_pool -= amt;
350       rq_update_estimate(resource_quota);
351       if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
352         gpr_log(GPR_INFO,
353                 "RQ %s %s: grant alloc %" PRId64
354                 " bytes; rq_free_pool -> %" PRId64,
355                 resource_quota->name.c_str(), resource_user->name.c_str(), amt,
356                 resource_quota->free_pool);
357       }
358     } else if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace) &&
359                resource_user->free_pool >= 0) {
360       gpr_log(GPR_INFO, "RQ %s %s: discard already satisfied alloc request",
361               resource_quota->name.c_str(), resource_user->name.c_str());
362     }
363     if (resource_user->free_pool >= 0) {
364       resource_user->allocating = false;
365       resource_user->outstanding_allocations = 0;
366       grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &resource_user->on_allocated);
367       gpr_mu_unlock(&resource_user->mu);
368     } else {
369       rulist_add_head(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
370       gpr_mu_unlock(&resource_user->mu);
371       return false;
372     }
373   }
374   return true;
375 }
376 
377 /* returns true if any memory could be reclaimed from buffers */
rq_reclaim_from_per_user_free_pool(grpc_resource_quota * resource_quota)378 static bool rq_reclaim_from_per_user_free_pool(
379     grpc_resource_quota* resource_quota) {
380   grpc_resource_user* resource_user;
381   while ((resource_user = rulist_pop_head(resource_quota,
382                                           GRPC_RULIST_NON_EMPTY_FREE_POOL))) {
383     gpr_mu_lock(&resource_user->mu);
384     resource_user->added_to_free_pool = false;
385     if (resource_user->free_pool > 0) {
386       int64_t amt = resource_user->free_pool;
387       resource_user->free_pool = 0;
388       resource_quota->free_pool += amt;
389       rq_update_estimate(resource_quota);
390       if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
391         gpr_log(GPR_INFO,
392                 "RQ %s %s: reclaim_from_per_user_free_pool %" PRId64
393                 " bytes; rq_free_pool -> %" PRId64,
394                 resource_quota->name.c_str(), resource_user->name.c_str(), amt,
395                 resource_quota->free_pool);
396       }
397       gpr_mu_unlock(&resource_user->mu);
398       return true;
399     } else {
400       if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
401         gpr_log(GPR_INFO,
402                 "RQ %s %s: failed to reclaim_from_per_user_free_pool; "
403                 "free_pool = %" PRId64 "; rq_free_pool = %" PRId64,
404                 resource_quota->name.c_str(), resource_user->name.c_str(),
405                 resource_user->free_pool, resource_quota->free_pool);
406       }
407       gpr_mu_unlock(&resource_user->mu);
408     }
409   }
410   return false;
411 }
412 
413 /* returns true if reclamation is proceeding */
rq_reclaim(grpc_resource_quota * resource_quota,bool destructive)414 static bool rq_reclaim(grpc_resource_quota* resource_quota, bool destructive) {
415   if (resource_quota->reclaiming) return true;
416   grpc_rulist list = destructive ? GRPC_RULIST_RECLAIMER_DESTRUCTIVE
417                                  : GRPC_RULIST_RECLAIMER_BENIGN;
418   grpc_resource_user* resource_user = rulist_pop_head(resource_quota, list);
419   if (resource_user == nullptr) return false;
420   if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
421     gpr_log(GPR_INFO, "RQ %s %s: initiate %s reclamation",
422             resource_quota->name.c_str(), resource_user->name.c_str(),
423             destructive ? "destructive" : "benign");
424   }
425   resource_quota->reclaiming = true;
426   grpc_resource_quota_ref_internal(resource_quota);
427   grpc_closure* c = resource_user->reclaimers[destructive];
428   GPR_ASSERT(c);
429   resource_quota->debug_only_last_reclaimer_resource_user = resource_user;
430   resource_quota->debug_only_last_initiated_reclaimer = c;
431   resource_user->reclaimers[destructive] = nullptr;
432   grpc_core::ExecCtx::Run(DEBUG_LOCATION, c, GRPC_ERROR_NONE);
433   return true;
434 }
435 
436 /*******************************************************************************
437  * ru_slice: a slice implementation that is backed by a grpc_resource_user
438  */
439 
440 namespace grpc_core {
441 
442 class RuSliceRefcount {
443  public:
Destroy(void * p)444   static void Destroy(void* p) {
445     auto* rc = static_cast<RuSliceRefcount*>(p);
446     rc->~RuSliceRefcount();
447     gpr_free(rc);
448   }
RuSliceRefcount(grpc_resource_user * resource_user,size_t size)449   RuSliceRefcount(grpc_resource_user* resource_user, size_t size)
450       : base_(grpc_slice_refcount::Type::REGULAR, &refs_, Destroy, this,
451               &base_),
452         resource_user_(resource_user),
453         size_(size) {
454     // Nothing to do here.
455   }
~RuSliceRefcount()456   ~RuSliceRefcount() { grpc_resource_user_free(resource_user_, size_); }
457 
base_refcount()458   grpc_slice_refcount* base_refcount() { return &base_; }
459 
460  private:
461   grpc_slice_refcount base_;
462   RefCount refs_;
463   grpc_resource_user* resource_user_;
464   size_t size_;
465 };
466 
467 }  // namespace grpc_core
468 
ru_slice_create(grpc_resource_user * resource_user,size_t size)469 static grpc_slice ru_slice_create(grpc_resource_user* resource_user,
470                                   size_t size) {
471   auto* rc = static_cast<grpc_core::RuSliceRefcount*>(
472       gpr_malloc(sizeof(grpc_core::RuSliceRefcount) + size));
473   new (rc) grpc_core::RuSliceRefcount(resource_user, size);
474   grpc_slice slice;
475 
476   slice.refcount = rc->base_refcount();
477   slice.data.refcounted.bytes = reinterpret_cast<uint8_t*>(rc + 1);
478   slice.data.refcounted.length = size;
479   return slice;
480 }
481 
482 /*******************************************************************************
483  * grpc_resource_quota internal implementation: resource user manipulation under
484  * the combiner
485  */
486 
ru_allocate(void * ru,grpc_error *)487 static void ru_allocate(void* ru, grpc_error* /*error*/) {
488   grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
489   if (rulist_empty(resource_user->resource_quota,
490                    GRPC_RULIST_AWAITING_ALLOCATION)) {
491     rq_step_sched(resource_user->resource_quota);
492   }
493   rulist_add_tail(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
494 }
495 
ru_add_to_free_pool(void * ru,grpc_error *)496 static void ru_add_to_free_pool(void* ru, grpc_error* /*error*/) {
497   grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
498   if (!rulist_empty(resource_user->resource_quota,
499                     GRPC_RULIST_AWAITING_ALLOCATION) &&
500       rulist_empty(resource_user->resource_quota,
501                    GRPC_RULIST_NON_EMPTY_FREE_POOL)) {
502     rq_step_sched(resource_user->resource_quota);
503   }
504   rulist_add_tail(resource_user, GRPC_RULIST_NON_EMPTY_FREE_POOL);
505 }
506 
ru_post_reclaimer(grpc_resource_user * resource_user,bool destructive)507 static bool ru_post_reclaimer(grpc_resource_user* resource_user,
508                               bool destructive) {
509   grpc_closure* closure = resource_user->new_reclaimers[destructive];
510   GPR_ASSERT(closure != nullptr);
511   resource_user->new_reclaimers[destructive] = nullptr;
512   GPR_ASSERT(resource_user->reclaimers[destructive] == nullptr);
513   if (gpr_atm_acq_load(&resource_user->shutdown) > 0) {
514     grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_CANCELLED);
515     return false;
516   }
517   resource_user->reclaimers[destructive] = closure;
518   return true;
519 }
520 
ru_post_benign_reclaimer(void * ru,grpc_error *)521 static void ru_post_benign_reclaimer(void* ru, grpc_error* /*error*/) {
522   grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
523   if (!ru_post_reclaimer(resource_user, false)) return;
524   if (!rulist_empty(resource_user->resource_quota,
525                     GRPC_RULIST_AWAITING_ALLOCATION) &&
526       rulist_empty(resource_user->resource_quota,
527                    GRPC_RULIST_NON_EMPTY_FREE_POOL) &&
528       rulist_empty(resource_user->resource_quota,
529                    GRPC_RULIST_RECLAIMER_BENIGN)) {
530     rq_step_sched(resource_user->resource_quota);
531   }
532   rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
533 }
534 
ru_post_destructive_reclaimer(void * ru,grpc_error *)535 static void ru_post_destructive_reclaimer(void* ru, grpc_error* /*error*/) {
536   grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
537   if (!ru_post_reclaimer(resource_user, true)) return;
538   if (!rulist_empty(resource_user->resource_quota,
539                     GRPC_RULIST_AWAITING_ALLOCATION) &&
540       rulist_empty(resource_user->resource_quota,
541                    GRPC_RULIST_NON_EMPTY_FREE_POOL) &&
542       rulist_empty(resource_user->resource_quota,
543                    GRPC_RULIST_RECLAIMER_BENIGN) &&
544       rulist_empty(resource_user->resource_quota,
545                    GRPC_RULIST_RECLAIMER_DESTRUCTIVE)) {
546     rq_step_sched(resource_user->resource_quota);
547   }
548   rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE);
549 }
550 
ru_shutdown(void * ru,grpc_error *)551 static void ru_shutdown(void* ru, grpc_error* /*error*/) {
552   if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
553     gpr_log(GPR_INFO, "RU shutdown %p", ru);
554   }
555   grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
556   gpr_mu_lock(&resource_user->mu);
557   grpc_core::ExecCtx::Run(DEBUG_LOCATION, resource_user->reclaimers[0],
558                           GRPC_ERROR_CANCELLED);
559   grpc_core::ExecCtx::Run(DEBUG_LOCATION, resource_user->reclaimers[1],
560                           GRPC_ERROR_CANCELLED);
561   resource_user->reclaimers[0] = nullptr;
562   resource_user->reclaimers[1] = nullptr;
563   rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
564   rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE);
565   if (resource_user->allocating) {
566     rq_step_sched(resource_user->resource_quota);
567   }
568   gpr_mu_unlock(&resource_user->mu);
569 }
570 
ru_destroy(void * ru,grpc_error *)571 static void ru_destroy(void* ru, grpc_error* /*error*/) {
572   grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
573   GPR_ASSERT(gpr_atm_no_barrier_load(&resource_user->refs) == 0);
574   // Free all the remaining thread quota
575   grpc_resource_user_free_threads(resource_user,
576                                   static_cast<int>(gpr_atm_no_barrier_load(
577                                       &resource_user->num_threads_allocated)));
578 
579   for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
580     rulist_remove(resource_user, static_cast<grpc_rulist>(i));
581   }
582   grpc_core::ExecCtx::Run(DEBUG_LOCATION, resource_user->reclaimers[0],
583                           GRPC_ERROR_CANCELLED);
584   grpc_core::ExecCtx::Run(DEBUG_LOCATION, resource_user->reclaimers[1],
585                           GRPC_ERROR_CANCELLED);
586   if (resource_user->free_pool != 0) {
587     resource_user->resource_quota->free_pool += resource_user->free_pool;
588     rq_step_sched(resource_user->resource_quota);
589   }
590   grpc_resource_quota_unref_internal(resource_user->resource_quota);
591   gpr_mu_destroy(&resource_user->mu);
592   delete resource_user;
593 }
594 
ru_alloc_slices(grpc_resource_user_slice_allocator * slice_allocator)595 static void ru_alloc_slices(
596     grpc_resource_user_slice_allocator* slice_allocator) {
597   for (size_t i = 0; i < slice_allocator->count; i++) {
598     grpc_slice_buffer_add_indexed(
599         slice_allocator->dest, ru_slice_create(slice_allocator->resource_user,
600                                                slice_allocator->length));
601   }
602 }
603 
ru_allocated_slices(void * arg,grpc_error * error)604 static void ru_allocated_slices(void* arg, grpc_error* error) {
605   grpc_resource_user_slice_allocator* slice_allocator =
606       static_cast<grpc_resource_user_slice_allocator*>(arg);
607   if (error == GRPC_ERROR_NONE) ru_alloc_slices(slice_allocator);
608   grpc_core::Closure::Run(DEBUG_LOCATION, &slice_allocator->on_done,
609                           GRPC_ERROR_REF(error));
610 }
611 
612 /*******************************************************************************
613  * grpc_resource_quota internal implementation: quota manipulation under the
614  * combiner
615  */
616 
617 struct rq_resize_args {
618   int64_t size;
619   grpc_resource_quota* resource_quota;
620   grpc_closure closure;
621 };
rq_resize(void * args,grpc_error *)622 static void rq_resize(void* args, grpc_error* /*error*/) {
623   rq_resize_args* a = static_cast<rq_resize_args*>(args);
624   int64_t delta = a->size - a->resource_quota->size;
625   a->resource_quota->size += delta;
626   a->resource_quota->free_pool += delta;
627   rq_update_estimate(a->resource_quota);
628   rq_step_sched(a->resource_quota);
629   grpc_resource_quota_unref_internal(a->resource_quota);
630   gpr_free(a);
631 }
632 
rq_reclamation_done(void * rq,grpc_error *)633 static void rq_reclamation_done(void* rq, grpc_error* /*error*/) {
634   grpc_resource_quota* resource_quota = static_cast<grpc_resource_quota*>(rq);
635   resource_quota->reclaiming = false;
636   rq_step_sched(resource_quota);
637   grpc_resource_quota_unref_internal(resource_quota);
638 }
639 
640 /*******************************************************************************
641  * grpc_resource_quota api
642  */
643 
644 /* Public API */
grpc_resource_quota_create(const char * name)645 grpc_resource_quota* grpc_resource_quota_create(const char* name) {
646   grpc_resource_quota* resource_quota = new grpc_resource_quota;
647   gpr_ref_init(&resource_quota->refs, 1);
648   resource_quota->combiner = grpc_combiner_create();
649   resource_quota->free_pool = INT64_MAX;
650   resource_quota->size = INT64_MAX;
651   resource_quota->used = 0;
652   gpr_atm_no_barrier_store(&resource_quota->last_size, GPR_ATM_MAX);
653   gpr_mu_init(&resource_quota->thread_count_mu);
654   resource_quota->max_threads = INT_MAX;
655   resource_quota->num_threads_allocated = 0;
656   resource_quota->step_scheduled = false;
657   resource_quota->reclaiming = false;
658   gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation, 0);
659   if (name != nullptr) {
660     resource_quota->name = name;
661   } else {
662     resource_quota->name = absl::StrCat(
663         "anonymous_pool_", reinterpret_cast<intptr_t>(resource_quota));
664   }
665   GRPC_CLOSURE_INIT(&resource_quota->rq_step_closure, rq_step, resource_quota,
666                     nullptr);
667   GRPC_CLOSURE_INIT(&resource_quota->rq_reclamation_done_closure,
668                     rq_reclamation_done, resource_quota, nullptr);
669   for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
670     resource_quota->roots[i] = nullptr;
671   }
672   return resource_quota;
673 }
674 
grpc_resource_quota_unref_internal(grpc_resource_quota * resource_quota)675 void grpc_resource_quota_unref_internal(grpc_resource_quota* resource_quota) {
676   if (gpr_unref(&resource_quota->refs)) {
677     // No outstanding thread quota
678     GPR_ASSERT(resource_quota->num_threads_allocated == 0);
679     GRPC_COMBINER_UNREF(resource_quota->combiner, "resource_quota");
680     gpr_mu_destroy(&resource_quota->thread_count_mu);
681     delete resource_quota;
682   }
683 }
684 
685 /* Public API */
grpc_resource_quota_unref(grpc_resource_quota * resource_quota)686 void grpc_resource_quota_unref(grpc_resource_quota* resource_quota) {
687   grpc_core::ExecCtx exec_ctx;
688   grpc_resource_quota_unref_internal(resource_quota);
689 }
690 
grpc_resource_quota_ref_internal(grpc_resource_quota * resource_quota)691 grpc_resource_quota* grpc_resource_quota_ref_internal(
692     grpc_resource_quota* resource_quota) {
693   gpr_ref(&resource_quota->refs);
694   return resource_quota;
695 }
696 
697 /* Public API */
grpc_resource_quota_ref(grpc_resource_quota * resource_quota)698 void grpc_resource_quota_ref(grpc_resource_quota* resource_quota) {
699   grpc_resource_quota_ref_internal(resource_quota);
700 }
701 
grpc_resource_quota_get_memory_pressure(grpc_resource_quota * resource_quota)702 double grpc_resource_quota_get_memory_pressure(
703     grpc_resource_quota* resource_quota) {
704   return (static_cast<double>(gpr_atm_no_barrier_load(
705              &resource_quota->memory_usage_estimation))) /
706          (static_cast<double>(MEMORY_USAGE_ESTIMATION_MAX));
707 }
708 
709 /* Public API */
grpc_resource_quota_set_max_threads(grpc_resource_quota * resource_quota,int new_max_threads)710 void grpc_resource_quota_set_max_threads(grpc_resource_quota* resource_quota,
711                                          int new_max_threads) {
712   GPR_ASSERT(new_max_threads >= 0);
713   gpr_mu_lock(&resource_quota->thread_count_mu);
714   resource_quota->max_threads = new_max_threads;
715   gpr_mu_unlock(&resource_quota->thread_count_mu);
716 }
717 
718 /* Public API */
grpc_resource_quota_resize(grpc_resource_quota * resource_quota,size_t size)719 void grpc_resource_quota_resize(grpc_resource_quota* resource_quota,
720                                 size_t size) {
721   grpc_core::ExecCtx exec_ctx;
722   rq_resize_args* a = static_cast<rq_resize_args*>(gpr_malloc(sizeof(*a)));
723   a->resource_quota = grpc_resource_quota_ref_internal(resource_quota);
724   a->size = static_cast<int64_t>(size);
725   gpr_atm_no_barrier_store(&resource_quota->last_size,
726                            (gpr_atm)GPR_MIN((size_t)GPR_ATM_MAX, size));
727   GRPC_CLOSURE_INIT(&a->closure, rq_resize, a, grpc_schedule_on_exec_ctx);
728   grpc_core::ExecCtx::Run(DEBUG_LOCATION, &a->closure, GRPC_ERROR_NONE);
729 }
730 
grpc_resource_quota_peek_size(grpc_resource_quota * resource_quota)731 size_t grpc_resource_quota_peek_size(grpc_resource_quota* resource_quota) {
732   return static_cast<size_t>(
733       gpr_atm_no_barrier_load(&resource_quota->last_size));
734 }
735 
736 /*******************************************************************************
737  * grpc_resource_user channel args api
738  */
739 
grpc_resource_quota_from_channel_args(const grpc_channel_args * channel_args,bool create)740 grpc_resource_quota* grpc_resource_quota_from_channel_args(
741     const grpc_channel_args* channel_args, bool create) {
742   for (size_t i = 0; i < channel_args->num_args; i++) {
743     if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
744       if (channel_args->args[i].type == GRPC_ARG_POINTER) {
745         return grpc_resource_quota_ref_internal(
746             static_cast<grpc_resource_quota*>(
747                 channel_args->args[i].value.pointer.p));
748       } else {
749         gpr_log(GPR_DEBUG, GRPC_ARG_RESOURCE_QUOTA " should be a pointer");
750       }
751     }
752   }
753   return create ? grpc_resource_quota_create(nullptr) : nullptr;
754 }
755 
rq_copy(void * rq)756 static void* rq_copy(void* rq) {
757   grpc_resource_quota_ref(static_cast<grpc_resource_quota*>(rq));
758   return rq;
759 }
760 
rq_destroy(void * rq)761 static void rq_destroy(void* rq) {
762   grpc_resource_quota_unref_internal(static_cast<grpc_resource_quota*>(rq));
763 }
764 
rq_cmp(void * a,void * b)765 static int rq_cmp(void* a, void* b) { return GPR_ICMP(a, b); }
766 
grpc_resource_quota_arg_vtable(void)767 const grpc_arg_pointer_vtable* grpc_resource_quota_arg_vtable(void) {
768   static const grpc_arg_pointer_vtable vtable = {rq_copy, rq_destroy, rq_cmp};
769   return &vtable;
770 }
771 
772 /*******************************************************************************
773  * grpc_resource_user api
774  */
775 
grpc_resource_user_create(grpc_resource_quota * resource_quota,const char * name)776 grpc_resource_user* grpc_resource_user_create(
777     grpc_resource_quota* resource_quota, const char* name) {
778   grpc_resource_user* resource_user = new grpc_resource_user;
779   resource_user->resource_quota =
780       grpc_resource_quota_ref_internal(resource_quota);
781   GRPC_CLOSURE_INIT(&resource_user->allocate_closure, &ru_allocate,
782                     resource_user, nullptr);
783   GRPC_CLOSURE_INIT(&resource_user->add_to_free_pool_closure,
784                     &ru_add_to_free_pool, resource_user, nullptr);
785   GRPC_CLOSURE_INIT(&resource_user->post_reclaimer_closure[0],
786                     &ru_post_benign_reclaimer, resource_user, nullptr);
787   GRPC_CLOSURE_INIT(&resource_user->post_reclaimer_closure[1],
788                     &ru_post_destructive_reclaimer, resource_user, nullptr);
789   GRPC_CLOSURE_INIT(&resource_user->destroy_closure, &ru_destroy, resource_user,
790                     nullptr);
791   gpr_mu_init(&resource_user->mu);
792   gpr_atm_rel_store(&resource_user->refs, 1);
793   gpr_atm_rel_store(&resource_user->shutdown, 0);
794   resource_user->free_pool = 0;
795   grpc_closure_list_init(&resource_user->on_allocated);
796   resource_user->allocating = false;
797   resource_user->added_to_free_pool = false;
798   gpr_atm_no_barrier_store(&resource_user->num_threads_allocated, 0);
799   resource_user->reclaimers[0] = nullptr;
800   resource_user->reclaimers[1] = nullptr;
801   resource_user->new_reclaimers[0] = nullptr;
802   resource_user->new_reclaimers[1] = nullptr;
803   resource_user->outstanding_allocations = 0;
804   for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
805     resource_user->links[i].next = resource_user->links[i].prev = nullptr;
806   }
807   if (name != nullptr) {
808     resource_user->name = name;
809   } else {
810     resource_user->name = absl::StrCat(
811         "anonymous_resource_user_", reinterpret_cast<intptr_t>(resource_user));
812   }
813   return resource_user;
814 }
815 
grpc_resource_user_quota(grpc_resource_user * resource_user)816 grpc_resource_quota* grpc_resource_user_quota(
817     grpc_resource_user* resource_user) {
818   return resource_user->resource_quota;
819 }
820 
ru_ref_by(grpc_resource_user * resource_user,gpr_atm amount)821 static void ru_ref_by(grpc_resource_user* resource_user, gpr_atm amount) {
822   GPR_ASSERT(amount > 0);
823   GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&resource_user->refs, amount) != 0);
824 }
825 
ru_unref_by(grpc_resource_user * resource_user,gpr_atm amount)826 static void ru_unref_by(grpc_resource_user* resource_user, gpr_atm amount) {
827   GPR_ASSERT(amount > 0);
828   gpr_atm old = gpr_atm_full_fetch_add(&resource_user->refs, -amount);
829   GPR_ASSERT(old >= amount);
830   if (old == amount) {
831     resource_user->resource_quota->combiner->Run(
832         &resource_user->destroy_closure, GRPC_ERROR_NONE);
833   }
834 }
835 
grpc_resource_user_ref(grpc_resource_user * resource_user)836 void grpc_resource_user_ref(grpc_resource_user* resource_user) {
837   ru_ref_by(resource_user, 1);
838 }
839 
grpc_resource_user_unref(grpc_resource_user * resource_user)840 void grpc_resource_user_unref(grpc_resource_user* resource_user) {
841   ru_unref_by(resource_user, 1);
842 }
843 
grpc_resource_user_shutdown(grpc_resource_user * resource_user)844 void grpc_resource_user_shutdown(grpc_resource_user* resource_user) {
845   if (gpr_atm_full_fetch_add(&resource_user->shutdown, 1) == 0) {
846     resource_user->resource_quota->combiner->Run(
847         GRPC_CLOSURE_CREATE(ru_shutdown, resource_user, nullptr),
848         GRPC_ERROR_NONE);
849   }
850 }
851 
grpc_resource_user_allocate_threads(grpc_resource_user * resource_user,int thread_count)852 bool grpc_resource_user_allocate_threads(grpc_resource_user* resource_user,
853                                          int thread_count) {
854   GPR_ASSERT(thread_count >= 0);
855   bool is_success = false;
856   gpr_mu_lock(&resource_user->resource_quota->thread_count_mu);
857   grpc_resource_quota* rq = resource_user->resource_quota;
858   if (rq->num_threads_allocated + thread_count <= rq->max_threads) {
859     rq->num_threads_allocated += thread_count;
860     gpr_atm_no_barrier_fetch_add(&resource_user->num_threads_allocated,
861                                  thread_count);
862     is_success = true;
863   }
864   gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu);
865   return is_success;
866 }
867 
grpc_resource_user_free_threads(grpc_resource_user * resource_user,int thread_count)868 void grpc_resource_user_free_threads(grpc_resource_user* resource_user,
869                                      int thread_count) {
870   GPR_ASSERT(thread_count >= 0);
871   gpr_mu_lock(&resource_user->resource_quota->thread_count_mu);
872   grpc_resource_quota* rq = resource_user->resource_quota;
873   rq->num_threads_allocated -= thread_count;
874   int old_count = static_cast<int>(gpr_atm_no_barrier_fetch_add(
875       &resource_user->num_threads_allocated, -thread_count));
876   if (old_count < thread_count || rq->num_threads_allocated < 0) {
877     gpr_log(GPR_ERROR,
878             "Releasing more threads (%d) than currently allocated (rq threads: "
879             "%d, ru threads: %d)",
880             thread_count, rq->num_threads_allocated + thread_count, old_count);
881     abort();
882   }
883   gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu);
884 }
885 
resource_user_alloc_locked(grpc_resource_user * resource_user,size_t size,grpc_closure * optional_on_done)886 static bool resource_user_alloc_locked(grpc_resource_user* resource_user,
887                                        size_t size,
888                                        grpc_closure* optional_on_done) {
889   ru_ref_by(resource_user, static_cast<gpr_atm>(size));
890   resource_user->free_pool -= static_cast<int64_t>(size);
891   if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
892     gpr_log(GPR_INFO, "RQ %s %s: alloc %" PRIdPTR "; free_pool -> %" PRId64,
893             resource_user->resource_quota->name.c_str(),
894             resource_user->name.c_str(), size, resource_user->free_pool);
895   }
896   if (GPR_LIKELY(resource_user->free_pool >= 0)) return true;
897   // Slow path: We need to wait for the free pool to refill.
898   if (optional_on_done != nullptr) {
899     resource_user->outstanding_allocations += static_cast<int64_t>(size);
900     grpc_closure_list_append(&resource_user->on_allocated, optional_on_done,
901                              GRPC_ERROR_NONE);
902   }
903   if (!resource_user->allocating) {
904     resource_user->allocating = true;
905     resource_user->resource_quota->combiner->Run(
906         &resource_user->allocate_closure, GRPC_ERROR_NONE);
907   }
908   return false;
909 }
910 
grpc_resource_user_safe_alloc(grpc_resource_user * resource_user,size_t size)911 bool grpc_resource_user_safe_alloc(grpc_resource_user* resource_user,
912                                    size_t size) {
913   if (gpr_atm_no_barrier_load(&resource_user->shutdown)) return false;
914   gpr_mu_lock(&resource_user->mu);
915   grpc_resource_quota* resource_quota = resource_user->resource_quota;
916   bool cas_success;
917   do {
918     gpr_atm used = gpr_atm_no_barrier_load(&resource_quota->used);
919     gpr_atm new_used = used + size;
920     if (static_cast<size_t>(new_used) >
921         grpc_resource_quota_peek_size(resource_quota)) {
922       gpr_mu_unlock(&resource_user->mu);
923       return false;
924     }
925     cas_success = gpr_atm_full_cas(&resource_quota->used, used, new_used);
926   } while (!cas_success);
927   resource_user_alloc_locked(resource_user, size, nullptr);
928   gpr_mu_unlock(&resource_user->mu);
929   return true;
930 }
931 
grpc_resource_user_alloc(grpc_resource_user * resource_user,size_t size,grpc_closure * optional_on_done)932 bool grpc_resource_user_alloc(grpc_resource_user* resource_user, size_t size,
933                               grpc_closure* optional_on_done) {
934   // TODO(juanlishen): Maybe return immediately if shutting down. Deferring this
935   // because some tests become flaky after the change.
936   gpr_mu_lock(&resource_user->mu);
937   grpc_resource_quota* resource_quota = resource_user->resource_quota;
938   gpr_atm_no_barrier_fetch_add(&resource_quota->used, size);
939   const bool ret =
940       resource_user_alloc_locked(resource_user, size, optional_on_done);
941   gpr_mu_unlock(&resource_user->mu);
942   return ret;
943 }
944 
grpc_resource_user_free(grpc_resource_user * resource_user,size_t size)945 void grpc_resource_user_free(grpc_resource_user* resource_user, size_t size) {
946   gpr_mu_lock(&resource_user->mu);
947   grpc_resource_quota* resource_quota = resource_user->resource_quota;
948   gpr_atm prior = gpr_atm_no_barrier_fetch_add(&resource_quota->used, -size);
949   GPR_ASSERT(prior >= static_cast<long>(size));
950   bool was_zero_or_negative = resource_user->free_pool <= 0;
951   resource_user->free_pool += static_cast<int64_t>(size);
952   if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
953     gpr_log(GPR_INFO, "RQ %s %s: free %" PRIdPTR "; free_pool -> %" PRId64,
954             resource_user->resource_quota->name.c_str(),
955             resource_user->name.c_str(), size, resource_user->free_pool);
956   }
957   bool is_bigger_than_zero = resource_user->free_pool > 0;
958   if (is_bigger_than_zero && was_zero_or_negative &&
959       !resource_user->added_to_free_pool) {
960     resource_user->added_to_free_pool = true;
961     resource_quota->combiner->Run(&resource_user->add_to_free_pool_closure,
962                                   GRPC_ERROR_NONE);
963   }
964   gpr_mu_unlock(&resource_user->mu);
965   ru_unref_by(resource_user, static_cast<gpr_atm>(size));
966 }
967 
grpc_resource_user_post_reclaimer(grpc_resource_user * resource_user,bool destructive,grpc_closure * closure)968 void grpc_resource_user_post_reclaimer(grpc_resource_user* resource_user,
969                                        bool destructive,
970                                        grpc_closure* closure) {
971   GPR_ASSERT(resource_user->new_reclaimers[destructive] == nullptr);
972   resource_user->new_reclaimers[destructive] = closure;
973   resource_user->resource_quota->combiner->Run(
974       &resource_user->post_reclaimer_closure[destructive], GRPC_ERROR_NONE);
975 }
976 
grpc_resource_user_finish_reclamation(grpc_resource_user * resource_user)977 void grpc_resource_user_finish_reclamation(grpc_resource_user* resource_user) {
978   if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
979     gpr_log(GPR_INFO, "RQ %s %s: reclamation complete",
980             resource_user->resource_quota->name.c_str(),
981             resource_user->name.c_str());
982   }
983   resource_user->resource_quota->combiner->Run(
984       &resource_user->resource_quota->rq_reclamation_done_closure,
985       GRPC_ERROR_NONE);
986 }
987 
grpc_resource_user_slice_allocator_init(grpc_resource_user_slice_allocator * slice_allocator,grpc_resource_user * resource_user,grpc_iomgr_cb_func cb,void * p)988 void grpc_resource_user_slice_allocator_init(
989     grpc_resource_user_slice_allocator* slice_allocator,
990     grpc_resource_user* resource_user, grpc_iomgr_cb_func cb, void* p) {
991   GRPC_CLOSURE_INIT(&slice_allocator->on_allocated, ru_allocated_slices,
992                     slice_allocator, grpc_schedule_on_exec_ctx);
993   GRPC_CLOSURE_INIT(&slice_allocator->on_done, cb, p,
994                     grpc_schedule_on_exec_ctx);
995   slice_allocator->resource_user = resource_user;
996 }
997 
grpc_resource_user_alloc_slices(grpc_resource_user_slice_allocator * slice_allocator,size_t length,size_t count,grpc_slice_buffer * dest)998 bool grpc_resource_user_alloc_slices(
999     grpc_resource_user_slice_allocator* slice_allocator, size_t length,
1000     size_t count, grpc_slice_buffer* dest) {
1001   if (GPR_UNLIKELY(
1002           gpr_atm_no_barrier_load(&slice_allocator->resource_user->shutdown))) {
1003     grpc_core::ExecCtx::Run(
1004         DEBUG_LOCATION, &slice_allocator->on_allocated,
1005         GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource user shutdown"));
1006     return false;
1007   }
1008   slice_allocator->length = length;
1009   slice_allocator->count = count;
1010   slice_allocator->dest = dest;
1011   const bool ret =
1012       grpc_resource_user_alloc(slice_allocator->resource_user, count * length,
1013                                &slice_allocator->on_allocated);
1014   if (ret) ru_alloc_slices(slice_allocator);
1015   return ret;
1016 }
1017