1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_ACTIVITY_H
16 #define GRPC_SRC_CORE_LIB_PROMISE_ACTIVITY_H
17
18 #include <grpc/support/port_platform.h>
19
20 #include <stdint.h>
21
22 #include <algorithm>
23 #include <atomic>
24 #include <memory>
25 #include <string>
26 #include <utility>
27
28 #include "absl/base/thread_annotations.h"
29 #include "absl/status/status.h"
30 #include "absl/types/optional.h"
31
32 #include <grpc/support/log.h>
33
34 #include "src/core/lib/event_engine/event_engine_context.h"
35 #include "src/core/lib/gprpp/construct_destruct.h"
36 #include "src/core/lib/gprpp/no_destruct.h"
37 #include "src/core/lib/gprpp/orphanable.h"
38 #include "src/core/lib/gprpp/sync.h"
39 #include "src/core/lib/promise/context.h"
40 #include "src/core/lib/promise/detail/promise_factory.h"
41 #include "src/core/lib/promise/detail/status.h"
42 #include "src/core/lib/promise/poll.h"
43
44 namespace grpc_core {
45
46 class Activity;
47
48 // WakeupMask is a bitfield representing which parts of an activity should be
49 // woken up.
50 using WakeupMask = uint16_t;
51
52 // A Wakeable object is used by queues to wake activities.
53 class Wakeable {
54 public:
55 // Wake up the underlying activity.
56 // After calling, this Wakeable cannot be used again.
57 // WakeupMask comes from the activity that created this Wakeable and specifies
58 // the set of promises that should be awoken.
59 virtual void Wakeup(WakeupMask wakeup_mask) = 0;
60 // Per Wakeup, but guarantee that the activity will be woken up out-of-line.
61 // Useful if there may be mutexes or the like held by the current thread.
62 virtual void WakeupAsync(WakeupMask wakeup_mask) = 0;
63 // Drop this wakeable without waking up the underlying activity.
64 virtual void Drop(WakeupMask wakeup_mask) = 0;
65
66 // Return the underlying activity debug tag, or "<unknown>" if not available.
67 virtual std::string ActivityDebugTag(WakeupMask wakeup_mask) const = 0;
68
69 protected:
~Wakeable()70 inline ~Wakeable() {}
71 };
72
73 namespace promise_detail {
74 struct Unwakeable final : public Wakeable {
Wakeupfinal75 void Wakeup(WakeupMask) override {}
WakeupAsyncfinal76 void WakeupAsync(WakeupMask) override {}
Dropfinal77 void Drop(WakeupMask) override {}
78 std::string ActivityDebugTag(WakeupMask) const override;
79 };
unwakeable()80 static Unwakeable* unwakeable() {
81 return NoDestructSingleton<Unwakeable>::Get();
82 }
83 } // namespace promise_detail
84
85 // An owning reference to a Wakeable.
86 // This type is non-copyable but movable.
87 class Waker {
88 public:
Waker(Wakeable * wakeable,WakeupMask wakeup_mask)89 Waker(Wakeable* wakeable, WakeupMask wakeup_mask)
90 : wakeable_and_arg_{wakeable, wakeup_mask} {}
Waker()91 Waker() : Waker(promise_detail::unwakeable(), 0) {}
~Waker()92 ~Waker() { wakeable_and_arg_.Drop(); }
93 Waker(const Waker&) = delete;
94 Waker& operator=(const Waker&) = delete;
Waker(Waker && other)95 Waker(Waker&& other) noexcept : wakeable_and_arg_(other.Take()) {}
96 Waker& operator=(Waker&& other) noexcept {
97 std::swap(wakeable_and_arg_, other.wakeable_and_arg_);
98 return *this;
99 }
100
101 // Wake the underlying activity.
Wakeup()102 void Wakeup() { Take().Wakeup(); }
103
WakeupAsync()104 void WakeupAsync() { Take().WakeupAsync(); }
105
106 template <typename H>
AbslHashValue(H h,const Waker & w)107 friend H AbslHashValue(H h, const Waker& w) {
108 return H::combine(H::combine(std::move(h), w.wakeable_and_arg_.wakeable),
109 w.wakeable_and_arg_.wakeup_mask);
110 }
111
112 bool operator==(const Waker& other) const noexcept {
113 return wakeable_and_arg_ == other.wakeable_and_arg_;
114 }
115
116 bool operator!=(const Waker& other) const noexcept {
117 return !operator==(other);
118 }
119
ActivityDebugTag()120 std::string ActivityDebugTag() {
121 return wakeable_and_arg_.ActivityDebugTag();
122 }
123
124 // This is for tests to assert that a waker is occupied or not.
is_unwakeable()125 bool is_unwakeable() const {
126 return wakeable_and_arg_.wakeable == promise_detail::unwakeable();
127 }
128
129 private:
130 struct WakeableAndArg {
131 Wakeable* wakeable;
132 WakeupMask wakeup_mask;
133
WakeupWakeableAndArg134 void Wakeup() { wakeable->Wakeup(wakeup_mask); }
WakeupAsyncWakeableAndArg135 void WakeupAsync() { wakeable->WakeupAsync(wakeup_mask); }
DropWakeableAndArg136 void Drop() { wakeable->Drop(wakeup_mask); }
ActivityDebugTagWakeableAndArg137 std::string ActivityDebugTag() const {
138 return wakeable == nullptr ? "<unknown>"
139 : wakeable->ActivityDebugTag(wakeup_mask);
140 }
141 bool operator==(const WakeableAndArg& other) const noexcept {
142 return wakeable == other.wakeable && wakeup_mask == other.wakeup_mask;
143 }
144 };
145
Take()146 WakeableAndArg Take() {
147 return std::exchange(wakeable_and_arg_, {promise_detail::unwakeable(), 0});
148 }
149
150 WakeableAndArg wakeable_and_arg_;
151 };
152
153 // Helper type to track wakeups between objects in the same activity.
154 // Can be fairly fast as no ref counting or locking needs to occur.
155 class IntraActivityWaiter {
156 public:
157 // Register for wakeup, return Pending(). If state is not ready to proceed,
158 // Promises should bottom out here.
159 Pending pending();
160 // Wake the activity
161 void Wake();
162
163 std::string DebugString() const;
164
165 private:
166 WakeupMask wakeups_ = 0;
167 };
168
169 // An Activity tracks execution of a single promise.
170 // It executes the promise under a mutex.
171 // When the promise stalls, it registers the containing activity to be woken up
172 // later.
173 // The activity takes a callback, which will be called exactly once with the
174 // result of execution.
175 // Activity execution may be cancelled by simply deleting the activity. In such
176 // a case, if execution had not already finished, the done callback would be
177 // called with absl::CancelledError().
178 class Activity : public Orphanable {
179 public:
180 // Force wakeup from the outside.
181 // This should be rarely needed, and usages should be accompanied with a note
182 // on why it's not possible to wakeup with a Waker object.
183 // Nevertheless, it's sometimes useful for integrations with Activity to force
184 // an Activity to repoll.
ForceWakeup()185 void ForceWakeup() { MakeOwningWaker().Wakeup(); }
186
187 // Force the current activity to immediately repoll if it doesn't complete.
188 virtual void ForceImmediateRepoll(WakeupMask mask) = 0;
189 // Legacy version of ForceImmediateRepoll() that uses the current participant.
190 // Will go away once Party gets merged with Activity. New usage is banned.
ForceImmediateRepoll()191 void ForceImmediateRepoll() { ForceImmediateRepoll(CurrentParticipant()); }
192
193 // Return the current part of the activity as a bitmask
CurrentParticipant()194 virtual WakeupMask CurrentParticipant() const { return 1; }
195
196 // Return the current activity.
197 // Additionally:
198 // - assert that there is a current activity (and catch bugs if there's not)
199 // - indicate to thread safety analysis that the current activity is indeed
200 // locked
201 // - back up that assertation with a runtime check in debug builds (it's
202 // prohibitively expensive in non-debug builds)
current()203 static Activity* current() { return g_current_activity_; }
204
205 // Produce an activity-owning Waker. The produced waker will keep the activity
206 // alive until it's awoken or dropped.
207 virtual Waker MakeOwningWaker() = 0;
208
209 // Produce a non-owning Waker. The waker will own a small heap allocated weak
210 // pointer to this activity. This is more suitable for wakeups that may not be
211 // delivered until long after the activity should be destroyed.
212 virtual Waker MakeNonOwningWaker() = 0;
213
214 // Some descriptive text to add to log messages to identify this activity.
215 virtual std::string DebugTag() const;
216
217 protected:
218 // Check if this activity is the current activity executing on the current
219 // thread.
is_current()220 bool is_current() const { return this == g_current_activity_; }
221 // Check if there is an activity executing on the current thread.
have_current()222 static bool have_current() { return g_current_activity_ != nullptr; }
223 // Set the current activity at construction, clean it up at destruction.
224 class ScopedActivity {
225 public:
ScopedActivity(Activity * activity)226 explicit ScopedActivity(Activity* activity)
227 : prior_activity_(g_current_activity_) {
228 g_current_activity_ = activity;
229 }
~ScopedActivity()230 ~ScopedActivity() { g_current_activity_ = prior_activity_; }
231 ScopedActivity(const ScopedActivity&) = delete;
232 ScopedActivity& operator=(const ScopedActivity&) = delete;
233
234 private:
235 Activity* const prior_activity_;
236 };
237
238 private:
239 // Set during RunLoop to the Activity that's executing.
240 // Being set implies that mu_ is held.
241 static thread_local Activity* g_current_activity_;
242 };
243
244 // Owned pointer to one Activity.
245 using ActivityPtr = OrphanablePtr<Activity>;
246
247 namespace promise_detail {
248
249 template <typename Context>
250 class ContextHolder {
251 public:
252 using ContextType = Context;
253
ContextHolder(Context value)254 explicit ContextHolder(Context value) : value_(std::move(value)) {}
GetContext()255 Context* GetContext() { return &value_; }
256
257 private:
258 Context value_;
259 };
260
261 template <typename Context>
262 class ContextHolder<Context*> {
263 public:
264 using ContextType = Context;
265
ContextHolder(Context * value)266 explicit ContextHolder(Context* value) : value_(value) {}
GetContext()267 Context* GetContext() { return value_; }
268
269 private:
270 Context* value_;
271 };
272
273 template <typename Context, typename Deleter>
274 class ContextHolder<std::unique_ptr<Context, Deleter>> {
275 public:
276 using ContextType = Context;
277
ContextHolder(std::unique_ptr<Context,Deleter> value)278 explicit ContextHolder(std::unique_ptr<Context, Deleter> value)
279 : value_(std::move(value)) {}
GetContext()280 Context* GetContext() { return value_.get(); }
281
282 private:
283 std::unique_ptr<Context, Deleter> value_;
284 };
285
286 template <>
287 class Context<Activity> {
288 public:
get()289 static Activity* get() { return Activity::current(); }
290 };
291
292 template <typename HeldContext>
293 using ContextTypeFromHeld = typename ContextHolder<HeldContext>::ContextType;
294
295 template <typename... Contexts>
296 class ActivityContexts : public ContextHolder<Contexts>... {
297 public:
298 explicit ActivityContexts(Contexts&&... contexts)
299 : ContextHolder<Contexts>(std::forward<Contexts>(contexts))... {}
300
301 class ScopedContext : public Context<ContextTypeFromHeld<Contexts>>... {
302 public:
303 explicit ScopedContext(ActivityContexts* contexts)
304 : Context<ContextTypeFromHeld<Contexts>>(
305 static_cast<ContextHolder<Contexts>*>(contexts)
306 ->GetContext())... {
307 // Silence `unused-but-set-parameter` in case of Contexts = {}
308 (void)contexts;
309 }
310 };
311 };
312
313 // A free standing activity: an activity that owns its own synchronization and
314 // memory.
315 // The alternative is an activity that's somehow tied into another system, for
316 // instance the type seen in promise_based_filter.h as we're transitioning from
317 // the old filter stack to the new system.
318 // FreestandingActivity is-a Wakeable, but needs to increment a refcount before
319 // returning that Wakeable interface. Additionally, we want to keep
320 // FreestandingActivity as small as is possible, since it will be used
321 // everywhere. So we use inheritance to provide the Wakeable interface: this
322 // makes it zero sized, and we make the inheritance private to prevent
323 // accidental casting.
324 class FreestandingActivity : public Activity, private Wakeable {
325 public:
MakeOwningWaker()326 Waker MakeOwningWaker() final {
327 Ref();
328 return Waker(this, 0);
329 }
330 Waker MakeNonOwningWaker() final;
331
Orphan()332 void Orphan() final {
333 Cancel();
334 Unref();
335 }
336
ForceImmediateRepoll(WakeupMask)337 void ForceImmediateRepoll(WakeupMask) final {
338 mu_.AssertHeld();
339 SetActionDuringRun(ActionDuringRun::kWakeup);
340 }
341
342 protected:
343 // Action received during a run, in priority order.
344 // If more than one action is received during a run, we use max() to resolve
345 // which one to report (so Cancel overrides Wakeup).
346 enum class ActionDuringRun : uint8_t {
347 kNone, // No action occured during run.
348 kWakeup, // A wakeup occured during run.
349 kCancel, // Cancel was called during run.
350 };
351
~FreestandingActivity()352 inline ~FreestandingActivity() override {
353 if (handle_) {
354 DropHandle();
355 }
356 }
357
358 // Check if we got an internal wakeup since the last time this function was
359 // called.
GotActionDuringRun()360 ActionDuringRun GotActionDuringRun() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
361 return std::exchange(action_during_run_, ActionDuringRun::kNone);
362 }
363
364 // Implementors of Wakeable::Wakeup should call this after the wakeup has
365 // completed.
WakeupComplete()366 void WakeupComplete() { Unref(); }
367
368 // Set the action that occured during this run.
369 // We use max to combine actions so that cancellation overrides wakeups.
SetActionDuringRun(ActionDuringRun action)370 void SetActionDuringRun(ActionDuringRun action)
371 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
372 action_during_run_ = std::max(action_during_run_, action);
373 }
374
mu()375 Mutex* mu() ABSL_LOCK_RETURNED(mu_) { return &mu_; }
376
ActivityDebugTag(WakeupMask)377 std::string ActivityDebugTag(WakeupMask) const override { return DebugTag(); }
378
379 private:
380 class Handle;
381
382 // Cancel execution of the underlying promise.
383 virtual void Cancel() = 0;
384
Ref()385 void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); }
Unref()386 void Unref() {
387 if (1 == refs_.fetch_sub(1, std::memory_order_acq_rel)) {
388 delete this;
389 }
390 }
391
392 // Return a Handle instance with a ref so that it can be stored waiting for
393 // some wakeup.
394 Handle* RefHandle() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
395 // If our refcount is non-zero, ref and return true.
396 // Otherwise, return false.
397 bool RefIfNonzero();
398 // Drop the (proved existing) wait handle.
399 void DropHandle() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
400
401 // All promise execution occurs under this mutex.
402 Mutex mu_;
403
404 // Current refcount.
405 std::atomic<uint32_t> refs_{1};
406 // If wakeup is called during Promise polling, we set this to Wakeup and
407 // repoll. If cancel is called during Promise polling, we set this to Cancel
408 // and cancel at the end of polling.
409 ActionDuringRun action_during_run_ ABSL_GUARDED_BY(mu_) =
410 ActionDuringRun::kNone;
411 // Handle for long waits. Allows a very small weak pointer type object to
412 // queue for wakeups while Activity may be deleted earlier.
413 Handle* handle_ ABSL_GUARDED_BY(mu_) = nullptr;
414 };
415
416 // Implementation details for an Activity of an arbitrary type of promise.
417 // There should exist an inner template class `BoundScheduler` that provides
418 // the following interface:
419 // struct WakeupScheduler {
420 // template <typename ActivityType>
421 // class BoundScheduler {
422 // public:
423 // BoundScheduler(WakeupScheduler);
424 // void ScheduleWakeup();
425 // };
426 // };
427 // The ScheduleWakeup function should arrange that
428 // static_cast<ActivityType*>(this)->RunScheduledWakeup() be invoked at the
429 // earliest opportunity.
430 // It can assume that activity will remain live until RunScheduledWakeup() is
431 // invoked, and that a given activity will not be concurrently scheduled again
432 // until its RunScheduledWakeup() has been invoked.
433 // We use private inheritance here as a way of getting private members for each
434 // of the contexts.
435 // TODO(ctiller): We can probably reconsider the private inheritance here
436 // when we move away from C++11 and have more powerful template features.
437 template <class F, class WakeupScheduler, class OnDone, typename... Contexts>
438 class PromiseActivity final
439 : public FreestandingActivity,
440 public WakeupScheduler::template BoundScheduler<
441 PromiseActivity<F, WakeupScheduler, OnDone, Contexts...>>,
442 private ActivityContexts<Contexts...> {
443 public:
444 using Factory = OncePromiseFactory<void, F>;
445 using ResultType = typename Factory::Promise::Result;
446
PromiseActivity(F promise_factory,WakeupScheduler wakeup_scheduler,OnDone on_done,Contexts &&...contexts)447 PromiseActivity(F promise_factory, WakeupScheduler wakeup_scheduler,
448 OnDone on_done, Contexts&&... contexts)
449 : FreestandingActivity(),
450 WakeupScheduler::template BoundScheduler<PromiseActivity>(
451 std::move(wakeup_scheduler)),
452 ActivityContexts<Contexts...>(std::forward<Contexts>(contexts)...),
453 on_done_(std::move(on_done)) {
454 // Lock, construct an initial promise from the factory, and step it.
455 // This may hit a waiter, which could expose our this pointer to other
456 // threads, meaning we do need to hold this mutex even though we're still
457 // constructing.
458 mu()->Lock();
459 auto status = Start(Factory(std::move(promise_factory)));
460 mu()->Unlock();
461 // We may complete immediately.
462 if (status.has_value()) {
463 on_done_(std::move(*status));
464 }
465 }
466
~PromiseActivity()467 ~PromiseActivity() override {
468 // We shouldn't destruct without calling Cancel() first, and that must get
469 // us to be done_, so we assume that and have no logic to destruct the
470 // promise here.
471 GPR_ASSERT(done_);
472 }
473
RunScheduledWakeup()474 void RunScheduledWakeup() {
475 GPR_ASSERT(wakeup_scheduled_.exchange(false, std::memory_order_acq_rel));
476 Step();
477 WakeupComplete();
478 }
479
480 private:
481 using typename ActivityContexts<Contexts...>::ScopedContext;
482
Cancel()483 void Cancel() final {
484 if (Activity::is_current()) {
485 mu()->AssertHeld();
486 SetActionDuringRun(ActionDuringRun::kCancel);
487 return;
488 }
489 bool was_done;
490 {
491 MutexLock lock(mu());
492 // Check if we were done, and flag done.
493 was_done = done_;
494 if (!done_) {
495 ScopedActivity scoped_activity(this);
496 ScopedContext contexts(this);
497 MarkDone();
498 }
499 }
500 // If we were not done, then call the on_done callback.
501 if (!was_done) {
502 on_done_(absl::CancelledError());
503 }
504 }
505
506 // Wakeup this activity. Arrange to poll the activity again at a convenient
507 // time: this could be inline if it's deemed safe, or it could be by passing
508 // the activity to an external threadpool to run. If the activity is already
509 // running on this thread, a note is taken of such and the activity is
510 // repolled if it doesn't complete.
Wakeup(WakeupMask m)511 void Wakeup(WakeupMask m) final {
512 // If there is an active activity, but hey it's us, flag that and we'll loop
513 // in RunLoop (that's calling from above here!).
514 if (Activity::is_current()) {
515 mu()->AssertHeld();
516 SetActionDuringRun(ActionDuringRun::kWakeup);
517 WakeupComplete();
518 return;
519 }
520 WakeupAsync(m);
521 }
522
WakeupAsync(WakeupMask)523 void WakeupAsync(WakeupMask) final {
524 if (!wakeup_scheduled_.exchange(true, std::memory_order_acq_rel)) {
525 // Can't safely run, so ask to run later.
526 this->ScheduleWakeup();
527 } else {
528 // Already a wakeup scheduled for later, drop ref.
529 WakeupComplete();
530 }
531 }
532
533 // Drop a wakeup
Drop(WakeupMask)534 void Drop(WakeupMask) final { this->WakeupComplete(); }
535
536 // Notification that we're no longer executing - it's ok to destruct the
537 // promise.
MarkDone()538 void MarkDone() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) {
539 GPR_ASSERT(!std::exchange(done_, true));
540 ScopedContext contexts(this);
541 Destruct(&promise_holder_.promise);
542 }
543
544 // In response to Wakeup, run the Promise state machine again until it
545 // settles. Then check for completion, and if we have completed, call on_done.
Step()546 void Step() ABSL_LOCKS_EXCLUDED(mu()) {
547 // Poll the promise until things settle out under a lock.
548 mu()->Lock();
549 if (done_) {
550 // We might get some spurious wakeups after finishing.
551 mu()->Unlock();
552 return;
553 }
554 auto status = RunStep();
555 mu()->Unlock();
556 if (status.has_value()) {
557 on_done_(std::move(*status));
558 }
559 }
560
561 // The main body of a step: set the current activity, and any contexts, and
562 // then run the main polling loop. Contained in a function by itself in
563 // order to keep the scoping rules a little easier in Step().
RunStep()564 absl::optional<ResultType> RunStep() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) {
565 ScopedActivity scoped_activity(this);
566 ScopedContext contexts(this);
567 return StepLoop();
568 }
569
570 // Similarly to RunStep, but additionally construct the promise from a
571 // promise factory before entering the main loop. Called once from the
572 // constructor.
Start(Factory promise_factory)573 absl::optional<ResultType> Start(Factory promise_factory)
574 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) {
575 ScopedActivity scoped_activity(this);
576 ScopedContext contexts(this);
577 Construct(&promise_holder_.promise, promise_factory.Make());
578 return StepLoop();
579 }
580
581 // Until there are no wakeups from within and the promise is incomplete:
582 // poll the promise.
StepLoop()583 absl::optional<ResultType> StepLoop() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) {
584 GPR_ASSERT(is_current());
585 while (true) {
586 // Run the promise.
587 GPR_ASSERT(!done_);
588 auto r = promise_holder_.promise();
589 if (auto* status = r.value_if_ready()) {
590 // If complete, destroy the promise, flag done, and exit this loop.
591 MarkDone();
592 return IntoStatus(status);
593 }
594 // Continue looping til no wakeups occur.
595 switch (GotActionDuringRun()) {
596 case ActionDuringRun::kNone:
597 return {};
598 case ActionDuringRun::kWakeup:
599 break;
600 case ActionDuringRun::kCancel:
601 MarkDone();
602 return absl::CancelledError();
603 }
604 }
605 }
606
607 using Promise = typename Factory::Promise;
608 // Callback on completion of the promise.
609 GPR_NO_UNIQUE_ADDRESS OnDone on_done_;
610 // Has execution completed?
611 GPR_NO_UNIQUE_ADDRESS bool done_ ABSL_GUARDED_BY(mu()) = false;
612 // Is there a wakeup scheduled?
613 GPR_NO_UNIQUE_ADDRESS std::atomic<bool> wakeup_scheduled_{false};
614 // We wrap the promise in a union to allow control over the construction
615 // simultaneously with annotating mutex requirements and noting that the
616 // promise contained may not use any memory.
617 union PromiseHolder {
PromiseHolder()618 PromiseHolder() {}
~PromiseHolder()619 ~PromiseHolder() {}
620 GPR_NO_UNIQUE_ADDRESS Promise promise;
621 };
622 GPR_NO_UNIQUE_ADDRESS PromiseHolder promise_holder_ ABSL_GUARDED_BY(mu());
623 };
624
625 } // namespace promise_detail
626
627 // Given a functor that returns a promise (a promise factory), a callback for
628 // completion, and a callback scheduler, construct an activity.
629 template <typename Factory, typename WakeupScheduler, typename OnDone,
630 typename... Contexts>
MakeActivity(Factory promise_factory,WakeupScheduler wakeup_scheduler,OnDone on_done,Contexts &&...contexts)631 ActivityPtr MakeActivity(Factory promise_factory,
632 WakeupScheduler wakeup_scheduler, OnDone on_done,
633 Contexts&&... contexts) {
634 return ActivityPtr(
635 new promise_detail::PromiseActivity<Factory, WakeupScheduler, OnDone,
636 Contexts...>(
637 std::move(promise_factory), std::move(wakeup_scheduler),
638 std::move(on_done), std::forward<Contexts>(contexts)...));
639 }
640
pending()641 inline Pending IntraActivityWaiter::pending() {
642 wakeups_ |= GetContext<Activity>()->CurrentParticipant();
643 return Pending();
644 }
645
Wake()646 inline void IntraActivityWaiter::Wake() {
647 if (wakeups_ == 0) return;
648 GetContext<Activity>()->ForceImmediateRepoll(std::exchange(wakeups_, 0));
649 }
650
651 } // namespace grpc_core
652
653 #endif // GRPC_SRC_CORE_LIB_PROMISE_ACTIVITY_H
654