1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/resource_quota.h"
22
23 #include <inttypes.h>
24 #include <limits.h>
25 #include <stdint.h>
26 #include <string.h>
27
28 #include <string>
29
30 #include "absl/strings/str_cat.h"
31
32 #include <grpc/slice_buffer.h>
33 #include <grpc/support/alloc.h>
34 #include <grpc/support/log.h>
35
36 #include "src/core/lib/gpr/useful.h"
37 #include "src/core/lib/iomgr/combiner.h"
38 #include "src/core/lib/slice/slice_internal.h"
39
40 grpc_core::TraceFlag grpc_resource_quota_trace(false, "resource_quota");
41
42 #define MEMORY_USAGE_ESTIMATION_MAX 65536
43
44 /* Internal linked list pointers for a resource user */
45 struct grpc_resource_user_link {
46 grpc_resource_user* next;
47 grpc_resource_user* prev;
48 };
49 /* Resource users are kept in (potentially) several intrusive linked lists
50 at once. These are the list names. */
51 typedef enum {
52 /* Resource users that are waiting for an allocation */
53 GRPC_RULIST_AWAITING_ALLOCATION,
54 /* Resource users that have free memory available for internal reclamation */
55 GRPC_RULIST_NON_EMPTY_FREE_POOL,
56 /* Resource users that have published a benign reclamation is available */
57 GRPC_RULIST_RECLAIMER_BENIGN,
58 /* Resource users that have published a destructive reclamation is
59 available */
60 GRPC_RULIST_RECLAIMER_DESTRUCTIVE,
61 /* Number of lists: must be last */
62 GRPC_RULIST_COUNT
63 } grpc_rulist;
64
65 struct grpc_resource_user {
66 /* The quota this resource user consumes from */
67 grpc_resource_quota* resource_quota;
68
69 /* Closure to schedule an allocation under the resource quota combiner lock */
70 grpc_closure allocate_closure;
71 /* Closure to publish a non empty free pool under the resource quota combiner
72 lock */
73 grpc_closure add_to_free_pool_closure;
74
75 /* one ref for each ref call (released by grpc_resource_user_unref), and one
76 ref for each byte allocated (released by grpc_resource_user_free) */
77 gpr_atm refs;
78 /* is this resource user unlocked? starts at 0, increases for each shutdown
79 call */
80 gpr_atm shutdown;
81
82 gpr_mu mu;
83 /* The amount of memory (in bytes) this user has cached for its own use: to
84 avoid quota contention, each resource user can keep some memory in
85 addition to what it is immediately using (e.g., for caching), and the quota
86 can pull it back under memory pressure.
87 This value can become negative if more memory has been requested than
88 existed in the free pool, at which point the quota is consulted to bring
89 this value non-negative (asynchronously). */
90 int64_t free_pool;
91 /* A list of closures to call once free_pool becomes non-negative - ie when
92 all outstanding allocations have been granted. */
93 grpc_closure_list on_allocated;
94 /* True if we are currently trying to allocate from the quota, false if not */
95 bool allocating;
96 /* The amount of memory (in bytes) that has been requested from this user
97 * asynchronously but hasn't been granted yet. */
98 int64_t outstanding_allocations;
99 /* True if we are currently trying to add ourselves to the non-free quota
100 list, false otherwise */
101 bool added_to_free_pool;
102
103 /* The number of threads currently allocated to this resource user */
104 gpr_atm num_threads_allocated;
105
106 /* Reclaimers: index 0 is the benign reclaimer, 1 is the destructive reclaimer
107 */
108 grpc_closure* reclaimers[2];
109 /* Reclaimers just posted: once we're in the combiner lock, we'll move them
110 to the array above */
111 grpc_closure* new_reclaimers[2];
112 /* Trampoline closures to finish reclamation and re-enter the quota combiner
113 lock */
114 grpc_closure post_reclaimer_closure[2];
115
116 /* Closure to execute under the quota combiner to de-register and shutdown the
117 resource user */
118 grpc_closure destroy_closure;
119
120 /* Links in the various grpc_rulist lists */
121 grpc_resource_user_link links[GRPC_RULIST_COUNT];
122
123 /* The name of this resource user, for debugging/tracing */
124 std::string name;
125 };
126
127 struct grpc_resource_quota {
128 /* refcount */
129 gpr_refcount refs;
130
131 /* estimate of current memory usage
132 scaled to the range [0..RESOURCE_USAGE_ESTIMATION_MAX] */
133 gpr_atm memory_usage_estimation;
134
135 /* Master combiner lock: all activity on a quota executes under this combiner
136 * (so no mutex is needed for this data structure) */
137 grpc_core::Combiner* combiner;
138 /* Size of the resource quota */
139 int64_t size;
140 /* Amount of free memory in the resource quota */
141 int64_t free_pool;
142 /* Used size of memory in the resource quota. Updated as soon as the resource
143 * users start to allocate or free the memory. */
144 gpr_atm used;
145
146 gpr_atm last_size;
147
148 /* Mutex to protect max_threads and num_threads_allocated */
149 /* Note: We could have used gpr_atm for max_threads and num_threads_allocated
150 * and avoid having this mutex; but in that case, each invocation of the
151 * function grpc_resource_user_allocate_threads() would have had to do at
152 * least two atomic loads (for max_threads and num_threads_allocated) followed
153 * by a CAS (on num_threads_allocated).
154 * Moreover, we expect grpc_resource_user_allocate_threads() to be often
155 * called concurrently thereby increasing the chances of failing the CAS
156 * operation. This additional complexity is not worth the tiny perf gain we
157 * may (or may not) have by using atomics */
158 gpr_mu thread_count_mu;
159
160 /* Max number of threads allowed */
161 int max_threads;
162
163 /* Number of threads currently allocated via this resource_quota object */
164 int num_threads_allocated;
165
166 /* Has rq_step been scheduled to occur? */
167 bool step_scheduled;
168
169 /* Are we currently reclaiming memory */
170 bool reclaiming;
171
172 /* Closure around rq_step */
173 grpc_closure rq_step_closure;
174
175 /* Closure around rq_reclamation_done */
176 grpc_closure rq_reclamation_done_closure;
177
178 /* This is only really usable for debugging: it's always a stale pointer, but
179 a stale pointer that might just be fresh enough to guide us to where the
180 reclamation system is stuck */
181 grpc_closure* debug_only_last_initiated_reclaimer;
182 grpc_resource_user* debug_only_last_reclaimer_resource_user;
183
184 /* Roots of all resource user lists */
185 grpc_resource_user* roots[GRPC_RULIST_COUNT];
186
187 std::string name;
188 };
189
190 static void ru_unref_by(grpc_resource_user* resource_user, gpr_atm amount);
191
192 /*******************************************************************************
193 * list management
194 */
195
rulist_add_head(grpc_resource_user * resource_user,grpc_rulist list)196 static void rulist_add_head(grpc_resource_user* resource_user,
197 grpc_rulist list) {
198 grpc_resource_quota* resource_quota = resource_user->resource_quota;
199 grpc_resource_user** root = &resource_quota->roots[list];
200 if (*root == nullptr) {
201 *root = resource_user;
202 resource_user->links[list].next = resource_user->links[list].prev =
203 resource_user;
204 } else {
205 resource_user->links[list].next = *root;
206 resource_user->links[list].prev = (*root)->links[list].prev;
207 resource_user->links[list].next->links[list].prev =
208 resource_user->links[list].prev->links[list].next = resource_user;
209 *root = resource_user;
210 }
211 }
212
rulist_add_tail(grpc_resource_user * resource_user,grpc_rulist list)213 static void rulist_add_tail(grpc_resource_user* resource_user,
214 grpc_rulist list) {
215 grpc_resource_quota* resource_quota = resource_user->resource_quota;
216 grpc_resource_user** root = &resource_quota->roots[list];
217 if (*root == nullptr) {
218 *root = resource_user;
219 resource_user->links[list].next = resource_user->links[list].prev =
220 resource_user;
221 } else {
222 resource_user->links[list].next = (*root)->links[list].next;
223 resource_user->links[list].prev = *root;
224 resource_user->links[list].next->links[list].prev =
225 resource_user->links[list].prev->links[list].next = resource_user;
226 }
227 }
228
rulist_empty(grpc_resource_quota * resource_quota,grpc_rulist list)229 static bool rulist_empty(grpc_resource_quota* resource_quota,
230 grpc_rulist list) {
231 return resource_quota->roots[list] == nullptr;
232 }
233
rulist_pop_head(grpc_resource_quota * resource_quota,grpc_rulist list)234 static grpc_resource_user* rulist_pop_head(grpc_resource_quota* resource_quota,
235 grpc_rulist list) {
236 grpc_resource_user** root = &resource_quota->roots[list];
237 grpc_resource_user* resource_user = *root;
238 if (resource_user == nullptr) {
239 return nullptr;
240 }
241 if (resource_user->links[list].next == resource_user) {
242 *root = nullptr;
243 } else {
244 resource_user->links[list].next->links[list].prev =
245 resource_user->links[list].prev;
246 resource_user->links[list].prev->links[list].next =
247 resource_user->links[list].next;
248 *root = resource_user->links[list].next;
249 }
250 resource_user->links[list].next = resource_user->links[list].prev = nullptr;
251 return resource_user;
252 }
253
rulist_remove(grpc_resource_user * resource_user,grpc_rulist list)254 static void rulist_remove(grpc_resource_user* resource_user, grpc_rulist list) {
255 if (resource_user->links[list].next == nullptr) return;
256 grpc_resource_quota* resource_quota = resource_user->resource_quota;
257 if (resource_quota->roots[list] == resource_user) {
258 resource_quota->roots[list] = resource_user->links[list].next;
259 if (resource_quota->roots[list] == resource_user) {
260 resource_quota->roots[list] = nullptr;
261 }
262 }
263 resource_user->links[list].next->links[list].prev =
264 resource_user->links[list].prev;
265 resource_user->links[list].prev->links[list].next =
266 resource_user->links[list].next;
267 resource_user->links[list].next = resource_user->links[list].prev = nullptr;
268 }
269
270 /*******************************************************************************
271 * resource quota state machine
272 */
273
274 static bool rq_alloc(grpc_resource_quota* resource_quota);
275 static bool rq_reclaim_from_per_user_free_pool(
276 grpc_resource_quota* resource_quota);
277 static bool rq_reclaim(grpc_resource_quota* resource_quota, bool destructive);
278
rq_step(void * rq,grpc_error *)279 static void rq_step(void* rq, grpc_error* /*error*/) {
280 grpc_resource_quota* resource_quota = static_cast<grpc_resource_quota*>(rq);
281 resource_quota->step_scheduled = false;
282 do {
283 if (rq_alloc(resource_quota)) goto done;
284 } while (rq_reclaim_from_per_user_free_pool(resource_quota));
285
286 if (!rq_reclaim(resource_quota, false)) {
287 rq_reclaim(resource_quota, true);
288 }
289
290 done:
291 grpc_resource_quota_unref_internal(resource_quota);
292 }
293
rq_step_sched(grpc_resource_quota * resource_quota)294 static void rq_step_sched(grpc_resource_quota* resource_quota) {
295 if (resource_quota->step_scheduled) return;
296 resource_quota->step_scheduled = true;
297 grpc_resource_quota_ref_internal(resource_quota);
298 resource_quota->combiner->FinallyRun(&resource_quota->rq_step_closure,
299 GRPC_ERROR_NONE);
300 }
301
302 /* update the atomically available resource estimate - use no barriers since
303 timeliness of delivery really doesn't matter much */
rq_update_estimate(grpc_resource_quota * resource_quota)304 static void rq_update_estimate(grpc_resource_quota* resource_quota) {
305 gpr_atm memory_usage_estimation = MEMORY_USAGE_ESTIMATION_MAX;
306 if (resource_quota->size != 0) {
307 memory_usage_estimation =
308 GPR_CLAMP((gpr_atm)((1.0 - ((double)resource_quota->free_pool) /
309 ((double)resource_quota->size)) *
310 MEMORY_USAGE_ESTIMATION_MAX),
311 0, MEMORY_USAGE_ESTIMATION_MAX);
312 }
313 gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation,
314 memory_usage_estimation);
315 }
316
317 /* returns true if all allocations are completed */
rq_alloc(grpc_resource_quota * resource_quota)318 static bool rq_alloc(grpc_resource_quota* resource_quota) {
319 grpc_resource_user* resource_user;
320 while ((resource_user = rulist_pop_head(resource_quota,
321 GRPC_RULIST_AWAITING_ALLOCATION))) {
322 gpr_mu_lock(&resource_user->mu);
323 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
324 gpr_log(GPR_INFO,
325 "RQ: check allocation for user %p shutdown=%" PRIdPTR
326 " free_pool=%" PRId64 " outstanding_allocations=%" PRId64,
327 resource_user, gpr_atm_no_barrier_load(&resource_user->shutdown),
328 resource_user->free_pool, resource_user->outstanding_allocations);
329 }
330 if (gpr_atm_no_barrier_load(&resource_user->shutdown)) {
331 resource_user->allocating = false;
332 grpc_closure_list_fail_all(
333 &resource_user->on_allocated,
334 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource user shutdown"));
335 int64_t aborted_allocations = resource_user->outstanding_allocations;
336 resource_user->outstanding_allocations = 0;
337 resource_user->free_pool += aborted_allocations;
338 grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &resource_user->on_allocated);
339 gpr_mu_unlock(&resource_user->mu);
340 if (aborted_allocations > 0) {
341 ru_unref_by(resource_user, static_cast<gpr_atm>(aborted_allocations));
342 }
343 continue;
344 }
345 if (resource_user->free_pool < 0 &&
346 -resource_user->free_pool <= resource_quota->free_pool) {
347 int64_t amt = -resource_user->free_pool;
348 resource_user->free_pool = 0;
349 resource_quota->free_pool -= amt;
350 rq_update_estimate(resource_quota);
351 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
352 gpr_log(GPR_INFO,
353 "RQ %s %s: grant alloc %" PRId64
354 " bytes; rq_free_pool -> %" PRId64,
355 resource_quota->name.c_str(), resource_user->name.c_str(), amt,
356 resource_quota->free_pool);
357 }
358 } else if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace) &&
359 resource_user->free_pool >= 0) {
360 gpr_log(GPR_INFO, "RQ %s %s: discard already satisfied alloc request",
361 resource_quota->name.c_str(), resource_user->name.c_str());
362 }
363 if (resource_user->free_pool >= 0) {
364 resource_user->allocating = false;
365 resource_user->outstanding_allocations = 0;
366 grpc_core::ExecCtx::RunList(DEBUG_LOCATION, &resource_user->on_allocated);
367 gpr_mu_unlock(&resource_user->mu);
368 } else {
369 rulist_add_head(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
370 gpr_mu_unlock(&resource_user->mu);
371 return false;
372 }
373 }
374 return true;
375 }
376
377 /* returns true if any memory could be reclaimed from buffers */
rq_reclaim_from_per_user_free_pool(grpc_resource_quota * resource_quota)378 static bool rq_reclaim_from_per_user_free_pool(
379 grpc_resource_quota* resource_quota) {
380 grpc_resource_user* resource_user;
381 while ((resource_user = rulist_pop_head(resource_quota,
382 GRPC_RULIST_NON_EMPTY_FREE_POOL))) {
383 gpr_mu_lock(&resource_user->mu);
384 resource_user->added_to_free_pool = false;
385 if (resource_user->free_pool > 0) {
386 int64_t amt = resource_user->free_pool;
387 resource_user->free_pool = 0;
388 resource_quota->free_pool += amt;
389 rq_update_estimate(resource_quota);
390 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
391 gpr_log(GPR_INFO,
392 "RQ %s %s: reclaim_from_per_user_free_pool %" PRId64
393 " bytes; rq_free_pool -> %" PRId64,
394 resource_quota->name.c_str(), resource_user->name.c_str(), amt,
395 resource_quota->free_pool);
396 }
397 gpr_mu_unlock(&resource_user->mu);
398 return true;
399 } else {
400 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
401 gpr_log(GPR_INFO,
402 "RQ %s %s: failed to reclaim_from_per_user_free_pool; "
403 "free_pool = %" PRId64 "; rq_free_pool = %" PRId64,
404 resource_quota->name.c_str(), resource_user->name.c_str(),
405 resource_user->free_pool, resource_quota->free_pool);
406 }
407 gpr_mu_unlock(&resource_user->mu);
408 }
409 }
410 return false;
411 }
412
413 /* returns true if reclamation is proceeding */
rq_reclaim(grpc_resource_quota * resource_quota,bool destructive)414 static bool rq_reclaim(grpc_resource_quota* resource_quota, bool destructive) {
415 if (resource_quota->reclaiming) return true;
416 grpc_rulist list = destructive ? GRPC_RULIST_RECLAIMER_DESTRUCTIVE
417 : GRPC_RULIST_RECLAIMER_BENIGN;
418 grpc_resource_user* resource_user = rulist_pop_head(resource_quota, list);
419 if (resource_user == nullptr) return false;
420 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
421 gpr_log(GPR_INFO, "RQ %s %s: initiate %s reclamation",
422 resource_quota->name.c_str(), resource_user->name.c_str(),
423 destructive ? "destructive" : "benign");
424 }
425 resource_quota->reclaiming = true;
426 grpc_resource_quota_ref_internal(resource_quota);
427 grpc_closure* c = resource_user->reclaimers[destructive];
428 GPR_ASSERT(c);
429 resource_quota->debug_only_last_reclaimer_resource_user = resource_user;
430 resource_quota->debug_only_last_initiated_reclaimer = c;
431 resource_user->reclaimers[destructive] = nullptr;
432 grpc_core::ExecCtx::Run(DEBUG_LOCATION, c, GRPC_ERROR_NONE);
433 return true;
434 }
435
436 /*******************************************************************************
437 * ru_slice: a slice implementation that is backed by a grpc_resource_user
438 */
439
440 namespace grpc_core {
441
442 class RuSliceRefcount {
443 public:
Destroy(void * p)444 static void Destroy(void* p) {
445 auto* rc = static_cast<RuSliceRefcount*>(p);
446 rc->~RuSliceRefcount();
447 gpr_free(rc);
448 }
RuSliceRefcount(grpc_resource_user * resource_user,size_t size)449 RuSliceRefcount(grpc_resource_user* resource_user, size_t size)
450 : base_(grpc_slice_refcount::Type::REGULAR, &refs_, Destroy, this,
451 &base_),
452 resource_user_(resource_user),
453 size_(size) {
454 // Nothing to do here.
455 }
~RuSliceRefcount()456 ~RuSliceRefcount() { grpc_resource_user_free(resource_user_, size_); }
457
base_refcount()458 grpc_slice_refcount* base_refcount() { return &base_; }
459
460 private:
461 grpc_slice_refcount base_;
462 RefCount refs_;
463 grpc_resource_user* resource_user_;
464 size_t size_;
465 };
466
467 } // namespace grpc_core
468
ru_slice_create(grpc_resource_user * resource_user,size_t size)469 static grpc_slice ru_slice_create(grpc_resource_user* resource_user,
470 size_t size) {
471 auto* rc = static_cast<grpc_core::RuSliceRefcount*>(
472 gpr_malloc(sizeof(grpc_core::RuSliceRefcount) + size));
473 new (rc) grpc_core::RuSliceRefcount(resource_user, size);
474 grpc_slice slice;
475
476 slice.refcount = rc->base_refcount();
477 slice.data.refcounted.bytes = reinterpret_cast<uint8_t*>(rc + 1);
478 slice.data.refcounted.length = size;
479 return slice;
480 }
481
482 /*******************************************************************************
483 * grpc_resource_quota internal implementation: resource user manipulation under
484 * the combiner
485 */
486
ru_allocate(void * ru,grpc_error *)487 static void ru_allocate(void* ru, grpc_error* /*error*/) {
488 grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
489 if (rulist_empty(resource_user->resource_quota,
490 GRPC_RULIST_AWAITING_ALLOCATION)) {
491 rq_step_sched(resource_user->resource_quota);
492 }
493 rulist_add_tail(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
494 }
495
ru_add_to_free_pool(void * ru,grpc_error *)496 static void ru_add_to_free_pool(void* ru, grpc_error* /*error*/) {
497 grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
498 if (!rulist_empty(resource_user->resource_quota,
499 GRPC_RULIST_AWAITING_ALLOCATION) &&
500 rulist_empty(resource_user->resource_quota,
501 GRPC_RULIST_NON_EMPTY_FREE_POOL)) {
502 rq_step_sched(resource_user->resource_quota);
503 }
504 rulist_add_tail(resource_user, GRPC_RULIST_NON_EMPTY_FREE_POOL);
505 }
506
ru_post_reclaimer(grpc_resource_user * resource_user,bool destructive)507 static bool ru_post_reclaimer(grpc_resource_user* resource_user,
508 bool destructive) {
509 grpc_closure* closure = resource_user->new_reclaimers[destructive];
510 GPR_ASSERT(closure != nullptr);
511 resource_user->new_reclaimers[destructive] = nullptr;
512 GPR_ASSERT(resource_user->reclaimers[destructive] == nullptr);
513 if (gpr_atm_acq_load(&resource_user->shutdown) > 0) {
514 grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_CANCELLED);
515 return false;
516 }
517 resource_user->reclaimers[destructive] = closure;
518 return true;
519 }
520
ru_post_benign_reclaimer(void * ru,grpc_error *)521 static void ru_post_benign_reclaimer(void* ru, grpc_error* /*error*/) {
522 grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
523 if (!ru_post_reclaimer(resource_user, false)) return;
524 if (!rulist_empty(resource_user->resource_quota,
525 GRPC_RULIST_AWAITING_ALLOCATION) &&
526 rulist_empty(resource_user->resource_quota,
527 GRPC_RULIST_NON_EMPTY_FREE_POOL) &&
528 rulist_empty(resource_user->resource_quota,
529 GRPC_RULIST_RECLAIMER_BENIGN)) {
530 rq_step_sched(resource_user->resource_quota);
531 }
532 rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
533 }
534
ru_post_destructive_reclaimer(void * ru,grpc_error *)535 static void ru_post_destructive_reclaimer(void* ru, grpc_error* /*error*/) {
536 grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
537 if (!ru_post_reclaimer(resource_user, true)) return;
538 if (!rulist_empty(resource_user->resource_quota,
539 GRPC_RULIST_AWAITING_ALLOCATION) &&
540 rulist_empty(resource_user->resource_quota,
541 GRPC_RULIST_NON_EMPTY_FREE_POOL) &&
542 rulist_empty(resource_user->resource_quota,
543 GRPC_RULIST_RECLAIMER_BENIGN) &&
544 rulist_empty(resource_user->resource_quota,
545 GRPC_RULIST_RECLAIMER_DESTRUCTIVE)) {
546 rq_step_sched(resource_user->resource_quota);
547 }
548 rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE);
549 }
550
ru_shutdown(void * ru,grpc_error *)551 static void ru_shutdown(void* ru, grpc_error* /*error*/) {
552 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
553 gpr_log(GPR_INFO, "RU shutdown %p", ru);
554 }
555 grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
556 gpr_mu_lock(&resource_user->mu);
557 grpc_core::ExecCtx::Run(DEBUG_LOCATION, resource_user->reclaimers[0],
558 GRPC_ERROR_CANCELLED);
559 grpc_core::ExecCtx::Run(DEBUG_LOCATION, resource_user->reclaimers[1],
560 GRPC_ERROR_CANCELLED);
561 resource_user->reclaimers[0] = nullptr;
562 resource_user->reclaimers[1] = nullptr;
563 rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
564 rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE);
565 if (resource_user->allocating) {
566 rq_step_sched(resource_user->resource_quota);
567 }
568 gpr_mu_unlock(&resource_user->mu);
569 }
570
ru_destroy(void * ru,grpc_error *)571 static void ru_destroy(void* ru, grpc_error* /*error*/) {
572 grpc_resource_user* resource_user = static_cast<grpc_resource_user*>(ru);
573 GPR_ASSERT(gpr_atm_no_barrier_load(&resource_user->refs) == 0);
574 // Free all the remaining thread quota
575 grpc_resource_user_free_threads(resource_user,
576 static_cast<int>(gpr_atm_no_barrier_load(
577 &resource_user->num_threads_allocated)));
578
579 for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
580 rulist_remove(resource_user, static_cast<grpc_rulist>(i));
581 }
582 grpc_core::ExecCtx::Run(DEBUG_LOCATION, resource_user->reclaimers[0],
583 GRPC_ERROR_CANCELLED);
584 grpc_core::ExecCtx::Run(DEBUG_LOCATION, resource_user->reclaimers[1],
585 GRPC_ERROR_CANCELLED);
586 if (resource_user->free_pool != 0) {
587 resource_user->resource_quota->free_pool += resource_user->free_pool;
588 rq_step_sched(resource_user->resource_quota);
589 }
590 grpc_resource_quota_unref_internal(resource_user->resource_quota);
591 gpr_mu_destroy(&resource_user->mu);
592 delete resource_user;
593 }
594
ru_alloc_slices(grpc_resource_user_slice_allocator * slice_allocator)595 static void ru_alloc_slices(
596 grpc_resource_user_slice_allocator* slice_allocator) {
597 for (size_t i = 0; i < slice_allocator->count; i++) {
598 grpc_slice_buffer_add_indexed(
599 slice_allocator->dest, ru_slice_create(slice_allocator->resource_user,
600 slice_allocator->length));
601 }
602 }
603
ru_allocated_slices(void * arg,grpc_error * error)604 static void ru_allocated_slices(void* arg, grpc_error* error) {
605 grpc_resource_user_slice_allocator* slice_allocator =
606 static_cast<grpc_resource_user_slice_allocator*>(arg);
607 if (error == GRPC_ERROR_NONE) ru_alloc_slices(slice_allocator);
608 grpc_core::Closure::Run(DEBUG_LOCATION, &slice_allocator->on_done,
609 GRPC_ERROR_REF(error));
610 }
611
612 /*******************************************************************************
613 * grpc_resource_quota internal implementation: quota manipulation under the
614 * combiner
615 */
616
617 struct rq_resize_args {
618 int64_t size;
619 grpc_resource_quota* resource_quota;
620 grpc_closure closure;
621 };
rq_resize(void * args,grpc_error *)622 static void rq_resize(void* args, grpc_error* /*error*/) {
623 rq_resize_args* a = static_cast<rq_resize_args*>(args);
624 int64_t delta = a->size - a->resource_quota->size;
625 a->resource_quota->size += delta;
626 a->resource_quota->free_pool += delta;
627 rq_update_estimate(a->resource_quota);
628 rq_step_sched(a->resource_quota);
629 grpc_resource_quota_unref_internal(a->resource_quota);
630 gpr_free(a);
631 }
632
rq_reclamation_done(void * rq,grpc_error *)633 static void rq_reclamation_done(void* rq, grpc_error* /*error*/) {
634 grpc_resource_quota* resource_quota = static_cast<grpc_resource_quota*>(rq);
635 resource_quota->reclaiming = false;
636 rq_step_sched(resource_quota);
637 grpc_resource_quota_unref_internal(resource_quota);
638 }
639
640 /*******************************************************************************
641 * grpc_resource_quota api
642 */
643
644 /* Public API */
grpc_resource_quota_create(const char * name)645 grpc_resource_quota* grpc_resource_quota_create(const char* name) {
646 grpc_resource_quota* resource_quota = new grpc_resource_quota;
647 gpr_ref_init(&resource_quota->refs, 1);
648 resource_quota->combiner = grpc_combiner_create();
649 resource_quota->free_pool = INT64_MAX;
650 resource_quota->size = INT64_MAX;
651 resource_quota->used = 0;
652 gpr_atm_no_barrier_store(&resource_quota->last_size, GPR_ATM_MAX);
653 gpr_mu_init(&resource_quota->thread_count_mu);
654 resource_quota->max_threads = INT_MAX;
655 resource_quota->num_threads_allocated = 0;
656 resource_quota->step_scheduled = false;
657 resource_quota->reclaiming = false;
658 gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation, 0);
659 if (name != nullptr) {
660 resource_quota->name = name;
661 } else {
662 resource_quota->name = absl::StrCat(
663 "anonymous_pool_", reinterpret_cast<intptr_t>(resource_quota));
664 }
665 GRPC_CLOSURE_INIT(&resource_quota->rq_step_closure, rq_step, resource_quota,
666 nullptr);
667 GRPC_CLOSURE_INIT(&resource_quota->rq_reclamation_done_closure,
668 rq_reclamation_done, resource_quota, nullptr);
669 for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
670 resource_quota->roots[i] = nullptr;
671 }
672 return resource_quota;
673 }
674
grpc_resource_quota_unref_internal(grpc_resource_quota * resource_quota)675 void grpc_resource_quota_unref_internal(grpc_resource_quota* resource_quota) {
676 if (gpr_unref(&resource_quota->refs)) {
677 // No outstanding thread quota
678 GPR_ASSERT(resource_quota->num_threads_allocated == 0);
679 GRPC_COMBINER_UNREF(resource_quota->combiner, "resource_quota");
680 gpr_mu_destroy(&resource_quota->thread_count_mu);
681 delete resource_quota;
682 }
683 }
684
685 /* Public API */
grpc_resource_quota_unref(grpc_resource_quota * resource_quota)686 void grpc_resource_quota_unref(grpc_resource_quota* resource_quota) {
687 grpc_core::ExecCtx exec_ctx;
688 grpc_resource_quota_unref_internal(resource_quota);
689 }
690
grpc_resource_quota_ref_internal(grpc_resource_quota * resource_quota)691 grpc_resource_quota* grpc_resource_quota_ref_internal(
692 grpc_resource_quota* resource_quota) {
693 gpr_ref(&resource_quota->refs);
694 return resource_quota;
695 }
696
697 /* Public API */
grpc_resource_quota_ref(grpc_resource_quota * resource_quota)698 void grpc_resource_quota_ref(grpc_resource_quota* resource_quota) {
699 grpc_resource_quota_ref_internal(resource_quota);
700 }
701
grpc_resource_quota_get_memory_pressure(grpc_resource_quota * resource_quota)702 double grpc_resource_quota_get_memory_pressure(
703 grpc_resource_quota* resource_quota) {
704 return (static_cast<double>(gpr_atm_no_barrier_load(
705 &resource_quota->memory_usage_estimation))) /
706 (static_cast<double>(MEMORY_USAGE_ESTIMATION_MAX));
707 }
708
709 /* Public API */
grpc_resource_quota_set_max_threads(grpc_resource_quota * resource_quota,int new_max_threads)710 void grpc_resource_quota_set_max_threads(grpc_resource_quota* resource_quota,
711 int new_max_threads) {
712 GPR_ASSERT(new_max_threads >= 0);
713 gpr_mu_lock(&resource_quota->thread_count_mu);
714 resource_quota->max_threads = new_max_threads;
715 gpr_mu_unlock(&resource_quota->thread_count_mu);
716 }
717
718 /* Public API */
grpc_resource_quota_resize(grpc_resource_quota * resource_quota,size_t size)719 void grpc_resource_quota_resize(grpc_resource_quota* resource_quota,
720 size_t size) {
721 grpc_core::ExecCtx exec_ctx;
722 rq_resize_args* a = static_cast<rq_resize_args*>(gpr_malloc(sizeof(*a)));
723 a->resource_quota = grpc_resource_quota_ref_internal(resource_quota);
724 a->size = static_cast<int64_t>(size);
725 gpr_atm_no_barrier_store(&resource_quota->last_size,
726 (gpr_atm)GPR_MIN((size_t)GPR_ATM_MAX, size));
727 GRPC_CLOSURE_INIT(&a->closure, rq_resize, a, grpc_schedule_on_exec_ctx);
728 grpc_core::ExecCtx::Run(DEBUG_LOCATION, &a->closure, GRPC_ERROR_NONE);
729 }
730
grpc_resource_quota_peek_size(grpc_resource_quota * resource_quota)731 size_t grpc_resource_quota_peek_size(grpc_resource_quota* resource_quota) {
732 return static_cast<size_t>(
733 gpr_atm_no_barrier_load(&resource_quota->last_size));
734 }
735
736 /*******************************************************************************
737 * grpc_resource_user channel args api
738 */
739
grpc_resource_quota_from_channel_args(const grpc_channel_args * channel_args,bool create)740 grpc_resource_quota* grpc_resource_quota_from_channel_args(
741 const grpc_channel_args* channel_args, bool create) {
742 for (size_t i = 0; i < channel_args->num_args; i++) {
743 if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
744 if (channel_args->args[i].type == GRPC_ARG_POINTER) {
745 return grpc_resource_quota_ref_internal(
746 static_cast<grpc_resource_quota*>(
747 channel_args->args[i].value.pointer.p));
748 } else {
749 gpr_log(GPR_DEBUG, GRPC_ARG_RESOURCE_QUOTA " should be a pointer");
750 }
751 }
752 }
753 return create ? grpc_resource_quota_create(nullptr) : nullptr;
754 }
755
rq_copy(void * rq)756 static void* rq_copy(void* rq) {
757 grpc_resource_quota_ref(static_cast<grpc_resource_quota*>(rq));
758 return rq;
759 }
760
rq_destroy(void * rq)761 static void rq_destroy(void* rq) {
762 grpc_resource_quota_unref_internal(static_cast<grpc_resource_quota*>(rq));
763 }
764
rq_cmp(void * a,void * b)765 static int rq_cmp(void* a, void* b) { return GPR_ICMP(a, b); }
766
grpc_resource_quota_arg_vtable(void)767 const grpc_arg_pointer_vtable* grpc_resource_quota_arg_vtable(void) {
768 static const grpc_arg_pointer_vtable vtable = {rq_copy, rq_destroy, rq_cmp};
769 return &vtable;
770 }
771
772 /*******************************************************************************
773 * grpc_resource_user api
774 */
775
grpc_resource_user_create(grpc_resource_quota * resource_quota,const char * name)776 grpc_resource_user* grpc_resource_user_create(
777 grpc_resource_quota* resource_quota, const char* name) {
778 grpc_resource_user* resource_user = new grpc_resource_user;
779 resource_user->resource_quota =
780 grpc_resource_quota_ref_internal(resource_quota);
781 GRPC_CLOSURE_INIT(&resource_user->allocate_closure, &ru_allocate,
782 resource_user, nullptr);
783 GRPC_CLOSURE_INIT(&resource_user->add_to_free_pool_closure,
784 &ru_add_to_free_pool, resource_user, nullptr);
785 GRPC_CLOSURE_INIT(&resource_user->post_reclaimer_closure[0],
786 &ru_post_benign_reclaimer, resource_user, nullptr);
787 GRPC_CLOSURE_INIT(&resource_user->post_reclaimer_closure[1],
788 &ru_post_destructive_reclaimer, resource_user, nullptr);
789 GRPC_CLOSURE_INIT(&resource_user->destroy_closure, &ru_destroy, resource_user,
790 nullptr);
791 gpr_mu_init(&resource_user->mu);
792 gpr_atm_rel_store(&resource_user->refs, 1);
793 gpr_atm_rel_store(&resource_user->shutdown, 0);
794 resource_user->free_pool = 0;
795 grpc_closure_list_init(&resource_user->on_allocated);
796 resource_user->allocating = false;
797 resource_user->added_to_free_pool = false;
798 gpr_atm_no_barrier_store(&resource_user->num_threads_allocated, 0);
799 resource_user->reclaimers[0] = nullptr;
800 resource_user->reclaimers[1] = nullptr;
801 resource_user->new_reclaimers[0] = nullptr;
802 resource_user->new_reclaimers[1] = nullptr;
803 resource_user->outstanding_allocations = 0;
804 for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
805 resource_user->links[i].next = resource_user->links[i].prev = nullptr;
806 }
807 if (name != nullptr) {
808 resource_user->name = name;
809 } else {
810 resource_user->name = absl::StrCat(
811 "anonymous_resource_user_", reinterpret_cast<intptr_t>(resource_user));
812 }
813 return resource_user;
814 }
815
grpc_resource_user_quota(grpc_resource_user * resource_user)816 grpc_resource_quota* grpc_resource_user_quota(
817 grpc_resource_user* resource_user) {
818 return resource_user->resource_quota;
819 }
820
ru_ref_by(grpc_resource_user * resource_user,gpr_atm amount)821 static void ru_ref_by(grpc_resource_user* resource_user, gpr_atm amount) {
822 GPR_ASSERT(amount > 0);
823 GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&resource_user->refs, amount) != 0);
824 }
825
ru_unref_by(grpc_resource_user * resource_user,gpr_atm amount)826 static void ru_unref_by(grpc_resource_user* resource_user, gpr_atm amount) {
827 GPR_ASSERT(amount > 0);
828 gpr_atm old = gpr_atm_full_fetch_add(&resource_user->refs, -amount);
829 GPR_ASSERT(old >= amount);
830 if (old == amount) {
831 resource_user->resource_quota->combiner->Run(
832 &resource_user->destroy_closure, GRPC_ERROR_NONE);
833 }
834 }
835
grpc_resource_user_ref(grpc_resource_user * resource_user)836 void grpc_resource_user_ref(grpc_resource_user* resource_user) {
837 ru_ref_by(resource_user, 1);
838 }
839
grpc_resource_user_unref(grpc_resource_user * resource_user)840 void grpc_resource_user_unref(grpc_resource_user* resource_user) {
841 ru_unref_by(resource_user, 1);
842 }
843
grpc_resource_user_shutdown(grpc_resource_user * resource_user)844 void grpc_resource_user_shutdown(grpc_resource_user* resource_user) {
845 if (gpr_atm_full_fetch_add(&resource_user->shutdown, 1) == 0) {
846 resource_user->resource_quota->combiner->Run(
847 GRPC_CLOSURE_CREATE(ru_shutdown, resource_user, nullptr),
848 GRPC_ERROR_NONE);
849 }
850 }
851
grpc_resource_user_allocate_threads(grpc_resource_user * resource_user,int thread_count)852 bool grpc_resource_user_allocate_threads(grpc_resource_user* resource_user,
853 int thread_count) {
854 GPR_ASSERT(thread_count >= 0);
855 bool is_success = false;
856 gpr_mu_lock(&resource_user->resource_quota->thread_count_mu);
857 grpc_resource_quota* rq = resource_user->resource_quota;
858 if (rq->num_threads_allocated + thread_count <= rq->max_threads) {
859 rq->num_threads_allocated += thread_count;
860 gpr_atm_no_barrier_fetch_add(&resource_user->num_threads_allocated,
861 thread_count);
862 is_success = true;
863 }
864 gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu);
865 return is_success;
866 }
867
grpc_resource_user_free_threads(grpc_resource_user * resource_user,int thread_count)868 void grpc_resource_user_free_threads(grpc_resource_user* resource_user,
869 int thread_count) {
870 GPR_ASSERT(thread_count >= 0);
871 gpr_mu_lock(&resource_user->resource_quota->thread_count_mu);
872 grpc_resource_quota* rq = resource_user->resource_quota;
873 rq->num_threads_allocated -= thread_count;
874 int old_count = static_cast<int>(gpr_atm_no_barrier_fetch_add(
875 &resource_user->num_threads_allocated, -thread_count));
876 if (old_count < thread_count || rq->num_threads_allocated < 0) {
877 gpr_log(GPR_ERROR,
878 "Releasing more threads (%d) than currently allocated (rq threads: "
879 "%d, ru threads: %d)",
880 thread_count, rq->num_threads_allocated + thread_count, old_count);
881 abort();
882 }
883 gpr_mu_unlock(&resource_user->resource_quota->thread_count_mu);
884 }
885
resource_user_alloc_locked(grpc_resource_user * resource_user,size_t size,grpc_closure * optional_on_done)886 static bool resource_user_alloc_locked(grpc_resource_user* resource_user,
887 size_t size,
888 grpc_closure* optional_on_done) {
889 ru_ref_by(resource_user, static_cast<gpr_atm>(size));
890 resource_user->free_pool -= static_cast<int64_t>(size);
891 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
892 gpr_log(GPR_INFO, "RQ %s %s: alloc %" PRIdPTR "; free_pool -> %" PRId64,
893 resource_user->resource_quota->name.c_str(),
894 resource_user->name.c_str(), size, resource_user->free_pool);
895 }
896 if (GPR_LIKELY(resource_user->free_pool >= 0)) return true;
897 // Slow path: We need to wait for the free pool to refill.
898 if (optional_on_done != nullptr) {
899 resource_user->outstanding_allocations += static_cast<int64_t>(size);
900 grpc_closure_list_append(&resource_user->on_allocated, optional_on_done,
901 GRPC_ERROR_NONE);
902 }
903 if (!resource_user->allocating) {
904 resource_user->allocating = true;
905 resource_user->resource_quota->combiner->Run(
906 &resource_user->allocate_closure, GRPC_ERROR_NONE);
907 }
908 return false;
909 }
910
grpc_resource_user_safe_alloc(grpc_resource_user * resource_user,size_t size)911 bool grpc_resource_user_safe_alloc(grpc_resource_user* resource_user,
912 size_t size) {
913 if (gpr_atm_no_barrier_load(&resource_user->shutdown)) return false;
914 gpr_mu_lock(&resource_user->mu);
915 grpc_resource_quota* resource_quota = resource_user->resource_quota;
916 bool cas_success;
917 do {
918 gpr_atm used = gpr_atm_no_barrier_load(&resource_quota->used);
919 gpr_atm new_used = used + size;
920 if (static_cast<size_t>(new_used) >
921 grpc_resource_quota_peek_size(resource_quota)) {
922 gpr_mu_unlock(&resource_user->mu);
923 return false;
924 }
925 cas_success = gpr_atm_full_cas(&resource_quota->used, used, new_used);
926 } while (!cas_success);
927 resource_user_alloc_locked(resource_user, size, nullptr);
928 gpr_mu_unlock(&resource_user->mu);
929 return true;
930 }
931
grpc_resource_user_alloc(grpc_resource_user * resource_user,size_t size,grpc_closure * optional_on_done)932 bool grpc_resource_user_alloc(grpc_resource_user* resource_user, size_t size,
933 grpc_closure* optional_on_done) {
934 // TODO(juanlishen): Maybe return immediately if shutting down. Deferring this
935 // because some tests become flaky after the change.
936 gpr_mu_lock(&resource_user->mu);
937 grpc_resource_quota* resource_quota = resource_user->resource_quota;
938 gpr_atm_no_barrier_fetch_add(&resource_quota->used, size);
939 const bool ret =
940 resource_user_alloc_locked(resource_user, size, optional_on_done);
941 gpr_mu_unlock(&resource_user->mu);
942 return ret;
943 }
944
grpc_resource_user_free(grpc_resource_user * resource_user,size_t size)945 void grpc_resource_user_free(grpc_resource_user* resource_user, size_t size) {
946 gpr_mu_lock(&resource_user->mu);
947 grpc_resource_quota* resource_quota = resource_user->resource_quota;
948 gpr_atm prior = gpr_atm_no_barrier_fetch_add(&resource_quota->used, -size);
949 GPR_ASSERT(prior >= static_cast<long>(size));
950 bool was_zero_or_negative = resource_user->free_pool <= 0;
951 resource_user->free_pool += static_cast<int64_t>(size);
952 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
953 gpr_log(GPR_INFO, "RQ %s %s: free %" PRIdPTR "; free_pool -> %" PRId64,
954 resource_user->resource_quota->name.c_str(),
955 resource_user->name.c_str(), size, resource_user->free_pool);
956 }
957 bool is_bigger_than_zero = resource_user->free_pool > 0;
958 if (is_bigger_than_zero && was_zero_or_negative &&
959 !resource_user->added_to_free_pool) {
960 resource_user->added_to_free_pool = true;
961 resource_quota->combiner->Run(&resource_user->add_to_free_pool_closure,
962 GRPC_ERROR_NONE);
963 }
964 gpr_mu_unlock(&resource_user->mu);
965 ru_unref_by(resource_user, static_cast<gpr_atm>(size));
966 }
967
grpc_resource_user_post_reclaimer(grpc_resource_user * resource_user,bool destructive,grpc_closure * closure)968 void grpc_resource_user_post_reclaimer(grpc_resource_user* resource_user,
969 bool destructive,
970 grpc_closure* closure) {
971 GPR_ASSERT(resource_user->new_reclaimers[destructive] == nullptr);
972 resource_user->new_reclaimers[destructive] = closure;
973 resource_user->resource_quota->combiner->Run(
974 &resource_user->post_reclaimer_closure[destructive], GRPC_ERROR_NONE);
975 }
976
grpc_resource_user_finish_reclamation(grpc_resource_user * resource_user)977 void grpc_resource_user_finish_reclamation(grpc_resource_user* resource_user) {
978 if (GRPC_TRACE_FLAG_ENABLED(grpc_resource_quota_trace)) {
979 gpr_log(GPR_INFO, "RQ %s %s: reclamation complete",
980 resource_user->resource_quota->name.c_str(),
981 resource_user->name.c_str());
982 }
983 resource_user->resource_quota->combiner->Run(
984 &resource_user->resource_quota->rq_reclamation_done_closure,
985 GRPC_ERROR_NONE);
986 }
987
grpc_resource_user_slice_allocator_init(grpc_resource_user_slice_allocator * slice_allocator,grpc_resource_user * resource_user,grpc_iomgr_cb_func cb,void * p)988 void grpc_resource_user_slice_allocator_init(
989 grpc_resource_user_slice_allocator* slice_allocator,
990 grpc_resource_user* resource_user, grpc_iomgr_cb_func cb, void* p) {
991 GRPC_CLOSURE_INIT(&slice_allocator->on_allocated, ru_allocated_slices,
992 slice_allocator, grpc_schedule_on_exec_ctx);
993 GRPC_CLOSURE_INIT(&slice_allocator->on_done, cb, p,
994 grpc_schedule_on_exec_ctx);
995 slice_allocator->resource_user = resource_user;
996 }
997
grpc_resource_user_alloc_slices(grpc_resource_user_slice_allocator * slice_allocator,size_t length,size_t count,grpc_slice_buffer * dest)998 bool grpc_resource_user_alloc_slices(
999 grpc_resource_user_slice_allocator* slice_allocator, size_t length,
1000 size_t count, grpc_slice_buffer* dest) {
1001 if (GPR_UNLIKELY(
1002 gpr_atm_no_barrier_load(&slice_allocator->resource_user->shutdown))) {
1003 grpc_core::ExecCtx::Run(
1004 DEBUG_LOCATION, &slice_allocator->on_allocated,
1005 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resource user shutdown"));
1006 return false;
1007 }
1008 slice_allocator->length = length;
1009 slice_allocator->count = count;
1010 slice_allocator->dest = dest;
1011 const bool ret =
1012 grpc_resource_user_alloc(slice_allocator->resource_user, count * length,
1013 &slice_allocator->on_allocated);
1014 if (ret) ru_alloc_slices(slice_allocator);
1015 return ret;
1016 }
1017