• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/resource_quota.h"
22 
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <stdint.h>
26 #include <string.h>
27 
28 #include <string>
29 
30 #include "absl/strings/str_cat.h"
31 
32 #include <grpc/slice_buffer.h>
33 #include <grpc/support/alloc.h>
34 #include <grpc/support/log.h>
35 
36 #include "src/core/lib/gpr/useful.h"
37 #include "src/core/lib/iomgr/combiner.h"
38 #include "src/core/lib/slice/slice_internal.h"
39 
40 grpc_core::TraceFlag grpc_resource_quota_trace(false, "resource_quota");
41 
42 #define MEMORY_USAGE_ESTIMATION_MAX 65536
43 
44 /* Internal linked list pointers for a resource user */
45 struct grpc_resource_user_link {
46   grpc_resource_user* next;
47   grpc_resource_user* prev;
48 };
49 /* Resource users are kept in (potentially) several intrusive linked lists
50    at once. These are the list names. */
51 typedef enum {
52   /* Resource users that are waiting for an allocation */
53   GRPC_RULIST_AWAITING_ALLOCATION,
54   /* Resource users that have free memory available for internal reclamation */
55   GRPC_RULIST_NON_EMPTY_FREE_POOL,
56   /* Resource users that have published a benign reclamation is available */
57   GRPC_RULIST_RECLAIMER_BENIGN,
58   /* Resource users that have published a destructive reclamation is
59      available */
60   GRPC_RULIST_RECLAIMER_DESTRUCTIVE,
61   /* Number of lists: must be last */
62   GRPC_RULIST_COUNT
63 } grpc_rulist;
64 
65 struct grpc_resource_user {
66   /* The quota this resource user consumes from */
67   grpc_resource_quota* resource_quota;
68 
69   /* Closure to schedule an allocation under the resource quota combiner lock */
70   grpc_closure allocate_closure;
71   /* Closure to publish a non empty free pool under the resource quota combiner
72      lock */
73   grpc_closure add_to_free_pool_closure;
74 
75   /* one ref for each ref call (released by grpc_resource_user_unref), and one
76      ref for each byte allocated (released by grpc_resource_user_free) */
77   gpr_atm refs;
78   /* is this resource user unlocked? starts at 0, increases for each shutdown
79      call */
80   gpr_atm shutdown;
81 
82   gpr_mu mu;
83   /* The amount of memory (in bytes) this user has cached for its own use: to
84      avoid quota contention, each resource user can keep some memory in
85      addition to what it is immediately using (e.g., for caching), and the quota
86      can pull it back under memory pressure.
87      This value can become negative if more memory has been requested than
88      existed in the free pool, at which point the quota is consulted to bring
89      this value non-negative (asynchronously). */
90   int64_t free_pool;
91   /* A list of closures to call once free_pool becomes non-negative - ie when
92      all outstanding allocations have been granted. */
93   grpc_closure_list on_allocated;
94   /* True if we are currently trying to allocate from the quota, false if not */
95   bool allocating;
96   /* The amount of memory (in bytes) that has been requested from this user
97    * asynchronously but hasn't been granted yet. */
98   int64_t outstanding_allocations;
99   /* True if we are currently trying to add ourselves to the non-free quota
100      list, false otherwise */
101   bool added_to_free_pool;
102 
103   /* The number of threads currently allocated to this resource user */
104   gpr_atm num_threads_allocated;
105 
106   /* Reclaimers: index 0 is the benign reclaimer, 1 is the destructive reclaimer
107    */
108   grpc_closure* reclaimers[2];
109   /* Reclaimers just posted: once we're in the combiner lock, we'll move them
110      to the array above */
111   grpc_closure* new_reclaimers[2];
112   /* Trampoline closures to finish reclamation and re-enter the quota combiner
113      lock */
114   grpc_closure post_reclaimer_closure[2];
115 
116   /* Closure to execute under the quota combiner to de-register and shutdown the
117      resource user */
118   grpc_closure destroy_closure;
119 
120   /* Links in the various grpc_rulist lists */
121   grpc_resource_user_link links[GRPC_RULIST_COUNT];
122 
123   /* The name of this resource user, for debugging/tracing */
124   std::string name;
125 };
126 
127 struct grpc_resource_quota {
128   /* refcount */
129   gpr_refcount refs;
130 
131   /* estimate of current memory usage
132      scaled to the range [0..RESOURCE_USAGE_ESTIMATION_MAX] */
133   gpr_atm memory_usage_estimation;
134 
135   /* Main combiner lock: all activity on a quota executes under this combiner
136    * (so no mutex is needed for this data structure) */
137   grpc_core::Combiner* combiner;
138   /* Size of the resource quota */
139   int64_t size;
140   /* Amount of free memory in the resource quota */
141   int64_t free_pool;
142   /* Used size of memory in the resource quota. Updated as soon as the resource
143    * users start to allocate or free the memory. */
144   gpr_atm used;
145 
146   gpr_atm last_size;
147 
148   /* Mutex to protect max_threads and num_threads_allocated */
149   /* Note: We could have used gpr_atm for max_threads and num_threads_allocated
150    * and avoid having this mutex; but in that case, each invocation of the
151    * function grpc_resource_user_allocate_threads() would have had to do at
152    * least two atomic loads (for max_threads and num_threads_allocated) followed
153    * by a CAS (on num_threads_allocated).
154    * Moreover, we expect grpc_resource_user_allocate_threads() to be often
155    * called concurrently thereby increasing the chances of failing the CAS
156    * operation. This additional complexity is not worth the tiny perf gain we
157    * may (or may not) have by using atomics */
158   gpr_mu thread_count_mu;
159 
160   /* Max number of threads allowed */
161   int max_threads;
162 
163   /* Number of threads currently allocated via this resource_quota object */
164   int num_threads_allocated;
165 
166   /* Has rq_step been scheduled to occur? */
167   bool step_scheduled;
168 
169   /* Are we currently reclaiming memory */
170   bool reclaiming;
171 
172   /* Closure around rq_step */
173   grpc_closure rq_step_closure;
174 
175   /* Closure around rq_reclamation_done */
176   grpc_closure rq_reclamation_done_closure;
177 
178   /* This is only really usable for debugging: it's always a stale pointer, but
179      a stale pointer that might just be fresh enough to guide us to where the
180      reclamation system is stuck */
181   grpc_closure* debug_only_last_initiated_reclaimer;
182   grpc_resource_user* debug_only_last_reclaimer_resource_user;
183 
184   /* Roots of all resource user lists */
185   grpc_resource_user* roots[GRPC_RULIST_COUNT];
186 
187   std::string name;
188 };
189 
190 static void ru_unref_by(grpc_resource_user* resource_user, gpr_atm amount);
191 
192 /*******************************************************************************
193  * list management
194  */
195 
rulist_add_head(grpc_resource_user * resource_user,grpc_rulist list)196 static void rulist_add_head(grpc_resource_user* resource_user,
197                             grpc_rulist list) {
198   grpc_resource_quota* resource_quota = resource_user->resource_quota;
199   grpc_resource_user** root = &resource_quota->roots[list];
200   if (*root == nullptr) {
201     *root = resource_user;
202     resource_user->links[list].next = resource_user->links[list].prev =
203         resource_user;
204   } else {
205     resource_user->links[list].next = *root;
206     resource_user->links[list].prev = (*root)->links[list].prev;
207     resource_user->links[list].next->links[list].prev =
208         resource_user->links[list].prev->links[list].next = resource_user;
209     *root = resource_user;
210   }
211 }
212 
rulist_add_tail(grpc_resource_user * resource_user,grpc_rulist list)213 static void rulist_add_tail(grpc_resource_user* resource_user,
214                             grpc_rulist list) {
215   grpc_resource_quota* resource_quota = resource_user->resource_quota;
216   grpc_resource_user** root = &resource_quota->roots[list];
217   if (*root == nullptr) {
218     *root = resource_user;
219     resource_user->links[list].next = resource_user->links[list].prev =
220         resource_user;
221   } else {
222     resource_user->links[list].next = (*root)->links[list].next;
223     resource_user->links[list].prev = *root;
224     resource_user->links[list].next->links[list].prev =
225         resource_user->links[list].prev->links[list].next = resource_user;
226   }
227 }
228 
rulist_empty(grpc_resource_quota * resource_quota,grpc_rulist list)229 static bool rulist_empty(grpc_resource_quota* resource_quota,
230                          grpc_rulist list) {
231   return resource_quota->roots[list] == nullptr;
232 }
233 
rulist_pop_head(grpc_resource_quota * resource_quota,grpc_rulist list)234 static grpc_resource_user* rulist_pop_head(grpc_resource_quota* resource_quota,
235                                            grpc_rulist list) {
236   grpc_resource_user** root = &resource_quota->roots[list];
237   grpc_resource_user* resource_user = *root;
238   if (resource_user == nullptr) {
239     return nullptr;
240   }
241   if (resource_user->links[list].next == resource_user) {
242     *root = nullptr;
243   } else {
244     resource_user->links[list].next->links[list].prev =
245         resource_user->links[list].prev;
246     resource_user->links[list].prev->links[list].next =
247         resource_user->links[list].next;
248     *root = resource_user->links[list].next;
249   }
250   resource_user->links[list].next = resource_user->links[list].prev = nullptr;
251   return resource_user;
252 }
253 
rulist_remove(grpc_resource_user * resource_user,grpc_rulist list)254 static void rulist_remove(grpc_resource_user* resource_user, grpc_rulist list) {
255   if (resource_user->links[list].next == nullptr) return;
256   grpc_resource_quota* resource_quota = resource_user->resource_quota;
257   if (resource_quota->roots[list] == resource_user) {
258     resource_quota->roots[list] = resource_user->links[list].next;
259     if (resource_quota->roots[list] == resource_user) {
260       resource_quota->roots[list] = nullptr;
261     }
262   }
263   resource_user->links[list].next->links[list].prev =
264       resource_user->links[list].prev;
265   resource_user->links[list].prev->links[list].next =
266       resource_user->links[list].next;
267   resource_user->links[list].next = resource_user->links[list].prev = nullptr;
268 }
269 
270 /*******************************************************************************
271  * resource quota state machine
272  */
273 
274 static bool rq_alloc(grpc_resource_quota* resource_quota);
275 static bool rq_reclaim_from_per_user_free_pool(
276     grpc_resource_quota* resource_quota);
277 static bool rq_reclaim(grpc_resource_quota* resource_quota, bool destructive);
278 
rq_step(void * rq,grpc_error_handle)279 static void rq_step(void* rq, grpc_error_handle /*error*/) {
280   grpc_resource_quota* resource_quota = static_cast<grpc_resource_quota*>(rq);
281   resource_quota->step_scheduled = false;
282   do {
283     if (rq_alloc(resource_quota)) goto done;
284   } while (rq_reclaim_from_per_user_free_pool(resource_quota));
285 
286   if (!rq_reclaim(resource_quota, false)) {
287     rq_reclaim(resource_quota, true);
288   }
289 
290 done:
291   grpc_resource_quota_unref_internal(resource_quota);
292 }
293 
rq_step_sched(grpc_resource_quota * resource_quota)294 static void rq_step_sched(grpc_resource_quota* resource_quota) {
295   if (resource_quota->step_scheduled) return;
296   resource_quota->step_scheduled = true;
297   grpc_resource_quota_ref_internal(resource_quota);
298   resource_quota->combiner->FinallyRun(&resource_quota->rq_step_closure,
299                                        GRPC_ERROR_NONE);
300 }
301 
302 /* update the atomically available resource estimate - use no barriers since
303    timeliness of delivery really doesn't matter much */
rq_update_estimate(grpc_resource_quota * resource_quota)304 static void rq_update_estimate(grpc_resource_quota* resource_quota) {
305   gpr_atm memory_usage_estimation = MEMORY_USAGE_ESTIMATION_MAX;
306   if (resource_quota->size != 0) {
307     memory_usage_estimation =
308         GPR_CLAMP((gpr_atm)((1.0 - ((double)resource_quota->free_pool) /
309                                        ((double)resource_quota->size)) *
310                             MEMORY_USAGE_ESTIMATION_MAX),
311                   0, MEMORY_USAGE_ESTIMATION_MAX);
312   }
313   gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation,
314                            memory_usage_estimation);
315 }
316 
317 /* returns true if all allocations are completed */
rq_alloc(grpc_resource_quota * resource_quota)318 static bool rq_alloc(grpc_resource_quota* resource_quota) {
319   grpc_resource_user* resource_user;
320   while ((resource_user = rulist_pop_head(resource_quota,
321                                           GRPC_RULIST_AWAITING_ALLOCATION))) {
322     gpr_mu_lock(&resource_user->mu);
323     if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
324       gpr_log(GPR_INFO,
325               "RQ: check allocation for user %p shutdown=%" PRIdPTR
326               " free_pool=%" PRId64 " outstanding_allocations=%" PRId64,
327               resource_user, gpr_atm_no_barrier_load(&resource_user->shutdown),
328               resource_user->free_pool, resource_user->outstanding_allocations);
329     }
330     if (gpr_atm_no_barrier_load(&resource_user->shutdown)) {
331       resource_user->allocating = false;
332       grpc_closure_list_fail_all(
333           &resource_user->on_allocated,
334           GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource user shutdown"));
335       int64_t aborted_allocations = resource_user->outstanding_allocations;
336       resource_user->outstanding_allocations = 0;
337       resource_user->free_pool += aborted_allocations;
338       grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &resource_user->on_allocated);
339       gpr_mu_unlock(&resource_user->mu);
340       if (aborted_allocations > 0) {
341         ru_unref_by(resource_user, static_cast<gpr_atm>(aborted_allocations));
342       }
343       continue;
344     }
345     if (resource_user->free_pool < 0 &&
346         -resource_user->free_pool <= resource_quota->free_pool) {
347       int64_t amt = -resource_user->free_pool;
348       resource_user->free_pool = 0;
349       resource_quota->free_pool -= amt;
350       rq_update_estimate(resource_quota);
351       if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
352         gpr_log(GPR_INFO,
353                 "RQ %s %s: grant alloc %" PRId64
354                 " bytes; rq_free_pool -> %" PRId64,
355                 resource_quota->name.c_str(), resource_user->name.c_str(), amt,
356                 resource_quota->free_pool);
357       }
358     } else if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace) &&
359                resource_user->free_pool >= 0) {
360       gpr_log(GPR_INFO, "RQ %s %s: discard already satisfied alloc request",
361               resource_quota->name.c_str(), resource_user->name.c_str());
362     }
363     if (resource_user->free_pool >= 0) {
364       resource_user->allocating = false;
365       resource_user->outstanding_allocations = 0;
366       grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &resource_user->on_allocated);
367       gpr_mu_unlock(&resource_user->mu);
368     } else {
369       rulist_add_head(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
370       gpr_mu_unlock(&resource_user->mu);
371       return false;
372     }
373   }
374   return true;
375 }
376 
377 /* returns true if any memory could be reclaimed from buffers */
rq_reclaim_from_per_user_free_pool(grpc_resource_quota * resource_quota)378 static bool rq_reclaim_from_per_user_free_pool(
379     grpc_resource_quota* resource_quota) {
380   grpc_resource_user* resource_user;
381   while ((resource_user = rulist_pop_head(resource_quota,
382                                           GRPC_RULIST_NON_EMPTY_FREE_POOL))) {
383     gpr_mu_lock(&resource_user->mu);
384     resource_user->added_to_free_pool = false;
385     if (resource_user->free_pool > 0) {
386       int64_t amt = resource_user->free_pool;
387       resource_user->free_pool = 0;
388       resource_quota->free_pool += amt;
389       rq_update_estimate(resource_quota);
390       if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
391         gpr_log(GPR_INFO,
392                 "RQ %s %s: reclaim_from_per_user_free_pool %" PRId64
393                 " bytes; rq_free_pool -> %" PRId64,
394                 resource_quota->name.c_str(), resource_user->name.c_str(), amt,
395                 resource_quota->free_pool);
396       }
397       gpr_mu_unlock(&resource_user->mu);
398       return true;
399     } else {
400       if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
401         gpr_log(GPR_INFO,
402                 "RQ %s %s: failed to reclaim_from_per_user_free_pool; "
403                 "free_pool = %" PRId64 "; rq_free_pool = %" PRId64,
404                 resource_quota->name.c_str(), resource_user->name.c_str(),
405                 resource_user->free_pool, resource_quota->free_pool);
406       }
407       gpr_mu_unlock(&resource_user->mu);
408     }
409   }
410   return false;
411 }
412 
413 /* returns true if reclamation is proceeding */
rq_reclaim(grpc_resource_quota * resource_quota,bool destructive)414 static bool rq_reclaim(grpc_resource_quota* resource_quota, bool destructive) {
415   if (resource_quota->reclaiming) return true;
416   grpc_rulist list = destructive ? GRPC_RULIST_RECLAIMER_DESTRUCTIVE
417                                  : GRPC_RULIST_RECLAIMER_BENIGN;
418   grpc_resource_user* resource_user = rulist_pop_head(resource_quota, list);
419   if (resource_user == nullptr) return false;
420   if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
421     gpr_log(GPR_INFO, "RQ %s %s: initiate %s reclamation",
422             resource_quota->name.c_str(), resource_user->name.c_str(),
423             destructive ? "destructive" : "benign");
424   }
425   resource_quota->reclaiming = true;
426   grpc_resource_quota_ref_internal(resource_quota);
427   grpc_closure* c = resource_user->reclaimers[destructive];
428   GPR_ASSERT(c);
429   resource_quota->debug_only_last_reclaimer_resource_user = resource_user;
430   resource_quota->debug_only_last_initiated_reclaimer = c;
431   resource_user->reclaimers[destructive] = nullptr;
432   grpc_core::ExecCtx::Run(DEBUG_LOCATION, c, GRPC_ERROR_NONE);
433   return true;
434 }
435 
436 /*******************************************************************************
437  * ru_slice: a slice implementation that is backed by a grpc_resource_user
438  */
439 
440 namespace grpc_core {
441 
442 class RuSliceRefcount {
443  public:
Destroy(void * p)444   static void Destroy(void* p) {
445     auto* rc = static_cast<RuSliceRefcount*>(p);
446     rc->~RuSliceRefcount();
447     gpr_free(rc);
448   }
RuSliceRefcount(grpc_resource_user * resource_user,size_t size)449   RuSliceRefcount(grpc_resource_user* resource_user, size_t size)
450       : base_(grpc_slice_refcount::Type::REGULAR, &refs_, Destroy, this,
451               &base_),
452         resource_user_(resource_user),
453         size_(size) {
454     // Nothing to do here.
455   }
~RuSliceRefcount()456   ~RuSliceRefcount() { grpc_resource_user_free(resource_user_, size_); }
457 
base_refcount()458   grpc_slice_refcount* base_refcount() { return &base_; }
459 
460  private:
461   grpc_slice_refcount base_;
462   RefCount refs_;
463   grpc_resource_user* resource_user_;
464   size_t size_;
465 };
466 
467 }  // namespace grpc_core
468 
ru_slice_create(grpc_resource_user * resource_user,size_t size)469 static grpc_slice ru_slice_create(grpc_resource_user* resource_user,
470                                   size_t size) {
471   auto* rc = static_cast<grpc_core::RuSliceRefcount*>(
472       gpr_malloc(sizeof(grpc_core::RuSliceRefcount) + size));
473   new (rc) grpc_core::RuSliceRefcount(resource_user, size);
474   grpc_slice slice;
475 
476   slice.refcount = rc->base_refcount();
477   slice.data.refcounted.bytes = reinterpret_cast<uint8_t*>(rc + 1);
478   slice.data.refcounted.length = size;
479   return slice;
480 }
481 
482 /*******************************************************************************
483  * grpc_resource_quota internal implementation: resource user manipulation under
484  * the combiner
485  */
486 
ru_allocate(void * ru,grpc_error_handle)487 static void ru_allocate(void* ru, grpc_error_handle /*error*/) {
488   grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
489   if (rulist_empty(resource_user->resource_quota,
490                    GRPC_RULIST_AWAITING_ALLOCATION)) {
491     rq_step_sched(resource_user->resource_quota);
492   }
493   rulist_add_tail(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
494 }
495 
ru_add_to_free_pool(void * ru,grpc_error_handle)496 static void ru_add_to_free_pool(void* ru, grpc_error_handle /*error*/) {
497   grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
498   if (!rulist_empty(resource_user->resource_quota,
499                     GRPC_RULIST_AWAITING_ALLOCATION) &&
500       rulist_empty(resource_user->resource_quota,
501                    GRPC_RULIST_NON_EMPTY_FREE_POOL)) {
502     rq_step_sched(resource_user->resource_quota);
503   }
504   rulist_add_tail(resource_user, GRPC_RULIST_NON_EMPTY_FREE_POOL);
505 }
506 
ru_post_reclaimer(grpc_resource_user * resource_user,bool destructive)507 static bool ru_post_reclaimer(grpc_resource_user* resource_user,
508                               bool destructive) {
509   grpc_closure* closure = resource_user->new_reclaimers[destructive];
510   GPR_ASSERT(closure != nullptr);
511   resource_user->new_reclaimers[destructive] = nullptr;
512   GPR_ASSERT(resource_user->reclaimers[destructive] == nullptr);
513   if (gpr_atm_acq_load(&resource_user->shutdown) > 0) {
514     grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_CANCELLED);
515     return false;
516   }
517   resource_user->reclaimers[destructive] = closure;
518   return true;
519 }
520 
ru_post_benign_reclaimer(void * ru,grpc_error_handle)521 static void ru_post_benign_reclaimer(void* ru, grpc_error_handle /*error*/) {
522   grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
523   if (!ru_post_reclaimer(resource_user, false)) return;
524   if (!rulist_empty(resource_user->resource_quota,
525                     GRPC_RULIST_AWAITING_ALLOCATION) &&
526       rulist_empty(resource_user->resource_quota,
527                    GRPC_RULIST_NON_EMPTY_FREE_POOL) &&
528       rulist_empty(resource_user->resource_quota,
529                    GRPC_RULIST_RECLAIMER_BENIGN)) {
530     rq_step_sched(resource_user->resource_quota);
531   }
532   rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
533 }
534 
ru_post_destructive_reclaimer(void * ru,grpc_error_handle)535 static void ru_post_destructive_reclaimer(void* ru,
536                                           grpc_error_handle /*error*/) {
537   grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
538   if (!ru_post_reclaimer(resource_user, true)) return;
539   if (!rulist_empty(resource_user->resource_quota,
540                     GRPC_RULIST_AWAITING_ALLOCATION) &&
541       rulist_empty(resource_user->resource_quota,
542                    GRPC_RULIST_NON_EMPTY_FREE_POOL) &&
543       rulist_empty(resource_user->resource_quota,
544                    GRPC_RULIST_RECLAIMER_BENIGN) &&
545       rulist_empty(resource_user->resource_quota,
546                    GRPC_RULIST_RECLAIMER_DESTRUCTIVE)) {
547     rq_step_sched(resource_user->resource_quota);
548   }
549   rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE);
550 }
551 
ru_shutdown(void * ru,grpc_error_handle)552 static void ru_shutdown(void* ru, grpc_error_handle /*error*/) {
553   if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
554     gpr_log(GPR_INFO, "RU shutdown %p", ru);
555   }
556   grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
557   gpr_mu_lock(&resource_user->mu);
558   grpc_core::ExecCtx::Run(DEBUG_LOCATION, resource_user->reclaimers[0],
559                           GRPC_ERROR_CANCELLED);
560   grpc_core::ExecCtx::Run(DEBUG_LOCATION, resource_user->reclaimers[1],
561                           GRPC_ERROR_CANCELLED);
562   resource_user->reclaimers[0] = nullptr;
563   resource_user->reclaimers[1] = nullptr;
564   rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
565   rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE);
566   if (resource_user->allocating) {
567     rq_step_sched(resource_user->resource_quota);
568   }
569   gpr_mu_unlock(&resource_user->mu);
570 }
571 
ru_destroy(void * ru,grpc_error_handle)572 static void ru_destroy(void* ru, grpc_error_handle /*error*/) {
573   grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
574   GPR_ASSERT(gpr_atm_no_barrier_load(&resource_user->refs) == 0);
575   // Free all the remaining thread quota
576   grpc_resource_user_free_threads(resource_user,
577                                   static_cast<int>(gpr_atm_no_barrier_load(
578                                       &resource_user->num_threads_allocated)));
579 
580   for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
581     rulist_remove(resource_user, static_cast<grpc_rulist>(i));
582   }
583   grpc_core::ExecCtx::Run(DEBUG_LOCATION, resource_user->reclaimers[0],
584                           GRPC_ERROR_CANCELLED);
585   grpc_core::ExecCtx::Run(DEBUG_LOCATION, resource_user->reclaimers[1],
586                           GRPC_ERROR_CANCELLED);
587   if (resource_user->free_pool != 0) {
588     resource_user->resource_quota->free_pool += resource_user->free_pool;
589     rq_step_sched(resource_user->resource_quota);
590   }
591   grpc_resource_quota_unref_internal(resource_user->resource_quota);
592   gpr_mu_destroy(&resource_user->mu);
593   delete resource_user;
594 }
595 
ru_alloc_slices(grpc_resource_user_slice_allocator * slice_allocator)596 static void ru_alloc_slices(
597     grpc_resource_user_slice_allocator* slice_allocator) {
598   for (size_t i = 0; i < slice_allocator->count; i++) {
599     grpc_slice_buffer_add_indexed(
600         slice_allocator->dest, ru_slice_create(slice_allocator->resource_user,
601                                                slice_allocator->length));
602   }
603 }
604 
ru_allocated_slices(void * arg,grpc_error_handle error)605 static void ru_allocated_slices(void* arg, grpc_error_handle error) {
606   grpc_resource_user_slice_allocator* slice_allocator =
607       static_cast<grpc_resource_user_slice_allocator*>(arg);
608   if (error == GRPC_ERROR_NONE) ru_alloc_slices(slice_allocator);
609   grpc_core::Closure::Run(DEBUG_LOCATION, &slice_allocator->on_done,
610                           GRPC_ERROR_REF(error));
611 }
612 
613 /*******************************************************************************
614  * grpc_resource_quota internal implementation: quota manipulation under the
615  * combiner
616  */
617 
618 struct rq_resize_args {
619   int64_t size;
620   grpc_resource_quota* resource_quota;
621   grpc_closure closure;
622 };
rq_resize(void * args,grpc_error_handle)623 static void rq_resize(void* args, grpc_error_handle /*error*/) {
624   rq_resize_args* a = static_cast<rq_resize_args*>(args);
625   int64_t delta = a->size - a->resource_quota->size;
626   a->resource_quota->size += delta;
627   a->resource_quota->free_pool += delta;
628   rq_update_estimate(a->resource_quota);
629   rq_step_sched(a->resource_quota);
630   grpc_resource_quota_unref_internal(a->resource_quota);
631   gpr_free(a);
632 }
633 
rq_reclamation_done(void * rq,grpc_error_handle)634 static void rq_reclamation_done(void* rq, grpc_error_handle /*error*/) {
635   grpc_resource_quota* resource_quota = static_cast<grpc_resource_quota*>(rq);
636   resource_quota->reclaiming = false;
637   rq_step_sched(resource_quota);
638   grpc_resource_quota_unref_internal(resource_quota);
639 }
640 
641 /*******************************************************************************
642  * grpc_resource_quota api
643  */
644 
645 /* Public API */
grpc_resource_quota_create(const char * name)646 grpc_resource_quota* grpc_resource_quota_create(const char* name) {
647   grpc_resource_quota* resource_quota = new grpc_resource_quota;
648   gpr_ref_init(&resource_quota->refs, 1);
649   resource_quota->combiner = grpc_combiner_create();
650   resource_quota->free_pool = INT64_MAX;
651   resource_quota->size = INT64_MAX;
652   resource_quota->used = 0;
653   gpr_atm_no_barrier_store(&resource_quota->last_size, GPR_ATM_MAX);
654   gpr_mu_init(&resource_quota->thread_count_mu);
655   resource_quota->max_threads = INT_MAX;
656   resource_quota->num_threads_allocated = 0;
657   resource_quota->step_scheduled = false;
658   resource_quota->reclaiming = false;
659   gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation, 0);
660   if (name != nullptr) {
661     resource_quota->name = name;
662   } else {
663     resource_quota->name = absl::StrCat(
664         "anonymous_pool_", reinterpret_cast<intptr_t>(resource_quota));
665   }
666   GRPC_CLOSURE_INIT(&resource_quota->rq_step_closure, rq_step, resource_quota,
667                     nullptr);
668   GRPC_CLOSURE_INIT(&resource_quota->rq_reclamation_done_closure,
669                     rq_reclamation_done, resource_quota, nullptr);
670   for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
671     resource_quota->roots[i] = nullptr;
672   }
673   return resource_quota;
674 }
675 
grpc_resource_quota_unref_internal(grpc_resource_quota * resource_quota)676 void grpc_resource_quota_unref_internal(grpc_resource_quota* resource_quota) {
677   if (gpr_unref(&resource_quota->refs)) {
678     // No outstanding thread quota
679     GPR_ASSERT(resource_quota->num_threads_allocated == 0);
680     GRPC_COMBINER_UNREF(resource_quota->combiner, "resource_quota");
681     gpr_mu_destroy(&resource_quota->thread_count_mu);
682     delete resource_quota;
683   }
684 }
685 
686 /* Public API */
grpc_resource_quota_unref(grpc_resource_quota * resource_quota)687 void grpc_resource_quota_unref(grpc_resource_quota* resource_quota) {
688   grpc_core::ExecCtx exec_ctx;
689   grpc_resource_quota_unref_internal(resource_quota);
690 }
691 
grpc_resource_quota_ref_internal(grpc_resource_quota * resource_quota)692 grpc_resource_quota* grpc_resource_quota_ref_internal(
693     grpc_resource_quota* resource_quota) {
694   gpr_ref(&resource_quota->refs);
695   return resource_quota;
696 }
697 
698 /* Public API */
grpc_resource_quota_ref(grpc_resource_quota * resource_quota)699 void grpc_resource_quota_ref(grpc_resource_quota* resource_quota) {
700   grpc_resource_quota_ref_internal(resource_quota);
701 }
702 
grpc_resource_quota_get_memory_pressure(grpc_resource_quota * resource_quota)703 double grpc_resource_quota_get_memory_pressure(
704     grpc_resource_quota* resource_quota) {
705   return (static_cast<double>(gpr_atm_no_barrier_load(
706              &resource_quota->memory_usage_estimation))) /
707          (static_cast<double>(MEMORY_USAGE_ESTIMATION_MAX));
708 }
709 
710 /* Public API */
grpc_resource_quota_set_max_threads(grpc_resource_quota * resource_quota,int new_max_threads)711 void grpc_resource_quota_set_max_threads(grpc_resource_quota* resource_quota,
712                                          int new_max_threads) {
713   GPR_ASSERT(new_max_threads >= 0);
714   gpr_mu_lock(&resource_quota->thread_count_mu);
715   resource_quota->max_threads = new_max_threads;
716   gpr_mu_unlock(&resource_quota->thread_count_mu);
717 }
718 
719 /* Public API */
grpc_resource_quota_resize(grpc_resource_quota * resource_quota,size_t size)720 void grpc_resource_quota_resize(grpc_resource_quota* resource_quota,
721                                 size_t size) {
722   grpc_core::ExecCtx exec_ctx;
723   rq_resize_args* a = static_cast<rq_resize_args*>(gpr_malloc(sizeof(*a)));
724   a->resource_quota = grpc_resource_quota_ref_internal(resource_quota);
725   a->size = static_cast<int64_t>(size);
726   gpr_atm_no_barrier_store(&resource_quota->last_size,
727                            (gpr_atm)GPR_MIN((size_t)GPR_ATM_MAX, size));
728   GRPC_CLOSURE_INIT(&a->closure, rq_resize, a, grpc_schedule_on_exec_ctx);
729   grpc_core::ExecCtx::Run(DEBUG_LOCATION, &a->closure, GRPC_ERROR_NONE);
730 }
731 
grpc_resource_quota_peek_size(grpc_resource_quota * resource_quota)732 size_t grpc_resource_quota_peek_size(grpc_resource_quota* resource_quota) {
733   return static_cast<size_t>(
734       gpr_atm_no_barrier_load(&resource_quota->last_size));
735 }
736 
737 /*******************************************************************************
738  * grpc_resource_user channel args api
739  */
740 
grpc_resource_quota_from_channel_args(const grpc_channel_args * channel_args,bool create)741 grpc_resource_quota* grpc_resource_quota_from_channel_args(
742     const grpc_channel_args* channel_args, bool create) {
743   for (size_t i = 0; i < channel_args->num_args; i++) {
744     if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
745       if (channel_args->args[i].type == GRPC_ARG_POINTER) {
746         return grpc_resource_quota_ref_internal(
747             static_cast<grpc_resource_quota*>(
748                 channel_args->args[i].value.pointer.p));
749       } else {
750         gpr_log(GPR_DEBUG, GRPC_ARG_RESOURCE_QUOTA " should be a pointer");
751       }
752     }
753   }
754   return create ? grpc_resource_quota_create(nullptr) : nullptr;
755 }
756 
rq_copy(void * rq)757 static void* rq_copy(void* rq) {
758   grpc_resource_quota_ref(static_cast<grpc_resource_quota*>(rq));
759   return rq;
760 }
761 
rq_destroy(void * rq)762 static void rq_destroy(void* rq) {
763   grpc_resource_quota_unref_internal(static_cast<grpc_resource_quota*>(rq));
764 }
765 
rq_cmp(void * a,void * b)766 static int rq_cmp(void* a, void* b) { return GPR_ICMP(a, b); }
767 
grpc_resource_quota_arg_vtable(void)768 const grpc_arg_pointer_vtable* grpc_resource_quota_arg_vtable(void) {
769   static const grpc_arg_pointer_vtable vtable = {rq_copy, rq_destroy, rq_cmp};
770   return &vtable;
771 }
772 
773 /*******************************************************************************
774  * grpc_resource_user api
775  */
776 
grpc_resource_user_create(grpc_resource_quota * resource_quota,const char * name)777 grpc_resource_user* grpc_resource_user_create(
778     grpc_resource_quota* resource_quota, const char* name) {
779   grpc_resource_user* resource_user = new grpc_resource_user;
780   resource_user->resource_quota =
781       grpc_resource_quota_ref_internal(resource_quota);
782   GRPC_CLOSURE_INIT(&resource_user->allocate_closure, &ru_allocate,
783                     resource_user, nullptr);
784   GRPC_CLOSURE_INIT(&resource_user->add_to_free_pool_closure,
785                     &ru_add_to_free_pool, resource_user, nullptr);
786   GRPC_CLOSURE_INIT(&resource_user->post_reclaimer_closure[0],
787                     &ru_post_benign_reclaimer, resource_user, nullptr);
788   GRPC_CLOSURE_INIT(&resource_user->post_reclaimer_closure[1],
789                     &ru_post_destructive_reclaimer, resource_user, nullptr);
790   GRPC_CLOSURE_INIT(&resource_user->destroy_closure, &ru_destroy, resource_user,
791                     nullptr);
792   gpr_mu_init(&resource_user->mu);
793   gpr_atm_rel_store(&resource_user->refs, 1);
794   gpr_atm_rel_store(&resource_user->shutdown, 0);
795   resource_user->free_pool = 0;
796   grpc_closure_list_init(&resource_user->on_allocated);
797   resource_user->allocating = false;
798   resource_user->added_to_free_pool = false;
799   gpr_atm_no_barrier_store(&resource_user->num_threads_allocated, 0);
800   resource_user->reclaimers[0] = nullptr;
801   resource_user->reclaimers[1] = nullptr;
802   resource_user->new_reclaimers[0] = nullptr;
803   resource_user->new_reclaimers[1] = nullptr;
804   resource_user->outstanding_allocations = 0;
805   for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
806     resource_user->links[i].next = resource_user->links[i].prev = nullptr;
807   }
808   if (name != nullptr) {
809     resource_user->name = name;
810   } else {
811     resource_user->name = absl::StrCat(
812         "anonymous_resource_user_", reinterpret_cast<intptr_t>(resource_user));
813   }
814   return resource_user;
815 }
816 
grpc_resource_user_quota(grpc_resource_user * resource_user)817 grpc_resource_quota* grpc_resource_user_quota(
818     grpc_resource_user* resource_user) {
819   return resource_user->resource_quota;
820 }
821 
ru_ref_by(grpc_resource_user * resource_user,gpr_atm amount)822 static void ru_ref_by(grpc_resource_user* resource_user, gpr_atm amount) {
823   GPR_ASSERT(amount > 0);
824   GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&resource_user->refs, amount) != 0);
825 }
826 
ru_unref_by(grpc_resource_user * resource_user,gpr_atm amount)827 static void ru_unref_by(grpc_resource_user* resource_user, gpr_atm amount) {
828   GPR_ASSERT(amount > 0);
829   gpr_atm old = gpr_atm_full_fetch_add(&resource_user->refs, -amount);
830   GPR_ASSERT(old >= amount);
831   if (old == amount) {
832     resource_user->resource_quota->combiner->Run(
833         &resource_user->destroy_closure, GRPC_ERROR_NONE);
834   }
835 }
836 
grpc_resource_user_ref(grpc_resource_user * resource_user)837 void grpc_resource_user_ref(grpc_resource_user* resource_user) {
838   ru_ref_by(resource_user, 1);
839 }
840 
grpc_resource_user_unref(grpc_resource_user * resource_user)841 void grpc_resource_user_unref(grpc_resource_user* resource_user) {
842   ru_unref_by(resource_user, 1);
843 }
844 
grpc_resource_user_shutdown(grpc_resource_user * resource_user)845 void grpc_resource_user_shutdown(grpc_resource_user* resource_user) {
846   if (gpr_atm_full_fetch_add(&resource_user->shutdown, 1) == 0) {
847     resource_user->resource_quota->combiner->Run(
848         GRPC_CLOSURE_CREATE(ru_shutdown, resource_user, nullptr),
849         GRPC_ERROR_NONE);
850   }
851 }
852 
grpc_resource_user_allocate_threads(grpc_resource_user * resource_user,int thread_count)853 bool grpc_resource_user_allocate_threads(grpc_resource_user* resource_user,
854                                          int thread_count) {
855   GPR_ASSERT(thread_count >= 0);
856   bool is_success = false;
857   gpr_mu_lock(&resource_user->resource_quota->thread_count_mu);
858   grpc_resource_quota* rq = resource_user->resource_quota;
859   if (rq->num_threads_allocated + thread_count <= rq->max_threads) {
860     rq->num_threads_allocated += thread_count;
861     gpr_atm_no_barrier_fetch_add(&resource_user->num_threads_allocated,
862                                  thread_count);
863     is_success = true;
864   }
865   gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu);
866   return is_success;
867 }
868 
grpc_resource_user_free_threads(grpc_resource_user * resource_user,int thread_count)869 void grpc_resource_user_free_threads(grpc_resource_user* resource_user,
870                                      int thread_count) {
871   GPR_ASSERT(thread_count >= 0);
872   gpr_mu_lock(&resource_user->resource_quota->thread_count_mu);
873   grpc_resource_quota* rq = resource_user->resource_quota;
874   rq->num_threads_allocated -= thread_count;
875   int old_count = static_cast<int>(gpr_atm_no_barrier_fetch_add(
876       &resource_user->num_threads_allocated, -thread_count));
877   if (old_count < thread_count || rq->num_threads_allocated < 0) {
878     gpr_log(GPR_ERROR,
879             "Releasing more threads (%d) than currently allocated (rq threads: "
880             "%d, ru threads: %d)",
881             thread_count, rq->num_threads_allocated + thread_count, old_count);
882     abort();
883   }
884   gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu);
885 }
886 
resource_user_alloc_locked(grpc_resource_user * resource_user,size_t size,grpc_closure * optional_on_done)887 static bool resource_user_alloc_locked(grpc_resource_user* resource_user,
888                                        size_t size,
889                                        grpc_closure* optional_on_done) {
890   ru_ref_by(resource_user, static_cast<gpr_atm>(size));
891   resource_user->free_pool -= static_cast<int64_t>(size);
892   if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
893     gpr_log(GPR_INFO, "RQ %s %s: alloc %" PRIdPTR "; free_pool -> %" PRId64,
894             resource_user->resource_quota->name.c_str(),
895             resource_user->name.c_str(), size, resource_user->free_pool);
896   }
897   if (GPR_LIKELY(resource_user->free_pool >= 0)) return true;
898   // Slow path: We need to wait for the free pool to refill.
899   if (optional_on_done != nullptr) {
900     resource_user->outstanding_allocations += static_cast<int64_t>(size);
901     grpc_closure_list_append(&resource_user->on_allocated, optional_on_done,
902                              GRPC_ERROR_NONE);
903   }
904   if (!resource_user->allocating) {
905     resource_user->allocating = true;
906     resource_user->resource_quota->combiner->Run(
907         &resource_user->allocate_closure, GRPC_ERROR_NONE);
908   }
909   return false;
910 }
911 
grpc_resource_user_safe_alloc(grpc_resource_user * resource_user,size_t size)912 bool grpc_resource_user_safe_alloc(grpc_resource_user* resource_user,
913                                    size_t size) {
914   if (gpr_atm_no_barrier_load(&resource_user->shutdown)) return false;
915   gpr_mu_lock(&resource_user->mu);
916   grpc_resource_quota* resource_quota = resource_user->resource_quota;
917   bool cas_success;
918   do {
919     gpr_atm used = gpr_atm_no_barrier_load(&resource_quota->used);
920     gpr_atm new_used = used + size;
921     if (static_cast<size_t>(new_used) >
922         grpc_resource_quota_peek_size(resource_quota)) {
923       gpr_mu_unlock(&resource_user->mu);
924       return false;
925     }
926     cas_success = gpr_atm_full_cas(&resource_quota->used, used, new_used);
927   } while (!cas_success);
928   resource_user_alloc_locked(resource_user, size, nullptr);
929   gpr_mu_unlock(&resource_user->mu);
930   return true;
931 }
932 
grpc_resource_user_alloc(grpc_resource_user * resource_user,size_t size,grpc_closure * optional_on_done)933 bool grpc_resource_user_alloc(grpc_resource_user* resource_user, size_t size,
934                               grpc_closure* optional_on_done) {
935   // TODO(juanlishen): Maybe return immediately if shutting down. Deferring this
936   // because some tests become flaky after the change.
937   gpr_mu_lock(&resource_user->mu);
938   grpc_resource_quota* resource_quota = resource_user->resource_quota;
939   gpr_atm_no_barrier_fetch_add(&resource_quota->used, size);
940   const bool ret =
941       resource_user_alloc_locked(resource_user, size, optional_on_done);
942   gpr_mu_unlock(&resource_user->mu);
943   return ret;
944 }
945 
grpc_resource_user_free(grpc_resource_user * resource_user,size_t size)946 void grpc_resource_user_free(grpc_resource_user* resource_user, size_t size) {
947   gpr_mu_lock(&resource_user->mu);
948   grpc_resource_quota* resource_quota = resource_user->resource_quota;
949   gpr_atm prior = gpr_atm_no_barrier_fetch_add(&resource_quota->used, -size);
950   GPR_ASSERT(prior >= static_cast<long>(size));
951   bool was_zero_or_negative = resource_user->free_pool <= 0;
952   resource_user->free_pool += static_cast<int64_t>(size);
953   if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
954     gpr_log(GPR_INFO, "RQ %s %s: free %" PRIdPTR "; free_pool -> %" PRId64,
955             resource_user->resource_quota->name.c_str(),
956             resource_user->name.c_str(), size, resource_user->free_pool);
957   }
958   bool is_bigger_than_zero = resource_user->free_pool > 0;
959   if (is_bigger_than_zero && was_zero_or_negative &&
960       !resource_user->added_to_free_pool) {
961     resource_user->added_to_free_pool = true;
962     resource_quota->combiner->Run(&resource_user->add_to_free_pool_closure,
963                                   GRPC_ERROR_NONE);
964   }
965   gpr_mu_unlock(&resource_user->mu);
966   ru_unref_by(resource_user, static_cast<gpr_atm>(size));
967 }
968 
grpc_resource_user_post_reclaimer(grpc_resource_user * resource_user,bool destructive,grpc_closure * closure)969 void grpc_resource_user_post_reclaimer(grpc_resource_user* resource_user,
970                                        bool destructive,
971                                        grpc_closure* closure) {
972   GPR_ASSERT(resource_user->new_reclaimers[destructive] == nullptr);
973   resource_user->new_reclaimers[destructive] = closure;
974   resource_user->resource_quota->combiner->Run(
975       &resource_user->post_reclaimer_closure[destructive], GRPC_ERROR_NONE);
976 }
977 
grpc_resource_user_finish_reclamation(grpc_resource_user * resource_user)978 void grpc_resource_user_finish_reclamation(grpc_resource_user* resource_user) {
979   if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
980     gpr_log(GPR_INFO, "RQ %s %s: reclamation complete",
981             resource_user->resource_quota->name.c_str(),
982             resource_user->name.c_str());
983   }
984   resource_user->resource_quota->combiner->Run(
985       &resource_user->resource_quota->rq_reclamation_done_closure,
986       GRPC_ERROR_NONE);
987 }
988 
grpc_resource_user_slice_allocator_init(grpc_resource_user_slice_allocator * slice_allocator,grpc_resource_user * resource_user,grpc_iomgr_cb_func cb,void * p)989 void grpc_resource_user_slice_allocator_init(
990     grpc_resource_user_slice_allocator* slice_allocator,
991     grpc_resource_user* resource_user, grpc_iomgr_cb_func cb, void* p) {
992   GRPC_CLOSURE_INIT(&slice_allocator->on_allocated, ru_allocated_slices,
993                     slice_allocator, grpc_schedule_on_exec_ctx);
994   GRPC_CLOSURE_INIT(&slice_allocator->on_done, cb, p,
995                     grpc_schedule_on_exec_ctx);
996   slice_allocator->resource_user = resource_user;
997 }
998 
grpc_resource_user_alloc_slices(grpc_resource_user_slice_allocator * slice_allocator,size_t length,size_t count,grpc_slice_buffer * dest)999 bool grpc_resource_user_alloc_slices(
1000     grpc_resource_user_slice_allocator* slice_allocator, size_t length,
1001     size_t count, grpc_slice_buffer* dest) {
1002   if (GPR_UNLIKELY(
1003           gpr_atm_no_barrier_load(&slice_allocator->resource_user->shutdown))) {
1004     grpc_core::ExecCtx::Run(
1005         DEBUG_LOCATION, &slice_allocator->on_allocated,
1006         GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource user shutdown"));
1007     return false;
1008   }
1009   slice_allocator->length = length;
1010   slice_allocator->count = count;
1011   slice_allocator->dest = dest;
1012   const bool ret =
1013       grpc_resource_user_alloc(slice_allocator->resource_user, count * length,
1014                                &slice_allocator->on_allocated);
1015   if (ret) ru_alloc_slices(slice_allocator);
1016   return ret;
1017 }
1018