1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include "src/core/lib/event_engine/posix_engine/lockfree_event.h"
15
16 #include <grpc/support/atm.h>
17 #include <grpc/support/port_platform.h>
18
19 #include <atomic>
20 #include <cstdint>
21
22 #include "absl/log/check.h"
23 #include "absl/status/status.h"
24 #include "src/core/lib/event_engine/posix_engine/event_poller.h"
25 #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
26 #include "src/core/util/crash.h"
27 #include "src/core/util/status_helper.h"
28
29 // 'state' holds the to call when the fd is readable or writable respectively.
30 // It can contain one of the following values:
31 // kClosureReady : The fd has an I/O event of interest but there is no
32 // closure yet to execute
33
34 // kClosureNotReady : The fd has no I/O event of interest
35
36 // closure ptr : The closure to be executed when the fd has an I/O
37 // event of interest
38
39 // shutdown_error | kShutdownBit :
40 // 'shutdown_error' field ORed with kShutdownBit.
41 // This indicates that the fd is shutdown. Since all
42 // memory allocations are word-aligned, the lower two
43 // bits of the shutdown_error pointer are always 0. So
44 // it is safe to OR these with kShutdownBit
45
46 // Valid state transitions:
47
48 // <closure ptr> <-----3------ kClosureNotReady -----1-------> kClosureReady
49 // | | ^ | ^ | |
50 // | | | | | | |
51 // | +--------------4----------+ 6 +---------2---------------+ |
52 // | | |
53 // | v |
54 // +-----5-------> [shutdown_error | kShutdownBit] <-------7---------+
55
56 // For 1, 4 : See SetReady() function
57 // For 2, 3 : See NotifyOn() function
58 // For 5,6,7: See SetShutdown() function
59
60 namespace grpc_event_engine {
61 namespace experimental {
62
InitEvent()63 void LockfreeEvent::InitEvent() {
64 // Perform an atomic store to start the state machine.
65
66 // Note carefully that LockfreeEvent *MAY* be used whilst in a destroyed
67 // state, while a file descriptor is on a freelist. In such a state it may
68 // be SetReady'd, and so we need to perform an atomic operation here to
69 // ensure no races
70 state_.store(kClosureNotReady, std::memory_order_relaxed);
71 }
72
DestroyEvent()73 void LockfreeEvent::DestroyEvent() {
74 intptr_t curr;
75 do {
76 curr = state_.load(std::memory_order_relaxed);
77 if (curr & kShutdownBit) {
78 grpc_core::internal::StatusFreeHeapPtr(curr & ~kShutdownBit);
79 } else {
80 CHECK(curr == kClosureNotReady || curr == kClosureReady);
81 }
82 // we CAS in a shutdown, no error value here. If this event is interacted
83 // with post-deletion (see the note in the constructor) we want the bit
84 // pattern to prevent error retention in a deleted object
85 } while (!state_.compare_exchange_strong(curr, kShutdownBit,
86 std::memory_order_acq_rel,
87 std::memory_order_acquire));
88 }
89
NotifyOn(PosixEngineClosure * closure)90 void LockfreeEvent::NotifyOn(PosixEngineClosure* closure) {
91 // This load needs to be an acquire load because this can be a shutdown
92 // error that we might need to reference. Adding acquire semantics makes
93 // sure that the shutdown error has been initialized properly before us
94 // referencing it. The load() needs to be performed only once before entry
95 // into the loop. This is because if any of the compare_exchange_strong
96 // operations inside the loop return false, they automatically update curr
97 // with the new value. So it doesn't need to be loaded again.
98 intptr_t curr = state_.load(std::memory_order_acquire);
99 while (true) {
100 switch (curr) {
101 case kClosureNotReady: {
102 // kClosureNotReady -> <closure>.
103
104 if (state_.compare_exchange_strong(
105 curr, reinterpret_cast<intptr_t>(closure),
106 std::memory_order_acq_rel, std::memory_order_acquire)) {
107 return; // Successful. Return
108 }
109
110 break; // retry
111 }
112
113 case kClosureReady: {
114 // Change the state to kClosureNotReady. Schedule the closure if
115 // successful. If not, the state most likely transitioned to shutdown.
116 // We should retry.
117
118 if (state_.compare_exchange_strong(curr, kClosureNotReady,
119 std::memory_order_acq_rel,
120 std::memory_order_acquire)) {
121 scheduler_->Run(closure);
122 return; // Successful. Return.
123 }
124 break; // retry
125 }
126
127 default: {
128 // 'curr' is either a closure or the fd is shutdown(in which case 'curr'
129 // contains a pointer to the shutdown-error). If the fd is shutdown,
130 // schedule the closure with the shutdown error
131 if ((curr & kShutdownBit) > 0) {
132 absl::Status shutdown_err =
133 grpc_core::internal::StatusGetFromHeapPtr(curr & ~kShutdownBit);
134 closure->SetStatus(shutdown_err);
135 scheduler_->Run(closure);
136 return;
137 }
138
139 // There is already a closure!. This indicates a bug in the code.
140 grpc_core::Crash(
141 "LockfreeEvent::NotifyOn: notify_on called with a previous "
142 "callback still pending");
143 }
144 }
145 }
146
147 GPR_UNREACHABLE_CODE(return);
148 }
149
SetShutdown(absl::Status shutdown_error)150 bool LockfreeEvent::SetShutdown(absl::Status shutdown_error) {
151 intptr_t status_ptr = grpc_core::internal::StatusAllocHeapPtr(shutdown_error);
152 gpr_atm new_state = status_ptr | kShutdownBit;
153 // The load() needs to be performed only once before entry
154 // into the loop. This is because if any of the compare_exchange_strong
155 // operations inside the loop return false, they automatically update curr
156 // with the new value. So it doesn't need to be loaded again.
157 intptr_t curr = state_.load(std::memory_order_acquire);
158
159 while (true) {
160 switch (curr) {
161 case kClosureReady:
162 case kClosureNotReady:
163 // Need a full barrier here so that the initial load in notify_on
164 // doesn't need a barrier
165 if (state_.compare_exchange_strong(curr, new_state,
166 std::memory_order_acq_rel,
167 std::memory_order_acquire)) {
168 return true; // early out
169 }
170 break; // retry
171
172 default: {
173 // 'curr' is either a closure or the fd is already shutdown
174
175 // If fd is already shutdown, we are done.
176 if ((curr & kShutdownBit) > 0) {
177 grpc_core::internal::StatusFreeHeapPtr(status_ptr);
178 return false;
179 }
180
181 // Fd is not shutdown. Schedule the closure and move the state to
182 // shutdown state.
183 // Needs an acquire to pair with setting the closure (and get a
184 // happens-after on that edge), and a release to pair with anything
185 // loading the shutdown state.
186 if (state_.compare_exchange_strong(curr, new_state,
187 std::memory_order_acq_rel,
188 std::memory_order_acquire)) {
189 auto closure = reinterpret_cast<PosixEngineClosure*>(curr);
190 closure->SetStatus(shutdown_error);
191 scheduler_->Run(closure);
192 return true;
193 }
194 // 'curr' was a closure but now changed to a different state. We will
195 // have to retry
196 break;
197 }
198 }
199 }
200 GPR_UNREACHABLE_CODE(return false);
201 }
202
SetReady()203 void LockfreeEvent::SetReady() {
204 // The load() needs to be performed only once before entry
205 // into the loop. This is because if any of the compare_exchange_strong
206 // operations inside the loop return false, they automatically update curr
207 // with the new value. So it doesn't need to be loaded again.
208 intptr_t curr = state_.load(std::memory_order_acquire);
209 while (true) {
210 switch (curr) {
211 case kClosureReady: {
212 // Already ready. We are done here.
213 return;
214 }
215
216 case kClosureNotReady: {
217 if (state_.compare_exchange_strong(curr, kClosureReady,
218 std::memory_order_acq_rel,
219 std::memory_order_acquire)) {
220 return; // early out
221 }
222 break; // retry
223 }
224
225 default: {
226 // 'curr' is either a closure or the fd is shutdown
227 if ((curr & kShutdownBit) > 0) {
228 // The fd is shutdown. Do nothing.
229 return;
230 } else if (state_.compare_exchange_strong(curr, kClosureNotReady,
231 std::memory_order_acq_rel,
232 std::memory_order_acquire)) {
233 // Full cas: acquire pairs with this cas' release in the event of a
234 // spurious set_ready; release pairs with this or the acquire in
235 // notify_on (or set_shutdown)
236 auto closure = reinterpret_cast<PosixEngineClosure*>(curr);
237 closure->SetStatus(absl::OkStatus());
238 scheduler_->Run(closure);
239 return;
240 }
241 // else the state changed again (only possible by either a racing
242 // set_ready or set_shutdown functions. In both these cases, the
243 // closure would have been scheduled for execution. So we are done
244 // here
245 return;
246 }
247 }
248 }
249 }
250
251 } // namespace experimental
252 } // namespace grpc_event_engine
253