• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_JOIN_H
16 #define GRPC_SRC_CORE_LIB_PROMISE_JOIN_H
17 
18 #include <grpc/support/port_platform.h>
19 #include <stdlib.h>
20 
21 #include <tuple>
22 
23 #include "absl/meta/type_traits.h"
24 #include "src/core/lib/promise/detail/join_state.h"
25 #include "src/core/lib/promise/detail/promise_factory.h"
26 #include "src/core/lib/promise/map.h"
27 
28 namespace grpc_core {
29 namespace promise_detail {
30 
31 // The Join promise combinator takes as inputs multiple promises.
32 // When the Join promise is polled, these input promises will be executed
33 // serially on the same thread.
34 // Each promise being executed either returns a value or Pending{}.
35 // Each subsequent execution of the Join will only execute the input promises
36 // which returned Pending{} in any of the previous executions. This mechanism
37 // ensures that no promise is executed after it resolves, which is an essential
38 // requirement.
39 //
40 // Suppose you have three promises
41 // 1.  First promise returning type Poll<int>
42 // 2.  Second promise returning type Poll<bool>
43 // 3.  Third promise returning type Poll<double>
44 // Then you poll the Join of theses three promises, the result will have the
45 // type Poll<std::tuple<int, bool, double>>
46 //
47 // Polling this join promise will
48 // 1.  Return Pending{} if even one promise in the input list of promises
49 // returns Pending{}
50 // 2.  Return the tuple if all promises are resolved.
51 //
52 // All promises in the input list will be executed irrespective of failure
53 // status. If you want the promise execution to stop when there is a failure in
54 // any one promise, consider using TryJoin promise combinator instead of the
55 // Join combinator.
56 //
57 // Example of Join :
58 //
59 // {
60 //   int execution_order = 0;
61 //   auto first_promise = [&execution_order]() mutable -> Poll<int> {
62 //     execution_order = (execution_order * 10) + 1;
63 //     return 1;
64 //   };
65 //   auto second_promise = [&execution_order]() mutable -> Poll<bool> {
66 //     execution_order = (execution_order * 10) + 2;
67 //     return false;
68 //   };
69 //   auto third_promise = [&execution_order,
70 //                         once = false]() mutable -> Poll<StatusFlag> {
71 //     execution_order = (execution_order * 10) + 3;
72 //     if (once) return Success{};
73 //     once = true;
74 //     return Pending{};
75 //   };
76 //
77 //   auto join_1_2_3 = Join(first_promise, second_promise, third_promise);
78 //
79 //   using JoinTuple = std::tuple<int, bool, StatusFlag>;
80 //   Poll<JoinTuple> first_execution = join_1_2_3();
81 //   EXPECT_FALSE(first_execution.ready());
82 //
83 //   Poll<JoinTuple> second_execution = join_1_2_3();
84 //   EXPECT_TRUE(second_execution.ready());
85 //
86 //   JoinTuple& tuple = *(second_execution.value_if_ready());
87 //   EXPECT_EQ(get<0>(tuple), 1);
88 //   EXPECT_EQ(get<1>(tuple), false);
89 //   EXPECT_EQ(get<2>(tuple), Success{});
90 //
91 //   EXPECT_EQ(execution_order, 1233);  // Check the order of execution.
92 // }
93 
94 struct JoinTraits {
95   template <typename T>
96   using ResultType = absl::remove_reference_t<T>;
97   template <typename T>
IsOkJoinTraits98   GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static bool IsOk(const T&) {
99     return true;
100   }
101   template <typename T>
UnwrappedJoinTraits102   GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static T Unwrapped(T x) {
103     return x;
104   }
105   template <typename R, typename T>
EarlyReturnJoinTraits106   static R EarlyReturn(T) {
107     abort();
108   }
109   template <typename... A>
FinalReturnJoinTraits110   GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static std::tuple<A...> FinalReturn(
111       A... a) {
112     return std::make_tuple(std::move(a)...);
113   }
114 };
115 
116 template <typename... Promises>
117 class Join {
118  public:
Join(Promises...promises)119   GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION explicit Join(Promises... promises)
120       : state_(std::move(promises)...) {}
operator()121   GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION auto operator()() {
122     return state_.PollOnce();
123   }
124 
125  private:
126   JoinState<JoinTraits, Promises...> state_;
127 };
128 
129 struct WrapInTuple {
130   template <typename T>
operatorWrapInTuple131   GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION std::tuple<T> operator()(T x) {
132     return std::make_tuple(std::move(x));
133   }
134 };
135 
136 }  // namespace promise_detail
137 
138 template <typename... Promise>
139 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline promise_detail::Join<Promise...>
Join(Promise...promises)140 Join(Promise... promises) {
141   return promise_detail::Join<Promise...>(std::move(promises)...);
142 }
143 
144 template <typename F>
Join(F promise)145 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline auto Join(F promise) {
146   return Map(std::move(promise), promise_detail::WrapInTuple{});
147 }
148 
149 template <typename Iter, typename FactoryFn>
JoinIter(Iter begin,Iter end,FactoryFn factory_fn)150 inline auto JoinIter(Iter begin, Iter end, FactoryFn factory_fn) {
151   using Factory =
152       promise_detail::RepeatedPromiseFactory<decltype(*begin), FactoryFn>;
153   Factory factory(std::move(factory_fn));
154   using Promise = typename Factory::Promise;
155   using Result = typename Promise::Result;
156   using State = absl::variant<Promise, Result>;
157   std::vector<State> state;
158   for (Iter it = begin; it != end; ++it) {
159     state.emplace_back(factory.Make(*it));
160   }
161   return [state = std::move(state)]() mutable -> Poll<std::vector<Result>> {
162     bool still_working = false;
163     for (auto& s : state) {
164       if (auto* promise = absl::get_if<Promise>(&s)) {
165         auto p = (*promise)();
166         if (auto* r = p.value_if_ready()) {
167           s.template emplace<Result>(std::move(*r));
168         } else {
169           still_working = true;
170         }
171       }
172     }
173     if (!still_working) {
174       std::vector<Result> output;
175       for (auto& s : state) {
176         output.emplace_back(std::move(absl::get<Result>(s)));
177       }
178       return output;
179     }
180     return Pending{};
181   };
182 }
183 
184 }  // namespace grpc_core
185 
186 #endif  // GRPC_SRC_CORE_LIB_PROMISE_JOIN_H
187