// Copyright 2021 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #ifndef GRPC_SRC_CORE_LIB_PROMISE_JOIN_H #define GRPC_SRC_CORE_LIB_PROMISE_JOIN_H #include #include #include #include "absl/meta/type_traits.h" #include "src/core/lib/promise/detail/join_state.h" #include "src/core/lib/promise/detail/promise_factory.h" #include "src/core/lib/promise/map.h" namespace grpc_core { namespace promise_detail { // The Join promise combinator takes as inputs multiple promises. // When the Join promise is polled, these input promises will be executed // serially on the same thread. // Each promise being executed either returns a value or Pending{}. // Each subsequent execution of the Join will only execute the input promises // which returned Pending{} in any of the previous executions. This mechanism // ensures that no promise is executed after it resolves, which is an essential // requirement. // // Suppose you have three promises // 1. First promise returning type Poll // 2. Second promise returning type Poll // 3. Third promise returning type Poll // Then you poll the Join of theses three promises, the result will have the // type Poll> // // Polling this join promise will // 1. Return Pending{} if even one promise in the input list of promises // returns Pending{} // 2. Return the tuple if all promises are resolved. // // All promises in the input list will be executed irrespective of failure // status. If you want the promise execution to stop when there is a failure in // any one promise, consider using TryJoin promise combinator instead of the // Join combinator. // // Example of Join : // // { // int execution_order = 0; // auto first_promise = [&execution_order]() mutable -> Poll { // execution_order = (execution_order * 10) + 1; // return 1; // }; // auto second_promise = [&execution_order]() mutable -> Poll { // execution_order = (execution_order * 10) + 2; // return false; // }; // auto third_promise = [&execution_order, // once = false]() mutable -> Poll { // execution_order = (execution_order * 10) + 3; // if (once) return Success{}; // once = true; // return Pending{}; // }; // // auto join_1_2_3 = Join(first_promise, second_promise, third_promise); // // using JoinTuple = std::tuple; // Poll first_execution = join_1_2_3(); // EXPECT_FALSE(first_execution.ready()); // // Poll second_execution = join_1_2_3(); // EXPECT_TRUE(second_execution.ready()); // // JoinTuple& tuple = *(second_execution.value_if_ready()); // EXPECT_EQ(get<0>(tuple), 1); // EXPECT_EQ(get<1>(tuple), false); // EXPECT_EQ(get<2>(tuple), Success{}); // // EXPECT_EQ(execution_order, 1233); // Check the order of execution. // } struct JoinTraits { template using ResultType = absl::remove_reference_t; template GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static bool IsOk(const T&) { return true; } template GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static T Unwrapped(T x) { return x; } template static R EarlyReturn(T) { abort(); } template GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static std::tuple FinalReturn( A... a) { return std::make_tuple(std::move(a)...); } }; template class Join { public: GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION explicit Join(Promises... promises) : state_(std::move(promises)...) {} GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION auto operator()() { return state_.PollOnce(); } private: JoinState state_; }; struct WrapInTuple { template GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION std::tuple operator()(T x) { return std::make_tuple(std::move(x)); } }; } // namespace promise_detail template GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline promise_detail::Join Join(Promise... promises) { return promise_detail::Join(std::move(promises)...); } template GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline auto Join(F promise) { return Map(std::move(promise), promise_detail::WrapInTuple{}); } template inline auto JoinIter(Iter begin, Iter end, FactoryFn factory_fn) { using Factory = promise_detail::RepeatedPromiseFactory; Factory factory(std::move(factory_fn)); using Promise = typename Factory::Promise; using Result = typename Promise::Result; using State = absl::variant; std::vector state; for (Iter it = begin; it != end; ++it) { state.emplace_back(factory.Make(*it)); } return [state = std::move(state)]() mutable -> Poll> { bool still_working = false; for (auto& s : state) { if (auto* promise = absl::get_if(&s)) { auto p = (*promise)(); if (auto* r = p.value_if_ready()) { s.template emplace(std::move(*r)); } else { still_working = true; } } } if (!still_working) { std::vector output; for (auto& s : state) { output.emplace_back(std::move(absl::get(s))); } return output; } return Pending{}; }; } } // namespace grpc_core #endif // GRPC_SRC_CORE_LIB_PROMISE_JOIN_H