1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 //
17
18 #include <grpc/event_engine/event_engine.h>
19 #include <grpc/grpc.h>
20 #include <grpc/support/port_platform.h>
21 #include <grpc/support/sync.h>
22 #include <grpc/support/time.h>
23 #include <grpcpp/alarm.h>
24 #include <grpcpp/completion_queue.h>
25 #include <grpcpp/impl/completion_queue_tag.h>
26
27 #include <atomic>
28 #include <functional>
29 #include <memory>
30 #include <utility>
31
32 #include "absl/log/check.h"
33 #include "absl/status/status.h"
34 #include "src/core/lib/event_engine/default_event_engine.h"
35 #include "src/core/lib/iomgr/error.h"
36 #include "src/core/lib/iomgr/exec_ctx.h"
37 #include "src/core/lib/surface/completion_queue.h"
38 #include "src/core/util/time.h"
39
40 namespace grpc {
41
42 namespace internal {
43
44 namespace {
45 using grpc_event_engine::experimental::EventEngine;
46 } // namespace
47
48 class AlarmImpl : public grpc::internal::CompletionQueueTag {
49 public:
AlarmImpl()50 AlarmImpl()
51 : event_engine_(grpc_event_engine::experimental::GetDefaultEventEngine()),
52 cq_(nullptr),
53 tag_(nullptr) {
54 gpr_ref_init(&refs_, 1);
55 }
~AlarmImpl()56 ~AlarmImpl() override {}
FinalizeResult(void ** tag,bool *)57 bool FinalizeResult(void** tag, bool* /*status*/) override {
58 *tag = tag_;
59 Unref();
60 return true;
61 }
Set(grpc::CompletionQueue * cq,gpr_timespec deadline,void * tag)62 void Set(grpc::CompletionQueue* cq, gpr_timespec deadline, void* tag) {
63 grpc_core::ExecCtx exec_ctx;
64 GRPC_CQ_INTERNAL_REF(cq->cq(), "alarm");
65 cq_ = cq->cq();
66 tag_ = tag;
67 CHECK(grpc_cq_begin_op(cq_, this));
68 Ref();
69 CHECK(cq_armed_.exchange(true) == false);
70 CHECK(!callback_armed_.load());
71 cq_timer_handle_ = event_engine_->RunAfter(
72 grpc_core::Timestamp::FromTimespecRoundUp(deadline) -
73 grpc_core::ExecCtx::Get()->Now(),
74 [this] { OnCQAlarm(absl::OkStatus()); });
75 }
Set(gpr_timespec deadline,std::function<void (bool)> f)76 void Set(gpr_timespec deadline, std::function<void(bool)> f) {
77 grpc_core::ExecCtx exec_ctx;
78 // Don't use any CQ at all. Instead just use the timer to fire the function
79 callback_ = std::move(f);
80 Ref();
81 CHECK(callback_armed_.exchange(true) == false);
82 CHECK(!cq_armed_.load());
83 callback_timer_handle_ = event_engine_->RunAfter(
84 grpc_core::Timestamp::FromTimespecRoundUp(deadline) -
85 grpc_core::ExecCtx::Get()->Now(),
86 [this] { OnCallbackAlarm(true); });
87 }
Cancel()88 void Cancel() {
89 grpc_core::ExecCtx exec_ctx;
90 if (callback_armed_.load() &&
91 event_engine_->Cancel(callback_timer_handle_)) {
92 event_engine_->Run([this] { OnCallbackAlarm(/*is_ok=*/false); });
93 }
94 if (cq_armed_.load() && event_engine_->Cancel(cq_timer_handle_)) {
95 event_engine_->Run(
96 [this] { OnCQAlarm(absl::CancelledError("cancelled")); });
97 }
98 }
Destroy()99 void Destroy() {
100 Cancel();
101 Unref();
102 }
103
104 private:
OnCQAlarm(grpc_error_handle error)105 void OnCQAlarm(grpc_error_handle error) {
106 cq_armed_.store(false);
107 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
108 grpc_core::ExecCtx exec_ctx;
109 // Preserve the cq and reset the cq_ so that the alarm
110 // can be reset when the alarm tag is delivered.
111 grpc_completion_queue* cq = cq_;
112 cq_ = nullptr;
113 grpc_cq_end_op(
114 cq, this, error,
115 [](void* /*arg*/, grpc_cq_completion* /*completion*/) {}, nullptr,
116 &completion_);
117 GRPC_CQ_INTERNAL_UNREF(cq, "alarm");
118 }
119
OnCallbackAlarm(bool is_ok)120 void OnCallbackAlarm(bool is_ok) {
121 callback_armed_.store(false);
122 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
123 grpc_core::ExecCtx exec_ctx;
124 callback_(is_ok);
125 Unref();
126 }
127
Ref()128 void Ref() { gpr_ref(&refs_); }
Unref()129 void Unref() {
130 if (gpr_unref(&refs_)) {
131 delete this;
132 }
133 }
134
135 std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine_;
136 std::atomic<bool> cq_armed_{false};
137 EventEngine::TaskHandle cq_timer_handle_ = EventEngine::TaskHandle::kInvalid;
138 std::atomic<bool> callback_armed_{false};
139 EventEngine::TaskHandle callback_timer_handle_ =
140 EventEngine::TaskHandle::kInvalid;
141 gpr_refcount refs_;
142 grpc_cq_completion completion_;
143 // completion queue where events about this alarm will be posted
144 grpc_completion_queue* cq_;
145 void* tag_;
146 std::function<void(bool)> callback_;
147 };
148 } // namespace internal
149
Alarm()150 Alarm::Alarm() : alarm_(new internal::AlarmImpl()) {}
151
SetInternal(grpc::CompletionQueue * cq,gpr_timespec deadline,void * tag)152 void Alarm::SetInternal(grpc::CompletionQueue* cq, gpr_timespec deadline,
153 void* tag) {
154 // Note that we know that alarm_ is actually an internal::AlarmImpl
155 // but we declared it as the base pointer to avoid a forward declaration
156 // or exposing core data structures in the C++ public headers.
157 // Thus it is safe to use a static_cast to the subclass here, and the
158 // C++ style guide allows us to do so in this case
159 static_cast<internal::AlarmImpl*>(alarm_)->Set(cq, deadline, tag);
160 }
161
SetInternal(gpr_timespec deadline,std::function<void (bool)> f)162 void Alarm::SetInternal(gpr_timespec deadline, std::function<void(bool)> f) {
163 // Note that we know that alarm_ is actually an internal::AlarmImpl
164 // but we declared it as the base pointer to avoid a forward declaration
165 // or exposing core data structures in the C++ public headers.
166 // Thus it is safe to use a static_cast to the subclass here, and the
167 // C++ style guide allows us to do so in this case
168 static_cast<internal::AlarmImpl*>(alarm_)->Set(deadline, std::move(f));
169 }
170
~Alarm()171 Alarm::~Alarm() {
172 if (alarm_ != nullptr) {
173 static_cast<internal::AlarmImpl*>(alarm_)->Destroy();
174 }
175 }
176
Cancel()177 void Alarm::Cancel() { static_cast<internal::AlarmImpl*>(alarm_)->Cancel(); }
178 } // namespace grpc
179