1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_ACTIVITY_H
16 #define GRPC_SRC_CORE_LIB_PROMISE_ACTIVITY_H
17
18 #include <grpc/support/port_platform.h>
19 #include <stdint.h>
20
21 #include <algorithm>
22 #include <atomic>
23 #include <memory>
24 #include <string>
25 #include <utility>
26
27 #include "absl/base/thread_annotations.h"
28 #include "absl/log/check.h"
29 #include "absl/status/status.h"
30 #include "absl/strings/str_cat.h"
31 #include "absl/types/optional.h"
32 #include "src/core/lib/debug/trace.h"
33 #include "src/core/lib/promise/context.h"
34 #include "src/core/lib/promise/detail/promise_factory.h"
35 #include "src/core/lib/promise/detail/status.h"
36 #include "src/core/lib/promise/poll.h"
37 #include "src/core/util/construct_destruct.h"
38 #include "src/core/util/dump_args.h"
39 #include "src/core/util/latent_see.h"
40 #include "src/core/util/no_destruct.h"
41 #include "src/core/util/orphanable.h"
42 #include "src/core/util/sync.h"
43
44 namespace grpc_core {
45
46 class Activity;
47
48 // WakeupMask is a bitfield representing which parts of an activity should be
49 // woken up.
50 using WakeupMask = uint16_t;
51
52 // A Wakeable object is used by queues to wake activities.
53 class Wakeable {
54 public:
55 // Wake up the underlying activity.
56 // After calling, this Wakeable cannot be used again.
57 // WakeupMask comes from the activity that created this Wakeable and specifies
58 // the set of promises that should be awoken.
59 virtual void Wakeup(WakeupMask wakeup_mask) = 0;
60 // Per Wakeup, but guarantee that the activity will be woken up out-of-line.
61 // Useful if there may be mutexes or the like held by the current thread.
62 virtual void WakeupAsync(WakeupMask wakeup_mask) = 0;
63 // Drop this wakeable without waking up the underlying activity.
64 virtual void Drop(WakeupMask wakeup_mask) = 0;
65
66 // Return the underlying activity debug tag, or "<unknown>" if not available.
67 virtual std::string ActivityDebugTag(WakeupMask wakeup_mask) const = 0;
68
69 protected:
~Wakeable()70 inline ~Wakeable() {}
71 };
72
73 namespace promise_detail {
74 struct Unwakeable final : public Wakeable {
Wakeupfinal75 void Wakeup(WakeupMask) override {}
WakeupAsyncfinal76 void WakeupAsync(WakeupMask) override {}
Dropfinal77 void Drop(WakeupMask) override {}
78 std::string ActivityDebugTag(WakeupMask) const override;
79 };
unwakeable()80 static Unwakeable* unwakeable() {
81 return NoDestructSingleton<Unwakeable>::Get();
82 }
83 } // namespace promise_detail
84
85 // An owning reference to a Wakeable.
86 // This type is non-copyable but movable.
87 class Waker {
88 public:
Waker(Wakeable * wakeable,WakeupMask wakeup_mask)89 Waker(Wakeable* wakeable, WakeupMask wakeup_mask)
90 : wakeable_and_arg_{wakeable, wakeup_mask} {}
Waker()91 Waker() : Waker(promise_detail::unwakeable(), 0) {}
~Waker()92 ~Waker() { wakeable_and_arg_.Drop(); }
93 Waker(const Waker&) = delete;
94 Waker& operator=(const Waker&) = delete;
Waker(Waker && other)95 Waker(Waker&& other) noexcept : wakeable_and_arg_(other.Take()) {}
96 Waker& operator=(Waker&& other) noexcept {
97 std::swap(wakeable_and_arg_, other.wakeable_and_arg_);
98 return *this;
99 }
100
101 // Wake the underlying activity.
Wakeup()102 void Wakeup() { Take().Wakeup(); }
103
WakeupAsync()104 void WakeupAsync() { Take().WakeupAsync(); }
105
106 template <typename H>
AbslHashValue(H h,const Waker & w)107 friend H AbslHashValue(H h, const Waker& w) {
108 return H::combine(H::combine(std::move(h), w.wakeable_and_arg_.wakeable),
109 w.wakeable_and_arg_.wakeup_mask);
110 }
111
112 bool operator==(const Waker& other) const noexcept {
113 return wakeable_and_arg_ == other.wakeable_and_arg_;
114 }
115
116 bool operator!=(const Waker& other) const noexcept {
117 return !operator==(other);
118 }
119
ActivityDebugTag()120 std::string ActivityDebugTag() {
121 return wakeable_and_arg_.ActivityDebugTag();
122 }
123
DebugString()124 std::string DebugString() const {
125 return absl::StrFormat("Waker{%p, %d}", wakeable_and_arg_.wakeable,
126 wakeable_and_arg_.wakeup_mask);
127 }
128
129 template <typename Sink>
AbslStringify(Sink & sink,const Waker & waker)130 friend void AbslStringify(Sink& sink, const Waker& waker) {
131 sink.Append(waker.DebugString());
132 }
133
134 // This is for tests to assert that a waker is occupied or not.
is_unwakeable()135 bool is_unwakeable() const {
136 return wakeable_and_arg_.wakeable == promise_detail::unwakeable();
137 }
138
139 private:
140 struct WakeableAndArg {
141 Wakeable* wakeable;
142 WakeupMask wakeup_mask;
143
WakeupWakeableAndArg144 void Wakeup() { wakeable->Wakeup(wakeup_mask); }
WakeupAsyncWakeableAndArg145 void WakeupAsync() { wakeable->WakeupAsync(wakeup_mask); }
DropWakeableAndArg146 void Drop() { wakeable->Drop(wakeup_mask); }
ActivityDebugTagWakeableAndArg147 std::string ActivityDebugTag() const {
148 return wakeable == nullptr ? "<unknown>"
149 : wakeable->ActivityDebugTag(wakeup_mask);
150 }
151 bool operator==(const WakeableAndArg& other) const noexcept {
152 return wakeable == other.wakeable && wakeup_mask == other.wakeup_mask;
153 }
154 };
155
Take()156 WakeableAndArg Take() {
157 return std::exchange(wakeable_and_arg_, {promise_detail::unwakeable(), 0});
158 }
159
160 WakeableAndArg wakeable_and_arg_;
161 };
162
163 // Helper type to track wakeups between objects in the same activity.
164 // Can be fairly fast as no ref counting or locking needs to occur.
165 class IntraActivityWaiter {
166 public:
167 // Register for wakeup, return Pending(). If state is not ready to proceed,
168 // Promises should bottom out here.
169 Pending pending();
170 // Wake the activity
171 void Wake();
172
173 std::string DebugString() const;
174
175 template <typename Sink>
AbslStringify(Sink & sink,const IntraActivityWaiter & waker)176 friend void AbslStringify(Sink& sink, const IntraActivityWaiter& waker) {
177 sink.Append(waker.DebugString());
178 }
179
180 private:
181 WakeupMask wakeups_ = 0;
182 };
183
184 // An Activity tracks execution of a single promise.
185 // It executes the promise under a mutex.
186 // When the promise stalls, it registers the containing activity to be woken up
187 // later.
188 // The activity takes a callback, which will be called exactly once with the
189 // result of execution.
190 // Activity execution may be cancelled by simply deleting the activity. In such
191 // a case, if execution had not already finished, the done callback would be
192 // called with absl::CancelledError().
193 class Activity : public Orphanable {
194 public:
195 // Force wakeup from the outside.
196 // This should be rarely needed, and usages should be accompanied with a note
197 // on why it's not possible to wakeup with a Waker object.
198 // Nevertheless, it's sometimes useful for integrations with Activity to force
199 // an Activity to repoll.
ForceWakeup()200 void ForceWakeup() { MakeOwningWaker().Wakeup(); }
201
202 // Force the current activity to immediately repoll if it doesn't complete.
203 virtual void ForceImmediateRepoll(WakeupMask mask) = 0;
204 // Legacy version of ForceImmediateRepoll() that uses the current participant.
205 // Will go away once Party gets merged with Activity. New usage is banned.
ForceImmediateRepoll()206 void ForceImmediateRepoll() { ForceImmediateRepoll(CurrentParticipant()); }
207
208 // Return the current part of the activity as a bitmask
CurrentParticipant()209 virtual WakeupMask CurrentParticipant() const { return 1; }
210
211 // Return the current activity.
212 // Additionally:
213 // - assert that there is a current activity (and catch bugs if there's not)
214 // - indicate to thread safety analysis that the current activity is indeed
215 // locked
216 // - back up that assertion with a runtime check in debug builds (it's
217 // prohibitively expensive in non-debug builds)
current()218 static Activity* current() { return current_ref(); }
219
220 // Produce an activity-owning Waker. The produced waker will keep the activity
221 // alive until it's awoken or dropped.
222 virtual Waker MakeOwningWaker() = 0;
223
224 // Produce a non-owning Waker. The waker will own a small heap allocated weak
225 // pointer to this activity. This is more suitable for wakeups that may not be
226 // delivered until long after the activity should be destroyed.
227 virtual Waker MakeNonOwningWaker() = 0;
228
229 // Some descriptive text to add to log messages to identify this activity.
230 virtual std::string DebugTag() const;
231
232 protected:
233 // Check if this activity is the current activity executing on the current
234 // thread.
is_current()235 bool is_current() const { return this == current(); }
236 // Check if there is an activity executing on the current thread.
have_current()237 static bool have_current() { return current() != nullptr; }
238 // Set the current activity at construction, clean it up at destruction.
239 class ScopedActivity {
240 public:
ScopedActivity(Activity * activity)241 explicit ScopedActivity(Activity* activity) : prior_activity_(current()) {
242 current_ref() = activity;
243 }
~ScopedActivity()244 ~ScopedActivity() { current_ref() = prior_activity_; }
245 ScopedActivity(const ScopedActivity&) = delete;
246 ScopedActivity& operator=(const ScopedActivity&) = delete;
247
248 private:
249 Activity* const prior_activity_;
250 };
251
252 private:
current_ref()253 static Activity*& current_ref() {
254 #if !defined(_WIN32) || !defined(_DLL)
255 return g_current_activity_;
256 #else
257 // Set during RunLoop to the Activity that's executing.
258 // Being set implies that mu_ is held.
259 static thread_local Activity* current_activity;
260 return current_activity;
261 #endif
262 }
263 #if !defined(_WIN32) || !defined(_DLL)
264 // Set during RunLoop to the Activity that's executing.
265 // Being set implies that mu_ is held.
266 static thread_local Activity* g_current_activity_;
267 #endif
268 };
269
270 // Owned pointer to one Activity.
271 using ActivityPtr = OrphanablePtr<Activity>;
272
273 namespace promise_detail {
274
275 template <typename Context>
276 class ContextHolder {
277 public:
278 using ContextType = Context;
279
ContextHolder(Context value)280 explicit ContextHolder(Context value) : value_(std::move(value)) {}
GetContext()281 Context* GetContext() { return &value_; }
282
283 private:
284 Context value_;
285 };
286
287 template <typename Context>
288 class ContextHolder<Context*> {
289 public:
290 using ContextType = Context;
291
ContextHolder(Context * value)292 explicit ContextHolder(Context* value) : value_(value) {}
GetContext()293 Context* GetContext() { return value_; }
294
295 private:
296 Context* value_;
297 };
298
299 template <typename Context, typename Deleter>
300 class ContextHolder<std::unique_ptr<Context, Deleter>> {
301 public:
302 using ContextType = Context;
303
ContextHolder(std::unique_ptr<Context,Deleter> value)304 explicit ContextHolder(std::unique_ptr<Context, Deleter> value)
305 : value_(std::move(value)) {}
GetContext()306 Context* GetContext() { return value_.get(); }
307
308 private:
309 std::unique_ptr<Context, Deleter> value_;
310 };
311
312 template <typename Context>
313 class ContextHolder<RefCountedPtr<Context>> {
314 public:
315 using ContextType = Context;
316
ContextHolder(RefCountedPtr<Context> value)317 explicit ContextHolder(RefCountedPtr<Context> value)
318 : value_(std::move(value)) {}
GetContext()319 Context* GetContext() { return value_.get(); }
320
321 private:
322 RefCountedPtr<Context> value_;
323 };
324
325 template <>
326 class Context<Activity> {
327 public:
get()328 static Activity* get() { return Activity::current(); }
329 };
330
331 template <typename HeldContext>
332 using ContextTypeFromHeld = typename ContextHolder<
333 typename std::remove_reference<HeldContext>::type>::ContextType;
334
335 template <typename... Contexts>
336 class ActivityContexts
337 : public ContextHolder<typename std::remove_reference<Contexts>::type>... {
338 public:
339 explicit ActivityContexts(Contexts&&... contexts)
340 : ContextHolder<typename std::remove_reference<Contexts>::type>(
341 std::forward<Contexts>(contexts))... {}
342
343 class ScopedContext : public Context<ContextTypeFromHeld<Contexts>>... {
344 public:
345 explicit ScopedContext(ActivityContexts* contexts)
346 : Context<ContextTypeFromHeld<Contexts>>(
347 static_cast<ContextHolder<
348 typename std::remove_reference<Contexts>::type>*>(contexts)
349 ->GetContext())... {
350 // Silence `unused-but-set-parameter` in case of Contexts = {}
351 (void)contexts;
352 }
353 };
354 };
355
356 // A free standing activity: an activity that owns its own synchronization and
357 // memory.
358 // The alternative is an activity that's somehow tied into another system, for
359 // instance the type seen in promise_based_filter.h as we're transitioning from
360 // the old filter stack to the new system.
361 // FreestandingActivity is-a Wakeable, but needs to increment a refcount before
362 // returning that Wakeable interface. Additionally, we want to keep
363 // FreestandingActivity as small as is possible, since it will be used
364 // everywhere. So we use inheritance to provide the Wakeable interface: this
365 // makes it zero sized, and we make the inheritance private to prevent
366 // accidental casting.
367 class FreestandingActivity : public Activity, private Wakeable {
368 public:
MakeOwningWaker()369 Waker MakeOwningWaker() final {
370 Ref();
371 return Waker(this, 0);
372 }
373 Waker MakeNonOwningWaker() final;
374
Orphan()375 void Orphan() final {
376 Cancel();
377 Unref();
378 }
379
ForceImmediateRepoll(WakeupMask)380 void ForceImmediateRepoll(WakeupMask) final {
381 mu_.AssertHeld();
382 SetActionDuringRun(ActionDuringRun::kWakeup);
383 }
384
385 protected:
386 // Action received during a run, in priority order.
387 // If more than one action is received during a run, we use max() to resolve
388 // which one to report (so Cancel overrides Wakeup).
389 enum class ActionDuringRun : uint8_t {
390 kNone, // No action occurred during run.
391 kWakeup, // A wakeup occurred during run.
392 kCancel, // Cancel was called during run.
393 };
394
~FreestandingActivity()395 inline ~FreestandingActivity() override {
396 if (handle_) {
397 DropHandle();
398 }
399 }
400
401 // Check if we got an internal wakeup since the last time this function was
402 // called.
GotActionDuringRun()403 ActionDuringRun GotActionDuringRun() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
404 return std::exchange(action_during_run_, ActionDuringRun::kNone);
405 }
406
407 // Implementors of Wakeable::Wakeup should call this after the wakeup has
408 // completed.
WakeupComplete()409 void WakeupComplete() { Unref(); }
410
411 // Set the action that occurred during this run.
412 // We use max to combine actions so that cancellation overrides wakeups.
SetActionDuringRun(ActionDuringRun action)413 void SetActionDuringRun(ActionDuringRun action)
414 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
415 action_during_run_ = std::max(action_during_run_, action);
416 }
417
mu()418 Mutex* mu() ABSL_LOCK_RETURNED(mu_) { return &mu_; }
419
ActivityDebugTag(WakeupMask)420 std::string ActivityDebugTag(WakeupMask) const override { return DebugTag(); }
421
422 private:
423 class Handle;
424
425 // Cancel execution of the underlying promise.
426 virtual void Cancel() = 0;
427
Ref()428 void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); }
Unref()429 void Unref() {
430 if (1 == refs_.fetch_sub(1, std::memory_order_acq_rel)) {
431 delete this;
432 }
433 }
434
435 // Return a Handle instance with a ref so that it can be stored waiting for
436 // some wakeup.
437 Handle* RefHandle() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
438 // If our refcount is non-zero, ref and return true.
439 // Otherwise, return false.
440 bool RefIfNonzero();
441 // Drop the (proved existing) wait handle.
442 void DropHandle() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
443
444 // All promise execution occurs under this mutex.
445 Mutex mu_;
446
447 // Current refcount.
448 std::atomic<uint32_t> refs_{1};
449 // If wakeup is called during Promise polling, we set this to Wakeup and
450 // repoll. If cancel is called during Promise polling, we set this to Cancel
451 // and cancel at the end of polling.
452 ActionDuringRun action_during_run_ ABSL_GUARDED_BY(mu_) =
453 ActionDuringRun::kNone;
454 // Handle for long waits. Allows a very small weak pointer type object to
455 // queue for wakeups while Activity may be deleted earlier.
456 Handle* handle_ ABSL_GUARDED_BY(mu_) = nullptr;
457 };
458
459 // Implementation details for an Activity of an arbitrary type of promise.
460 // There should exist an inner template class `BoundScheduler` that provides
461 // the following interface:
462 // struct WakeupScheduler {
463 // template <typename ActivityType>
464 // class BoundScheduler {
465 // public:
466 // BoundScheduler(WakeupScheduler);
467 // void ScheduleWakeup();
468 // };
469 // };
470 // The ScheduleWakeup function should arrange that
471 // static_cast<ActivityType*>(this)->RunScheduledWakeup() be invoked at the
472 // earliest opportunity.
473 // It can assume that activity will remain live until RunScheduledWakeup() is
474 // invoked, and that a given activity will not be concurrently scheduled again
475 // until its RunScheduledWakeup() has been invoked.
476 // We use private inheritance here as a way of getting private members for each
477 // of the contexts.
478 // TODO(ctiller): We can probably reconsider the private inheritance here
479 // when we move away from C++11 and have more powerful template features.
480 template <class F, class WakeupScheduler, class OnDone, typename... Contexts>
481 class PromiseActivity final
482 : public FreestandingActivity,
483 public WakeupScheduler::template BoundScheduler<
484 PromiseActivity<F, WakeupScheduler, OnDone, Contexts...>>,
485 private ActivityContexts<Contexts...> {
486 public:
487 using Factory = OncePromiseFactory<void, F>;
488 using ResultType = typename Factory::Promise::Result;
489
PromiseActivity(F promise_factory,WakeupScheduler wakeup_scheduler,OnDone on_done,Contexts &&...contexts)490 PromiseActivity(F promise_factory, WakeupScheduler wakeup_scheduler,
491 OnDone on_done, Contexts&&... contexts)
492 : FreestandingActivity(),
493 WakeupScheduler::template BoundScheduler<PromiseActivity>(
494 std::move(wakeup_scheduler)),
495 ActivityContexts<Contexts...>(std::forward<Contexts>(contexts)...),
496 on_done_(std::move(on_done)) {
497 // Lock, construct an initial promise from the factory, and step it.
498 // This may hit a waiter, which could expose our this pointer to other
499 // threads, meaning we do need to hold this mutex even though we're still
500 // constructing.
501 mu()->Lock();
502 auto status = Start(Factory(std::move(promise_factory)));
503 mu()->Unlock();
504 // We may complete immediately.
505 if (status.has_value()) {
506 on_done_(std::move(*status));
507 }
508 }
509
~PromiseActivity()510 ~PromiseActivity() override {
511 // We shouldn't destruct without calling Cancel() first, and that must get
512 // us to be done_, so we assume that and have no logic to destruct the
513 // promise here.
514 CHECK(done_);
515 }
516
RunScheduledWakeup()517 void RunScheduledWakeup() {
518 CHECK(wakeup_scheduled_.exchange(false, std::memory_order_acq_rel));
519 Step();
520 WakeupComplete();
521 }
522
523 private:
524 using typename ActivityContexts<Contexts...>::ScopedContext;
525
Cancel()526 void Cancel() final {
527 if (Activity::is_current()) {
528 mu()->AssertHeld();
529 SetActionDuringRun(ActionDuringRun::kCancel);
530 return;
531 }
532 bool was_done;
533 {
534 MutexLock lock(mu());
535 // Check if we were done, and flag done.
536 was_done = done_;
537 if (!done_) {
538 ScopedActivity scoped_activity(this);
539 ScopedContext contexts(this);
540 MarkDone();
541 }
542 }
543 // If we were not done, then call the on_done callback.
544 if (!was_done) {
545 on_done_(absl::CancelledError());
546 }
547 }
548
549 // Wakeup this activity. Arrange to poll the activity again at a convenient
550 // time: this could be inline if it's deemed safe, or it could be by passing
551 // the activity to an external threadpool to run. If the activity is already
552 // running on this thread, a note is taken of such and the activity is
553 // repolled if it doesn't complete.
Wakeup(WakeupMask m)554 void Wakeup(WakeupMask m) final {
555 // If there is an active activity, but hey it's us, flag that and we'll loop
556 // in RunLoop (that's calling from above here!).
557 if (Activity::is_current()) {
558 mu()->AssertHeld();
559 SetActionDuringRun(ActionDuringRun::kWakeup);
560 WakeupComplete();
561 return;
562 }
563 WakeupAsync(m);
564 }
565
WakeupAsync(WakeupMask)566 void WakeupAsync(WakeupMask) final {
567 GRPC_LATENT_SEE_INNER_SCOPE("PromiseActivity::WakeupAsync");
568 wakeup_flow_.Begin(GRPC_LATENT_SEE_METADATA("Activity::Wakeup"));
569 if (!wakeup_scheduled_.exchange(true, std::memory_order_acq_rel)) {
570 // Can't safely run, so ask to run later.
571 this->ScheduleWakeup();
572 } else {
573 // Already a wakeup scheduled for later, drop ref.
574 WakeupComplete();
575 }
576 }
577
578 // Drop a wakeup
Drop(WakeupMask)579 void Drop(WakeupMask) final { this->WakeupComplete(); }
580
581 // Notification that we're no longer executing - it's ok to destruct the
582 // promise.
MarkDone()583 void MarkDone() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) {
584 CHECK(!std::exchange(done_, true));
585 ScopedContext contexts(this);
586 Destruct(&promise_holder_.promise);
587 }
588
589 // In response to Wakeup, run the Promise state machine again until it
590 // settles. Then check for completion, and if we have completed, call on_done.
Step()591 void Step() ABSL_LOCKS_EXCLUDED(mu()) {
592 GRPC_LATENT_SEE_PARENT_SCOPE("PromiseActivity::Step");
593 wakeup_flow_.End();
594 // Poll the promise until things settle out under a lock.
595 mu()->Lock();
596 if (done_) {
597 // We might get some spurious wakeups after finishing.
598 mu()->Unlock();
599 return;
600 }
601 auto status = RunStep();
602 mu()->Unlock();
603 if (status.has_value()) {
604 on_done_(std::move(*status));
605 }
606 }
607
608 // The main body of a step: set the current activity, and any contexts, and
609 // then run the main polling loop. Contained in a function by itself in
610 // order to keep the scoping rules a little easier in Step().
RunStep()611 absl::optional<ResultType> RunStep() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) {
612 ScopedActivity scoped_activity(this);
613 ScopedContext contexts(this);
614 return StepLoop();
615 }
616
617 // Similarly to RunStep, but additionally construct the promise from a
618 // promise factory before entering the main loop. Called once from the
619 // constructor.
Start(Factory promise_factory)620 absl::optional<ResultType> Start(Factory promise_factory)
621 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) {
622 ScopedActivity scoped_activity(this);
623 ScopedContext contexts(this);
624 Construct(&promise_holder_.promise, promise_factory.Make());
625 return StepLoop();
626 }
627
628 // Until there are no wakeups from within and the promise is incomplete:
629 // poll the promise.
StepLoop()630 absl::optional<ResultType> StepLoop() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) {
631 CHECK(is_current());
632 while (true) {
633 // Run the promise.
634 CHECK(!done_);
635 auto r = promise_holder_.promise();
636 if (auto* status = r.value_if_ready()) {
637 // If complete, destroy the promise, flag done, and exit this loop.
638 MarkDone();
639 return IntoStatus(status);
640 }
641 // Continue looping til no wakeups occur.
642 switch (GotActionDuringRun()) {
643 case ActionDuringRun::kNone:
644 return {};
645 case ActionDuringRun::kWakeup:
646 break;
647 case ActionDuringRun::kCancel:
648 MarkDone();
649 return absl::CancelledError();
650 }
651 }
652 }
653
654 using Promise = typename Factory::Promise;
655 // Callback on completion of the promise.
656 GPR_NO_UNIQUE_ADDRESS OnDone on_done_;
657 // Has execution completed?
658 GPR_NO_UNIQUE_ADDRESS bool done_ ABSL_GUARDED_BY(mu()) = false;
659 // Is there a wakeup scheduled?
660 GPR_NO_UNIQUE_ADDRESS std::atomic<bool> wakeup_scheduled_{false};
661 // We wrap the promise in a union to allow control over the construction
662 // simultaneously with annotating mutex requirements and noting that the
663 // promise contained may not use any memory.
664 union PromiseHolder {
PromiseHolder()665 PromiseHolder() {}
~PromiseHolder()666 ~PromiseHolder() {}
667 GPR_NO_UNIQUE_ADDRESS Promise promise;
668 };
669 GPR_NO_UNIQUE_ADDRESS PromiseHolder promise_holder_ ABSL_GUARDED_BY(mu());
670 GPR_NO_UNIQUE_ADDRESS latent_see::Flow wakeup_flow_;
671 };
672
673 } // namespace promise_detail
674
675 // Given a functor that returns a promise (a promise factory), a callback for
676 // completion, and a callback scheduler, construct an activity.
677 template <typename Factory, typename WakeupScheduler, typename OnDone,
678 typename... Contexts>
MakeActivity(Factory promise_factory,WakeupScheduler wakeup_scheduler,OnDone on_done,Contexts &&...contexts)679 ActivityPtr MakeActivity(Factory promise_factory,
680 WakeupScheduler wakeup_scheduler, OnDone on_done,
681 Contexts&&... contexts) {
682 return ActivityPtr(
683 new promise_detail::PromiseActivity<Factory, WakeupScheduler, OnDone,
684 Contexts...>(
685 std::move(promise_factory), std::move(wakeup_scheduler),
686 std::move(on_done), std::forward<Contexts>(contexts)...));
687 }
688
pending()689 inline Pending IntraActivityWaiter::pending() {
690 const auto new_wakeups = GetContext<Activity>()->CurrentParticipant();
691 GRPC_TRACE_LOG(promise_primitives, INFO)
692 << "IntraActivityWaiter::pending: "
693 << GRPC_DUMP_ARGS(this, new_wakeups, wakeups_);
694 wakeups_ |= new_wakeups;
695 return Pending();
696 }
697
Wake()698 inline void IntraActivityWaiter::Wake() {
699 if (wakeups_ == 0) return;
700 GetContext<Activity>()->ForceImmediateRepoll(std::exchange(wakeups_, 0));
701 }
702
703 } // namespace grpc_core
704
705 #endif // GRPC_SRC_CORE_LIB_PROMISE_ACTIVITY_H
706