1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/lib/promise/party.h"
16
17 #include <grpc/support/port_platform.h>
18
19 #include <atomic>
20 #include <cstdint>
21
22 #include "absl/base/thread_annotations.h"
23 #include "absl/log/check.h"
24 #include "absl/log/log.h"
25 #include "absl/strings/str_format.h"
26 #include "src/core/lib/event_engine/event_engine_context.h"
27 #include "src/core/lib/iomgr/exec_ctx.h"
28 #include "src/core/lib/promise/activity.h"
29 #include "src/core/util/latent_see.h"
30 #include "src/core/util/sync.h"
31
32 #ifdef GRPC_MAXIMIZE_THREADYNESS
33 #include "absl/random/random.h" // IWYU pragma: keep
34 #include "src/core/lib/iomgr/exec_ctx.h" // IWYU pragma: keep
35 #include "src/core/util/thd.h" // IWYU pragma: keep
36 #endif
37
38 namespace grpc_core {
39
40 ///////////////////////////////////////////////////////////////////////////////
41 // PartySyncUsingAtomics
42
RefIfNonZero()43 GRPC_MUST_USE_RESULT bool Party::RefIfNonZero() {
44 auto state = state_.load(std::memory_order_relaxed);
45 do {
46 // If zero, we are done (without an increment). If not, we must do a CAS
47 // to maintain the contract: do not increment the counter if it is already
48 // zero
49 if ((state & kRefMask) == 0) {
50 return false;
51 }
52 } while (!state_.compare_exchange_weak(state, state + kOneRef,
53 std::memory_order_acq_rel,
54 std::memory_order_relaxed));
55 LogStateChange("RefIfNonZero", state, state + kOneRef);
56 return true;
57 }
58
59 ///////////////////////////////////////////////////////////////////////////////
60 // Party::Handle
61
62 // Weak handle to a Party.
63 // Handle can persist while Party goes away.
64 class Party::Handle final : public Wakeable {
65 public:
Handle(Party * party)66 explicit Handle(Party* party) : party_(party) {}
67
68 // Ref the Handle (not the activity).
Ref()69 void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); }
70
71 // Activity is going away... drop its reference and sever the connection back.
DropActivity()72 void DropActivity() ABSL_LOCKS_EXCLUDED(mu_) {
73 mu_.Lock();
74 CHECK_NE(party_, nullptr);
75 party_ = nullptr;
76 mu_.Unlock();
77 Unref();
78 }
79
WakeupGeneric(WakeupMask wakeup_mask,void (Party::* wakeup_method)(WakeupMask))80 void WakeupGeneric(WakeupMask wakeup_mask,
81 void (Party::*wakeup_method)(WakeupMask))
82 ABSL_LOCKS_EXCLUDED(mu_) {
83 mu_.Lock();
84 // Note that activity refcount can drop to zero, but we could win the lock
85 // against DropActivity, so we need to only increase activities refcount if
86 // it is non-zero.
87 Party* party = party_;
88 if (party != nullptr && party->RefIfNonZero()) {
89 mu_.Unlock();
90 // Activity still exists and we have a reference: wake it up, which will
91 // drop the ref.
92 (party->*wakeup_method)(wakeup_mask);
93 } else {
94 // Could not get the activity - it's either gone or going. No need to wake
95 // it up!
96 mu_.Unlock();
97 }
98 // Drop the ref to the handle (we have one ref = one wakeup semantics).
99 Unref();
100 }
101
102 // Activity needs to wake up (if it still exists!) - wake it up, and drop the
103 // ref that was kept for this handle.
Wakeup(WakeupMask wakeup_mask)104 void Wakeup(WakeupMask wakeup_mask) override ABSL_LOCKS_EXCLUDED(mu_) {
105 WakeupGeneric(wakeup_mask, &Party::Wakeup);
106 }
107
WakeupAsync(WakeupMask wakeup_mask)108 void WakeupAsync(WakeupMask wakeup_mask) override ABSL_LOCKS_EXCLUDED(mu_) {
109 WakeupGeneric(wakeup_mask, &Party::WakeupAsync);
110 }
111
Drop(WakeupMask)112 void Drop(WakeupMask) override { Unref(); }
113
ActivityDebugTag(WakeupMask) const114 std::string ActivityDebugTag(WakeupMask) const override {
115 MutexLock lock(&mu_);
116 return party_ == nullptr ? "<unknown>" : party_->DebugTag();
117 }
118
119 private:
120 // Unref the Handle (not the activity).
Unref()121 void Unref() {
122 if (1 == refs_.fetch_sub(1, std::memory_order_acq_rel)) {
123 delete this;
124 }
125 }
126
127 // Two initial refs: one for the waiter that caused instantiation, one for the
128 // party.
129 std::atomic<size_t> refs_{2};
130 mutable Mutex mu_;
131 Party* party_ ABSL_GUARDED_BY(mu_);
132 };
133
MakeNonOwningWakeable(Party * party)134 Wakeable* Party::Participant::MakeNonOwningWakeable(Party* party) {
135 if (handle_ == nullptr) {
136 handle_ = new Handle(party);
137 return handle_;
138 }
139 handle_->Ref();
140 return handle_;
141 }
142
~Participant()143 Party::Participant::~Participant() {
144 if (handle_ != nullptr) {
145 handle_->DropActivity();
146 }
147 }
148
~Party()149 Party::~Party() {}
150
CancelRemainingParticipants()151 void Party::CancelRemainingParticipants() {
152 uint64_t prev_state = state_.load(std::memory_order_relaxed);
153 if ((prev_state & kAllocatedMask) == 0) return;
154 ScopedActivity activity(this);
155 promise_detail::Context<Arena> arena_ctx(arena_.get());
156 uint64_t clear_state = 0;
157 do {
158 for (size_t i = 0; i < party_detail::kMaxParticipants; i++) {
159 if (auto* p =
160 participants_[i].exchange(nullptr, std::memory_order_acquire)) {
161 clear_state |= 1ull << i << kAllocatedShift;
162 p->Destroy();
163 }
164 }
165 if (clear_state == 0) return;
166 } while (!state_.compare_exchange_weak(prev_state, prev_state & ~clear_state,
167 std::memory_order_acq_rel));
168 LogStateChange("CancelRemainingParticipants", prev_state,
169 prev_state & ~clear_state);
170 }
171
ActivityDebugTag(WakeupMask wakeup_mask) const172 std::string Party::ActivityDebugTag(WakeupMask wakeup_mask) const {
173 return absl::StrFormat("%s [parts:%x]", DebugTag(), wakeup_mask);
174 }
175
MakeOwningWaker()176 Waker Party::MakeOwningWaker() {
177 DCHECK(currently_polling_ != kNotPolling);
178 IncrementRefCount();
179 return Waker(this, 1u << currently_polling_);
180 }
181
MakeNonOwningWaker()182 Waker Party::MakeNonOwningWaker() {
183 DCHECK(currently_polling_ != kNotPolling);
184 return Waker(participants_[currently_polling_]
185 .load(std::memory_order_relaxed)
186 ->MakeNonOwningWakeable(this),
187 1u << currently_polling_);
188 }
189
ForceImmediateRepoll(WakeupMask mask)190 void Party::ForceImmediateRepoll(WakeupMask mask) {
191 DCHECK(is_current());
192 wakeup_mask_ |= mask;
193 }
194
RunLockedAndUnref(Party * party,uint64_t prev_state)195 void Party::RunLockedAndUnref(Party* party, uint64_t prev_state) {
196 GRPC_LATENT_SEE_PARENT_SCOPE("Party::RunLocked");
197 #ifdef GRPC_MAXIMIZE_THREADYNESS
198 Thread thd(
199 "RunParty",
200 [party, prev_state]() {
201 ApplicationCallbackExecCtx app_exec_ctx;
202 ExecCtx exec_ctx;
203 party->RunPartyAndUnref(prev_state);
204 },
205 nullptr, Thread::Options().set_joinable(false));
206 thd.Start();
207 #else
208 struct RunState;
209 static thread_local RunState* g_run_state = nullptr;
210 struct PartyWakeup {
211 PartyWakeup() : party{nullptr} {}
212 PartyWakeup(Party* party, uint64_t prev_state)
213 : party{party}, prev_state{prev_state} {}
214 Party* party;
215 uint64_t prev_state;
216 };
217 struct RunState {
218 explicit RunState(PartyWakeup first) : first{first}, next{} {}
219 PartyWakeup first;
220 PartyWakeup next;
221 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION void Run() {
222 g_run_state = this;
223 do {
224 GRPC_LATENT_SEE_INNER_SCOPE("run_one_party");
225 first.party->RunPartyAndUnref(first.prev_state);
226 first = std::exchange(next, PartyWakeup{});
227 } while (first.party != nullptr);
228 DCHECK(g_run_state == this);
229 g_run_state = nullptr;
230 }
231 };
232 // If there is a party running, then we don't run it immediately
233 // but instead add it to the end of the list of parties to run.
234 // This enables a fairly straightforward batching of work from a
235 // call to a transport (or back again).
236 if (GPR_UNLIKELY(g_run_state != nullptr)) {
237 if (g_run_state->first.party == party) {
238 g_run_state->first.prev_state = prev_state;
239 party->Unref();
240 return;
241 }
242 if (g_run_state->next.party == party) {
243 g_run_state->next.prev_state = prev_state;
244 party->Unref();
245 return;
246 }
247 if (g_run_state->next.party != nullptr) {
248 // If there's already a different party queued, we're better off asking
249 // event engine to run it so we can spread load.
250 // We swap the oldest party to run on the event engine so that we don't
251 // accidentally end up with a tail latency problem whereby one party
252 // gets held for a really long time.
253 auto wakeup =
254 std::exchange(g_run_state->next, PartyWakeup{party, prev_state});
255 auto arena = party->arena_.get();
256 auto* event_engine =
257 arena->GetContext<grpc_event_engine::experimental::EventEngine>();
258 CHECK(event_engine != nullptr) << "; " << GRPC_DUMP_ARGS(party, arena);
259 event_engine->Run([wakeup]() {
260 GRPC_LATENT_SEE_PARENT_SCOPE("Party::RunLocked offload");
261 ApplicationCallbackExecCtx app_exec_ctx;
262 ExecCtx exec_ctx;
263 RunState{wakeup}.Run();
264 });
265 return;
266 }
267 g_run_state->next = PartyWakeup{party, prev_state};
268 return;
269 }
270 RunState{{party, prev_state}}.Run();
271 #endif
272 }
273
RunPartyAndUnref(uint64_t prev_state)274 void Party::RunPartyAndUnref(uint64_t prev_state) {
275 ScopedActivity activity(this);
276 promise_detail::Context<Arena> arena_ctx(arena_.get());
277 DCHECK_EQ(prev_state & kLocked, 0u)
278 << "Party should be unlocked prior to first wakeup";
279 DCHECK_GE(prev_state & kRefMask, kOneRef);
280 // Now update prev_state to be what we want the CAS to see below.
281 DCHECK_EQ(prev_state & ~(kRefMask | kAllocatedMask), 0u)
282 << "Party should have contained no wakeups on lock";
283 prev_state |= kLocked;
284 absl::optional<ScopedTimeCache> time_cache;
285 #if !TARGET_OS_IPHONE
286 if (IsTimeCachingInPartyEnabled()) {
287 time_cache.emplace();
288 }
289 #endif
290 for (;;) {
291 uint64_t keep_allocated_mask = kAllocatedMask;
292 // For each wakeup bit...
293 while (wakeup_mask_ != 0) {
294 auto wakeup_mask = std::exchange(wakeup_mask_, 0);
295 while (wakeup_mask != 0) {
296 const uint64_t t = LowestOneBit(wakeup_mask);
297 const int i = absl::countr_zero(t);
298 wakeup_mask ^= t;
299 // If the participant is null, skip.
300 // This allows participants to complete whilst wakers still exist
301 // somewhere.
302 auto* participant = participants_[i].load(std::memory_order_acquire);
303 if (GPR_UNLIKELY(participant == nullptr)) {
304 GRPC_TRACE_LOG(promise_primitives, INFO)
305 << "Party " << this << " Run:Wakeup " << i
306 << " already complete";
307 continue;
308 }
309 GRPC_TRACE_LOG(promise_primitives, INFO)
310 << "Party " << this << " Run:Wakeup " << i;
311 // Poll the participant.
312 currently_polling_ = i;
313 if (participant->PollParticipantPromise()) {
314 participants_[i].store(nullptr, std::memory_order_relaxed);
315 const uint64_t allocated_bit = (1u << i << kAllocatedShift);
316 keep_allocated_mask &= ~allocated_bit;
317 }
318 }
319 }
320 currently_polling_ = kNotPolling;
321 // Try to CAS the state we expected to have (with no wakeups or adds)
322 // back to unlocked (by masking in only the ref mask - sans locked bit).
323 // If this succeeds then no wakeups were added, no adds were added, and we
324 // have successfully unlocked.
325 // Otherwise, we need to loop again.
326 // Note that if an owning waker is created or the weak cas spuriously
327 // fails we will also loop again, but in that case see no wakeups or adds
328 // and so will get back here fairly quickly.
329 // TODO(ctiller): consider mitigations for the accidental wakeup on owning
330 // waker creation case -- I currently expect this will be more expensive
331 // than this quick loop.
332 if (state_.compare_exchange_weak(
333 prev_state,
334 (prev_state & (kRefMask | keep_allocated_mask)) - kOneRef,
335 std::memory_order_acq_rel, std::memory_order_acquire)) {
336 LogStateChange("Run:End", prev_state,
337 (prev_state & (kRefMask | keep_allocated_mask)) - kOneRef);
338 if ((prev_state & kRefMask) == kOneRef) {
339 // We're done with the party.
340 PartyIsOver();
341 }
342 return;
343 }
344 // CAS out (but retrieve) any allocations and wakeups that occurred during
345 // the run.
346 while (!state_.compare_exchange_weak(
347 prev_state, prev_state & (kRefMask | kLocked | keep_allocated_mask))) {
348 // Nothing to do here.
349 }
350 LogStateChange("Run:Continue", prev_state,
351 prev_state & (kRefMask | kLocked | keep_allocated_mask));
352 DCHECK(prev_state & kLocked)
353 << "Party should be locked; prev_state=" << prev_state;
354 DCHECK_GE(prev_state & kRefMask, kOneRef);
355 // From the previous state, extract which participants we're to wakeup.
356 wakeup_mask_ |= prev_state & kWakeupMask;
357 // Now update prev_state to be what we want the CAS to see once wakeups
358 // complete next iteration.
359 prev_state &= kRefMask | kLocked | keep_allocated_mask;
360 }
361 }
362
363 // Given a bitmask indicating allocation status of promises, return the index of
364 // the next slot to allocate.
365 // By default we use a deterministic and fast algorithm (fit-first), but we
366 // don't want to guarantee that this is the order of spawning -- if a promise is
367 // locked by another thread (for instance) a sequence of spawns may be reordered
368 // for initial execution.
369 // So for thready-tsan we provide an alternative implementation that
370 // additionally reorders promises.
371 #ifndef GRPC_MAXIMIZE_THREADYNESS
NextAllocationMask(uint64_t current_allocation_mask)372 uint64_t Party::NextAllocationMask(uint64_t current_allocation_mask) {
373 return LowestOneBit(~current_allocation_mask);
374 }
375 #else
NextAllocationMask(uint64_t current_allocation_mask)376 uint64_t Party::NextAllocationMask(uint64_t current_allocation_mask) {
377 CHECK_EQ(current_allocation_mask & ~kWakeupMask, 0);
378 if (current_allocation_mask == kWakeupMask) return kWakeupMask + 1;
379 // Count number of unset bits in the wakeup mask
380 size_t unset_bits = 0;
381 for (size_t i = 0; i < party_detail::kMaxParticipants; i++) {
382 if (current_allocation_mask & (1ull << i)) continue;
383 ++unset_bits;
384 }
385 CHECK_GT(unset_bits, 0);
386 absl::BitGen bitgen;
387 size_t selected = absl::Uniform<size_t>(bitgen, 0, unset_bits);
388 for (size_t i = 0; i < party_detail::kMaxParticipants; i++) {
389 if (current_allocation_mask & (1ull << i)) continue;
390 if (selected == 0) return 1ull << i;
391 --selected;
392 }
393 LOG(FATAL) << "unreachable";
394 }
395 #endif
396
AddParticipant(Participant * participant)397 void Party::AddParticipant(Participant* participant) {
398 GRPC_LATENT_SEE_INNER_SCOPE("Party::AddParticipant");
399 uint64_t state = state_.load(std::memory_order_acquire);
400 uint64_t allocated;
401 size_t slot;
402
403 // Find slots for each new participant, ordering them from lowest available
404 // slot upwards to ensure the same poll ordering as presentation ordering to
405 // this function.
406 uint64_t wakeup_mask;
407 uint64_t new_state;
408 do {
409 allocated = (state & kAllocatedMask) >> kAllocatedShift;
410 wakeup_mask = NextAllocationMask(allocated);
411 if (GPR_UNLIKELY((wakeup_mask & kWakeupMask) == 0)) {
412 DelayAddParticipant(participant);
413 return;
414 }
415 DCHECK_NE(wakeup_mask & kWakeupMask, 0u)
416 << "No available slots for new participant; allocated=" << allocated
417 << " state=" << state << " wakeup_mask=" << wakeup_mask;
418 allocated |= wakeup_mask;
419 slot = absl::countr_zero(wakeup_mask);
420 // Try to allocate this slot and take a ref (atomically).
421 // Ref needs to be taken because once we store the participant it could be
422 // spuriously woken up and unref the party.
423 new_state = (state | (allocated << kAllocatedShift)) + kOneRef;
424 } while (!state_.compare_exchange_weak(
425 state, new_state, std::memory_order_acq_rel, std::memory_order_acquire));
426 LogStateChange("AddParticipantsAndRef", state, new_state);
427 GRPC_TRACE_LOG(party_state, INFO)
428 << "Party " << this << " AddParticipant: " << slot
429 << " [participant=" << participant << "]";
430 participants_[slot].store(participant, std::memory_order_release);
431 // Now we need to wake up the party.
432 WakeupFromState(new_state, wakeup_mask);
433 }
434
DelayAddParticipant(Participant * participant)435 void Party::DelayAddParticipant(Participant* participant) {
436 // We need to delay the addition of participants.
437 IncrementRefCount();
438 VLOG_EVERY_N_SEC(2, 10) << "Delaying addition of participant to party "
439 << this << " because it is full.";
440 arena_->GetContext<grpc_event_engine::experimental::EventEngine>()->Run(
441 [this, participant]() mutable {
442 ApplicationCallbackExecCtx app_exec_ctx;
443 ExecCtx exec_ctx;
444 AddParticipant(participant);
445 Unref();
446 });
447 }
448
WakeupAsync(WakeupMask wakeup_mask)449 void Party::WakeupAsync(WakeupMask wakeup_mask) {
450 // Or in the wakeup bit for the participant, AND the locked bit.
451 uint64_t prev_state = state_.load(std::memory_order_relaxed);
452 LogStateChange("ScheduleWakeup", prev_state,
453 prev_state | (wakeup_mask & kWakeupMask) | kLocked);
454 while (true) {
455 if ((prev_state & kLocked) == 0) {
456 if (state_.compare_exchange_weak(prev_state, prev_state | kLocked,
457 std::memory_order_acq_rel,
458 std::memory_order_acquire)) {
459 LogStateChange("WakeupAsync", prev_state, prev_state | kLocked);
460 wakeup_mask_ |= wakeup_mask;
461 arena_->GetContext<grpc_event_engine::experimental::EventEngine>()->Run(
462 [this, prev_state]() {
463 GRPC_LATENT_SEE_PARENT_SCOPE("Party::WakeupAsync");
464 ApplicationCallbackExecCtx app_exec_ctx;
465 ExecCtx exec_ctx;
466 RunLockedAndUnref(this, prev_state);
467 });
468 return;
469 }
470 } else {
471 if (state_.compare_exchange_weak(
472 prev_state, (prev_state | wakeup_mask) - kOneRef,
473 std::memory_order_acq_rel, std::memory_order_acquire)) {
474 LogStateChange("WakeupAsync", prev_state, prev_state | wakeup_mask);
475 return;
476 }
477 }
478 }
479 }
480
Drop(WakeupMask)481 void Party::Drop(WakeupMask) { Unref(); }
482
PartyIsOver()483 void Party::PartyIsOver() {
484 CancelRemainingParticipants();
485 auto arena = std::move(arena_);
486 this->~Party();
487 }
488
489 } // namespace grpc_core
490