• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_ACTIVITY_H
16 #define GRPC_SRC_CORE_LIB_PROMISE_ACTIVITY_H
17 
18 #include <grpc/support/port_platform.h>
19 
20 #include <stdint.h>
21 
22 #include <algorithm>
23 #include <atomic>
24 #include <memory>
25 #include <string>
26 #include <utility>
27 
28 #include "absl/base/thread_annotations.h"
29 #include "absl/status/status.h"
30 #include "absl/types/optional.h"
31 
32 #include <grpc/support/log.h>
33 
34 #include "src/core/lib/event_engine/event_engine_context.h"
35 #include "src/core/lib/gprpp/construct_destruct.h"
36 #include "src/core/lib/gprpp/no_destruct.h"
37 #include "src/core/lib/gprpp/orphanable.h"
38 #include "src/core/lib/gprpp/sync.h"
39 #include "src/core/lib/promise/context.h"
40 #include "src/core/lib/promise/detail/promise_factory.h"
41 #include "src/core/lib/promise/detail/status.h"
42 #include "src/core/lib/promise/poll.h"
43 
44 namespace grpc_core {
45 
46 class Activity;
47 
48 // WakeupMask is a bitfield representing which parts of an activity should be
49 // woken up.
50 using WakeupMask = uint16_t;
51 
52 // A Wakeable object is used by queues to wake activities.
53 class Wakeable {
54  public:
55   // Wake up the underlying activity.
56   // After calling, this Wakeable cannot be used again.
57   // WakeupMask comes from the activity that created this Wakeable and specifies
58   // the set of promises that should be awoken.
59   virtual void Wakeup(WakeupMask wakeup_mask) = 0;
60   // Per Wakeup, but guarantee that the activity will be woken up out-of-line.
61   // Useful if there may be mutexes or the like held by the current thread.
62   virtual void WakeupAsync(WakeupMask wakeup_mask) = 0;
63   // Drop this wakeable without waking up the underlying activity.
64   virtual void Drop(WakeupMask wakeup_mask) = 0;
65 
66   // Return the underlying activity debug tag, or "<unknown>" if not available.
67   virtual std::string ActivityDebugTag(WakeupMask wakeup_mask) const = 0;
68 
69  protected:
~Wakeable()70   inline ~Wakeable() {}
71 };
72 
73 namespace promise_detail {
74 struct Unwakeable final : public Wakeable {
Wakeupfinal75   void Wakeup(WakeupMask) override {}
WakeupAsyncfinal76   void WakeupAsync(WakeupMask) override {}
Dropfinal77   void Drop(WakeupMask) override {}
78   std::string ActivityDebugTag(WakeupMask) const override;
79 };
unwakeable()80 static Unwakeable* unwakeable() {
81   return NoDestructSingleton<Unwakeable>::Get();
82 }
83 }  // namespace promise_detail
84 
85 // An owning reference to a Wakeable.
86 // This type is non-copyable but movable.
87 class Waker {
88  public:
Waker(Wakeable * wakeable,WakeupMask wakeup_mask)89   Waker(Wakeable* wakeable, WakeupMask wakeup_mask)
90       : wakeable_and_arg_{wakeable, wakeup_mask} {}
Waker()91   Waker() : Waker(promise_detail::unwakeable(), 0) {}
~Waker()92   ~Waker() { wakeable_and_arg_.Drop(); }
93   Waker(const Waker&) = delete;
94   Waker& operator=(const Waker&) = delete;
Waker(Waker && other)95   Waker(Waker&& other) noexcept : wakeable_and_arg_(other.Take()) {}
96   Waker& operator=(Waker&& other) noexcept {
97     std::swap(wakeable_and_arg_, other.wakeable_and_arg_);
98     return *this;
99   }
100 
101   // Wake the underlying activity.
Wakeup()102   void Wakeup() { Take().Wakeup(); }
103 
WakeupAsync()104   void WakeupAsync() { Take().WakeupAsync(); }
105 
106   template <typename H>
AbslHashValue(H h,const Waker & w)107   friend H AbslHashValue(H h, const Waker& w) {
108     return H::combine(H::combine(std::move(h), w.wakeable_and_arg_.wakeable),
109                       w.wakeable_and_arg_.wakeup_mask);
110   }
111 
112   bool operator==(const Waker& other) const noexcept {
113     return wakeable_and_arg_ == other.wakeable_and_arg_;
114   }
115 
116   bool operator!=(const Waker& other) const noexcept {
117     return !operator==(other);
118   }
119 
ActivityDebugTag()120   std::string ActivityDebugTag() {
121     return wakeable_and_arg_.ActivityDebugTag();
122   }
123 
124   // This is for tests to assert that a waker is occupied or not.
is_unwakeable()125   bool is_unwakeable() const {
126     return wakeable_and_arg_.wakeable == promise_detail::unwakeable();
127   }
128 
129  private:
130   struct WakeableAndArg {
131     Wakeable* wakeable;
132     WakeupMask wakeup_mask;
133 
WakeupWakeableAndArg134     void Wakeup() { wakeable->Wakeup(wakeup_mask); }
WakeupAsyncWakeableAndArg135     void WakeupAsync() { wakeable->WakeupAsync(wakeup_mask); }
DropWakeableAndArg136     void Drop() { wakeable->Drop(wakeup_mask); }
ActivityDebugTagWakeableAndArg137     std::string ActivityDebugTag() const {
138       return wakeable == nullptr ? "<unknown>"
139                                  : wakeable->ActivityDebugTag(wakeup_mask);
140     }
141     bool operator==(const WakeableAndArg& other) const noexcept {
142       return wakeable == other.wakeable && wakeup_mask == other.wakeup_mask;
143     }
144   };
145 
Take()146   WakeableAndArg Take() {
147     return std::exchange(wakeable_and_arg_, {promise_detail::unwakeable(), 0});
148   }
149 
150   WakeableAndArg wakeable_and_arg_;
151 };
152 
153 // Helper type to track wakeups between objects in the same activity.
154 // Can be fairly fast as no ref counting or locking needs to occur.
155 class IntraActivityWaiter {
156  public:
157   // Register for wakeup, return Pending(). If state is not ready to proceed,
158   // Promises should bottom out here.
159   Pending pending();
160   // Wake the activity
161   void Wake();
162 
163   std::string DebugString() const;
164 
165  private:
166   WakeupMask wakeups_ = 0;
167 };
168 
169 // An Activity tracks execution of a single promise.
170 // It executes the promise under a mutex.
171 // When the promise stalls, it registers the containing activity to be woken up
172 // later.
173 // The activity takes a callback, which will be called exactly once with the
174 // result of execution.
175 // Activity execution may be cancelled by simply deleting the activity. In such
176 // a case, if execution had not already finished, the done callback would be
177 // called with absl::CancelledError().
178 class Activity : public Orphanable {
179  public:
180   // Force wakeup from the outside.
181   // This should be rarely needed, and usages should be accompanied with a note
182   // on why it's not possible to wakeup with a Waker object.
183   // Nevertheless, it's sometimes useful for integrations with Activity to force
184   // an Activity to repoll.
ForceWakeup()185   void ForceWakeup() { MakeOwningWaker().Wakeup(); }
186 
187   // Force the current activity to immediately repoll if it doesn't complete.
188   virtual void ForceImmediateRepoll(WakeupMask mask) = 0;
189   // Legacy version of ForceImmediateRepoll() that uses the current participant.
190   // Will go away once Party gets merged with Activity. New usage is banned.
ForceImmediateRepoll()191   void ForceImmediateRepoll() { ForceImmediateRepoll(CurrentParticipant()); }
192 
193   // Return the current part of the activity as a bitmask
CurrentParticipant()194   virtual WakeupMask CurrentParticipant() const { return 1; }
195 
196   // Return the current activity.
197   // Additionally:
198   // - assert that there is a current activity (and catch bugs if there's not)
199   // - indicate to thread safety analysis that the current activity is indeed
200   //   locked
201   // - back up that assertation with a runtime check in debug builds (it's
202   //   prohibitively expensive in non-debug builds)
current()203   static Activity* current() { return g_current_activity_; }
204 
205   // Produce an activity-owning Waker. The produced waker will keep the activity
206   // alive until it's awoken or dropped.
207   virtual Waker MakeOwningWaker() = 0;
208 
209   // Produce a non-owning Waker. The waker will own a small heap allocated weak
210   // pointer to this activity. This is more suitable for wakeups that may not be
211   // delivered until long after the activity should be destroyed.
212   virtual Waker MakeNonOwningWaker() = 0;
213 
214   // Some descriptive text to add to log messages to identify this activity.
215   virtual std::string DebugTag() const;
216 
217  protected:
218   // Check if this activity is the current activity executing on the current
219   // thread.
is_current()220   bool is_current() const { return this == g_current_activity_; }
221   // Check if there is an activity executing on the current thread.
have_current()222   static bool have_current() { return g_current_activity_ != nullptr; }
223   // Set the current activity at construction, clean it up at destruction.
224   class ScopedActivity {
225    public:
ScopedActivity(Activity * activity)226     explicit ScopedActivity(Activity* activity)
227         : prior_activity_(g_current_activity_) {
228       g_current_activity_ = activity;
229     }
~ScopedActivity()230     ~ScopedActivity() { g_current_activity_ = prior_activity_; }
231     ScopedActivity(const ScopedActivity&) = delete;
232     ScopedActivity& operator=(const ScopedActivity&) = delete;
233 
234    private:
235     Activity* const prior_activity_;
236   };
237 
238  private:
239   // Set during RunLoop to the Activity that's executing.
240   // Being set implies that mu_ is held.
241   static thread_local Activity* g_current_activity_;
242 };
243 
244 // Owned pointer to one Activity.
245 using ActivityPtr = OrphanablePtr<Activity>;
246 
247 namespace promise_detail {
248 
249 template <typename Context>
250 class ContextHolder {
251  public:
252   using ContextType = Context;
253 
ContextHolder(Context value)254   explicit ContextHolder(Context value) : value_(std::move(value)) {}
GetContext()255   Context* GetContext() { return &value_; }
256 
257  private:
258   Context value_;
259 };
260 
261 template <typename Context>
262 class ContextHolder<Context*> {
263  public:
264   using ContextType = Context;
265 
ContextHolder(Context * value)266   explicit ContextHolder(Context* value) : value_(value) {}
GetContext()267   Context* GetContext() { return value_; }
268 
269  private:
270   Context* value_;
271 };
272 
273 template <typename Context, typename Deleter>
274 class ContextHolder<std::unique_ptr<Context, Deleter>> {
275  public:
276   using ContextType = Context;
277 
ContextHolder(std::unique_ptr<Context,Deleter> value)278   explicit ContextHolder(std::unique_ptr<Context, Deleter> value)
279       : value_(std::move(value)) {}
GetContext()280   Context* GetContext() { return value_.get(); }
281 
282  private:
283   std::unique_ptr<Context, Deleter> value_;
284 };
285 
286 template <>
287 class Context<Activity> {
288  public:
get()289   static Activity* get() { return Activity::current(); }
290 };
291 
292 template <typename HeldContext>
293 using ContextTypeFromHeld = typename ContextHolder<HeldContext>::ContextType;
294 
295 template <typename... Contexts>
296 class ActivityContexts : public ContextHolder<Contexts>... {
297  public:
298   explicit ActivityContexts(Contexts&&... contexts)
299       : ContextHolder<Contexts>(std::forward<Contexts>(contexts))... {}
300 
301   class ScopedContext : public Context<ContextTypeFromHeld<Contexts>>... {
302    public:
303     explicit ScopedContext(ActivityContexts* contexts)
304         : Context<ContextTypeFromHeld<Contexts>>(
305               static_cast<ContextHolder<Contexts>*>(contexts)
306                   ->GetContext())... {
307       // Silence `unused-but-set-parameter` in case of Contexts = {}
308       (void)contexts;
309     }
310   };
311 };
312 
313 // A free standing activity: an activity that owns its own synchronization and
314 // memory.
315 // The alternative is an activity that's somehow tied into another system, for
316 // instance the type seen in promise_based_filter.h as we're transitioning from
317 // the old filter stack to the new system.
318 // FreestandingActivity is-a Wakeable, but needs to increment a refcount before
319 // returning that Wakeable interface. Additionally, we want to keep
320 // FreestandingActivity as small as is possible, since it will be used
321 // everywhere. So we use inheritance to provide the Wakeable interface: this
322 // makes it zero sized, and we make the inheritance private to prevent
323 // accidental casting.
324 class FreestandingActivity : public Activity, private Wakeable {
325  public:
MakeOwningWaker()326   Waker MakeOwningWaker() final {
327     Ref();
328     return Waker(this, 0);
329   }
330   Waker MakeNonOwningWaker() final;
331 
Orphan()332   void Orphan() final {
333     Cancel();
334     Unref();
335   }
336 
ForceImmediateRepoll(WakeupMask)337   void ForceImmediateRepoll(WakeupMask) final {
338     mu_.AssertHeld();
339     SetActionDuringRun(ActionDuringRun::kWakeup);
340   }
341 
342  protected:
343   // Action received during a run, in priority order.
344   // If more than one action is received during a run, we use max() to resolve
345   // which one to report (so Cancel overrides Wakeup).
346   enum class ActionDuringRun : uint8_t {
347     kNone,    // No action occured during run.
348     kWakeup,  // A wakeup occured during run.
349     kCancel,  // Cancel was called during run.
350   };
351 
~FreestandingActivity()352   inline ~FreestandingActivity() override {
353     if (handle_) {
354       DropHandle();
355     }
356   }
357 
358   // Check if we got an internal wakeup since the last time this function was
359   // called.
GotActionDuringRun()360   ActionDuringRun GotActionDuringRun() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
361     return std::exchange(action_during_run_, ActionDuringRun::kNone);
362   }
363 
364   // Implementors of Wakeable::Wakeup should call this after the wakeup has
365   // completed.
WakeupComplete()366   void WakeupComplete() { Unref(); }
367 
368   // Set the action that occured during this run.
369   // We use max to combine actions so that cancellation overrides wakeups.
SetActionDuringRun(ActionDuringRun action)370   void SetActionDuringRun(ActionDuringRun action)
371       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
372     action_during_run_ = std::max(action_during_run_, action);
373   }
374 
mu()375   Mutex* mu() ABSL_LOCK_RETURNED(mu_) { return &mu_; }
376 
ActivityDebugTag(WakeupMask)377   std::string ActivityDebugTag(WakeupMask) const override { return DebugTag(); }
378 
379  private:
380   class Handle;
381 
382   // Cancel execution of the underlying promise.
383   virtual void Cancel() = 0;
384 
Ref()385   void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); }
Unref()386   void Unref() {
387     if (1 == refs_.fetch_sub(1, std::memory_order_acq_rel)) {
388       delete this;
389     }
390   }
391 
392   // Return a Handle instance with a ref so that it can be stored waiting for
393   // some wakeup.
394   Handle* RefHandle() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
395   // If our refcount is non-zero, ref and return true.
396   // Otherwise, return false.
397   bool RefIfNonzero();
398   // Drop the (proved existing) wait handle.
399   void DropHandle() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
400 
401   // All promise execution occurs under this mutex.
402   Mutex mu_;
403 
404   // Current refcount.
405   std::atomic<uint32_t> refs_{1};
406   // If wakeup is called during Promise polling, we set this to Wakeup and
407   // repoll. If cancel is called during Promise polling, we set this to Cancel
408   // and cancel at the end of polling.
409   ActionDuringRun action_during_run_ ABSL_GUARDED_BY(mu_) =
410       ActionDuringRun::kNone;
411   // Handle for long waits. Allows a very small weak pointer type object to
412   // queue for wakeups while Activity may be deleted earlier.
413   Handle* handle_ ABSL_GUARDED_BY(mu_) = nullptr;
414 };
415 
416 // Implementation details for an Activity of an arbitrary type of promise.
417 // There should exist an inner template class `BoundScheduler` that provides
418 // the following interface:
419 // struct WakeupScheduler {
420 //   template <typename ActivityType>
421 //   class BoundScheduler {
422 //    public:
423 //     BoundScheduler(WakeupScheduler);
424 //     void ScheduleWakeup();
425 //   };
426 // };
427 // The ScheduleWakeup function should arrange that
428 // static_cast<ActivityType*>(this)->RunScheduledWakeup() be invoked at the
429 // earliest opportunity.
430 // It can assume that activity will remain live until RunScheduledWakeup() is
431 // invoked, and that a given activity will not be concurrently scheduled again
432 // until its RunScheduledWakeup() has been invoked.
433 // We use private inheritance here as a way of getting private members for each
434 // of the contexts.
435 // TODO(ctiller): We can probably reconsider the private inheritance here
436 // when we move away from C++11 and have more powerful template features.
437 template <class F, class WakeupScheduler, class OnDone, typename... Contexts>
438 class PromiseActivity final
439     : public FreestandingActivity,
440       public WakeupScheduler::template BoundScheduler<
441           PromiseActivity<F, WakeupScheduler, OnDone, Contexts...>>,
442       private ActivityContexts<Contexts...> {
443  public:
444   using Factory = OncePromiseFactory<void, F>;
445   using ResultType = typename Factory::Promise::Result;
446 
PromiseActivity(F promise_factory,WakeupScheduler wakeup_scheduler,OnDone on_done,Contexts &&...contexts)447   PromiseActivity(F promise_factory, WakeupScheduler wakeup_scheduler,
448                   OnDone on_done, Contexts&&... contexts)
449       : FreestandingActivity(),
450         WakeupScheduler::template BoundScheduler<PromiseActivity>(
451             std::move(wakeup_scheduler)),
452         ActivityContexts<Contexts...>(std::forward<Contexts>(contexts)...),
453         on_done_(std::move(on_done)) {
454     // Lock, construct an initial promise from the factory, and step it.
455     // This may hit a waiter, which could expose our this pointer to other
456     // threads, meaning we do need to hold this mutex even though we're still
457     // constructing.
458     mu()->Lock();
459     auto status = Start(Factory(std::move(promise_factory)));
460     mu()->Unlock();
461     // We may complete immediately.
462     if (status.has_value()) {
463       on_done_(std::move(*status));
464     }
465   }
466 
~PromiseActivity()467   ~PromiseActivity() override {
468     // We shouldn't destruct without calling Cancel() first, and that must get
469     // us to be done_, so we assume that and have no logic to destruct the
470     // promise here.
471     GPR_ASSERT(done_);
472   }
473 
RunScheduledWakeup()474   void RunScheduledWakeup() {
475     GPR_ASSERT(wakeup_scheduled_.exchange(false, std::memory_order_acq_rel));
476     Step();
477     WakeupComplete();
478   }
479 
480  private:
481   using typename ActivityContexts<Contexts...>::ScopedContext;
482 
Cancel()483   void Cancel() final {
484     if (Activity::is_current()) {
485       mu()->AssertHeld();
486       SetActionDuringRun(ActionDuringRun::kCancel);
487       return;
488     }
489     bool was_done;
490     {
491       MutexLock lock(mu());
492       // Check if we were done, and flag done.
493       was_done = done_;
494       if (!done_) {
495         ScopedActivity scoped_activity(this);
496         ScopedContext contexts(this);
497         MarkDone();
498       }
499     }
500     // If we were not done, then call the on_done callback.
501     if (!was_done) {
502       on_done_(absl::CancelledError());
503     }
504   }
505 
506   // Wakeup this activity. Arrange to poll the activity again at a convenient
507   // time: this could be inline if it's deemed safe, or it could be by passing
508   // the activity to an external threadpool to run. If the activity is already
509   // running on this thread, a note is taken of such and the activity is
510   // repolled if it doesn't complete.
Wakeup(WakeupMask m)511   void Wakeup(WakeupMask m) final {
512     // If there is an active activity, but hey it's us, flag that and we'll loop
513     // in RunLoop (that's calling from above here!).
514     if (Activity::is_current()) {
515       mu()->AssertHeld();
516       SetActionDuringRun(ActionDuringRun::kWakeup);
517       WakeupComplete();
518       return;
519     }
520     WakeupAsync(m);
521   }
522 
WakeupAsync(WakeupMask)523   void WakeupAsync(WakeupMask) final {
524     if (!wakeup_scheduled_.exchange(true, std::memory_order_acq_rel)) {
525       // Can't safely run, so ask to run later.
526       this->ScheduleWakeup();
527     } else {
528       // Already a wakeup scheduled for later, drop ref.
529       WakeupComplete();
530     }
531   }
532 
533   // Drop a wakeup
Drop(WakeupMask)534   void Drop(WakeupMask) final { this->WakeupComplete(); }
535 
536   // Notification that we're no longer executing - it's ok to destruct the
537   // promise.
MarkDone()538   void MarkDone() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) {
539     GPR_ASSERT(!std::exchange(done_, true));
540     ScopedContext contexts(this);
541     Destruct(&promise_holder_.promise);
542   }
543 
544   // In response to Wakeup, run the Promise state machine again until it
545   // settles. Then check for completion, and if we have completed, call on_done.
Step()546   void Step() ABSL_LOCKS_EXCLUDED(mu()) {
547     // Poll the promise until things settle out under a lock.
548     mu()->Lock();
549     if (done_) {
550       // We might get some spurious wakeups after finishing.
551       mu()->Unlock();
552       return;
553     }
554     auto status = RunStep();
555     mu()->Unlock();
556     if (status.has_value()) {
557       on_done_(std::move(*status));
558     }
559   }
560 
561   // The main body of a step: set the current activity, and any contexts, and
562   // then run the main polling loop. Contained in a function by itself in
563   // order to keep the scoping rules a little easier in Step().
RunStep()564   absl::optional<ResultType> RunStep() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) {
565     ScopedActivity scoped_activity(this);
566     ScopedContext contexts(this);
567     return StepLoop();
568   }
569 
570   // Similarly to RunStep, but additionally construct the promise from a
571   // promise factory before entering the main loop. Called once from the
572   // constructor.
Start(Factory promise_factory)573   absl::optional<ResultType> Start(Factory promise_factory)
574       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) {
575     ScopedActivity scoped_activity(this);
576     ScopedContext contexts(this);
577     Construct(&promise_holder_.promise, promise_factory.Make());
578     return StepLoop();
579   }
580 
581   // Until there are no wakeups from within and the promise is incomplete:
582   // poll the promise.
StepLoop()583   absl::optional<ResultType> StepLoop() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) {
584     GPR_ASSERT(is_current());
585     while (true) {
586       // Run the promise.
587       GPR_ASSERT(!done_);
588       auto r = promise_holder_.promise();
589       if (auto* status = r.value_if_ready()) {
590         // If complete, destroy the promise, flag done, and exit this loop.
591         MarkDone();
592         return IntoStatus(status);
593       }
594       // Continue looping til no wakeups occur.
595       switch (GotActionDuringRun()) {
596         case ActionDuringRun::kNone:
597           return {};
598         case ActionDuringRun::kWakeup:
599           break;
600         case ActionDuringRun::kCancel:
601           MarkDone();
602           return absl::CancelledError();
603       }
604     }
605   }
606 
607   using Promise = typename Factory::Promise;
608   // Callback on completion of the promise.
609   GPR_NO_UNIQUE_ADDRESS OnDone on_done_;
610   // Has execution completed?
611   GPR_NO_UNIQUE_ADDRESS bool done_ ABSL_GUARDED_BY(mu()) = false;
612   // Is there a wakeup scheduled?
613   GPR_NO_UNIQUE_ADDRESS std::atomic<bool> wakeup_scheduled_{false};
614   // We wrap the promise in a union to allow control over the construction
615   // simultaneously with annotating mutex requirements and noting that the
616   // promise contained may not use any memory.
617   union PromiseHolder {
PromiseHolder()618     PromiseHolder() {}
~PromiseHolder()619     ~PromiseHolder() {}
620     GPR_NO_UNIQUE_ADDRESS Promise promise;
621   };
622   GPR_NO_UNIQUE_ADDRESS PromiseHolder promise_holder_ ABSL_GUARDED_BY(mu());
623 };
624 
625 }  // namespace promise_detail
626 
627 // Given a functor that returns a promise (a promise factory), a callback for
628 // completion, and a callback scheduler, construct an activity.
629 template <typename Factory, typename WakeupScheduler, typename OnDone,
630           typename... Contexts>
MakeActivity(Factory promise_factory,WakeupScheduler wakeup_scheduler,OnDone on_done,Contexts &&...contexts)631 ActivityPtr MakeActivity(Factory promise_factory,
632                          WakeupScheduler wakeup_scheduler, OnDone on_done,
633                          Contexts&&... contexts) {
634   return ActivityPtr(
635       new promise_detail::PromiseActivity<Factory, WakeupScheduler, OnDone,
636                                           Contexts...>(
637           std::move(promise_factory), std::move(wakeup_scheduler),
638           std::move(on_done), std::forward<Contexts>(contexts)...));
639 }
640 
pending()641 inline Pending IntraActivityWaiter::pending() {
642   wakeups_ |= GetContext<Activity>()->CurrentParticipant();
643   return Pending();
644 }
645 
Wake()646 inline void IntraActivityWaiter::Wake() {
647   if (wakeups_ == 0) return;
648   GetContext<Activity>()->ForceImmediateRepoll(std::exchange(wakeups_, 0));
649 }
650 
651 }  // namespace grpc_core
652 
653 #endif  // GRPC_SRC_CORE_LIB_PROMISE_ACTIVITY_H
654