1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/lib/resource_quota/memory_quota.h"
16
17 #include <grpc/event_engine/internal/memory_allocator_impl.h>
18 #include <grpc/slice.h>
19 #include <grpc/support/port_platform.h>
20 #include <inttypes.h>
21
22 #include <algorithm>
23 #include <atomic>
24 #include <cstddef>
25 #include <cstdint>
26 #include <cstdlib>
27 #include <memory>
28 #include <tuple>
29 #include <utility>
30
31 #include "absl/log/check.h"
32 #include "absl/log/log.h"
33 #include "absl/status/status.h"
34 #include "absl/strings/str_cat.h"
35 #include "src/core/lib/debug/trace.h"
36 #include "src/core/lib/promise/exec_ctx_wakeup_scheduler.h"
37 #include "src/core/lib/promise/loop.h"
38 #include "src/core/lib/promise/map.h"
39 #include "src/core/lib/promise/race.h"
40 #include "src/core/lib/promise/seq.h"
41 #include "src/core/lib/slice/slice_refcount.h"
42 #include "src/core/util/mpscq.h"
43 #include "src/core/util/useful.h"
44
45 namespace grpc_core {
46
47 namespace {
48 // Maximum number of bytes an allocator will request from a quota in one step.
49 // Larger allocations than this will require multiple allocation requests.
50 constexpr size_t kMaxReplenishBytes = 1024 * 1024;
51
52 // Minimum number of bytes an allocator will request from a quota in one step.
53 constexpr size_t kMinReplenishBytes = 4096;
54
55 class MemoryQuotaTracker {
56 public:
Get()57 static MemoryQuotaTracker& Get() {
58 static MemoryQuotaTracker* tracker = new MemoryQuotaTracker();
59 return *tracker;
60 }
61
Add(std::shared_ptr<BasicMemoryQuota> quota)62 void Add(std::shared_ptr<BasicMemoryQuota> quota) {
63 MutexLock lock(&mu_);
64 // Common usage is that we only create a few (one or two) quotas.
65 // We'd like to ensure that we don't OOM if more are added - and
66 // using a weak_ptr here, whilst nicely braindead, does run that
67 // risk.
68 // If usage patterns change sufficiently we'll likely want to
69 // change this class to have a more sophisticated data structure
70 // and probably a Remove() method.
71 GatherAndGarbageCollect();
72 quotas_.push_back(quota);
73 }
74
All()75 std::vector<std::shared_ptr<BasicMemoryQuota>> All() {
76 MutexLock lock(&mu_);
77 return GatherAndGarbageCollect();
78 }
79
80 private:
MemoryQuotaTracker()81 MemoryQuotaTracker() {}
82
GatherAndGarbageCollect()83 std::vector<std::shared_ptr<BasicMemoryQuota>> GatherAndGarbageCollect()
84 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
85 std::vector<std::weak_ptr<BasicMemoryQuota>> new_quotas;
86 std::vector<std::shared_ptr<BasicMemoryQuota>> all_quotas;
87 for (const auto& quota : quotas_) {
88 auto p = quota.lock();
89 if (p == nullptr) continue;
90 new_quotas.push_back(quota);
91 all_quotas.push_back(p);
92 }
93 quotas_.swap(new_quotas);
94 return all_quotas;
95 }
96
97 Mutex mu_;
98 std::vector<std::weak_ptr<BasicMemoryQuota>> quotas_ ABSL_GUARDED_BY(mu_);
99 };
100
101 // Reference count for a slice allocated by MemoryAllocator::MakeSlice.
102 // Takes care of releasing memory back when the slice is destroyed.
103 class SliceRefCount : public grpc_slice_refcount {
104 public:
SliceRefCount(std::shared_ptr<grpc_event_engine::experimental::internal::MemoryAllocatorImpl> allocator,size_t size)105 SliceRefCount(
106 std::shared_ptr<
107 grpc_event_engine::experimental::internal::MemoryAllocatorImpl>
108 allocator,
109 size_t size)
110 : grpc_slice_refcount(Destroy),
111 allocator_(std::move(allocator)),
112 size_(size) {
113 // Nothing to do here.
114 }
~SliceRefCount()115 ~SliceRefCount() {
116 allocator_->Release(size_);
117 allocator_.reset();
118 }
119
120 private:
Destroy(grpc_slice_refcount * p)121 static void Destroy(grpc_slice_refcount* p) {
122 auto* rc = static_cast<SliceRefCount*>(p);
123 rc->~SliceRefCount();
124 free(rc);
125 }
126
127 std::shared_ptr<
128 grpc_event_engine::experimental::internal::MemoryAllocatorImpl>
129 allocator_;
130 size_t size_;
131 };
132
133 std::atomic<double> container_memory_pressure{0.0};
134
135 } // namespace
136
SetContainerMemoryPressure(double pressure)137 void SetContainerMemoryPressure(double pressure) {
138 container_memory_pressure.store(pressure, std::memory_order_relaxed);
139 }
140
ContainerMemoryPressure()141 double ContainerMemoryPressure() {
142 return container_memory_pressure.load(std::memory_order_relaxed);
143 }
144
145 //
146 // Reclaimer
147 //
148
~ReclamationSweep()149 ReclamationSweep::~ReclamationSweep() {
150 if (memory_quota_ != nullptr) {
151 memory_quota_->FinishReclamation(sweep_token_, std::move(waker_));
152 }
153 }
154
155 //
156 // ReclaimerQueue
157 //
158
159 struct ReclaimerQueue::QueuedNode
160 : public MultiProducerSingleConsumerQueue::Node {
QueuedNodegrpc_core::ReclaimerQueue::QueuedNode161 explicit QueuedNode(RefCountedPtr<Handle> reclaimer_handle)
162 : reclaimer_handle(std::move(reclaimer_handle)) {}
163 RefCountedPtr<Handle> reclaimer_handle;
164 };
165
166 struct ReclaimerQueue::State {
167 Mutex reader_mu;
168 MultiProducerSingleConsumerQueue queue; // reader_mu must be held to pop
169 Waker waker ABSL_GUARDED_BY(reader_mu);
170
~Stategrpc_core::ReclaimerQueue::State171 ~State() {
172 bool empty = false;
173 do {
174 delete static_cast<QueuedNode*>(queue.PopAndCheckEnd(&empty));
175 } while (!empty);
176 }
177 };
178
Orphan()179 void ReclaimerQueue::Handle::Orphan() {
180 if (auto* sweep = sweep_.exchange(nullptr, std::memory_order_acq_rel)) {
181 sweep->RunAndDelete(absl::nullopt);
182 }
183 Unref();
184 }
185
Run(ReclamationSweep reclamation_sweep)186 void ReclaimerQueue::Handle::Run(ReclamationSweep reclamation_sweep) {
187 if (auto* sweep = sweep_.exchange(nullptr, std::memory_order_acq_rel)) {
188 sweep->RunAndDelete(std::move(reclamation_sweep));
189 }
190 }
191
Requeue(ReclaimerQueue * new_queue)192 bool ReclaimerQueue::Handle::Requeue(ReclaimerQueue* new_queue) {
193 if (sweep_.load(std::memory_order_relaxed)) {
194 new_queue->Enqueue(Ref());
195 return true;
196 } else {
197 return false;
198 }
199 }
200
MarkCancelled()201 void ReclaimerQueue::Handle::Sweep::MarkCancelled() {
202 // When we cancel a reclaimer we rotate the elements of the queue once -
203 // taking one non-cancelled node from the start, and placing it on the end.
204 // This ensures that we don't suffer from head of line blocking whereby a
205 // non-cancelled reclaimer at the head of the queue, in the absence of memory
206 // pressure, prevents the remainder of the queue from being cleaned up.
207 MutexLock lock(&state_->reader_mu);
208 while (true) {
209 bool empty = false;
210 std::unique_ptr<QueuedNode> node(
211 static_cast<QueuedNode*>(state_->queue.PopAndCheckEnd(&empty)));
212 if (node == nullptr) break;
213 if (node->reclaimer_handle->sweep_.load(std::memory_order_relaxed) !=
214 nullptr) {
215 state_->queue.Push(node.release());
216 break;
217 }
218 }
219 }
220
ReclaimerQueue()221 ReclaimerQueue::ReclaimerQueue() : state_(std::make_shared<State>()) {}
222
223 ReclaimerQueue::~ReclaimerQueue() = default;
224
Enqueue(RefCountedPtr<Handle> handle)225 void ReclaimerQueue::Enqueue(RefCountedPtr<Handle> handle) {
226 if (state_->queue.Push(new QueuedNode(std::move(handle)))) {
227 MutexLock lock(&state_->reader_mu);
228 state_->waker.Wakeup();
229 }
230 }
231
PollNext()232 Poll<RefCountedPtr<ReclaimerQueue::Handle>> ReclaimerQueue::PollNext() {
233 MutexLock lock(&state_->reader_mu);
234 bool empty = false;
235 // Try to pull from the queue.
236 std::unique_ptr<QueuedNode> node(
237 static_cast<QueuedNode*>(state_->queue.PopAndCheckEnd(&empty)));
238 // If we get something, great.
239 if (node != nullptr) return std::move(node->reclaimer_handle);
240 if (!empty) {
241 // If we don't, but the queue is probably not empty, schedule an immediate
242 // repoll.
243 GetContext<Activity>()->ForceImmediateRepoll();
244 } else {
245 // Otherwise, schedule a wakeup for whenever something is pushed.
246 state_->waker = GetContext<Activity>()->MakeNonOwningWaker();
247 }
248 return Pending{};
249 }
250
251 //
252 // GrpcMemoryAllocatorImpl
253 //
254
GrpcMemoryAllocatorImpl(std::shared_ptr<BasicMemoryQuota> memory_quota)255 GrpcMemoryAllocatorImpl::GrpcMemoryAllocatorImpl(
256 std::shared_ptr<BasicMemoryQuota> memory_quota)
257 : memory_quota_(memory_quota) {
258 memory_quota_->Take(
259 /*allocator=*/this, taken_bytes_);
260 memory_quota_->AddNewAllocator(this);
261 }
262
~GrpcMemoryAllocatorImpl()263 GrpcMemoryAllocatorImpl::~GrpcMemoryAllocatorImpl() {
264 CHECK_EQ(free_bytes_.load(std::memory_order_acquire) +
265 sizeof(GrpcMemoryAllocatorImpl),
266 taken_bytes_.load(std::memory_order_relaxed));
267 memory_quota_->Return(taken_bytes_.load(std::memory_order_relaxed));
268 }
269
Shutdown()270 void GrpcMemoryAllocatorImpl::Shutdown() {
271 memory_quota_->RemoveAllocator(this);
272 std::shared_ptr<BasicMemoryQuota> memory_quota;
273 OrphanablePtr<ReclaimerQueue::Handle>
274 reclamation_handles[kNumReclamationPasses];
275 {
276 MutexLock lock(&reclaimer_mu_);
277 CHECK(!shutdown_);
278 shutdown_ = true;
279 memory_quota = memory_quota_;
280 for (size_t i = 0; i < kNumReclamationPasses; i++) {
281 reclamation_handles[i] = std::exchange(reclamation_handles_[i], nullptr);
282 }
283 }
284 }
285
Reserve(MemoryRequest request)286 size_t GrpcMemoryAllocatorImpl::Reserve(MemoryRequest request) {
287 // Validate request - performed here so we don't bloat the generated code with
288 // inlined asserts.
289 CHECK(request.min() <= request.max());
290 CHECK(request.max() <= MemoryRequest::max_allowed_size());
291 size_t old_free = free_bytes_.load(std::memory_order_relaxed);
292
293 while (true) {
294 // Attempt to reserve memory from our pool.
295 auto reservation = TryReserve(request);
296 if (reservation.has_value()) {
297 size_t new_free = free_bytes_.load(std::memory_order_relaxed);
298 memory_quota_->MaybeMoveAllocator(this, old_free, new_free);
299 return *reservation;
300 }
301
302 // If that failed, grab more from the quota and retry.
303 Replenish();
304 }
305 }
306
TryReserve(MemoryRequest request)307 absl::optional<size_t> GrpcMemoryAllocatorImpl::TryReserve(
308 MemoryRequest request) {
309 // How much memory should we request? (see the scaling below)
310 size_t scaled_size_over_min = request.max() - request.min();
311 // Scale the request down according to memory pressure if we have that
312 // flexibility.
313 if (scaled_size_over_min != 0) {
314 const auto pressure_info = memory_quota_->GetPressureInfo();
315 double pressure = pressure_info.pressure_control_value;
316 size_t max_recommended_allocation_size =
317 pressure_info.max_recommended_allocation_size;
318 // Reduce allocation size proportional to the pressure > 80% usage.
319 if (pressure > 0.8) {
320 scaled_size_over_min =
321 std::min(scaled_size_over_min,
322 static_cast<size_t>((request.max() - request.min()) *
323 (1.0 - pressure) / 0.2));
324 }
325 if (max_recommended_allocation_size < request.min()) {
326 scaled_size_over_min = 0;
327 } else if (request.min() + scaled_size_over_min >
328 max_recommended_allocation_size) {
329 scaled_size_over_min = max_recommended_allocation_size - request.min();
330 }
331 }
332
333 // How much do we want to reserve?
334 const size_t reserve = request.min() + scaled_size_over_min;
335 // See how many bytes are available.
336 size_t available = free_bytes_.load(std::memory_order_acquire);
337 while (true) {
338 // Does the current free pool satisfy the request?
339 if (available < reserve) {
340 return {};
341 }
342 // Try to reserve the requested amount.
343 // If the amount of free memory changed through this loop, then available
344 // will be set to the new value and we'll repeat.
345 if (free_bytes_.compare_exchange_weak(available, available - reserve,
346 std::memory_order_acq_rel,
347 std::memory_order_acquire)) {
348 return reserve;
349 }
350 }
351 }
352
MaybeDonateBack()353 void GrpcMemoryAllocatorImpl::MaybeDonateBack() {
354 size_t free = free_bytes_.load(std::memory_order_relaxed);
355 while (free > 0) {
356 size_t ret = 0;
357 if (!IsUnconstrainedMaxQuotaBufferSizeEnabled() &&
358 free > kMaxQuotaBufferSize / 2) {
359 ret = std::max(ret, free - (kMaxQuotaBufferSize / 2));
360 }
361 ret = std::max(ret, free > 8192 ? free / 2 : free);
362 const size_t new_free = free - ret;
363 if (free_bytes_.compare_exchange_weak(free, new_free,
364 std::memory_order_acq_rel,
365 std::memory_order_acquire)) {
366 GRPC_TRACE_LOG(resource_quota, INFO)
367 << "[" << this << "] Early return " << ret << " bytes";
368 CHECK(taken_bytes_.fetch_sub(ret, std::memory_order_relaxed) >= ret);
369 memory_quota_->Return(ret);
370 return;
371 }
372 }
373 }
374
Replenish()375 void GrpcMemoryAllocatorImpl::Replenish() {
376 // Attempt a fairly low rate exponential growth request size, bounded between
377 // some reasonable limits declared at top of file.
378 auto amount = Clamp(taken_bytes_.load(std::memory_order_relaxed) / 3,
379 kMinReplenishBytes, kMaxReplenishBytes);
380 // Take the requested amount from the quota.
381 memory_quota_->Take(
382 /*allocator=*/this, amount);
383 // Record that we've taken it.
384 taken_bytes_.fetch_add(amount, std::memory_order_relaxed);
385 // Add the taken amount to the free pool.
386 free_bytes_.fetch_add(amount, std::memory_order_acq_rel);
387 }
388
MakeSlice(MemoryRequest request)389 grpc_slice GrpcMemoryAllocatorImpl::MakeSlice(MemoryRequest request) {
390 auto size = Reserve(request.Increase(sizeof(SliceRefCount)));
391 void* p = malloc(size);
392 new (p) SliceRefCount(shared_from_this(), size);
393 grpc_slice slice;
394 slice.refcount = static_cast<SliceRefCount*>(p);
395 slice.data.refcounted.bytes =
396 static_cast<uint8_t*>(p) + sizeof(SliceRefCount);
397 slice.data.refcounted.length = size - sizeof(SliceRefCount);
398 return slice;
399 }
400
401 //
402 // BasicMemoryQuota
403 //
404
405 class BasicMemoryQuota::WaitForSweepPromise {
406 public:
WaitForSweepPromise(std::shared_ptr<BasicMemoryQuota> memory_quota,uint64_t token)407 WaitForSweepPromise(std::shared_ptr<BasicMemoryQuota> memory_quota,
408 uint64_t token)
409 : memory_quota_(std::move(memory_quota)), token_(token) {}
410
operator ()()411 Poll<Empty> operator()() {
412 if (memory_quota_->reclamation_counter_.load(std::memory_order_relaxed) !=
413 token_) {
414 return Empty{};
415 } else {
416 return Pending{};
417 }
418 }
419
420 private:
421 std::shared_ptr<BasicMemoryQuota> memory_quota_;
422 uint64_t token_;
423 };
424
BasicMemoryQuota(std::string name)425 BasicMemoryQuota::BasicMemoryQuota(std::string name) : name_(std::move(name)) {}
426
Start()427 void BasicMemoryQuota::Start() {
428 auto self = shared_from_this();
429
430 MemoryQuotaTracker::Get().Add(self);
431
432 // Reclamation loop:
433 // basically, wait until we are in overcommit (free_bytes_ < 0), and then:
434 // while (free_bytes_ < 0) reclaim_memory()
435 // ... and repeat
436 auto reclamation_loop = Loop(Seq(
437 [self]() -> Poll<int> {
438 // If there's free memory we no longer need to reclaim memory!
439 if (self->free_bytes_.load(std::memory_order_acquire) > 0) {
440 return Pending{};
441 }
442 return 0;
443 },
444 [self]() {
445 // Race biases to the first thing that completes... so this will
446 // choose the highest priority/least destructive thing to do that's
447 // available.
448 auto annotate = [](const char* name) {
449 return [name](RefCountedPtr<ReclaimerQueue::Handle> f) {
450 return std::make_tuple(name, std::move(f));
451 };
452 };
453 return Race(Map(self->reclaimers_[0].Next(), annotate("benign")),
454 Map(self->reclaimers_[1].Next(), annotate("idle")),
455 Map(self->reclaimers_[2].Next(), annotate("destructive")));
456 },
457 [self](
458 std::tuple<const char*, RefCountedPtr<ReclaimerQueue::Handle>> arg) {
459 auto reclaimer = std::move(std::get<1>(arg));
460 if (GRPC_TRACE_FLAG_ENABLED(resource_quota)) {
461 double free = std::max(intptr_t{0}, self->free_bytes_.load());
462 size_t quota_size = self->quota_size_.load();
463 LOG(INFO) << "RQ: " << self->name_ << " perform " << std::get<0>(arg)
464 << " reclamation. Available free bytes: " << free
465 << ", total quota_size: " << quota_size;
466 }
467 // One of the reclaimer queues gave us a way to get back memory.
468 // Call the reclaimer with a token that contains enough to wake us
469 // up again.
470 const uint64_t token =
471 self->reclamation_counter_.fetch_add(1, std::memory_order_relaxed) +
472 1;
473 reclaimer->Run(ReclamationSweep(
474 self, token, GetContext<Activity>()->MakeNonOwningWaker()));
475 // Return a promise that will wait for our barrier. This will be
476 // awoken by the token above being destroyed. So, once that token is
477 // destroyed, we'll be able to proceed.
478 return WaitForSweepPromise(self, token);
479 },
480 []() -> LoopCtl<absl::Status> {
481 // Continue the loop!
482 return Continue{};
483 }));
484
485 reclaimer_activity_ =
486 MakeActivity(std::move(reclamation_loop), ExecCtxWakeupScheduler(),
487 [](absl::Status status) {
488 CHECK(status.code() == absl::StatusCode::kCancelled);
489 });
490 }
491
Stop()492 void BasicMemoryQuota::Stop() { reclaimer_activity_.reset(); }
493
SetSize(size_t new_size)494 void BasicMemoryQuota::SetSize(size_t new_size) {
495 size_t old_size = quota_size_.exchange(new_size, std::memory_order_relaxed);
496 if (old_size < new_size) {
497 // We're growing the quota.
498 Return(new_size - old_size);
499 } else {
500 // We're shrinking the quota.
501 Take(/*allocator=*/nullptr, old_size - new_size);
502 }
503 }
504
Take(GrpcMemoryAllocatorImpl * allocator,size_t amount)505 void BasicMemoryQuota::Take(GrpcMemoryAllocatorImpl* allocator, size_t amount) {
506 // If there's a request for nothing, then do nothing!
507 if (amount == 0) return;
508 DCHECK(amount <= std::numeric_limits<intptr_t>::max());
509 // Grab memory from the quota.
510 auto prior = free_bytes_.fetch_sub(amount, std::memory_order_acq_rel);
511 // If we push into overcommit, awake the reclaimer.
512 if (prior >= 0 && prior < static_cast<intptr_t>(amount)) {
513 if (reclaimer_activity_ != nullptr) reclaimer_activity_->ForceWakeup();
514 }
515
516 if (IsFreeLargeAllocatorEnabled()) {
517 if (allocator == nullptr) return;
518 GrpcMemoryAllocatorImpl* chosen_allocator = nullptr;
519 // Use calling allocator's shard index to choose shard.
520 auto& shard = big_allocators_.shards[allocator->IncrementShardIndex() %
521 big_allocators_.shards.size()];
522
523 if (shard.shard_mu.TryLock()) {
524 if (!shard.allocators.empty()) {
525 chosen_allocator = *shard.allocators.begin();
526 }
527 shard.shard_mu.Unlock();
528 }
529
530 if (chosen_allocator != nullptr) {
531 chosen_allocator->ReturnFree();
532 }
533 }
534 }
535
FinishReclamation(uint64_t token,Waker waker)536 void BasicMemoryQuota::FinishReclamation(uint64_t token, Waker waker) {
537 uint64_t current = reclamation_counter_.load(std::memory_order_relaxed);
538 if (current != token) return;
539 if (reclamation_counter_.compare_exchange_strong(current, current + 1,
540 std::memory_order_relaxed,
541 std::memory_order_relaxed)) {
542 if (GRPC_TRACE_FLAG_ENABLED(resource_quota)) {
543 double free = std::max(intptr_t{0}, free_bytes_.load());
544 size_t quota_size = quota_size_.load();
545 LOG(INFO) << "RQ: " << name_
546 << " reclamation complete. Available free bytes: " << free
547 << ", total quota_size: " << quota_size;
548 }
549 waker.Wakeup();
550 }
551 }
552
Return(size_t amount)553 void BasicMemoryQuota::Return(size_t amount) {
554 free_bytes_.fetch_add(amount, std::memory_order_relaxed);
555 }
556
AddNewAllocator(GrpcMemoryAllocatorImpl * allocator)557 void BasicMemoryQuota::AddNewAllocator(GrpcMemoryAllocatorImpl* allocator) {
558 GRPC_TRACE_LOG(resource_quota, INFO) << "Adding allocator " << allocator;
559
560 AllocatorBucket::Shard& shard = small_allocators_.SelectShard(allocator);
561
562 {
563 MutexLock l(&shard.shard_mu);
564 shard.allocators.emplace(allocator);
565 }
566 }
567
RemoveAllocator(GrpcMemoryAllocatorImpl * allocator)568 void BasicMemoryQuota::RemoveAllocator(GrpcMemoryAllocatorImpl* allocator) {
569 GRPC_TRACE_LOG(resource_quota, INFO) << "Removing allocator " << allocator;
570
571 AllocatorBucket::Shard& small_shard =
572 small_allocators_.SelectShard(allocator);
573
574 {
575 MutexLock l(&small_shard.shard_mu);
576 if (small_shard.allocators.erase(allocator) == 1) {
577 return;
578 }
579 }
580
581 AllocatorBucket::Shard& big_shard = big_allocators_.SelectShard(allocator);
582
583 {
584 MutexLock l(&big_shard.shard_mu);
585 big_shard.allocators.erase(allocator);
586 }
587 }
588
MaybeMoveAllocator(GrpcMemoryAllocatorImpl * allocator,size_t old_free_bytes,size_t new_free_bytes)589 void BasicMemoryQuota::MaybeMoveAllocator(GrpcMemoryAllocatorImpl* allocator,
590 size_t old_free_bytes,
591 size_t new_free_bytes) {
592 while (true) {
593 if (new_free_bytes < kSmallAllocatorThreshold) {
594 // Still in small bucket. No move.
595 if (old_free_bytes < kSmallAllocatorThreshold) return;
596 MaybeMoveAllocatorBigToSmall(allocator);
597 } else if (new_free_bytes > kBigAllocatorThreshold) {
598 // Still in big bucket. No move.
599 if (old_free_bytes > kBigAllocatorThreshold) return;
600 MaybeMoveAllocatorSmallToBig(allocator);
601 } else {
602 // Somewhere between thresholds. No move.
603 return;
604 }
605
606 // Loop to make sure move is eventually stable.
607 old_free_bytes = new_free_bytes;
608 new_free_bytes = allocator->GetFreeBytes();
609 }
610 }
611
MaybeMoveAllocatorBigToSmall(GrpcMemoryAllocatorImpl * allocator)612 void BasicMemoryQuota::MaybeMoveAllocatorBigToSmall(
613 GrpcMemoryAllocatorImpl* allocator) {
614 GRPC_TRACE_LOG(resource_quota, INFO)
615 << "Moving allocator " << allocator << " to small";
616
617 AllocatorBucket::Shard& old_shard = big_allocators_.SelectShard(allocator);
618
619 {
620 MutexLock l(&old_shard.shard_mu);
621 if (old_shard.allocators.erase(allocator) == 0) return;
622 }
623
624 AllocatorBucket::Shard& new_shard = small_allocators_.SelectShard(allocator);
625
626 {
627 MutexLock l(&new_shard.shard_mu);
628 new_shard.allocators.emplace(allocator);
629 }
630 }
631
MaybeMoveAllocatorSmallToBig(GrpcMemoryAllocatorImpl * allocator)632 void BasicMemoryQuota::MaybeMoveAllocatorSmallToBig(
633 GrpcMemoryAllocatorImpl* allocator) {
634 GRPC_TRACE_LOG(resource_quota, INFO)
635 << "Moving allocator " << allocator << " to big";
636
637 AllocatorBucket::Shard& old_shard = small_allocators_.SelectShard(allocator);
638
639 {
640 MutexLock l(&old_shard.shard_mu);
641 if (old_shard.allocators.erase(allocator) == 0) return;
642 }
643
644 AllocatorBucket::Shard& new_shard = big_allocators_.SelectShard(allocator);
645
646 {
647 MutexLock l(&new_shard.shard_mu);
648 new_shard.allocators.emplace(allocator);
649 }
650 }
651
GetPressureInfo()652 BasicMemoryQuota::PressureInfo BasicMemoryQuota::GetPressureInfo() {
653 double free = free_bytes_.load();
654 if (free < 0) free = 0;
655 size_t quota_size = quota_size_.load();
656 double size = quota_size;
657 if (size < 1) return PressureInfo{1, 1, 1};
658 PressureInfo pressure_info;
659 pressure_info.instantaneous_pressure =
660 std::max({0.0, (size - free) / size, ContainerMemoryPressure()});
661 pressure_info.pressure_control_value =
662 pressure_tracker_.AddSampleAndGetControlValue(
663 pressure_info.instantaneous_pressure);
664 pressure_info.max_recommended_allocation_size = quota_size / 16;
665 return pressure_info;
666 }
667
668 //
669 // PressureTracker
670 //
671
672 namespace memory_quota_detail {
673
Update(double error)674 double PressureController::Update(double error) {
675 bool is_low = error < 0;
676 bool was_low = std::exchange(last_was_low_, is_low);
677 double new_control; // leave unset to compiler can note bad branches
678 if (is_low && was_low) {
679 // Memory pressure is too low this round, and was last round too.
680 // If we have reached the min reporting value last time, then we will report
681 // the same value again this time and can start to increase the ticks_same_
682 // counter.
683 if (last_control_ == min_) {
684 ticks_same_++;
685 if (ticks_same_ >= max_ticks_same_) {
686 // If it's been the same for too long, reduce the min reported value
687 // down towards zero.
688 min_ /= 2.0;
689 ticks_same_ = 0;
690 }
691 }
692 // Target the min reporting value.
693 new_control = min_;
694 } else if (!is_low && !was_low) {
695 // Memory pressure is high, and was high previously.
696 ticks_same_++;
697 if (ticks_same_ >= max_ticks_same_) {
698 // It's been high for too long, increase the max reporting value up
699 // towards 1.0.
700 max_ = (1.0 + max_) / 2.0;
701 ticks_same_ = 0;
702 }
703 // Target the max reporting value.
704 new_control = max_;
705 } else if (is_low) {
706 // Memory pressure is low, but was high last round.
707 // Target the min reporting value, but first update it to be closer to the
708 // current max (that we've been reporting lately).
709 // In this way the min will gradually climb towards the max as we find a
710 // stable point.
711 // If this is too high, then we'll eventually move it back towards zero.
712 ticks_same_ = 0;
713 min_ = (min_ + max_) / 2.0;
714 new_control = min_;
715 } else {
716 // Memory pressure is high, but was low last round.
717 // Target the max reporting value, but first update it to be closer to the
718 // last reported value.
719 // The first switchover will have last_control_ being 0, and max_ being 2,
720 // so we'll immediately choose 1.0 which will tend to really slow down
721 // progress.
722 // If we end up targeting too low, we'll eventually move it back towards
723 // 1.0 after max_ticks_same_ ticks.
724 ticks_same_ = 0;
725 max_ = (last_control_ + max_) / 2.0;
726 new_control = max_;
727 }
728 // If the control value is decreasing we do it slowly. This avoids rapid
729 // oscillations.
730 // (If we want a control value that's higher than the last one we snap
731 // immediately because it's likely that memory pressure is growing unchecked).
732 if (new_control < last_control_) {
733 new_control = std::max(new_control,
734 last_control_ - (max_reduction_per_tick_ / 1000.0));
735 }
736 last_control_ = new_control;
737 return new_control;
738 }
739
DebugString() const740 std::string PressureController::DebugString() const {
741 return absl::StrCat(last_was_low_ ? "low" : "high", " min=", min_,
742 " max=", max_, " ticks=", ticks_same_,
743 " last_control=", last_control_);
744 }
745
AddSampleAndGetControlValue(double sample)746 double PressureTracker::AddSampleAndGetControlValue(double sample) {
747 static const double kSetPoint = 0.95;
748
749 double max_so_far = max_this_round_.load(std::memory_order_relaxed);
750 if (sample > max_so_far) {
751 max_this_round_.compare_exchange_weak(max_so_far, sample,
752 std::memory_order_relaxed,
753 std::memory_order_relaxed);
754 }
755 // If memory pressure is almost done, immediately hit the brakes and report
756 // full memory usage.
757 if (sample >= 0.99) {
758 report_.store(1.0, std::memory_order_relaxed);
759 }
760 update_.Tick([&](Duration) {
761 // Reset the round tracker with the new sample.
762 const double current_estimate =
763 max_this_round_.exchange(sample, std::memory_order_relaxed);
764 double report;
765 if (current_estimate > 0.99) {
766 // Under very high memory pressure we... just max things out.
767 report = controller_.Update(1e99);
768 } else {
769 report = controller_.Update(current_estimate - kSetPoint);
770 }
771 GRPC_TRACE_LOG(resource_quota, INFO)
772 << "RQ: pressure:" << current_estimate << " report:" << report
773 << " controller:" << controller_.DebugString();
774 report_.store(report, std::memory_order_relaxed);
775 });
776 return report_.load(std::memory_order_relaxed);
777 }
778
779 } // namespace memory_quota_detail
780
781 //
782 // MemoryQuota
783 //
784
CreateMemoryAllocator(GRPC_UNUSED absl::string_view name)785 MemoryAllocator MemoryQuota::CreateMemoryAllocator(
786 GRPC_UNUSED absl::string_view name) {
787 auto impl = std::make_shared<GrpcMemoryAllocatorImpl>(memory_quota_);
788 return MemoryAllocator(std::move(impl));
789 }
790
CreateMemoryOwner()791 MemoryOwner MemoryQuota::CreateMemoryOwner() {
792 // Note: we will likely want to add a name or some way to distinguish
793 // between memory owners once resource quota is fully rolled out and we need
794 // full metrics. One thing to note, however, is that manipulating the name
795 // here (e.g. concatenation) can add significant memory increase when many
796 // owners are created.
797 auto impl = std::make_shared<GrpcMemoryAllocatorImpl>(memory_quota_);
798 return MemoryOwner(std::move(impl));
799 }
800
AllMemoryQuotas()801 std::vector<std::shared_ptr<BasicMemoryQuota>> AllMemoryQuotas() {
802 return MemoryQuotaTracker::Get().All();
803 }
804
805 } // namespace grpc_core
806