• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/lib/resource_quota/memory_quota.h"
16 
17 #include <grpc/event_engine/internal/memory_allocator_impl.h>
18 #include <grpc/slice.h>
19 #include <grpc/support/port_platform.h>
20 #include <inttypes.h>
21 
22 #include <algorithm>
23 #include <atomic>
24 #include <cstddef>
25 #include <cstdint>
26 #include <cstdlib>
27 #include <memory>
28 #include <tuple>
29 #include <utility>
30 
31 #include "absl/log/check.h"
32 #include "absl/log/log.h"
33 #include "absl/status/status.h"
34 #include "absl/strings/str_cat.h"
35 #include "src/core/lib/debug/trace.h"
36 #include "src/core/lib/promise/exec_ctx_wakeup_scheduler.h"
37 #include "src/core/lib/promise/loop.h"
38 #include "src/core/lib/promise/map.h"
39 #include "src/core/lib/promise/race.h"
40 #include "src/core/lib/promise/seq.h"
41 #include "src/core/lib/slice/slice_refcount.h"
42 #include "src/core/util/mpscq.h"
43 #include "src/core/util/useful.h"
44 
45 namespace grpc_core {
46 
47 namespace {
48 // Maximum number of bytes an allocator will request from a quota in one step.
49 // Larger allocations than this will require multiple allocation requests.
50 constexpr size_t kMaxReplenishBytes = 1024 * 1024;
51 
52 // Minimum number of bytes an allocator will request from a quota in one step.
53 constexpr size_t kMinReplenishBytes = 4096;
54 
55 class MemoryQuotaTracker {
56  public:
Get()57   static MemoryQuotaTracker& Get() {
58     static MemoryQuotaTracker* tracker = new MemoryQuotaTracker();
59     return *tracker;
60   }
61 
Add(std::shared_ptr<BasicMemoryQuota> quota)62   void Add(std::shared_ptr<BasicMemoryQuota> quota) {
63     MutexLock lock(&mu_);
64     // Common usage is that we only create a few (one or two) quotas.
65     // We'd like to ensure that we don't OOM if more are added - and
66     // using a weak_ptr here, whilst nicely braindead, does run that
67     // risk.
68     // If usage patterns change sufficiently we'll likely want to
69     // change this class to have a more sophisticated data structure
70     // and probably a Remove() method.
71     GatherAndGarbageCollect();
72     quotas_.push_back(quota);
73   }
74 
All()75   std::vector<std::shared_ptr<BasicMemoryQuota>> All() {
76     MutexLock lock(&mu_);
77     return GatherAndGarbageCollect();
78   }
79 
80  private:
MemoryQuotaTracker()81   MemoryQuotaTracker() {}
82 
GatherAndGarbageCollect()83   std::vector<std::shared_ptr<BasicMemoryQuota>> GatherAndGarbageCollect()
84       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
85     std::vector<std::weak_ptr<BasicMemoryQuota>> new_quotas;
86     std::vector<std::shared_ptr<BasicMemoryQuota>> all_quotas;
87     for (const auto& quota : quotas_) {
88       auto p = quota.lock();
89       if (p == nullptr) continue;
90       new_quotas.push_back(quota);
91       all_quotas.push_back(p);
92     }
93     quotas_.swap(new_quotas);
94     return all_quotas;
95   }
96 
97   Mutex mu_;
98   std::vector<std::weak_ptr<BasicMemoryQuota>> quotas_ ABSL_GUARDED_BY(mu_);
99 };
100 
101 // Reference count for a slice allocated by MemoryAllocator::MakeSlice.
102 // Takes care of releasing memory back when the slice is destroyed.
103 class SliceRefCount : public grpc_slice_refcount {
104  public:
SliceRefCount(std::shared_ptr<grpc_event_engine::experimental::internal::MemoryAllocatorImpl> allocator,size_t size)105   SliceRefCount(
106       std::shared_ptr<
107           grpc_event_engine::experimental::internal::MemoryAllocatorImpl>
108           allocator,
109       size_t size)
110       : grpc_slice_refcount(Destroy),
111         allocator_(std::move(allocator)),
112         size_(size) {
113     // Nothing to do here.
114   }
~SliceRefCount()115   ~SliceRefCount() {
116     allocator_->Release(size_);
117     allocator_.reset();
118   }
119 
120  private:
Destroy(grpc_slice_refcount * p)121   static void Destroy(grpc_slice_refcount* p) {
122     auto* rc = static_cast<SliceRefCount*>(p);
123     rc->~SliceRefCount();
124     free(rc);
125   }
126 
127   std::shared_ptr<
128       grpc_event_engine::experimental::internal::MemoryAllocatorImpl>
129       allocator_;
130   size_t size_;
131 };
132 
133 std::atomic<double> container_memory_pressure{0.0};
134 
135 }  // namespace
136 
SetContainerMemoryPressure(double pressure)137 void SetContainerMemoryPressure(double pressure) {
138   container_memory_pressure.store(pressure, std::memory_order_relaxed);
139 }
140 
ContainerMemoryPressure()141 double ContainerMemoryPressure() {
142   return container_memory_pressure.load(std::memory_order_relaxed);
143 }
144 
145 //
146 // Reclaimer
147 //
148 
~ReclamationSweep()149 ReclamationSweep::~ReclamationSweep() {
150   if (memory_quota_ != nullptr) {
151     memory_quota_->FinishReclamation(sweep_token_, std::move(waker_));
152   }
153 }
154 
155 //
156 // ReclaimerQueue
157 //
158 
159 struct ReclaimerQueue::QueuedNode
160     : public MultiProducerSingleConsumerQueue::Node {
QueuedNodegrpc_core::ReclaimerQueue::QueuedNode161   explicit QueuedNode(RefCountedPtr<Handle> reclaimer_handle)
162       : reclaimer_handle(std::move(reclaimer_handle)) {}
163   RefCountedPtr<Handle> reclaimer_handle;
164 };
165 
166 struct ReclaimerQueue::State {
167   Mutex reader_mu;
168   MultiProducerSingleConsumerQueue queue;  // reader_mu must be held to pop
169   Waker waker ABSL_GUARDED_BY(reader_mu);
170 
~Stategrpc_core::ReclaimerQueue::State171   ~State() {
172     bool empty = false;
173     do {
174       delete static_cast<QueuedNode*>(queue.PopAndCheckEnd(&empty));
175     } while (!empty);
176   }
177 };
178 
Orphan()179 void ReclaimerQueue::Handle::Orphan() {
180   if (auto* sweep = sweep_.exchange(nullptr, std::memory_order_acq_rel)) {
181     sweep->RunAndDelete(absl::nullopt);
182   }
183   Unref();
184 }
185 
Run(ReclamationSweep reclamation_sweep)186 void ReclaimerQueue::Handle::Run(ReclamationSweep reclamation_sweep) {
187   if (auto* sweep = sweep_.exchange(nullptr, std::memory_order_acq_rel)) {
188     sweep->RunAndDelete(std::move(reclamation_sweep));
189   }
190 }
191 
Requeue(ReclaimerQueue * new_queue)192 bool ReclaimerQueue::Handle::Requeue(ReclaimerQueue* new_queue) {
193   if (sweep_.load(std::memory_order_relaxed)) {
194     new_queue->Enqueue(Ref());
195     return true;
196   } else {
197     return false;
198   }
199 }
200 
MarkCancelled()201 void ReclaimerQueue::Handle::Sweep::MarkCancelled() {
202   // When we cancel a reclaimer we rotate the elements of the queue once -
203   // taking one non-cancelled node from the start, and placing it on the end.
204   // This ensures that we don't suffer from head of line blocking whereby a
205   // non-cancelled reclaimer at the head of the queue, in the absence of memory
206   // pressure, prevents the remainder of the queue from being cleaned up.
207   MutexLock lock(&state_->reader_mu);
208   while (true) {
209     bool empty = false;
210     std::unique_ptr<QueuedNode> node(
211         static_cast<QueuedNode*>(state_->queue.PopAndCheckEnd(&empty)));
212     if (node == nullptr) break;
213     if (node->reclaimer_handle->sweep_.load(std::memory_order_relaxed) !=
214         nullptr) {
215       state_->queue.Push(node.release());
216       break;
217     }
218   }
219 }
220 
ReclaimerQueue()221 ReclaimerQueue::ReclaimerQueue() : state_(std::make_shared<State>()) {}
222 
223 ReclaimerQueue::~ReclaimerQueue() = default;
224 
Enqueue(RefCountedPtr<Handle> handle)225 void ReclaimerQueue::Enqueue(RefCountedPtr<Handle> handle) {
226   if (state_->queue.Push(new QueuedNode(std::move(handle)))) {
227     MutexLock lock(&state_->reader_mu);
228     state_->waker.Wakeup();
229   }
230 }
231 
PollNext()232 Poll<RefCountedPtr<ReclaimerQueue::Handle>> ReclaimerQueue::PollNext() {
233   MutexLock lock(&state_->reader_mu);
234   bool empty = false;
235   // Try to pull from the queue.
236   std::unique_ptr<QueuedNode> node(
237       static_cast<QueuedNode*>(state_->queue.PopAndCheckEnd(&empty)));
238   // If we get something, great.
239   if (node != nullptr) return std::move(node->reclaimer_handle);
240   if (!empty) {
241     // If we don't, but the queue is probably not empty, schedule an immediate
242     // repoll.
243     GetContext<Activity>()->ForceImmediateRepoll();
244   } else {
245     // Otherwise, schedule a wakeup for whenever something is pushed.
246     state_->waker = GetContext<Activity>()->MakeNonOwningWaker();
247   }
248   return Pending{};
249 }
250 
251 //
252 // GrpcMemoryAllocatorImpl
253 //
254 
GrpcMemoryAllocatorImpl(std::shared_ptr<BasicMemoryQuota> memory_quota)255 GrpcMemoryAllocatorImpl::GrpcMemoryAllocatorImpl(
256     std::shared_ptr<BasicMemoryQuota> memory_quota)
257     : memory_quota_(memory_quota) {
258   memory_quota_->Take(
259       /*allocator=*/this, taken_bytes_);
260   memory_quota_->AddNewAllocator(this);
261 }
262 
~GrpcMemoryAllocatorImpl()263 GrpcMemoryAllocatorImpl::~GrpcMemoryAllocatorImpl() {
264   CHECK_EQ(free_bytes_.load(std::memory_order_acquire) +
265                sizeof(GrpcMemoryAllocatorImpl),
266            taken_bytes_.load(std::memory_order_relaxed));
267   memory_quota_->Return(taken_bytes_.load(std::memory_order_relaxed));
268 }
269 
Shutdown()270 void GrpcMemoryAllocatorImpl::Shutdown() {
271   memory_quota_->RemoveAllocator(this);
272   std::shared_ptr<BasicMemoryQuota> memory_quota;
273   OrphanablePtr<ReclaimerQueue::Handle>
274       reclamation_handles[kNumReclamationPasses];
275   {
276     MutexLock lock(&reclaimer_mu_);
277     CHECK(!shutdown_);
278     shutdown_ = true;
279     memory_quota = memory_quota_;
280     for (size_t i = 0; i < kNumReclamationPasses; i++) {
281       reclamation_handles[i] = std::exchange(reclamation_handles_[i], nullptr);
282     }
283   }
284 }
285 
Reserve(MemoryRequest request)286 size_t GrpcMemoryAllocatorImpl::Reserve(MemoryRequest request) {
287   // Validate request - performed here so we don't bloat the generated code with
288   // inlined asserts.
289   CHECK(request.min() <= request.max());
290   CHECK(request.max() <= MemoryRequest::max_allowed_size());
291   size_t old_free = free_bytes_.load(std::memory_order_relaxed);
292 
293   while (true) {
294     // Attempt to reserve memory from our pool.
295     auto reservation = TryReserve(request);
296     if (reservation.has_value()) {
297       size_t new_free = free_bytes_.load(std::memory_order_relaxed);
298       memory_quota_->MaybeMoveAllocator(this, old_free, new_free);
299       return *reservation;
300     }
301 
302     // If that failed, grab more from the quota and retry.
303     Replenish();
304   }
305 }
306 
TryReserve(MemoryRequest request)307 absl::optional<size_t> GrpcMemoryAllocatorImpl::TryReserve(
308     MemoryRequest request) {
309   // How much memory should we request? (see the scaling below)
310   size_t scaled_size_over_min = request.max() - request.min();
311   // Scale the request down according to memory pressure if we have that
312   // flexibility.
313   if (scaled_size_over_min != 0) {
314     const auto pressure_info = memory_quota_->GetPressureInfo();
315     double pressure = pressure_info.pressure_control_value;
316     size_t max_recommended_allocation_size =
317         pressure_info.max_recommended_allocation_size;
318     // Reduce allocation size proportional to the pressure > 80% usage.
319     if (pressure > 0.8) {
320       scaled_size_over_min =
321           std::min(scaled_size_over_min,
322                    static_cast<size_t>((request.max() - request.min()) *
323                                        (1.0 - pressure) / 0.2));
324     }
325     if (max_recommended_allocation_size < request.min()) {
326       scaled_size_over_min = 0;
327     } else if (request.min() + scaled_size_over_min >
328                max_recommended_allocation_size) {
329       scaled_size_over_min = max_recommended_allocation_size - request.min();
330     }
331   }
332 
333   // How much do we want to reserve?
334   const size_t reserve = request.min() + scaled_size_over_min;
335   // See how many bytes are available.
336   size_t available = free_bytes_.load(std::memory_order_acquire);
337   while (true) {
338     // Does the current free pool satisfy the request?
339     if (available < reserve) {
340       return {};
341     }
342     // Try to reserve the requested amount.
343     // If the amount of free memory changed through this loop, then available
344     // will be set to the new value and we'll repeat.
345     if (free_bytes_.compare_exchange_weak(available, available - reserve,
346                                           std::memory_order_acq_rel,
347                                           std::memory_order_acquire)) {
348       return reserve;
349     }
350   }
351 }
352 
MaybeDonateBack()353 void GrpcMemoryAllocatorImpl::MaybeDonateBack() {
354   size_t free = free_bytes_.load(std::memory_order_relaxed);
355   while (free > 0) {
356     size_t ret = 0;
357     if (!IsUnconstrainedMaxQuotaBufferSizeEnabled() &&
358         free > kMaxQuotaBufferSize / 2) {
359       ret = std::max(ret, free - (kMaxQuotaBufferSize / 2));
360     }
361     ret = std::max(ret, free > 8192 ? free / 2 : free);
362     const size_t new_free = free - ret;
363     if (free_bytes_.compare_exchange_weak(free, new_free,
364                                           std::memory_order_acq_rel,
365                                           std::memory_order_acquire)) {
366       GRPC_TRACE_LOG(resource_quota, INFO)
367           << "[" << this << "] Early return " << ret << " bytes";
368       CHECK(taken_bytes_.fetch_sub(ret, std::memory_order_relaxed) >= ret);
369       memory_quota_->Return(ret);
370       return;
371     }
372   }
373 }
374 
Replenish()375 void GrpcMemoryAllocatorImpl::Replenish() {
376   // Attempt a fairly low rate exponential growth request size, bounded between
377   // some reasonable limits declared at top of file.
378   auto amount = Clamp(taken_bytes_.load(std::memory_order_relaxed) / 3,
379                       kMinReplenishBytes, kMaxReplenishBytes);
380   // Take the requested amount from the quota.
381   memory_quota_->Take(
382       /*allocator=*/this, amount);
383   // Record that we've taken it.
384   taken_bytes_.fetch_add(amount, std::memory_order_relaxed);
385   // Add the taken amount to the free pool.
386   free_bytes_.fetch_add(amount, std::memory_order_acq_rel);
387 }
388 
MakeSlice(MemoryRequest request)389 grpc_slice GrpcMemoryAllocatorImpl::MakeSlice(MemoryRequest request) {
390   auto size = Reserve(request.Increase(sizeof(SliceRefCount)));
391   void* p = malloc(size);
392   new (p) SliceRefCount(shared_from_this(), size);
393   grpc_slice slice;
394   slice.refcount = static_cast<SliceRefCount*>(p);
395   slice.data.refcounted.bytes =
396       static_cast<uint8_t*>(p) + sizeof(SliceRefCount);
397   slice.data.refcounted.length = size - sizeof(SliceRefCount);
398   return slice;
399 }
400 
401 //
402 // BasicMemoryQuota
403 //
404 
405 class BasicMemoryQuota::WaitForSweepPromise {
406  public:
WaitForSweepPromise(std::shared_ptr<BasicMemoryQuota> memory_quota,uint64_t token)407   WaitForSweepPromise(std::shared_ptr<BasicMemoryQuota> memory_quota,
408                       uint64_t token)
409       : memory_quota_(std::move(memory_quota)), token_(token) {}
410 
operator ()()411   Poll<Empty> operator()() {
412     if (memory_quota_->reclamation_counter_.load(std::memory_order_relaxed) !=
413         token_) {
414       return Empty{};
415     } else {
416       return Pending{};
417     }
418   }
419 
420  private:
421   std::shared_ptr<BasicMemoryQuota> memory_quota_;
422   uint64_t token_;
423 };
424 
BasicMemoryQuota(std::string name)425 BasicMemoryQuota::BasicMemoryQuota(std::string name) : name_(std::move(name)) {}
426 
Start()427 void BasicMemoryQuota::Start() {
428   auto self = shared_from_this();
429 
430   MemoryQuotaTracker::Get().Add(self);
431 
432   // Reclamation loop:
433   // basically, wait until we are in overcommit (free_bytes_ < 0), and then:
434   // while (free_bytes_ < 0) reclaim_memory()
435   // ... and repeat
436   auto reclamation_loop = Loop(Seq(
437       [self]() -> Poll<int> {
438         // If there's free memory we no longer need to reclaim memory!
439         if (self->free_bytes_.load(std::memory_order_acquire) > 0) {
440           return Pending{};
441         }
442         return 0;
443       },
444       [self]() {
445         // Race biases to the first thing that completes... so this will
446         // choose the highest priority/least destructive thing to do that's
447         // available.
448         auto annotate = [](const char* name) {
449           return [name](RefCountedPtr<ReclaimerQueue::Handle> f) {
450             return std::make_tuple(name, std::move(f));
451           };
452         };
453         return Race(Map(self->reclaimers_[0].Next(), annotate("benign")),
454                     Map(self->reclaimers_[1].Next(), annotate("idle")),
455                     Map(self->reclaimers_[2].Next(), annotate("destructive")));
456       },
457       [self](
458           std::tuple<const char*, RefCountedPtr<ReclaimerQueue::Handle>> arg) {
459         auto reclaimer = std::move(std::get<1>(arg));
460         if (GRPC_TRACE_FLAG_ENABLED(resource_quota)) {
461           double free = std::max(intptr_t{0}, self->free_bytes_.load());
462           size_t quota_size = self->quota_size_.load();
463           LOG(INFO) << "RQ: " << self->name_ << " perform " << std::get<0>(arg)
464                     << " reclamation. Available free bytes: " << free
465                     << ", total quota_size: " << quota_size;
466         }
467         // One of the reclaimer queues gave us a way to get back memory.
468         // Call the reclaimer with a token that contains enough to wake us
469         // up again.
470         const uint64_t token =
471             self->reclamation_counter_.fetch_add(1, std::memory_order_relaxed) +
472             1;
473         reclaimer->Run(ReclamationSweep(
474             self, token, GetContext<Activity>()->MakeNonOwningWaker()));
475         // Return a promise that will wait for our barrier. This will be
476         // awoken by the token above being destroyed. So, once that token is
477         // destroyed, we'll be able to proceed.
478         return WaitForSweepPromise(self, token);
479       },
480       []() -> LoopCtl<absl::Status> {
481         // Continue the loop!
482         return Continue{};
483       }));
484 
485   reclaimer_activity_ =
486       MakeActivity(std::move(reclamation_loop), ExecCtxWakeupScheduler(),
487                    [](absl::Status status) {
488                      CHECK(status.code() == absl::StatusCode::kCancelled);
489                    });
490 }
491 
Stop()492 void BasicMemoryQuota::Stop() { reclaimer_activity_.reset(); }
493 
SetSize(size_t new_size)494 void BasicMemoryQuota::SetSize(size_t new_size) {
495   size_t old_size = quota_size_.exchange(new_size, std::memory_order_relaxed);
496   if (old_size < new_size) {
497     // We're growing the quota.
498     Return(new_size - old_size);
499   } else {
500     // We're shrinking the quota.
501     Take(/*allocator=*/nullptr, old_size - new_size);
502   }
503 }
504 
Take(GrpcMemoryAllocatorImpl * allocator,size_t amount)505 void BasicMemoryQuota::Take(GrpcMemoryAllocatorImpl* allocator, size_t amount) {
506   // If there's a request for nothing, then do nothing!
507   if (amount == 0) return;
508   DCHECK(amount <= std::numeric_limits<intptr_t>::max());
509   // Grab memory from the quota.
510   auto prior = free_bytes_.fetch_sub(amount, std::memory_order_acq_rel);
511   // If we push into overcommit, awake the reclaimer.
512   if (prior >= 0 && prior < static_cast<intptr_t>(amount)) {
513     if (reclaimer_activity_ != nullptr) reclaimer_activity_->ForceWakeup();
514   }
515 
516   if (IsFreeLargeAllocatorEnabled()) {
517     if (allocator == nullptr) return;
518     GrpcMemoryAllocatorImpl* chosen_allocator = nullptr;
519     // Use calling allocator's shard index to choose shard.
520     auto& shard = big_allocators_.shards[allocator->IncrementShardIndex() %
521                                          big_allocators_.shards.size()];
522 
523     if (shard.shard_mu.TryLock()) {
524       if (!shard.allocators.empty()) {
525         chosen_allocator = *shard.allocators.begin();
526       }
527       shard.shard_mu.Unlock();
528     }
529 
530     if (chosen_allocator != nullptr) {
531       chosen_allocator->ReturnFree();
532     }
533   }
534 }
535 
FinishReclamation(uint64_t token,Waker waker)536 void BasicMemoryQuota::FinishReclamation(uint64_t token, Waker waker) {
537   uint64_t current = reclamation_counter_.load(std::memory_order_relaxed);
538   if (current != token) return;
539   if (reclamation_counter_.compare_exchange_strong(current, current + 1,
540                                                    std::memory_order_relaxed,
541                                                    std::memory_order_relaxed)) {
542     if (GRPC_TRACE_FLAG_ENABLED(resource_quota)) {
543       double free = std::max(intptr_t{0}, free_bytes_.load());
544       size_t quota_size = quota_size_.load();
545       LOG(INFO) << "RQ: " << name_
546                 << " reclamation complete. Available free bytes: " << free
547                 << ", total quota_size: " << quota_size;
548     }
549     waker.Wakeup();
550   }
551 }
552 
Return(size_t amount)553 void BasicMemoryQuota::Return(size_t amount) {
554   free_bytes_.fetch_add(amount, std::memory_order_relaxed);
555 }
556 
AddNewAllocator(GrpcMemoryAllocatorImpl * allocator)557 void BasicMemoryQuota::AddNewAllocator(GrpcMemoryAllocatorImpl* allocator) {
558   GRPC_TRACE_LOG(resource_quota, INFO) << "Adding allocator " << allocator;
559 
560   AllocatorBucket::Shard& shard = small_allocators_.SelectShard(allocator);
561 
562   {
563     MutexLock l(&shard.shard_mu);
564     shard.allocators.emplace(allocator);
565   }
566 }
567 
RemoveAllocator(GrpcMemoryAllocatorImpl * allocator)568 void BasicMemoryQuota::RemoveAllocator(GrpcMemoryAllocatorImpl* allocator) {
569   GRPC_TRACE_LOG(resource_quota, INFO) << "Removing allocator " << allocator;
570 
571   AllocatorBucket::Shard& small_shard =
572       small_allocators_.SelectShard(allocator);
573 
574   {
575     MutexLock l(&small_shard.shard_mu);
576     if (small_shard.allocators.erase(allocator) == 1) {
577       return;
578     }
579   }
580 
581   AllocatorBucket::Shard& big_shard = big_allocators_.SelectShard(allocator);
582 
583   {
584     MutexLock l(&big_shard.shard_mu);
585     big_shard.allocators.erase(allocator);
586   }
587 }
588 
MaybeMoveAllocator(GrpcMemoryAllocatorImpl * allocator,size_t old_free_bytes,size_t new_free_bytes)589 void BasicMemoryQuota::MaybeMoveAllocator(GrpcMemoryAllocatorImpl* allocator,
590                                           size_t old_free_bytes,
591                                           size_t new_free_bytes) {
592   while (true) {
593     if (new_free_bytes < kSmallAllocatorThreshold) {
594       // Still in small bucket. No move.
595       if (old_free_bytes < kSmallAllocatorThreshold) return;
596       MaybeMoveAllocatorBigToSmall(allocator);
597     } else if (new_free_bytes > kBigAllocatorThreshold) {
598       // Still in big bucket. No move.
599       if (old_free_bytes > kBigAllocatorThreshold) return;
600       MaybeMoveAllocatorSmallToBig(allocator);
601     } else {
602       // Somewhere between thresholds. No move.
603       return;
604     }
605 
606     // Loop to make sure move is eventually stable.
607     old_free_bytes = new_free_bytes;
608     new_free_bytes = allocator->GetFreeBytes();
609   }
610 }
611 
MaybeMoveAllocatorBigToSmall(GrpcMemoryAllocatorImpl * allocator)612 void BasicMemoryQuota::MaybeMoveAllocatorBigToSmall(
613     GrpcMemoryAllocatorImpl* allocator) {
614   GRPC_TRACE_LOG(resource_quota, INFO)
615       << "Moving allocator " << allocator << " to small";
616 
617   AllocatorBucket::Shard& old_shard = big_allocators_.SelectShard(allocator);
618 
619   {
620     MutexLock l(&old_shard.shard_mu);
621     if (old_shard.allocators.erase(allocator) == 0) return;
622   }
623 
624   AllocatorBucket::Shard& new_shard = small_allocators_.SelectShard(allocator);
625 
626   {
627     MutexLock l(&new_shard.shard_mu);
628     new_shard.allocators.emplace(allocator);
629   }
630 }
631 
MaybeMoveAllocatorSmallToBig(GrpcMemoryAllocatorImpl * allocator)632 void BasicMemoryQuota::MaybeMoveAllocatorSmallToBig(
633     GrpcMemoryAllocatorImpl* allocator) {
634   GRPC_TRACE_LOG(resource_quota, INFO)
635       << "Moving allocator " << allocator << " to big";
636 
637   AllocatorBucket::Shard& old_shard = small_allocators_.SelectShard(allocator);
638 
639   {
640     MutexLock l(&old_shard.shard_mu);
641     if (old_shard.allocators.erase(allocator) == 0) return;
642   }
643 
644   AllocatorBucket::Shard& new_shard = big_allocators_.SelectShard(allocator);
645 
646   {
647     MutexLock l(&new_shard.shard_mu);
648     new_shard.allocators.emplace(allocator);
649   }
650 }
651 
GetPressureInfo()652 BasicMemoryQuota::PressureInfo BasicMemoryQuota::GetPressureInfo() {
653   double free = free_bytes_.load();
654   if (free < 0) free = 0;
655   size_t quota_size = quota_size_.load();
656   double size = quota_size;
657   if (size < 1) return PressureInfo{1, 1, 1};
658   PressureInfo pressure_info;
659   pressure_info.instantaneous_pressure =
660       std::max({0.0, (size - free) / size, ContainerMemoryPressure()});
661   pressure_info.pressure_control_value =
662       pressure_tracker_.AddSampleAndGetControlValue(
663           pressure_info.instantaneous_pressure);
664   pressure_info.max_recommended_allocation_size = quota_size / 16;
665   return pressure_info;
666 }
667 
668 //
669 // PressureTracker
670 //
671 
672 namespace memory_quota_detail {
673 
Update(double error)674 double PressureController::Update(double error) {
675   bool is_low = error < 0;
676   bool was_low = std::exchange(last_was_low_, is_low);
677   double new_control;  // leave unset to compiler can note bad branches
678   if (is_low && was_low) {
679     // Memory pressure is too low this round, and was last round too.
680     // If we have reached the min reporting value last time, then we will report
681     // the same value again this time and can start to increase the ticks_same_
682     // counter.
683     if (last_control_ == min_) {
684       ticks_same_++;
685       if (ticks_same_ >= max_ticks_same_) {
686         // If it's been the same for too long, reduce the min reported value
687         // down towards zero.
688         min_ /= 2.0;
689         ticks_same_ = 0;
690       }
691     }
692     // Target the min reporting value.
693     new_control = min_;
694   } else if (!is_low && !was_low) {
695     // Memory pressure is high, and was high previously.
696     ticks_same_++;
697     if (ticks_same_ >= max_ticks_same_) {
698       // It's been high for too long, increase the max reporting value up
699       // towards 1.0.
700       max_ = (1.0 + max_) / 2.0;
701       ticks_same_ = 0;
702     }
703     // Target the max reporting value.
704     new_control = max_;
705   } else if (is_low) {
706     // Memory pressure is low, but was high last round.
707     // Target the min reporting value, but first update it to be closer to the
708     // current max (that we've been reporting lately).
709     // In this way the min will gradually climb towards the max as we find a
710     // stable point.
711     // If this is too high, then we'll eventually move it back towards zero.
712     ticks_same_ = 0;
713     min_ = (min_ + max_) / 2.0;
714     new_control = min_;
715   } else {
716     // Memory pressure is high, but was low last round.
717     // Target the max reporting value, but first update it to be closer to the
718     // last reported value.
719     // The first switchover will have last_control_ being 0, and max_ being 2,
720     // so we'll immediately choose 1.0 which will tend to really slow down
721     // progress.
722     // If we end up targeting too low, we'll eventually move it back towards
723     // 1.0 after max_ticks_same_ ticks.
724     ticks_same_ = 0;
725     max_ = (last_control_ + max_) / 2.0;
726     new_control = max_;
727   }
728   // If the control value is decreasing we do it slowly. This avoids rapid
729   // oscillations.
730   // (If we want a control value that's higher than the last one we snap
731   // immediately because it's likely that memory pressure is growing unchecked).
732   if (new_control < last_control_) {
733     new_control = std::max(new_control,
734                            last_control_ - (max_reduction_per_tick_ / 1000.0));
735   }
736   last_control_ = new_control;
737   return new_control;
738 }
739 
DebugString() const740 std::string PressureController::DebugString() const {
741   return absl::StrCat(last_was_low_ ? "low" : "high", " min=", min_,
742                       " max=", max_, " ticks=", ticks_same_,
743                       " last_control=", last_control_);
744 }
745 
AddSampleAndGetControlValue(double sample)746 double PressureTracker::AddSampleAndGetControlValue(double sample) {
747   static const double kSetPoint = 0.95;
748 
749   double max_so_far = max_this_round_.load(std::memory_order_relaxed);
750   if (sample > max_so_far) {
751     max_this_round_.compare_exchange_weak(max_so_far, sample,
752                                           std::memory_order_relaxed,
753                                           std::memory_order_relaxed);
754   }
755   // If memory pressure is almost done, immediately hit the brakes and report
756   // full memory usage.
757   if (sample >= 0.99) {
758     report_.store(1.0, std::memory_order_relaxed);
759   }
760   update_.Tick([&](Duration) {
761     // Reset the round tracker with the new sample.
762     const double current_estimate =
763         max_this_round_.exchange(sample, std::memory_order_relaxed);
764     double report;
765     if (current_estimate > 0.99) {
766       // Under very high memory pressure we... just max things out.
767       report = controller_.Update(1e99);
768     } else {
769       report = controller_.Update(current_estimate - kSetPoint);
770     }
771     GRPC_TRACE_LOG(resource_quota, INFO)
772         << "RQ: pressure:" << current_estimate << " report:" << report
773         << " controller:" << controller_.DebugString();
774     report_.store(report, std::memory_order_relaxed);
775   });
776   return report_.load(std::memory_order_relaxed);
777 }
778 
779 }  // namespace memory_quota_detail
780 
781 //
782 // MemoryQuota
783 //
784 
CreateMemoryAllocator(GRPC_UNUSED absl::string_view name)785 MemoryAllocator MemoryQuota::CreateMemoryAllocator(
786     GRPC_UNUSED absl::string_view name) {
787   auto impl = std::make_shared<GrpcMemoryAllocatorImpl>(memory_quota_);
788   return MemoryAllocator(std::move(impl));
789 }
790 
CreateMemoryOwner()791 MemoryOwner MemoryQuota::CreateMemoryOwner() {
792   // Note: we will likely want to add a name or some way to distinguish
793   // between memory owners once resource quota is fully rolled out and we need
794   // full metrics. One thing to note, however, is that manipulating the name
795   // here (e.g. concatenation) can add significant memory increase when many
796   // owners are created.
797   auto impl = std::make_shared<GrpcMemoryAllocatorImpl>(memory_quota_);
798   return MemoryOwner(std::move(impl));
799 }
800 
AllMemoryQuotas()801 std::vector<std::shared_ptr<BasicMemoryQuota>> AllMemoryQuotas() {
802   return MemoryQuotaTracker::Get().All();
803 }
804 
805 }  // namespace grpc_core
806