• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 //
17 
18 #include <grpc/event_engine/event_engine.h>
19 #include <grpc/grpc.h>
20 #include <grpc/support/port_platform.h>
21 #include <grpc/support/sync.h>
22 #include <grpc/support/time.h>
23 #include <grpcpp/alarm.h>
24 #include <grpcpp/completion_queue.h>
25 #include <grpcpp/impl/completion_queue_tag.h>
26 
27 #include <atomic>
28 #include <functional>
29 #include <memory>
30 #include <utility>
31 
32 #include "absl/log/check.h"
33 #include "absl/status/status.h"
34 #include "src/core/lib/event_engine/default_event_engine.h"
35 #include "src/core/lib/iomgr/error.h"
36 #include "src/core/lib/iomgr/exec_ctx.h"
37 #include "src/core/lib/surface/completion_queue.h"
38 #include "src/core/util/time.h"
39 
40 namespace grpc {
41 
42 namespace internal {
43 
44 namespace {
45 using grpc_event_engine::experimental::EventEngine;
46 }  // namespace
47 
48 class AlarmImpl : public grpc::internal::CompletionQueueTag {
49  public:
AlarmImpl()50   AlarmImpl()
51       : event_engine_(grpc_event_engine::experimental::GetDefaultEventEngine()),
52         cq_(nullptr),
53         tag_(nullptr) {
54     gpr_ref_init(&refs_, 1);
55   }
~AlarmImpl()56   ~AlarmImpl() override {}
FinalizeResult(void ** tag,bool *)57   bool FinalizeResult(void** tag, bool* /*status*/) override {
58     *tag = tag_;
59     Unref();
60     return true;
61   }
Set(grpc::CompletionQueue * cq,gpr_timespec deadline,void * tag)62   void Set(grpc::CompletionQueue* cq, gpr_timespec deadline, void* tag) {
63     grpc_core::ExecCtx exec_ctx;
64     GRPC_CQ_INTERNAL_REF(cq->cq(), "alarm");
65     cq_ = cq->cq();
66     tag_ = tag;
67     CHECK(grpc_cq_begin_op(cq_, this));
68     Ref();
69     CHECK(cq_armed_.exchange(true) == false);
70     CHECK(!callback_armed_.load());
71     cq_timer_handle_ = event_engine_->RunAfter(
72         grpc_core::Timestamp::FromTimespecRoundUp(deadline) -
73             grpc_core::ExecCtx::Get()->Now(),
74         [this] { OnCQAlarm(absl::OkStatus()); });
75   }
Set(gpr_timespec deadline,std::function<void (bool)> f)76   void Set(gpr_timespec deadline, std::function<void(bool)> f) {
77     grpc_core::ExecCtx exec_ctx;
78     // Don't use any CQ at all. Instead just use the timer to fire the function
79     callback_ = std::move(f);
80     Ref();
81     CHECK(callback_armed_.exchange(true) == false);
82     CHECK(!cq_armed_.load());
83     callback_timer_handle_ = event_engine_->RunAfter(
84         grpc_core::Timestamp::FromTimespecRoundUp(deadline) -
85             grpc_core::ExecCtx::Get()->Now(),
86         [this] { OnCallbackAlarm(true); });
87   }
Cancel()88   void Cancel() {
89     grpc_core::ExecCtx exec_ctx;
90     if (callback_armed_.load() &&
91         event_engine_->Cancel(callback_timer_handle_)) {
92       event_engine_->Run([this] { OnCallbackAlarm(/*is_ok=*/false); });
93     }
94     if (cq_armed_.load() && event_engine_->Cancel(cq_timer_handle_)) {
95       event_engine_->Run(
96           [this] { OnCQAlarm(absl::CancelledError("cancelled")); });
97     }
98   }
Destroy()99   void Destroy() {
100     Cancel();
101     Unref();
102   }
103 
104  private:
OnCQAlarm(grpc_error_handle error)105   void OnCQAlarm(grpc_error_handle error) {
106     cq_armed_.store(false);
107     grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
108     grpc_core::ExecCtx exec_ctx;
109     // Preserve the cq and reset the cq_ so that the alarm
110     // can be reset when the alarm tag is delivered.
111     grpc_completion_queue* cq = cq_;
112     cq_ = nullptr;
113     grpc_cq_end_op(
114         cq, this, error,
115         [](void* /*arg*/, grpc_cq_completion* /*completion*/) {}, nullptr,
116         &completion_);
117     GRPC_CQ_INTERNAL_UNREF(cq, "alarm");
118   }
119 
OnCallbackAlarm(bool is_ok)120   void OnCallbackAlarm(bool is_ok) {
121     callback_armed_.store(false);
122     grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
123     grpc_core::ExecCtx exec_ctx;
124     callback_(is_ok);
125     Unref();
126   }
127 
Ref()128   void Ref() { gpr_ref(&refs_); }
Unref()129   void Unref() {
130     if (gpr_unref(&refs_)) {
131       delete this;
132     }
133   }
134 
135   std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_;
136   std::atomic<bool> cq_armed_{false};
137   EventEngine::TaskHandle cq_timer_handle_ = EventEngine::TaskHandle::kInvalid;
138   std::atomic<bool> callback_armed_{false};
139   EventEngine::TaskHandle callback_timer_handle_ =
140       EventEngine::TaskHandle::kInvalid;
141   gpr_refcount refs_;
142   grpc_cq_completion completion_;
143   // completion queue where events about this alarm will be posted
144   grpc_completion_queue* cq_;
145   void* tag_;
146   std::function<void(bool)> callback_;
147 };
148 }  // namespace internal
149 
Alarm()150 Alarm::Alarm() : alarm_(new internal::AlarmImpl()) {}
151 
SetInternal(grpc::CompletionQueue * cq,gpr_timespec deadline,void * tag)152 void Alarm::SetInternal(grpc::CompletionQueue* cq, gpr_timespec deadline,
153                         void* tag) {
154   // Note that we know that alarm_ is actually an internal::AlarmImpl
155   // but we declared it as the base pointer to avoid a forward declaration
156   // or exposing core data structures in the C++ public headers.
157   // Thus it is safe to use a static_cast to the subclass here, and the
158   // C++ style guide allows us to do so in this case
159   static_cast<internal::AlarmImpl*>(alarm_)->Set(cq, deadline, tag);
160 }
161 
SetInternal(gpr_timespec deadline,std::function<void (bool)> f)162 void Alarm::SetInternal(gpr_timespec deadline, std::function<void(bool)> f) {
163   // Note that we know that alarm_ is actually an internal::AlarmImpl
164   // but we declared it as the base pointer to avoid a forward declaration
165   // or exposing core data structures in the C++ public headers.
166   // Thus it is safe to use a static_cast to the subclass here, and the
167   // C++ style guide allows us to do so in this case
168   static_cast<internal::AlarmImpl*>(alarm_)->Set(deadline, std::move(f));
169 }
170 
~Alarm()171 Alarm::~Alarm() {
172   if (alarm_ != nullptr) {
173     static_cast<internal::AlarmImpl*>(alarm_)->Destroy();
174   }
175 }
176 
Cancel()177 void Alarm::Cancel() { static_cast<internal::AlarmImpl*>(alarm_)->Cancel(); }
178 }  // namespace grpc
179