1 // Copyright 2024 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_CALL_REQUEST_BUFFER_H 16 #define GRPC_SRC_CORE_CALL_REQUEST_BUFFER_H 17 18 #include <utility> 19 20 #include "src/core/lib/transport/call_spine.h" 21 #include "src/core/lib/transport/message.h" 22 #include "src/core/lib/transport/metadata.h" 23 24 namespace grpc_core { 25 26 // Outbound request buffer. 27 // Collects client->server metadata and messages whilst in its initial buffering 28 // mode. In buffering mode it can have zero or more Reader objects attached to 29 // it. 30 // The buffer can later be switched to committed mode, at which point it 31 // will have exactly one Reader object attached to it. 32 // Callers can choose to switch to committed mode based upon policy of their 33 // choice. 34 class RequestBuffer { 35 public: 36 // One reader of the request buffer. 37 class Reader { 38 public: Reader(RequestBuffer * buffer)39 explicit Reader(RequestBuffer* buffer) ABSL_LOCKS_EXCLUDED(buffer->mu_) 40 : buffer_(buffer) { 41 buffer->AddReader(this); 42 } 43 ~Reader() ABSL_LOCKS_EXCLUDED(buffer_->mu_) { buffer_->RemoveReader(this); } 44 45 Reader(const Reader&) = delete; 46 Reader& operator=(const Reader&) = delete; 47 48 // Pull client initial metadata. Returns a promise that resolves to 49 // ValueOrFailure<ClientMetadataHandle>. PullClientInitialMetadata()50 GRPC_MUST_USE_RESULT auto PullClientInitialMetadata() { 51 return [this]() { return PollPullClientInitialMetadata(); }; 52 } 53 // Pull a message. Returns a promise that resolves to a 54 // ValueOrFailure<absl::optional<MessageHandle>>. PullMessage()55 GRPC_MUST_USE_RESULT auto PullMessage() { 56 return [this]() { return PollPullMessage(); }; 57 } 58 TakeError()59 absl::Status TakeError() { return std::move(error_); } 60 61 private: 62 friend class RequestBuffer; 63 64 Poll<ValueOrFailure<ClientMetadataHandle>> PollPullClientInitialMetadata(); 65 Poll<ValueOrFailure<absl::optional<MessageHandle>>> PollPullMessage(); 66 67 template <typename T> ClaimObject(T & object)68 T ClaimObject(T& object) ABSL_EXCLUSIVE_LOCKS_REQUIRED(buffer_->mu_) { 69 if (buffer_->winner_ == this) return std::move(object); 70 return CopyObject(object); 71 } 72 CopyObject(const ClientMetadataHandle & md)73 ClientMetadataHandle CopyObject(const ClientMetadataHandle& md) { 74 return Arena::MakePooled<ClientMetadata>(md->Copy()); 75 } 76 CopyObject(const MessageHandle & msg)77 MessageHandle CopyObject(const MessageHandle& msg) { 78 return Arena::MakePooled<Message>(msg->payload()->Copy(), msg->flags()); 79 } 80 81 RequestBuffer* const buffer_; 82 bool pulled_client_initial_metadata_ = false; 83 size_t message_index_ = 0; 84 absl::Status error_; 85 Waker pull_waker_; 86 }; 87 88 RequestBuffer(); 89 90 // Push ClientInitialMetadata into the buffer. 91 // This is instantaneous, and returns success with the amount of data 92 // buffered, or failure. 93 ValueOrFailure<size_t> PushClientInitialMetadata(ClientMetadataHandle md); 94 // Resolves to a ValueOrFailure<size_t> where the size_t is the amount of data 95 // buffered (or 0 if we're in committed mode). PushMessage(MessageHandle message)96 GRPC_MUST_USE_RESULT auto PushMessage(MessageHandle message) { 97 return [this, message = std::move(message)]() mutable { 98 return PollPushMessage(message); 99 }; 100 } 101 // Push end of stream (client half-closure). 102 StatusFlag FinishSends(); 103 // Cancel the request, propagate failure to all readers. 104 void Cancel(absl::Status error = absl::CancelledError()); 105 106 // Switch to committed mode - needs to be called exactly once with the winning 107 // reader. All other readers will see failure. 108 void Commit(Reader* winner); 109 committed()110 bool committed() const { 111 MutexLock lock(&mu_); 112 return winner_ != nullptr; 113 } 114 115 private: 116 // Buffering state: we're collecting metadata and messages. 117 struct Buffering { 118 Buffering(); 119 // Initial metadata, or nullptr if not yet received. 120 ClientMetadataHandle initial_metadata; 121 // Buffered messages. 122 absl::InlinedVector<MessageHandle, 1> messages; 123 // Amount of data buffered. 124 size_t buffered = 0; 125 }; 126 // Buffered state: all messages have been collected (the client has finished 127 // sending). 128 struct Buffered { BufferedBuffered129 Buffered(ClientMetadataHandle md, 130 absl::InlinedVector<MessageHandle, 1> msgs) 131 : initial_metadata(std::move(md)), messages(std::move(msgs)) {} 132 ClientMetadataHandle initial_metadata; 133 absl::InlinedVector<MessageHandle, 1> messages; 134 }; 135 // Streaming state: we're streaming messages to the server. 136 // This implies winner_ is set. 137 struct Streaming { 138 MessageHandle message; 139 bool end_of_stream = false; 140 }; 141 // Cancelled state: the request has been cancelled. 142 struct Cancelled { CancelledCancelled143 explicit Cancelled(absl::Status error) : error(std::move(error)) {} 144 absl::Status error; 145 }; 146 using State = absl::variant<Buffering, Buffered, Streaming, Cancelled>; 147 148 Poll<ValueOrFailure<size_t>> PollPushMessage(MessageHandle& message); PendingPull(Reader * reader)149 Pending PendingPull(Reader* reader) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { 150 reader->pull_waker_ = Activity::current()->MakeOwningWaker(); 151 return Pending{}; 152 } PendingPush()153 Pending PendingPush() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { 154 push_waker_ = Activity::current()->MakeOwningWaker(); 155 return Pending{}; 156 } MaybeSwitchToStreaming()157 void MaybeSwitchToStreaming() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { 158 auto& buffering = absl::get<Buffering>(state_); 159 if (winner_ == nullptr) return; 160 if (winner_->message_index_ < buffering.messages.size()) return; 161 state_.emplace<Streaming>(); 162 push_waker_.Wakeup(); 163 } 164 WakeupAsyncAllPullers()165 void WakeupAsyncAllPullers() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { 166 WakeupAsyncAllPullersExcept(nullptr); 167 } 168 void WakeupAsyncAllPullersExcept(Reader* except_reader) 169 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 170 AddReader(Reader * reader)171 void AddReader(Reader* reader) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { 172 readers_.insert(reader); 173 } 174 RemoveReader(Reader * reader)175 void RemoveReader(Reader* reader) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { 176 readers_.erase(reader); 177 } 178 179 std::string DebugString(Reader* caller) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 180 181 mutable Mutex mu_; ABSL_GUARDED_BY(mu_)182 Reader* winner_ ABSL_GUARDED_BY(mu_){nullptr}; 183 State state_ ABSL_GUARDED_BY(mu_); 184 // TODO(ctiller): change this to an intrusively linked list to avoid 185 // allocations. 186 absl::flat_hash_set<Reader*> readers_ ABSL_GUARDED_BY(mu_); 187 Waker push_waker_ ABSL_GUARDED_BY(mu_); 188 }; 189 190 } // namespace grpc_core 191 192 #endif // GRPC_SRC_CORE_CALL_REQUEST_BUFFER_H 193