1 // Copyright 2021 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_FOR_EACH_H 16 #define GRPC_SRC_CORE_LIB_PROMISE_FOR_EACH_H 17 18 #include <grpc/support/port_platform.h> 19 #include <stdint.h> 20 21 #include <string> 22 #include <utility> 23 24 #include "absl/log/check.h" 25 #include "absl/log/log.h" 26 #include "absl/status/status.h" 27 #include "absl/strings/str_cat.h" 28 #include "src/core/lib/debug/trace.h" 29 #include "src/core/lib/promise/activity.h" 30 #include "src/core/lib/promise/detail/promise_factory.h" 31 #include "src/core/lib/promise/detail/status.h" 32 #include "src/core/lib/promise/poll.h" 33 #include "src/core/lib/promise/status_flag.h" 34 #include "src/core/util/construct_destruct.h" 35 36 namespace grpc_core { 37 38 namespace for_each_detail { 39 40 // Done creates statuses for the end of the iteration. It's templated on the 41 // type of the result of the ForEach loop, so that we can introduce new types 42 // easily. 43 template <typename T> 44 struct Done; 45 46 template <> 47 struct Done<absl::Status> { 48 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static absl::Status Make( 49 bool cancelled) { 50 return cancelled ? absl::CancelledError() : absl::OkStatus(); 51 } 52 }; 53 54 template <> 55 struct Done<StatusFlag> { 56 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static StatusFlag Make(bool cancelled) { 57 return StatusFlag(!cancelled); 58 } 59 }; 60 61 template <typename T, typename SfinaeVoid = void> 62 struct NextValueTraits; 63 64 enum class NextValueType { 65 kValue, 66 kEndOfStream, 67 kError, 68 }; 69 70 template <typename T> 71 struct NextValueTraits<T, absl::void_t<typename T::value_type>> { 72 using Value = typename T::value_type; 73 74 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static NextValueType Type(const T& t) { 75 if (t.has_value()) return NextValueType::kValue; 76 if (t.cancelled()) return NextValueType::kError; 77 return NextValueType::kEndOfStream; 78 } 79 80 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static Value&& TakeValue(T& t) { 81 return std::move(*t); 82 } 83 }; 84 85 template <typename T> 86 struct NextValueTraits<ValueOrFailure<absl::optional<T>>> { 87 using Value = T; 88 89 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static NextValueType Type( 90 const ValueOrFailure<absl::optional<T>>& t) { 91 if (t.ok()) { 92 if (t.value().has_value()) return NextValueType::kValue; 93 return NextValueType::kEndOfStream; 94 } 95 return NextValueType::kError; 96 } 97 98 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION static Value&& TakeValue( 99 ValueOrFailure<absl::optional<T>>& t) { 100 return std::move(**t); 101 } 102 }; 103 104 template <typename Reader, typename Action> 105 class ForEach { 106 private: 107 using ReaderNext = decltype(std::declval<Reader>().Next()); 108 using ReaderResult = 109 typename PollTraits<decltype(std::declval<ReaderNext>()())>::Type; 110 using ReaderResultValue = typename NextValueTraits<ReaderResult>::Value; 111 using ActionFactory = 112 promise_detail::RepeatedPromiseFactory<ReaderResultValue, Action>; 113 using ActionPromise = typename ActionFactory::Promise; 114 115 public: 116 using Result = 117 typename PollTraits<decltype(std::declval<ActionPromise>()())>::Type; 118 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION ForEach(Reader reader, Action action, 119 DebugLocation whence = {}) 120 : reader_(std::move(reader)), 121 action_factory_(std::move(action)), 122 whence_(whence) { 123 Construct(&reader_next_, reader_.Next()); 124 } 125 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION ~ForEach() { 126 if (reading_next_) { 127 Destruct(&reader_next_); 128 } else { 129 Destruct(&in_action_); 130 } 131 } 132 133 ForEach(const ForEach&) = delete; 134 ForEach& operator=(const ForEach&) = delete; 135 ForEach(ForEach&& other) noexcept 136 : reader_(std::move(other.reader_)), 137 action_factory_(std::move(other.action_factory_)), 138 whence_(other.whence_) { 139 DCHECK(reading_next_); 140 DCHECK(other.reading_next_); 141 Construct(&reader_next_, std::move(other.reader_next_)); 142 } 143 ForEach& operator=(ForEach&& other) noexcept { 144 DCHECK(reading_next_); 145 DCHECK(other.reading_next_); 146 reader_ = std::move(other.reader_); 147 action_factory_ = std::move(other.action_factory_); 148 reader_next_ = std::move(other.reader_next_); 149 whence_ = other.whence_; 150 return *this; 151 } 152 153 Poll<Result> operator()() { 154 if (reading_next_) return PollReaderNext(); 155 return PollAction(); 156 } 157 158 private: 159 struct InAction { 160 InAction(ActionPromise promise, ReaderResult result) 161 : promise(std::move(promise)), result(std::move(result)) {} 162 ActionPromise promise; 163 ReaderResult result; 164 }; 165 166 std::string DebugTag() { 167 return absl::StrCat(GetContext<Activity>()->DebugTag(), " FOR_EACH[0x", 168 reinterpret_cast<uintptr_t>(this), "@", whence_.file(), 169 ":", whence_.line(), "]: "); 170 } 171 172 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION Poll<Result> PollReaderNext() { 173 GRPC_TRACE_LOG(promise_primitives, INFO) << DebugTag() << " PollReaderNext"; 174 auto r = reader_next_(); 175 if (auto* p = r.value_if_ready()) { 176 switch (NextValueTraits<ReaderResult>::Type(*p)) { 177 case NextValueType::kValue: { 178 GRPC_TRACE_LOG(promise_primitives, INFO) 179 << DebugTag() << " PollReaderNext: got value"; 180 Destruct(&reader_next_); 181 auto action = action_factory_.Make( 182 NextValueTraits<ReaderResult>::TakeValue(*p)); 183 Construct(&in_action_, std::move(action), std::move(*p)); 184 reading_next_ = false; 185 return PollAction(); 186 } 187 case NextValueType::kEndOfStream: { 188 GRPC_TRACE_LOG(promise_primitives, INFO) 189 << DebugTag() << " PollReaderNext: got end of stream"; 190 return Done<Result>::Make(false); 191 } 192 case NextValueType::kError: { 193 GRPC_TRACE_LOG(promise_primitives, INFO) 194 << DebugTag() << " PollReaderNext: got error"; 195 return Done<Result>::Make(true); 196 } 197 } 198 } 199 return Pending(); 200 } 201 202 Poll<Result> PollAction() { 203 GRPC_TRACE_LOG(promise_primitives, INFO) << DebugTag() << " PollAction"; 204 auto r = in_action_.promise(); 205 if (auto* p = r.value_if_ready()) { 206 if (IsStatusOk(*p)) { 207 Destruct(&in_action_); 208 Construct(&reader_next_, reader_.Next()); 209 reading_next_ = true; 210 return PollReaderNext(); 211 } else { 212 return std::move(*p); 213 } 214 } 215 return Pending(); 216 } 217 218 GPR_NO_UNIQUE_ADDRESS Reader reader_; 219 GPR_NO_UNIQUE_ADDRESS ActionFactory action_factory_; 220 GPR_NO_UNIQUE_ADDRESS DebugLocation whence_; 221 bool reading_next_ = true; 222 union { 223 ReaderNext reader_next_; 224 InAction in_action_; 225 }; 226 }; 227 228 } // namespace for_each_detail 229 230 /// For each item acquired by calling Reader::Next, run the promise Action. 231 template <typename Reader, typename Action> 232 GPR_ATTRIBUTE_ALWAYS_INLINE_FUNCTION inline for_each_detail::ForEach<Reader, 233 Action> 234 ForEach(Reader reader, Action action, DebugLocation whence = {}) { 235 return for_each_detail::ForEach<Reader, Action>(std::move(reader), 236 std::move(action), whence); 237 } 238 239 } // namespace grpc_core 240 241 #endif // GRPC_SRC_CORE_LIB_PROMISE_FOR_EACH_H 242