1 // Copyright 2022 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #include "src/core/lib/promise/sleep.h" 16 17 #include <grpc/event_engine/event_engine.h> 18 #include <grpc/support/port_platform.h> 19 20 #include <utility> 21 22 #include "src/core/lib/event_engine/event_engine_context.h" // IWYU pragma: keep 23 #include "src/core/lib/iomgr/exec_ctx.h" 24 #include "src/core/lib/promise/activity.h" 25 #include "src/core/lib/promise/context.h" 26 #include "src/core/lib/promise/poll.h" 27 #include "src/core/util/time.h" 28 29 namespace grpc_core { 30 31 using ::grpc_event_engine::experimental::EventEngine; 32 Sleep(Timestamp deadline)33Sleep::Sleep(Timestamp deadline) : deadline_(deadline) {} 34 ~Sleep()35Sleep::~Sleep() { 36 if (closure_ != nullptr) closure_->Cancel(); 37 } 38 operator ()()39Poll<absl::Status> Sleep::operator()() { 40 // Invalidate now so that we see a fresh version of the time. 41 // TODO(ctiller): the following can be safely removed when we remove ExecCtx. 42 ExecCtx::Get()->InvalidateNow(); 43 const auto now = Timestamp::Now(); 44 // If the deadline is earlier than now we can just return. 45 if (deadline_ <= now) return absl::OkStatus(); 46 if (closure_ == nullptr) { 47 // TODO(ctiller): it's likely we'll want a pool of closures - probably per 48 // cpu? - to avoid allocating/deallocating on fast paths. 49 closure_ = new ActiveClosure(deadline_); 50 } 51 if (closure_->HasRun()) return absl::OkStatus(); 52 return Pending{}; 53 } 54 ActiveClosure(Timestamp deadline)55Sleep::ActiveClosure::ActiveClosure(Timestamp deadline) 56 : waker_(GetContext<Activity>()->MakeOwningWaker()), 57 timer_handle_(GetContext<EventEngine>()->RunAfter( 58 deadline - Timestamp::Now(), this)) {} 59 Run()60void Sleep::ActiveClosure::Run() { 61 ApplicationCallbackExecCtx callback_exec_ctx; 62 ExecCtx exec_ctx; 63 auto waker = std::move(waker_); 64 if (Unref()) { 65 delete this; 66 } else { 67 waker.Wakeup(); 68 } 69 } 70 Cancel()71void Sleep::ActiveClosure::Cancel() { 72 // If we cancel correctly then we must own both refs still and can simply 73 // delete without unreffing twice, otherwise try unreffing since this may be 74 // the last owned ref. 75 if (HasRun() || GetContext<EventEngine>()->Cancel(timer_handle_) || Unref()) { 76 delete this; 77 } 78 } 79 Unref()80bool Sleep::ActiveClosure::Unref() { 81 return (refs_.fetch_sub(1, std::memory_order_acq_rel) == 1); 82 } 83 HasRun() const84bool Sleep::ActiveClosure::HasRun() const { 85 // If the closure has run (ie woken up the activity) then it will have 86 // decremented this ref count once. 87 return refs_.load(std::memory_order_acquire) == 1; 88 } 89 90 } // namespace grpc_core 91