• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 #include "src/core/lib/event_engine/posix_engine/lockfree_event.h"
15 
16 #include <grpc/support/atm.h>
17 #include <grpc/support/port_platform.h>
18 
19 #include <atomic>
20 #include <cstdint>
21 
22 #include "absl/log/check.h"
23 #include "absl/status/status.h"
24 #include "src/core/lib/event_engine/posix_engine/event_poller.h"
25 #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h"
26 #include "src/core/util/crash.h"
27 #include "src/core/util/status_helper.h"
28 
29 //  'state' holds the to call when the fd is readable or writable respectively.
30 //    It can contain one of the following values:
31 //      kClosureReady     : The fd has an I/O event of interest but there is no
32 //                          closure yet to execute
33 
34 //      kClosureNotReady : The fd has no I/O event of interest
35 
36 //      closure ptr       : The closure to be executed when the fd has an I/O
37 //                          event of interest
38 
39 //      shutdown_error | kShutdownBit :
40 //                         'shutdown_error' field ORed with kShutdownBit.
41 //                          This indicates that the fd is shutdown. Since all
42 //                          memory allocations are word-aligned, the lower two
43 //                          bits of the shutdown_error pointer are always 0. So
44 //                          it is safe to OR these with kShutdownBit
45 
46 //    Valid state transitions:
47 
48 //    <closure ptr> <-----3------ kClosureNotReady -----1------->  kClosureReady
49 //        |  |                         ^   |    ^                         |  |
50 //        |  |                         |   |    |                         |  |
51 //        |  +--------------4----------+   6    +---------2---------------+  |
52 //        |                                |                                 |
53 //        |                                v                                 |
54 //        +-----5------->  [shutdown_error | kShutdownBit] <-------7---------+
55 
56 //     For 1, 4 : See SetReady() function
57 //     For 2, 3 : See NotifyOn() function
58 //     For 5,6,7: See SetShutdown() function
59 
60 namespace grpc_event_engine {
61 namespace experimental {
62 
InitEvent()63 void LockfreeEvent::InitEvent() {
64   // Perform an atomic store to start the state machine.
65 
66   // Note carefully that LockfreeEvent *MAY* be used whilst in a destroyed
67   // state, while a file descriptor is on a freelist. In such a state it may
68   // be SetReady'd, and so we need to perform an atomic operation here to
69   // ensure no races
70   state_.store(kClosureNotReady, std::memory_order_relaxed);
71 }
72 
DestroyEvent()73 void LockfreeEvent::DestroyEvent() {
74   intptr_t curr;
75   do {
76     curr = state_.load(std::memory_order_relaxed);
77     if (curr & kShutdownBit) {
78       grpc_core::internal::StatusFreeHeapPtr(curr & ~kShutdownBit);
79     } else {
80       CHECK(curr == kClosureNotReady || curr == kClosureReady);
81     }
82     // we CAS in a shutdown, no error value here. If this event is interacted
83     // with post-deletion (see the note in the constructor) we want the bit
84     // pattern to prevent error retention in a deleted object
85   } while (!state_.compare_exchange_strong(curr, kShutdownBit,
86                                            std::memory_order_acq_rel,
87                                            std::memory_order_acquire));
88 }
89 
NotifyOn(PosixEngineClosure * closure)90 void LockfreeEvent::NotifyOn(PosixEngineClosure* closure) {
91   // This load needs to be an acquire load because this can be a shutdown
92   // error that we might need to reference. Adding acquire semantics makes
93   // sure that the shutdown error has been initialized properly before us
94   // referencing it. The load() needs to be performed only once before entry
95   // into the loop. This is because if any of the compare_exchange_strong
96   // operations inside the loop return false, they automatically update curr
97   // with the new value. So it doesn't need to be loaded again.
98   intptr_t curr = state_.load(std::memory_order_acquire);
99   while (true) {
100     switch (curr) {
101       case kClosureNotReady: {
102         // kClosureNotReady -> <closure>.
103 
104         if (state_.compare_exchange_strong(
105                 curr, reinterpret_cast<intptr_t>(closure),
106                 std::memory_order_acq_rel, std::memory_order_acquire)) {
107           return;  // Successful. Return
108         }
109 
110         break;  // retry
111       }
112 
113       case kClosureReady: {
114         // Change the state to kClosureNotReady. Schedule the closure if
115         // successful. If not, the state most likely transitioned to shutdown.
116         // We should retry.
117 
118         if (state_.compare_exchange_strong(curr, kClosureNotReady,
119                                            std::memory_order_acq_rel,
120                                            std::memory_order_acquire)) {
121           scheduler_->Run(closure);
122           return;  // Successful. Return.
123         }
124         break;  // retry
125       }
126 
127       default: {
128         // 'curr' is either a closure or the fd is shutdown(in which case 'curr'
129         // contains a pointer to the shutdown-error). If the fd is shutdown,
130         // schedule the closure with the shutdown error
131         if ((curr & kShutdownBit) > 0) {
132           absl::Status shutdown_err =
133               grpc_core::internal::StatusGetFromHeapPtr(curr & ~kShutdownBit);
134           closure->SetStatus(shutdown_err);
135           scheduler_->Run(closure);
136           return;
137         }
138 
139         // There is already a closure!. This indicates a bug in the code.
140         grpc_core::Crash(
141             "LockfreeEvent::NotifyOn: notify_on called with a previous "
142             "callback still pending");
143       }
144     }
145   }
146 
147   GPR_UNREACHABLE_CODE(return);
148 }
149 
SetShutdown(absl::Status shutdown_error)150 bool LockfreeEvent::SetShutdown(absl::Status shutdown_error) {
151   intptr_t status_ptr = grpc_core::internal::StatusAllocHeapPtr(shutdown_error);
152   gpr_atm new_state = status_ptr | kShutdownBit;
153   // The load() needs to be performed only once before entry
154   // into the loop. This is because if any of the compare_exchange_strong
155   // operations inside the loop return false, they automatically update curr
156   // with the new value. So it doesn't need to be loaded again.
157   intptr_t curr = state_.load(std::memory_order_acquire);
158 
159   while (true) {
160     switch (curr) {
161       case kClosureReady:
162       case kClosureNotReady:
163         // Need a full barrier here so that the initial load in notify_on
164         // doesn't need a barrier
165         if (state_.compare_exchange_strong(curr, new_state,
166                                            std::memory_order_acq_rel,
167                                            std::memory_order_acquire)) {
168           return true;  // early out
169         }
170         break;  // retry
171 
172       default: {
173         // 'curr' is either a closure or the fd is already shutdown
174 
175         // If fd is already shutdown, we are done.
176         if ((curr & kShutdownBit) > 0) {
177           grpc_core::internal::StatusFreeHeapPtr(status_ptr);
178           return false;
179         }
180 
181         // Fd is not shutdown. Schedule the closure and move the state to
182         // shutdown state.
183         // Needs an acquire to pair with setting the closure (and get a
184         // happens-after on that edge), and a release to pair with anything
185         // loading the shutdown state.
186         if (state_.compare_exchange_strong(curr, new_state,
187                                            std::memory_order_acq_rel,
188                                            std::memory_order_acquire)) {
189           auto closure = reinterpret_cast<PosixEngineClosure*>(curr);
190           closure->SetStatus(shutdown_error);
191           scheduler_->Run(closure);
192           return true;
193         }
194         // 'curr' was a closure but now changed to a different state. We will
195         // have to retry
196         break;
197       }
198     }
199   }
200   GPR_UNREACHABLE_CODE(return false);
201 }
202 
SetReady()203 void LockfreeEvent::SetReady() {
204   // The load() needs to be performed only once before entry
205   // into the loop. This is because if any of the compare_exchange_strong
206   // operations inside the loop return false, they automatically update curr
207   // with the new value. So it doesn't need to be loaded again.
208   intptr_t curr = state_.load(std::memory_order_acquire);
209   while (true) {
210     switch (curr) {
211       case kClosureReady: {
212         // Already ready. We are done here.
213         return;
214       }
215 
216       case kClosureNotReady: {
217         if (state_.compare_exchange_strong(curr, kClosureReady,
218                                            std::memory_order_acq_rel,
219                                            std::memory_order_acquire)) {
220           return;  // early out
221         }
222         break;  // retry
223       }
224 
225       default: {
226         // 'curr' is either a closure or the fd is shutdown
227         if ((curr & kShutdownBit) > 0) {
228           // The fd is shutdown. Do nothing.
229           return;
230         } else if (state_.compare_exchange_strong(curr, kClosureNotReady,
231                                                   std::memory_order_acq_rel,
232                                                   std::memory_order_acquire)) {
233           // Full cas: acquire pairs with this cas' release in the event of a
234           // spurious set_ready; release pairs with this or the acquire in
235           // notify_on (or set_shutdown)
236           auto closure = reinterpret_cast<PosixEngineClosure*>(curr);
237           closure->SetStatus(absl::OkStatus());
238           scheduler_->Run(closure);
239           return;
240         }
241         // else the state changed again (only possible by either a racing
242         // set_ready or set_shutdown functions. In both these cases, the
243         // closure would have been scheduled for execution. So we are done
244         // here
245         return;
246       }
247     }
248   }
249 }
250 
251 }  // namespace experimental
252 }  // namespace grpc_event_engine
253