1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_JOIN_H
16 #define GRPC_SRC_CORE_LIB_PROMISE_JOIN_H
17
18 #include <grpc/support/port_platform.h>
19 #include <stdlib.h>
20
21 #include <tuple>
22
23 #include "absl/meta/type_traits.h"
24 #include "src/core/lib/promise/detail/join_state.h"
25 #include "src/core/lib/promise/detail/promise_factory.h"
26 #include "src/core/lib/promise/map.h"
27
28 namespace grpc_core {
29 namespace promise_detail {
30
31 // The Join promise combinator takes as inputs multiple promises.
32 // When the Join promise is polled, these input promises will be executed
33 // serially on the same thread.
34 // Each promise being executed either returns a value or Pending{}.
35 // Each subsequent execution of the Join will only execute the input promises
36 // which returned Pending{} in any of the previous executions. This mechanism
37 // ensures that no promise is executed after it resolves, which is an essential
38 // requirement.
39 //
40 // Suppose you have three promises
41 // 1. First promise returning type Poll<int>
42 // 2. Second promise returning type Poll<bool>
43 // 3. Third promise returning type Poll<double>
44 // Then you poll the Join of theses three promises, the result will have the
45 // type Poll<std::tuple<int, bool, double>>
46 //
47 // Polling this join promise will
48 // 1. Return Pending{} if even one promise in the input list of promises
49 // returns Pending{}
50 // 2. Return the tuple if all promises are resolved.
51 //
52 // All promises in the input list will be executed irrespective of failure
53 // status. If you want the promise execution to stop when there is a failure in
54 // any one promise, consider using TryJoin promise combinator instead of the
55 // Join combinator.
56 //
57 // Example of Join :
58 //
59 // {
60 // int execution_order = 0;
61 // auto first_promise = [&execution_order]() mutable -> Poll<int> {
62 // execution_order = (execution_order * 10) + 1;
63 // return 1;
64 // };
65 // auto second_promise = [&execution_order]() mutable -> Poll<bool> {
66 // execution_order = (execution_order * 10) + 2;
67 // return false;
68 // };
69 // auto third_promise = [&execution_order,
70 // once = false]() mutable -> Poll<StatusFlag> {
71 // execution_order = (execution_order * 10) + 3;
72 // if (once) return Success{};
73 // once = true;
74 // return Pending{};
75 // };
76 //
77 // auto join_1_2_3 = Join(first_promise, second_promise, third_promise);
78 //
79 // using JoinTuple = std::tuple<int, bool, StatusFlag>;
80 // Poll<JoinTuple> first_execution = join_1_2_3();
81 // EXPECT_FALSE(first_execution.ready());
82 //
83 // Poll<JoinTuple> second_execution = join_1_2_3();
84 // EXPECT_TRUE(second_execution.ready());
85 //
86 // JoinTuple& tuple = *(second_execution.value_if_ready());
87 // EXPECT_EQ(get<0>(tuple), 1);
88 // EXPECT_EQ(get<1>(tuple), false);
89 // EXPECT_EQ(get<2>(tuple), Success{});
90 //
91 // EXPECT_EQ(execution_order, 1233); // Check the order of execution.
92 // }
93
94 struct JoinTraits {
95 template <typename T>
96 using ResultType = absl::remove_reference_t<T>;
97 template <typename T>
IsOkJoinTraits98 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static bool IsOk(const T&) {
99 return true;
100 }
101 template <typename T>
UnwrappedJoinTraits102 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static T Unwrapped(T x) {
103 return x;
104 }
105 template <typename R, typename T>
EarlyReturnJoinTraits106 static R EarlyReturn(T) {
107 abort();
108 }
109 template <typename... A>
FinalReturnJoinTraits110 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static std::tuple<A...> FinalReturn(
111 A... a) {
112 return std::make_tuple(std::move(a)...);
113 }
114 };
115
116 template <typename... Promises>
117 class Join {
118 public:
Join(Promises...promises)119 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION explicit Join(Promises... promises)
120 : state_(std::move(promises)...) {}
operator()121 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION auto operator()() {
122 return state_.PollOnce();
123 }
124
125 private:
126 JoinState<JoinTraits, Promises...> state_;
127 };
128
129 struct WrapInTuple {
130 template <typename T>
operatorWrapInTuple131 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION std::tuple<T> operator()(T x) {
132 return std::make_tuple(std::move(x));
133 }
134 };
135
136 } // namespace promise_detail
137
138 template <typename... Promise>
139 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline promise_detail::Join<Promise...>
Join(Promise...promises)140 Join(Promise... promises) {
141 return promise_detail::Join<Promise...>(std::move(promises)...);
142 }
143
144 template <typename F>
Join(F promise)145 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline auto Join(F promise) {
146 return Map(std::move(promise), promise_detail::WrapInTuple{});
147 }
148
149 template <typename Iter, typename FactoryFn>
JoinIter(Iter begin,Iter end,FactoryFn factory_fn)150 inline auto JoinIter(Iter begin, Iter end, FactoryFn factory_fn) {
151 using Factory =
152 promise_detail::RepeatedPromiseFactory<decltype(*begin), FactoryFn>;
153 Factory factory(std::move(factory_fn));
154 using Promise = typename Factory::Promise;
155 using Result = typename Promise::Result;
156 using State = absl::variant<Promise, Result>;
157 std::vector<State> state;
158 for (Iter it = begin; it != end; ++it) {
159 state.emplace_back(factory.Make(*it));
160 }
161 return [state = std::move(state)]() mutable -> Poll<std::vector<Result>> {
162 bool still_working = false;
163 for (auto& s : state) {
164 if (auto* promise = absl::get_if<Promise>(&s)) {
165 auto p = (*promise)();
166 if (auto* r = p.value_if_ready()) {
167 s.template emplace<Result>(std::move(*r));
168 } else {
169 still_working = true;
170 }
171 }
172 }
173 if (!still_working) {
174 std::vector<Result> output;
175 for (auto& s : state) {
176 output.emplace_back(std::move(absl::get<Result>(s)));
177 }
178 return output;
179 }
180 return Pending{};
181 };
182 }
183
184 } // namespace grpc_core
185
186 #endif // GRPC_SRC_CORE_LIB_PROMISE_JOIN_H
187