• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_ACTIVITY_H
16 #define GRPC_SRC_CORE_LIB_PROMISE_ACTIVITY_H
17 
18 #include <grpc/support/port_platform.h>
19 #include <stdint.h>
20 
21 #include <algorithm>
22 #include <atomic>
23 #include <memory>
24 #include <string>
25 #include <utility>
26 
27 #include "absl/base/thread_annotations.h"
28 #include "absl/log/check.h"
29 #include "absl/status/status.h"
30 #include "absl/strings/str_cat.h"
31 #include "absl/types/optional.h"
32 #include "src/core/lib/debug/trace.h"
33 #include "src/core/lib/promise/context.h"
34 #include "src/core/lib/promise/detail/promise_factory.h"
35 #include "src/core/lib/promise/detail/status.h"
36 #include "src/core/lib/promise/poll.h"
37 #include "src/core/util/construct_destruct.h"
38 #include "src/core/util/dump_args.h"
39 #include "src/core/util/latent_see.h"
40 #include "src/core/util/no_destruct.h"
41 #include "src/core/util/orphanable.h"
42 #include "src/core/util/sync.h"
43 
44 namespace grpc_core {
45 
46 class Activity;
47 
48 // WakeupMask is a bitfield representing which parts of an activity should be
49 // woken up.
50 using WakeupMask = uint16_t;
51 
52 // A Wakeable object is used by queues to wake activities.
53 class Wakeable {
54  public:
55   // Wake up the underlying activity.
56   // After calling, this Wakeable cannot be used again.
57   // WakeupMask comes from the activity that created this Wakeable and specifies
58   // the set of promises that should be awoken.
59   virtual void Wakeup(WakeupMask wakeup_mask) = 0;
60   // Per Wakeup, but guarantee that the activity will be woken up out-of-line.
61   // Useful if there may be mutexes or the like held by the current thread.
62   virtual void WakeupAsync(WakeupMask wakeup_mask) = 0;
63   // Drop this wakeable without waking up the underlying activity.
64   virtual void Drop(WakeupMask wakeup_mask) = 0;
65 
66   // Return the underlying activity debug tag, or "<unknown>" if not available.
67   virtual std::string ActivityDebugTag(WakeupMask wakeup_mask) const = 0;
68 
69  protected:
~Wakeable()70   inline ~Wakeable() {}
71 };
72 
73 namespace promise_detail {
74 struct Unwakeable final : public Wakeable {
Wakeupfinal75   void Wakeup(WakeupMask) override {}
WakeupAsyncfinal76   void WakeupAsync(WakeupMask) override {}
Dropfinal77   void Drop(WakeupMask) override {}
78   std::string ActivityDebugTag(WakeupMask) const override;
79 };
unwakeable()80 static Unwakeable* unwakeable() {
81   return NoDestructSingleton<Unwakeable>::Get();
82 }
83 }  // namespace promise_detail
84 
85 // An owning reference to a Wakeable.
86 // This type is non-copyable but movable.
87 class Waker {
88  public:
Waker(Wakeable * wakeable,WakeupMask wakeup_mask)89   Waker(Wakeable* wakeable, WakeupMask wakeup_mask)
90       : wakeable_and_arg_{wakeable, wakeup_mask} {}
Waker()91   Waker() : Waker(promise_detail::unwakeable(), 0) {}
~Waker()92   ~Waker() { wakeable_and_arg_.Drop(); }
93   Waker(const Waker&) = delete;
94   Waker& operator=(const Waker&) = delete;
Waker(Waker && other)95   Waker(Waker&& other) noexcept : wakeable_and_arg_(other.Take()) {}
96   Waker& operator=(Waker&& other) noexcept {
97     std::swap(wakeable_and_arg_, other.wakeable_and_arg_);
98     return *this;
99   }
100 
101   // Wake the underlying activity.
Wakeup()102   void Wakeup() { Take().Wakeup(); }
103 
WakeupAsync()104   void WakeupAsync() { Take().WakeupAsync(); }
105 
106   template <typename H>
AbslHashValue(H h,const Waker & w)107   friend H AbslHashValue(H h, const Waker& w) {
108     return H::combine(H::combine(std::move(h), w.wakeable_and_arg_.wakeable),
109                       w.wakeable_and_arg_.wakeup_mask);
110   }
111 
112   bool operator==(const Waker& other) const noexcept {
113     return wakeable_and_arg_ == other.wakeable_and_arg_;
114   }
115 
116   bool operator!=(const Waker& other) const noexcept {
117     return !operator==(other);
118   }
119 
ActivityDebugTag()120   std::string ActivityDebugTag() {
121     return wakeable_and_arg_.ActivityDebugTag();
122   }
123 
DebugString()124   std::string DebugString() const {
125     return absl::StrFormat("Waker{%p, %d}", wakeable_and_arg_.wakeable,
126                            wakeable_and_arg_.wakeup_mask);
127   }
128 
129   template <typename Sink>
AbslStringify(Sink & sink,const Waker & waker)130   friend void AbslStringify(Sink& sink, const Waker& waker) {
131     sink.Append(waker.DebugString());
132   }
133 
134   // This is for tests to assert that a waker is occupied or not.
is_unwakeable()135   bool is_unwakeable() const {
136     return wakeable_and_arg_.wakeable == promise_detail::unwakeable();
137   }
138 
139  private:
140   struct WakeableAndArg {
141     Wakeable* wakeable;
142     WakeupMask wakeup_mask;
143 
WakeupWakeableAndArg144     void Wakeup() { wakeable->Wakeup(wakeup_mask); }
WakeupAsyncWakeableAndArg145     void WakeupAsync() { wakeable->WakeupAsync(wakeup_mask); }
DropWakeableAndArg146     void Drop() { wakeable->Drop(wakeup_mask); }
ActivityDebugTagWakeableAndArg147     std::string ActivityDebugTag() const {
148       return wakeable == nullptr ? "<unknown>"
149                                  : wakeable->ActivityDebugTag(wakeup_mask);
150     }
151     bool operator==(const WakeableAndArg& other) const noexcept {
152       return wakeable == other.wakeable && wakeup_mask == other.wakeup_mask;
153     }
154   };
155 
Take()156   WakeableAndArg Take() {
157     return std::exchange(wakeable_and_arg_, {promise_detail::unwakeable(), 0});
158   }
159 
160   WakeableAndArg wakeable_and_arg_;
161 };
162 
163 // Helper type to track wakeups between objects in the same activity.
164 // Can be fairly fast as no ref counting or locking needs to occur.
165 class IntraActivityWaiter {
166  public:
167   // Register for wakeup, return Pending(). If state is not ready to proceed,
168   // Promises should bottom out here.
169   Pending pending();
170   // Wake the activity
171   void Wake();
172 
173   std::string DebugString() const;
174 
175   template <typename Sink>
AbslStringify(Sink & sink,const IntraActivityWaiter & waker)176   friend void AbslStringify(Sink& sink, const IntraActivityWaiter& waker) {
177     sink.Append(waker.DebugString());
178   }
179 
180  private:
181   WakeupMask wakeups_ = 0;
182 };
183 
184 // An Activity tracks execution of a single promise.
185 // It executes the promise under a mutex.
186 // When the promise stalls, it registers the containing activity to be woken up
187 // later.
188 // The activity takes a callback, which will be called exactly once with the
189 // result of execution.
190 // Activity execution may be cancelled by simply deleting the activity. In such
191 // a case, if execution had not already finished, the done callback would be
192 // called with absl::CancelledError().
193 class Activity : public Orphanable {
194  public:
195   // Force wakeup from the outside.
196   // This should be rarely needed, and usages should be accompanied with a note
197   // on why it's not possible to wakeup with a Waker object.
198   // Nevertheless, it's sometimes useful for integrations with Activity to force
199   // an Activity to repoll.
ForceWakeup()200   void ForceWakeup() { MakeOwningWaker().Wakeup(); }
201 
202   // Force the current activity to immediately repoll if it doesn't complete.
203   virtual void ForceImmediateRepoll(WakeupMask mask) = 0;
204   // Legacy version of ForceImmediateRepoll() that uses the current participant.
205   // Will go away once Party gets merged with Activity. New usage is banned.
ForceImmediateRepoll()206   void ForceImmediateRepoll() { ForceImmediateRepoll(CurrentParticipant()); }
207 
208   // Return the current part of the activity as a bitmask
CurrentParticipant()209   virtual WakeupMask CurrentParticipant() const { return 1; }
210 
211   // Return the current activity.
212   // Additionally:
213   // - assert that there is a current activity (and catch bugs if there's not)
214   // - indicate to thread safety analysis that the current activity is indeed
215   //   locked
216   // - back up that assertion with a runtime check in debug builds (it's
217   //   prohibitively expensive in non-debug builds)
current()218   static Activity* current() { return current_ref(); }
219 
220   // Produce an activity-owning Waker. The produced waker will keep the activity
221   // alive until it's awoken or dropped.
222   virtual Waker MakeOwningWaker() = 0;
223 
224   // Produce a non-owning Waker. The waker will own a small heap allocated weak
225   // pointer to this activity. This is more suitable for wakeups that may not be
226   // delivered until long after the activity should be destroyed.
227   virtual Waker MakeNonOwningWaker() = 0;
228 
229   // Some descriptive text to add to log messages to identify this activity.
230   virtual std::string DebugTag() const;
231 
232  protected:
233   // Check if this activity is the current activity executing on the current
234   // thread.
is_current()235   bool is_current() const { return this == current(); }
236   // Check if there is an activity executing on the current thread.
have_current()237   static bool have_current() { return current() != nullptr; }
238   // Set the current activity at construction, clean it up at destruction.
239   class ScopedActivity {
240    public:
ScopedActivity(Activity * activity)241     explicit ScopedActivity(Activity* activity) : prior_activity_(current()) {
242       current_ref() = activity;
243     }
~ScopedActivity()244     ~ScopedActivity() { current_ref() = prior_activity_; }
245     ScopedActivity(const ScopedActivity&) = delete;
246     ScopedActivity& operator=(const ScopedActivity&) = delete;
247 
248    private:
249     Activity* const prior_activity_;
250   };
251 
252  private:
current_ref()253   static Activity*& current_ref() {
254 #if !defined(_WIN32) || !defined(_DLL)
255     return g_current_activity_;
256 #else
257     // Set during RunLoop to the Activity that's executing.
258     // Being set implies that mu_ is held.
259     static thread_local Activity* current_activity;
260     return current_activity;
261 #endif
262   }
263 #if !defined(_WIN32) || !defined(_DLL)
264   // Set during RunLoop to the Activity that's executing.
265   // Being set implies that mu_ is held.
266   static thread_local Activity* g_current_activity_;
267 #endif
268 };
269 
270 // Owned pointer to one Activity.
271 using ActivityPtr = OrphanablePtr<Activity>;
272 
273 namespace promise_detail {
274 
275 template <typename Context>
276 class ContextHolder {
277  public:
278   using ContextType = Context;
279 
ContextHolder(Context value)280   explicit ContextHolder(Context value) : value_(std::move(value)) {}
GetContext()281   Context* GetContext() { return &value_; }
282 
283  private:
284   Context value_;
285 };
286 
287 template <typename Context>
288 class ContextHolder<Context*> {
289  public:
290   using ContextType = Context;
291 
ContextHolder(Context * value)292   explicit ContextHolder(Context* value) : value_(value) {}
GetContext()293   Context* GetContext() { return value_; }
294 
295  private:
296   Context* value_;
297 };
298 
299 template <typename Context, typename Deleter>
300 class ContextHolder<std::unique_ptr<Context, Deleter>> {
301  public:
302   using ContextType = Context;
303 
ContextHolder(std::unique_ptr<Context,Deleter> value)304   explicit ContextHolder(std::unique_ptr<Context, Deleter> value)
305       : value_(std::move(value)) {}
GetContext()306   Context* GetContext() { return value_.get(); }
307 
308  private:
309   std::unique_ptr<Context, Deleter> value_;
310 };
311 
312 template <typename Context>
313 class ContextHolder<RefCountedPtr<Context>> {
314  public:
315   using ContextType = Context;
316 
ContextHolder(RefCountedPtr<Context> value)317   explicit ContextHolder(RefCountedPtr<Context> value)
318       : value_(std::move(value)) {}
GetContext()319   Context* GetContext() { return value_.get(); }
320 
321  private:
322   RefCountedPtr<Context> value_;
323 };
324 
325 template <>
326 class Context<Activity> {
327  public:
get()328   static Activity* get() { return Activity::current(); }
329 };
330 
331 template <typename HeldContext>
332 using ContextTypeFromHeld = typename ContextHolder<
333     typename std::remove_reference<HeldContext>::type>::ContextType;
334 
335 template <typename... Contexts>
336 class ActivityContexts
337     : public ContextHolder<typename std::remove_reference<Contexts>::type>... {
338  public:
339   explicit ActivityContexts(Contexts&&... contexts)
340       : ContextHolder<typename std::remove_reference<Contexts>::type>(
341             std::forward<Contexts>(contexts))... {}
342 
343   class ScopedContext : public Context<ContextTypeFromHeld<Contexts>>... {
344    public:
345     explicit ScopedContext(ActivityContexts* contexts)
346         : Context<ContextTypeFromHeld<Contexts>>(
347               static_cast<ContextHolder<
348                   typename std::remove_reference<Contexts>::type>*>(contexts)
349                   ->GetContext())... {
350       // Silence `unused-but-set-parameter` in case of Contexts = {}
351       (void)contexts;
352     }
353   };
354 };
355 
356 // A free standing activity: an activity that owns its own synchronization and
357 // memory.
358 // The alternative is an activity that's somehow tied into another system, for
359 // instance the type seen in promise_based_filter.h as we're transitioning from
360 // the old filter stack to the new system.
361 // FreestandingActivity is-a Wakeable, but needs to increment a refcount before
362 // returning that Wakeable interface. Additionally, we want to keep
363 // FreestandingActivity as small as is possible, since it will be used
364 // everywhere. So we use inheritance to provide the Wakeable interface: this
365 // makes it zero sized, and we make the inheritance private to prevent
366 // accidental casting.
367 class FreestandingActivity : public Activity, private Wakeable {
368  public:
MakeOwningWaker()369   Waker MakeOwningWaker() final {
370     Ref();
371     return Waker(this, 0);
372   }
373   Waker MakeNonOwningWaker() final;
374 
Orphan()375   void Orphan() final {
376     Cancel();
377     Unref();
378   }
379 
ForceImmediateRepoll(WakeupMask)380   void ForceImmediateRepoll(WakeupMask) final {
381     mu_.AssertHeld();
382     SetActionDuringRun(ActionDuringRun::kWakeup);
383   }
384 
385  protected:
386   // Action received during a run, in priority order.
387   // If more than one action is received during a run, we use max() to resolve
388   // which one to report (so Cancel overrides Wakeup).
389   enum class ActionDuringRun : uint8_t {
390     kNone,    // No action occurred during run.
391     kWakeup,  // A wakeup occurred during run.
392     kCancel,  // Cancel was called during run.
393   };
394 
~FreestandingActivity()395   inline ~FreestandingActivity() override {
396     if (handle_) {
397       DropHandle();
398     }
399   }
400 
401   // Check if we got an internal wakeup since the last time this function was
402   // called.
GotActionDuringRun()403   ActionDuringRun GotActionDuringRun() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
404     return std::exchange(action_during_run_, ActionDuringRun::kNone);
405   }
406 
407   // Implementors of Wakeable::Wakeup should call this after the wakeup has
408   // completed.
WakeupComplete()409   void WakeupComplete() { Unref(); }
410 
411   // Set the action that occurred during this run.
412   // We use max to combine actions so that cancellation overrides wakeups.
SetActionDuringRun(ActionDuringRun action)413   void SetActionDuringRun(ActionDuringRun action)
414       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
415     action_during_run_ = std::max(action_during_run_, action);
416   }
417 
mu()418   Mutex* mu() ABSL_LOCK_RETURNED(mu_) { return &mu_; }
419 
ActivityDebugTag(WakeupMask)420   std::string ActivityDebugTag(WakeupMask) const override { return DebugTag(); }
421 
422  private:
423   class Handle;
424 
425   // Cancel execution of the underlying promise.
426   virtual void Cancel() = 0;
427 
Ref()428   void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); }
Unref()429   void Unref() {
430     if (1 == refs_.fetch_sub(1, std::memory_order_acq_rel)) {
431       delete this;
432     }
433   }
434 
435   // Return a Handle instance with a ref so that it can be stored waiting for
436   // some wakeup.
437   Handle* RefHandle() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
438   // If our refcount is non-zero, ref and return true.
439   // Otherwise, return false.
440   bool RefIfNonzero();
441   // Drop the (proved existing) wait handle.
442   void DropHandle() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
443 
444   // All promise execution occurs under this mutex.
445   Mutex mu_;
446 
447   // Current refcount.
448   std::atomic<uint32_t> refs_{1};
449   // If wakeup is called during Promise polling, we set this to Wakeup and
450   // repoll. If cancel is called during Promise polling, we set this to Cancel
451   // and cancel at the end of polling.
452   ActionDuringRun action_during_run_ ABSL_GUARDED_BY(mu_) =
453       ActionDuringRun::kNone;
454   // Handle for long waits. Allows a very small weak pointer type object to
455   // queue for wakeups while Activity may be deleted earlier.
456   Handle* handle_ ABSL_GUARDED_BY(mu_) = nullptr;
457 };
458 
459 // Implementation details for an Activity of an arbitrary type of promise.
460 // There should exist an inner template class `BoundScheduler` that provides
461 // the following interface:
462 // struct WakeupScheduler {
463 //   template <typename ActivityType>
464 //   class BoundScheduler {
465 //    public:
466 //     BoundScheduler(WakeupScheduler);
467 //     void ScheduleWakeup();
468 //   };
469 // };
470 // The ScheduleWakeup function should arrange that
471 // static_cast<ActivityType*>(this)->RunScheduledWakeup() be invoked at the
472 // earliest opportunity.
473 // It can assume that activity will remain live until RunScheduledWakeup() is
474 // invoked, and that a given activity will not be concurrently scheduled again
475 // until its RunScheduledWakeup() has been invoked.
476 // We use private inheritance here as a way of getting private members for each
477 // of the contexts.
478 // TODO(ctiller): We can probably reconsider the private inheritance here
479 // when we move away from C++11 and have more powerful template features.
480 template <class F, class WakeupScheduler, class OnDone, typename... Contexts>
481 class PromiseActivity final
482     : public FreestandingActivity,
483       public WakeupScheduler::template BoundScheduler<
484           PromiseActivity<F, WakeupScheduler, OnDone, Contexts...>>,
485       private ActivityContexts<Contexts...> {
486  public:
487   using Factory = OncePromiseFactory<void, F>;
488   using ResultType = typename Factory::Promise::Result;
489 
PromiseActivity(F promise_factory,WakeupScheduler wakeup_scheduler,OnDone on_done,Contexts &&...contexts)490   PromiseActivity(F promise_factory, WakeupScheduler wakeup_scheduler,
491                   OnDone on_done, Contexts&&... contexts)
492       : FreestandingActivity(),
493         WakeupScheduler::template BoundScheduler<PromiseActivity>(
494             std::move(wakeup_scheduler)),
495         ActivityContexts<Contexts...>(std::forward<Contexts>(contexts)...),
496         on_done_(std::move(on_done)) {
497     // Lock, construct an initial promise from the factory, and step it.
498     // This may hit a waiter, which could expose our this pointer to other
499     // threads, meaning we do need to hold this mutex even though we're still
500     // constructing.
501     mu()->Lock();
502     auto status = Start(Factory(std::move(promise_factory)));
503     mu()->Unlock();
504     // We may complete immediately.
505     if (status.has_value()) {
506       on_done_(std::move(*status));
507     }
508   }
509 
~PromiseActivity()510   ~PromiseActivity() override {
511     // We shouldn't destruct without calling Cancel() first, and that must get
512     // us to be done_, so we assume that and have no logic to destruct the
513     // promise here.
514     CHECK(done_);
515   }
516 
RunScheduledWakeup()517   void RunScheduledWakeup() {
518     CHECK(wakeup_scheduled_.exchange(false, std::memory_order_acq_rel));
519     Step();
520     WakeupComplete();
521   }
522 
523  private:
524   using typename ActivityContexts<Contexts...>::ScopedContext;
525 
Cancel()526   void Cancel() final {
527     if (Activity::is_current()) {
528       mu()->AssertHeld();
529       SetActionDuringRun(ActionDuringRun::kCancel);
530       return;
531     }
532     bool was_done;
533     {
534       MutexLock lock(mu());
535       // Check if we were done, and flag done.
536       was_done = done_;
537       if (!done_) {
538         ScopedActivity scoped_activity(this);
539         ScopedContext contexts(this);
540         MarkDone();
541       }
542     }
543     // If we were not done, then call the on_done callback.
544     if (!was_done) {
545       on_done_(absl::CancelledError());
546     }
547   }
548 
549   // Wakeup this activity. Arrange to poll the activity again at a convenient
550   // time: this could be inline if it's deemed safe, or it could be by passing
551   // the activity to an external threadpool to run. If the activity is already
552   // running on this thread, a note is taken of such and the activity is
553   // repolled if it doesn't complete.
Wakeup(WakeupMask m)554   void Wakeup(WakeupMask m) final {
555     // If there is an active activity, but hey it's us, flag that and we'll loop
556     // in RunLoop (that's calling from above here!).
557     if (Activity::is_current()) {
558       mu()->AssertHeld();
559       SetActionDuringRun(ActionDuringRun::kWakeup);
560       WakeupComplete();
561       return;
562     }
563     WakeupAsync(m);
564   }
565 
WakeupAsync(WakeupMask)566   void WakeupAsync(WakeupMask) final {
567     GRPC_LATENT_SEE_INNER_SCOPE("PromiseActivity::WakeupAsync");
568     wakeup_flow_.Begin(GRPC_LATENT_SEE_METADATA("Activity::Wakeup"));
569     if (!wakeup_scheduled_.exchange(true, std::memory_order_acq_rel)) {
570       // Can't safely run, so ask to run later.
571       this->ScheduleWakeup();
572     } else {
573       // Already a wakeup scheduled for later, drop ref.
574       WakeupComplete();
575     }
576   }
577 
578   // Drop a wakeup
Drop(WakeupMask)579   void Drop(WakeupMask) final { this->WakeupComplete(); }
580 
581   // Notification that we're no longer executing - it's ok to destruct the
582   // promise.
MarkDone()583   void MarkDone() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) {
584     CHECK(!std::exchange(done_, true));
585     ScopedContext contexts(this);
586     Destruct(&promise_holder_.promise);
587   }
588 
589   // In response to Wakeup, run the Promise state machine again until it
590   // settles. Then check for completion, and if we have completed, call on_done.
Step()591   void Step() ABSL_LOCKS_EXCLUDED(mu()) {
592     GRPC_LATENT_SEE_PARENT_SCOPE("PromiseActivity::Step");
593     wakeup_flow_.End();
594     // Poll the promise until things settle out under a lock.
595     mu()->Lock();
596     if (done_) {
597       // We might get some spurious wakeups after finishing.
598       mu()->Unlock();
599       return;
600     }
601     auto status = RunStep();
602     mu()->Unlock();
603     if (status.has_value()) {
604       on_done_(std::move(*status));
605     }
606   }
607 
608   // The main body of a step: set the current activity, and any contexts, and
609   // then run the main polling loop. Contained in a function by itself in
610   // order to keep the scoping rules a little easier in Step().
RunStep()611   absl::optional<ResultType> RunStep() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) {
612     ScopedActivity scoped_activity(this);
613     ScopedContext contexts(this);
614     return StepLoop();
615   }
616 
617   // Similarly to RunStep, but additionally construct the promise from a
618   // promise factory before entering the main loop. Called once from the
619   // constructor.
Start(Factory promise_factory)620   absl::optional<ResultType> Start(Factory promise_factory)
621       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) {
622     ScopedActivity scoped_activity(this);
623     ScopedContext contexts(this);
624     Construct(&promise_holder_.promise, promise_factory.Make());
625     return StepLoop();
626   }
627 
628   // Until there are no wakeups from within and the promise is incomplete:
629   // poll the promise.
StepLoop()630   absl::optional<ResultType> StepLoop() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) {
631     CHECK(is_current());
632     while (true) {
633       // Run the promise.
634       CHECK(!done_);
635       auto r = promise_holder_.promise();
636       if (auto* status = r.value_if_ready()) {
637         // If complete, destroy the promise, flag done, and exit this loop.
638         MarkDone();
639         return IntoStatus(status);
640       }
641       // Continue looping til no wakeups occur.
642       switch (GotActionDuringRun()) {
643         case ActionDuringRun::kNone:
644           return {};
645         case ActionDuringRun::kWakeup:
646           break;
647         case ActionDuringRun::kCancel:
648           MarkDone();
649           return absl::CancelledError();
650       }
651     }
652   }
653 
654   using Promise = typename Factory::Promise;
655   // Callback on completion of the promise.
656   GPR_NO_UNIQUE_ADDRESS OnDone on_done_;
657   // Has execution completed?
658   GPR_NO_UNIQUE_ADDRESS bool done_ ABSL_GUARDED_BY(mu()) = false;
659   // Is there a wakeup scheduled?
660   GPR_NO_UNIQUE_ADDRESS std::atomic<bool> wakeup_scheduled_{false};
661   // We wrap the promise in a union to allow control over the construction
662   // simultaneously with annotating mutex requirements and noting that the
663   // promise contained may not use any memory.
664   union PromiseHolder {
PromiseHolder()665     PromiseHolder() {}
~PromiseHolder()666     ~PromiseHolder() {}
667     GPR_NO_UNIQUE_ADDRESS Promise promise;
668   };
669   GPR_NO_UNIQUE_ADDRESS PromiseHolder promise_holder_ ABSL_GUARDED_BY(mu());
670   GPR_NO_UNIQUE_ADDRESS latent_see::Flow wakeup_flow_;
671 };
672 
673 }  // namespace promise_detail
674 
675 // Given a functor that returns a promise (a promise factory), a callback for
676 // completion, and a callback scheduler, construct an activity.
677 template <typename Factory, typename WakeupScheduler, typename OnDone,
678           typename... Contexts>
MakeActivity(Factory promise_factory,WakeupScheduler wakeup_scheduler,OnDone on_done,Contexts &&...contexts)679 ActivityPtr MakeActivity(Factory promise_factory,
680                          WakeupScheduler wakeup_scheduler, OnDone on_done,
681                          Contexts&&... contexts) {
682   return ActivityPtr(
683       new promise_detail::PromiseActivity<Factory, WakeupScheduler, OnDone,
684                                           Contexts...>(
685           std::move(promise_factory), std::move(wakeup_scheduler),
686           std::move(on_done), std::forward<Contexts>(contexts)...));
687 }
688 
pending()689 inline Pending IntraActivityWaiter::pending() {
690   const auto new_wakeups = GetContext<Activity>()->CurrentParticipant();
691   GRPC_TRACE_LOG(promise_primitives, INFO)
692       << "IntraActivityWaiter::pending: "
693       << GRPC_DUMP_ARGS(this, new_wakeups, wakeups_);
694   wakeups_ |= new_wakeups;
695   return Pending();
696 }
697 
Wake()698 inline void IntraActivityWaiter::Wake() {
699   if (wakeups_ == 0) return;
700   GetContext<Activity>()->ForceImmediateRepoll(std::exchange(wakeups_, 0));
701 }
702 
703 }  // namespace grpc_core
704 
705 #endif  // GRPC_SRC_CORE_LIB_PROMISE_ACTIVITY_H
706