• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_FOR_EACH_H
16 #define GRPC_SRC_CORE_LIB_PROMISE_FOR_EACH_H
17 
18 #include <grpc/support/port_platform.h>
19 #include <stdint.h>
20 
21 #include <string>
22 #include <utility>
23 
24 #include "absl/log/check.h"
25 #include "absl/log/log.h"
26 #include "absl/status/status.h"
27 #include "absl/strings/str_cat.h"
28 #include "src/core/lib/debug/trace.h"
29 #include "src/core/lib/promise/activity.h"
30 #include "src/core/lib/promise/detail/promise_factory.h"
31 #include "src/core/lib/promise/detail/status.h"
32 #include "src/core/lib/promise/poll.h"
33 #include "src/core/lib/promise/status_flag.h"
34 #include "src/core/util/construct_destruct.h"
35 
36 namespace grpc_core {
37 
38 namespace for_each_detail {
39 
40 // Done creates statuses for the end of the iteration. It's templated on the
41 // type of the result of the ForEach loop, so that we can introduce new types
42 // easily.
43 template <typename T>
44 struct Done;
45 
46 template <>
47 struct Done<absl::Status> {
48   GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static absl::Status Make(
49       bool cancelled) {
50     return cancelled ? absl::CancelledError() : absl::OkStatus();
51   }
52 };
53 
54 template <>
55 struct Done<StatusFlag> {
56   GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static StatusFlag Make(bool cancelled) {
57     return StatusFlag(!cancelled);
58   }
59 };
60 
61 template <typename T, typename SfinaeVoid = void>
62 struct NextValueTraits;
63 
64 enum class NextValueType {
65   kValue,
66   kEndOfStream,
67   kError,
68 };
69 
70 template <typename T>
71 struct NextValueTraits<T, absl::void_t<typename T::value_type>> {
72   using Value = typename T::value_type;
73 
74   GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static NextValueType Type(const T& t) {
75     if (t.has_value()) return NextValueType::kValue;
76     if (t.cancelled()) return NextValueType::kError;
77     return NextValueType::kEndOfStream;
78   }
79 
80   GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static Value&& TakeValue(T& t) {
81     return std::move(*t);
82   }
83 };
84 
85 template <typename T>
86 struct NextValueTraits<ValueOrFailure<absl::optional<T>>> {
87   using Value = T;
88 
89   GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static NextValueType Type(
90       const ValueOrFailure<absl::optional<T>>& t) {
91     if (t.ok()) {
92       if (t.value().has_value()) return NextValueType::kValue;
93       return NextValueType::kEndOfStream;
94     }
95     return NextValueType::kError;
96   }
97 
98   GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static Value&& TakeValue(
99       ValueOrFailure<absl::optional<T>>& t) {
100     return std::move(**t);
101   }
102 };
103 
104 template <typename Reader, typename Action>
105 class ForEach {
106  private:
107   using ReaderNext = decltype(std::declval<Reader>().Next());
108   using ReaderResult =
109       typename PollTraits<decltype(std::declval<ReaderNext>()())>::Type;
110   using ReaderResultValue = typename NextValueTraits<ReaderResult>::Value;
111   using ActionFactory =
112       promise_detail::RepeatedPromiseFactory<ReaderResultValue, Action>;
113   using ActionPromise = typename ActionFactory::Promise;
114 
115  public:
116   using Result =
117       typename PollTraits<decltype(std::declval<ActionPromise>()())>::Type;
118   GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION ForEach(Reader reader, Action action,
119                                                DebugLocation whence = {})
120       : reader_(std::move(reader)),
121         action_factory_(std::move(action)),
122         whence_(whence) {
123     Construct(&reader_next_, reader_.Next());
124   }
125   GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION ~ForEach() {
126     if (reading_next_) {
127       Destruct(&reader_next_);
128     } else {
129       Destruct(&in_action_);
130     }
131   }
132 
133   ForEach(const ForEach&) = delete;
134   ForEach& operator=(const ForEach&) = delete;
135   ForEach(ForEach&& other) noexcept
136       : reader_(std::move(other.reader_)),
137         action_factory_(std::move(other.action_factory_)),
138         whence_(other.whence_) {
139     DCHECK(reading_next_);
140     DCHECK(other.reading_next_);
141     Construct(&reader_next_, std::move(other.reader_next_));
142   }
143   ForEach& operator=(ForEach&& other) noexcept {
144     DCHECK(reading_next_);
145     DCHECK(other.reading_next_);
146     reader_ = std::move(other.reader_);
147     action_factory_ = std::move(other.action_factory_);
148     reader_next_ = std::move(other.reader_next_);
149     whence_ = other.whence_;
150     return *this;
151   }
152 
153   Poll<Result> operator()() {
154     if (reading_next_) return PollReaderNext();
155     return PollAction();
156   }
157 
158  private:
159   struct InAction {
160     InAction(ActionPromise promise, ReaderResult result)
161         : promise(std::move(promise)), result(std::move(result)) {}
162     ActionPromise promise;
163     ReaderResult result;
164   };
165 
166   std::string DebugTag() {
167     return absl::StrCat(GetContext<Activity>()->DebugTag(), " FOR_EACH[0x",
168                         reinterpret_cast<uintptr_t>(this), "@", whence_.file(),
169                         ":", whence_.line(), "]: ");
170   }
171 
172   GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION Poll<Result> PollReaderNext() {
173     GRPC_TRACE_LOG(promise_primitives, INFO) << DebugTag() << " PollReaderNext";
174     auto r = reader_next_();
175     if (auto* p = r.value_if_ready()) {
176       switch (NextValueTraits<ReaderResult>::Type(*p)) {
177         case NextValueType::kValue: {
178           GRPC_TRACE_LOG(promise_primitives, INFO)
179               << DebugTag() << " PollReaderNext: got value";
180           Destruct(&reader_next_);
181           auto action = action_factory_.Make(
182               NextValueTraits<ReaderResult>::TakeValue(*p));
183           Construct(&in_action_, std::move(action), std::move(*p));
184           reading_next_ = false;
185           return PollAction();
186         }
187         case NextValueType::kEndOfStream: {
188           GRPC_TRACE_LOG(promise_primitives, INFO)
189               << DebugTag() << " PollReaderNext: got end of stream";
190           return Done<Result>::Make(false);
191         }
192         case NextValueType::kError: {
193           GRPC_TRACE_LOG(promise_primitives, INFO)
194               << DebugTag() << " PollReaderNext: got error";
195           return Done<Result>::Make(true);
196         }
197       }
198     }
199     return Pending();
200   }
201 
202   Poll<Result> PollAction() {
203     GRPC_TRACE_LOG(promise_primitives, INFO) << DebugTag() << " PollAction";
204     auto r = in_action_.promise();
205     if (auto* p = r.value_if_ready()) {
206       if (IsStatusOk(*p)) {
207         Destruct(&in_action_);
208         Construct(&reader_next_, reader_.Next());
209         reading_next_ = true;
210         return PollReaderNext();
211       } else {
212         return std::move(*p);
213       }
214     }
215     return Pending();
216   }
217 
218   GPR_NO_UNIQUE_ADDRESS Reader reader_;
219   GPR_NO_UNIQUE_ADDRESS ActionFactory action_factory_;
220   GPR_NO_UNIQUE_ADDRESS DebugLocation whence_;
221   bool reading_next_ = true;
222   union {
223     ReaderNext reader_next_;
224     InAction in_action_;
225   };
226 };
227 
228 }  // namespace for_each_detail
229 
230 /// For each item acquired by calling Reader::Next, run the promise Action.
231 template <typename Reader, typename Action>
232 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline for_each_detail::ForEach<Reader,
233                                                                      Action>
234 ForEach(Reader reader, Action action, DebugLocation whence = {}) {
235   return for_each_detail::ForEach<Reader, Action>(std::move(reader),
236                                                   std::move(action), whence);
237 }
238 
239 }  // namespace grpc_core
240 
241 #endif  // GRPC_SRC_CORE_LIB_PROMISE_FOR_EACH_H
242