• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "src/core/lib/promise/party.h"
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include <atomic>
20 #include <cstdint>
21 
22 #include "absl/base/thread_annotations.h"
23 #include "absl/log/check.h"
24 #include "absl/log/log.h"
25 #include "absl/strings/str_format.h"
26 #include "src/core/lib/event_engine/event_engine_context.h"
27 #include "src/core/lib/iomgr/exec_ctx.h"
28 #include "src/core/lib/promise/activity.h"
29 #include "src/core/util/latent_see.h"
30 #include "src/core/util/sync.h"
31 
32 #ifdef GRPC_MAXIMIZE_THREADYNESS
33 #include "absl/random/random.h"           // IWYU pragma: keep
34 #include "src/core/lib/iomgr/exec_ctx.h"  // IWYU pragma: keep
35 #include "src/core/util/thd.h"            // IWYU pragma: keep
36 #endif
37 
38 namespace grpc_core {
39 
40 ///////////////////////////////////////////////////////////////////////////////
41 // PartySyncUsingAtomics
42 
RefIfNonZero()43 GRPC_MUST_USE_RESULT bool Party::RefIfNonZero() {
44   auto state = state_.load(std::memory_order_relaxed);
45   do {
46     // If zero, we are done (without an increment). If not, we must do a CAS
47     // to maintain the contract: do not increment the counter if it is already
48     // zero
49     if ((state & kRefMask) == 0) {
50       return false;
51     }
52   } while (!state_.compare_exchange_weak(state, state + kOneRef,
53                                          std::memory_order_acq_rel,
54                                          std::memory_order_relaxed));
55   LogStateChange("RefIfNonZero", state, state + kOneRef);
56   return true;
57 }
58 
59 ///////////////////////////////////////////////////////////////////////////////
60 // Party::Handle
61 
62 // Weak handle to a Party.
63 // Handle can persist while Party goes away.
64 class Party::Handle final : public Wakeable {
65  public:
Handle(Party * party)66   explicit Handle(Party* party) : party_(party) {}
67 
68   // Ref the Handle (not the activity).
Ref()69   void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); }
70 
71   // Activity is going away... drop its reference and sever the connection back.
DropActivity()72   void DropActivity() ABSL_LOCKS_EXCLUDED(mu_) {
73     mu_.Lock();
74     CHECK_NE(party_, nullptr);
75     party_ = nullptr;
76     mu_.Unlock();
77     Unref();
78   }
79 
WakeupGeneric(WakeupMask wakeup_mask,void (Party::* wakeup_method)(WakeupMask))80   void WakeupGeneric(WakeupMask wakeup_mask,
81                      void (Party::*wakeup_method)(WakeupMask))
82       ABSL_LOCKS_EXCLUDED(mu_) {
83     mu_.Lock();
84     // Note that activity refcount can drop to zero, but we could win the lock
85     // against DropActivity, so we need to only increase activities refcount if
86     // it is non-zero.
87     Party* party = party_;
88     if (party != nullptr && party->RefIfNonZero()) {
89       mu_.Unlock();
90       // Activity still exists and we have a reference: wake it up, which will
91       // drop the ref.
92       (party->*wakeup_method)(wakeup_mask);
93     } else {
94       // Could not get the activity - it's either gone or going. No need to wake
95       // it up!
96       mu_.Unlock();
97     }
98     // Drop the ref to the handle (we have one ref = one wakeup semantics).
99     Unref();
100   }
101 
102   // Activity needs to wake up (if it still exists!) - wake it up, and drop the
103   // ref that was kept for this handle.
Wakeup(WakeupMask wakeup_mask)104   void Wakeup(WakeupMask wakeup_mask) override ABSL_LOCKS_EXCLUDED(mu_) {
105     WakeupGeneric(wakeup_mask, &Party::Wakeup);
106   }
107 
WakeupAsync(WakeupMask wakeup_mask)108   void WakeupAsync(WakeupMask wakeup_mask) override ABSL_LOCKS_EXCLUDED(mu_) {
109     WakeupGeneric(wakeup_mask, &Party::WakeupAsync);
110   }
111 
Drop(WakeupMask)112   void Drop(WakeupMask) override { Unref(); }
113 
ActivityDebugTag(WakeupMask) const114   std::string ActivityDebugTag(WakeupMask) const override {
115     MutexLock lock(&mu_);
116     return party_ == nullptr ? "<unknown>" : party_->DebugTag();
117   }
118 
119  private:
120   // Unref the Handle (not the activity).
Unref()121   void Unref() {
122     if (1 == refs_.fetch_sub(1, std::memory_order_acq_rel)) {
123       delete this;
124     }
125   }
126 
127   // Two initial refs: one for the waiter that caused instantiation, one for the
128   // party.
129   std::atomic<size_t> refs_{2};
130   mutable Mutex mu_;
131   Party* party_ ABSL_GUARDED_BY(mu_);
132 };
133 
MakeNonOwningWakeable(Party * party)134 Wakeable* Party::Participant::MakeNonOwningWakeable(Party* party) {
135   if (handle_ == nullptr) {
136     handle_ = new Handle(party);
137     return handle_;
138   }
139   handle_->Ref();
140   return handle_;
141 }
142 
~Participant()143 Party::Participant::~Participant() {
144   if (handle_ != nullptr) {
145     handle_->DropActivity();
146   }
147 }
148 
~Party()149 Party::~Party() {}
150 
CancelRemainingParticipants()151 void Party::CancelRemainingParticipants() {
152   uint64_t prev_state = state_.load(std::memory_order_relaxed);
153   if ((prev_state & kAllocatedMask) == 0) return;
154   ScopedActivity activity(this);
155   promise_detail::Context<Arena> arena_ctx(arena_.get());
156   uint64_t clear_state = 0;
157   do {
158     for (size_t i = 0; i < party_detail::kMaxParticipants; i++) {
159       if (auto* p =
160               participants_[i].exchange(nullptr, std::memory_order_acquire)) {
161         clear_state |= 1ull << i << kAllocatedShift;
162         p->Destroy();
163       }
164     }
165     if (clear_state == 0) return;
166   } while (!state_.compare_exchange_weak(prev_state, prev_state & ~clear_state,
167                                          std::memory_order_acq_rel));
168   LogStateChange("CancelRemainingParticipants", prev_state,
169                  prev_state & ~clear_state);
170 }
171 
ActivityDebugTag(WakeupMask wakeup_mask) const172 std::string Party::ActivityDebugTag(WakeupMask wakeup_mask) const {
173   return absl::StrFormat("%s [parts:%x]", DebugTag(), wakeup_mask);
174 }
175 
MakeOwningWaker()176 Waker Party::MakeOwningWaker() {
177   DCHECK(currently_polling_ != kNotPolling);
178   IncrementRefCount();
179   return Waker(this, 1u << currently_polling_);
180 }
181 
MakeNonOwningWaker()182 Waker Party::MakeNonOwningWaker() {
183   DCHECK(currently_polling_ != kNotPolling);
184   return Waker(participants_[currently_polling_]
185                    .load(std::memory_order_relaxed)
186                    ->MakeNonOwningWakeable(this),
187                1u << currently_polling_);
188 }
189 
ForceImmediateRepoll(WakeupMask mask)190 void Party::ForceImmediateRepoll(WakeupMask mask) {
191   DCHECK(is_current());
192   wakeup_mask_ |= mask;
193 }
194 
RunLockedAndUnref(Party * party,uint64_t prev_state)195 void Party::RunLockedAndUnref(Party* party, uint64_t prev_state) {
196   GRPC_LATENT_SEE_PARENT_SCOPE("Party::RunLocked");
197 #ifdef GRPC_MAXIMIZE_THREADYNESS
198   Thread thd(
199       "RunParty",
200       [party, prev_state]() {
201         ApplicationCallbackExecCtx app_exec_ctx;
202         ExecCtx exec_ctx;
203         party->RunPartyAndUnref(prev_state);
204       },
205       nullptr, Thread::Options().set_joinable(false));
206   thd.Start();
207 #else
208   struct RunState;
209   static thread_local RunState* g_run_state = nullptr;
210   struct PartyWakeup {
211     PartyWakeup() : party{nullptr} {}
212     PartyWakeup(Party* party, uint64_t prev_state)
213         : party{party}, prev_state{prev_state} {}
214     Party* party;
215     uint64_t prev_state;
216   };
217   struct RunState {
218     explicit RunState(PartyWakeup first) : first{first}, next{} {}
219     PartyWakeup first;
220     PartyWakeup next;
221     GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION void Run() {
222       g_run_state = this;
223       do {
224         GRPC_LATENT_SEE_INNER_SCOPE("run_one_party");
225         first.party->RunPartyAndUnref(first.prev_state);
226         first = std::exchange(next, PartyWakeup{});
227       } while (first.party != nullptr);
228       DCHECK(g_run_state == this);
229       g_run_state = nullptr;
230     }
231   };
232   // If there is a party running, then we don't run it immediately
233   // but instead add it to the end of the list of parties to run.
234   // This enables a fairly straightforward batching of work from a
235   // call to a transport (or back again).
236   if (GPR_UNLIKELY(g_run_state != nullptr)) {
237     if (g_run_state->first.party == party) {
238       g_run_state->first.prev_state = prev_state;
239       party->Unref();
240       return;
241     }
242     if (g_run_state->next.party == party) {
243       g_run_state->next.prev_state = prev_state;
244       party->Unref();
245       return;
246     }
247     if (g_run_state->next.party != nullptr) {
248       // If there's already a different party queued, we're better off asking
249       // event engine to run it so we can spread load.
250       // We swap the oldest party to run on the event engine so that we don't
251       // accidentally end up with a tail latency problem whereby one party
252       // gets held for a really long time.
253       auto wakeup =
254           std::exchange(g_run_state->next, PartyWakeup{party, prev_state});
255       auto arena = party->arena_.get();
256       auto* event_engine =
257           arena->GetContext<grpc_event_engine::experimental::EventEngine>();
258       CHECK(event_engine != nullptr) << "; " << GRPC_DUMP_ARGS(party, arena);
259       event_engine->Run([wakeup]() {
260         GRPC_LATENT_SEE_PARENT_SCOPE("Party::RunLocked offload");
261         ApplicationCallbackExecCtx app_exec_ctx;
262         ExecCtx exec_ctx;
263         RunState{wakeup}.Run();
264       });
265       return;
266     }
267     g_run_state->next = PartyWakeup{party, prev_state};
268     return;
269   }
270   RunState{{party, prev_state}}.Run();
271 #endif
272 }
273 
RunPartyAndUnref(uint64_t prev_state)274 void Party::RunPartyAndUnref(uint64_t prev_state) {
275   ScopedActivity activity(this);
276   promise_detail::Context<Arena> arena_ctx(arena_.get());
277   DCHECK_EQ(prev_state & kLocked, 0u)
278       << "Party should be unlocked prior to first wakeup";
279   DCHECK_GE(prev_state & kRefMask, kOneRef);
280   // Now update prev_state to be what we want the CAS to see below.
281   DCHECK_EQ(prev_state & ~(kRefMask | kAllocatedMask), 0u)
282       << "Party should have contained no wakeups on lock";
283   prev_state |= kLocked;
284   absl::optional<ScopedTimeCache> time_cache;
285 #if !TARGET_OS_IPHONE
286   if (IsTimeCachingInPartyEnabled()) {
287     time_cache.emplace();
288   }
289 #endif
290   for (;;) {
291     uint64_t keep_allocated_mask = kAllocatedMask;
292     // For each wakeup bit...
293     while (wakeup_mask_ != 0) {
294       auto wakeup_mask = std::exchange(wakeup_mask_, 0);
295       while (wakeup_mask != 0) {
296         const uint64_t t = LowestOneBit(wakeup_mask);
297         const int i = absl::countr_zero(t);
298         wakeup_mask ^= t;
299         // If the participant is null, skip.
300         // This allows participants to complete whilst wakers still exist
301         // somewhere.
302         auto* participant = participants_[i].load(std::memory_order_acquire);
303         if (GPR_UNLIKELY(participant == nullptr)) {
304           GRPC_TRACE_LOG(promise_primitives, INFO)
305               << "Party " << this << "                 Run:Wakeup " << i
306               << " already complete";
307           continue;
308         }
309         GRPC_TRACE_LOG(promise_primitives, INFO)
310             << "Party " << this << "                 Run:Wakeup " << i;
311         // Poll the participant.
312         currently_polling_ = i;
313         if (participant->PollParticipantPromise()) {
314           participants_[i].store(nullptr, std::memory_order_relaxed);
315           const uint64_t allocated_bit = (1u << i << kAllocatedShift);
316           keep_allocated_mask &= ~allocated_bit;
317         }
318       }
319     }
320     currently_polling_ = kNotPolling;
321     // Try to CAS the state we expected to have (with no wakeups or adds)
322     // back to unlocked (by masking in only the ref mask - sans locked bit).
323     // If this succeeds then no wakeups were added, no adds were added, and we
324     // have successfully unlocked.
325     // Otherwise, we need to loop again.
326     // Note that if an owning waker is created or the weak cas spuriously
327     // fails we will also loop again, but in that case see no wakeups or adds
328     // and so will get back here fairly quickly.
329     // TODO(ctiller): consider mitigations for the accidental wakeup on owning
330     // waker creation case -- I currently expect this will be more expensive
331     // than this quick loop.
332     if (state_.compare_exchange_weak(
333             prev_state,
334             (prev_state & (kRefMask | keep_allocated_mask)) - kOneRef,
335             std::memory_order_acq_rel, std::memory_order_acquire)) {
336       LogStateChange("Run:End", prev_state,
337                      (prev_state & (kRefMask | keep_allocated_mask)) - kOneRef);
338       if ((prev_state & kRefMask) == kOneRef) {
339         // We're done with the party.
340         PartyIsOver();
341       }
342       return;
343     }
344     // CAS out (but retrieve) any allocations and wakeups that occurred during
345     // the run.
346     while (!state_.compare_exchange_weak(
347         prev_state, prev_state & (kRefMask | kLocked | keep_allocated_mask))) {
348       // Nothing to do here.
349     }
350     LogStateChange("Run:Continue", prev_state,
351                    prev_state & (kRefMask | kLocked | keep_allocated_mask));
352     DCHECK(prev_state & kLocked)
353         << "Party should be locked; prev_state=" << prev_state;
354     DCHECK_GE(prev_state & kRefMask, kOneRef);
355     // From the previous state, extract which participants we're to wakeup.
356     wakeup_mask_ |= prev_state & kWakeupMask;
357     // Now update prev_state to be what we want the CAS to see once wakeups
358     // complete next iteration.
359     prev_state &= kRefMask | kLocked | keep_allocated_mask;
360   }
361 }
362 
363 // Given a bitmask indicating allocation status of promises, return the index of
364 // the next slot to allocate.
365 // By default we use a deterministic and fast algorithm (fit-first), but we
366 // don't want to guarantee that this is the order of spawning -- if a promise is
367 // locked by another thread (for instance) a sequence of spawns may be reordered
368 // for initial execution.
369 // So for thready-tsan we provide an alternative implementation that
370 // additionally reorders promises.
371 #ifndef GRPC_MAXIMIZE_THREADYNESS
NextAllocationMask(uint64_t current_allocation_mask)372 uint64_t Party::NextAllocationMask(uint64_t current_allocation_mask) {
373   return LowestOneBit(~current_allocation_mask);
374 }
375 #else
NextAllocationMask(uint64_t current_allocation_mask)376 uint64_t Party::NextAllocationMask(uint64_t current_allocation_mask) {
377   CHECK_EQ(current_allocation_mask & ~kWakeupMask, 0);
378   if (current_allocation_mask == kWakeupMask) return kWakeupMask + 1;
379   // Count number of unset bits in the wakeup mask
380   size_t unset_bits = 0;
381   for (size_t i = 0; i < party_detail::kMaxParticipants; i++) {
382     if (current_allocation_mask & (1ull << i)) continue;
383     ++unset_bits;
384   }
385   CHECK_GT(unset_bits, 0);
386   absl::BitGen bitgen;
387   size_t selected = absl::Uniform<size_t>(bitgen, 0, unset_bits);
388   for (size_t i = 0; i < party_detail::kMaxParticipants; i++) {
389     if (current_allocation_mask & (1ull << i)) continue;
390     if (selected == 0) return 1ull << i;
391     --selected;
392   }
393   LOG(FATAL) << "unreachable";
394 }
395 #endif
396 
AddParticipant(Participant * participant)397 void Party::AddParticipant(Participant* participant) {
398   GRPC_LATENT_SEE_INNER_SCOPE("Party::AddParticipant");
399   uint64_t state = state_.load(std::memory_order_acquire);
400   uint64_t allocated;
401   size_t slot;
402 
403   // Find slots for each new participant, ordering them from lowest available
404   // slot upwards to ensure the same poll ordering as presentation ordering to
405   // this function.
406   uint64_t wakeup_mask;
407   uint64_t new_state;
408   do {
409     allocated = (state & kAllocatedMask) >> kAllocatedShift;
410     wakeup_mask = NextAllocationMask(allocated);
411     if (GPR_UNLIKELY((wakeup_mask & kWakeupMask) == 0)) {
412       DelayAddParticipant(participant);
413       return;
414     }
415     DCHECK_NE(wakeup_mask & kWakeupMask, 0u)
416         << "No available slots for new participant; allocated=" << allocated
417         << " state=" << state << " wakeup_mask=" << wakeup_mask;
418     allocated |= wakeup_mask;
419     slot = absl::countr_zero(wakeup_mask);
420     // Try to allocate this slot and take a ref (atomically).
421     // Ref needs to be taken because once we store the participant it could be
422     // spuriously woken up and unref the party.
423     new_state = (state | (allocated << kAllocatedShift)) + kOneRef;
424   } while (!state_.compare_exchange_weak(
425       state, new_state, std::memory_order_acq_rel, std::memory_order_acquire));
426   LogStateChange("AddParticipantsAndRef", state, new_state);
427   GRPC_TRACE_LOG(party_state, INFO)
428       << "Party " << this << "                 AddParticipant: " << slot
429       << " [participant=" << participant << "]";
430   participants_[slot].store(participant, std::memory_order_release);
431   // Now we need to wake up the party.
432   WakeupFromState(new_state, wakeup_mask);
433 }
434 
DelayAddParticipant(Participant * participant)435 void Party::DelayAddParticipant(Participant* participant) {
436   // We need to delay the addition of participants.
437   IncrementRefCount();
438   VLOG_EVERY_N_SEC(2, 10) << "Delaying addition of participant to party "
439                           << this << " because it is full.";
440   arena_->GetContext<grpc_event_engine::experimental::EventEngine>()->Run(
441       [this, participant]() mutable {
442         ApplicationCallbackExecCtx app_exec_ctx;
443         ExecCtx exec_ctx;
444         AddParticipant(participant);
445         Unref();
446       });
447 }
448 
WakeupAsync(WakeupMask wakeup_mask)449 void Party::WakeupAsync(WakeupMask wakeup_mask) {
450   // Or in the wakeup bit for the participant, AND the locked bit.
451   uint64_t prev_state = state_.load(std::memory_order_relaxed);
452   LogStateChange("ScheduleWakeup", prev_state,
453                  prev_state | (wakeup_mask & kWakeupMask) | kLocked);
454   while (true) {
455     if ((prev_state & kLocked) == 0) {
456       if (state_.compare_exchange_weak(prev_state, prev_state | kLocked,
457                                        std::memory_order_acq_rel,
458                                        std::memory_order_acquire)) {
459         LogStateChange("WakeupAsync", prev_state, prev_state | kLocked);
460         wakeup_mask_ |= wakeup_mask;
461         arena_->GetContext<grpc_event_engine::experimental::EventEngine>()->Run(
462             [this, prev_state]() {
463               GRPC_LATENT_SEE_PARENT_SCOPE("Party::WakeupAsync");
464               ApplicationCallbackExecCtx app_exec_ctx;
465               ExecCtx exec_ctx;
466               RunLockedAndUnref(this, prev_state);
467             });
468         return;
469       }
470     } else {
471       if (state_.compare_exchange_weak(
472               prev_state, (prev_state | wakeup_mask) - kOneRef,
473               std::memory_order_acq_rel, std::memory_order_acquire)) {
474         LogStateChange("WakeupAsync", prev_state, prev_state | wakeup_mask);
475         return;
476       }
477     }
478   }
479 }
480 
Drop(WakeupMask)481 void Party::Drop(WakeupMask) { Unref(); }
482 
PartyIsOver()483 void Party::PartyIsOver() {
484   CancelRemainingParticipants();
485   auto arena = std::move(arena_);
486   this->~Party();
487 }
488 
489 }  // namespace grpc_core
490