• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_CALL_REQUEST_BUFFER_H
16 #define GRPC_SRC_CORE_CALL_REQUEST_BUFFER_H
17 
18 #include <utility>
19 
20 #include "src/core/lib/transport/call_spine.h"
21 #include "src/core/lib/transport/message.h"
22 #include "src/core/lib/transport/metadata.h"
23 
24 namespace grpc_core {
25 
26 // Outbound request buffer.
27 // Collects client->server metadata and messages whilst in its initial buffering
28 // mode. In buffering mode it can have zero or more Reader objects attached to
29 // it.
30 // The buffer can later be switched to committed mode, at which point it
31 // will have exactly one Reader object attached to it.
32 // Callers can choose to switch to committed mode based upon policy of their
33 // choice.
34 class RequestBuffer {
35  public:
36   // One reader of the request buffer.
37   class Reader {
38    public:
Reader(RequestBuffer * buffer)39     explicit Reader(RequestBuffer* buffer) ABSL_LOCKS_EXCLUDED(buffer->mu_)
40         : buffer_(buffer) {
41       buffer->AddReader(this);
42     }
43     ~Reader() ABSL_LOCKS_EXCLUDED(buffer_->mu_) { buffer_->RemoveReader(this); }
44 
45     Reader(const Reader&) = delete;
46     Reader& operator=(const Reader&) = delete;
47 
48     // Pull client initial metadata. Returns a promise that resolves to
49     // ValueOrFailure<ClientMetadataHandle>.
PullClientInitialMetadata()50     GRPC_MUST_USE_RESULT auto PullClientInitialMetadata() {
51       return [this]() { return PollPullClientInitialMetadata(); };
52     }
53     // Pull a message. Returns a promise that resolves to a
54     // ValueOrFailure<absl::optional<MessageHandle>>.
PullMessage()55     GRPC_MUST_USE_RESULT auto PullMessage() {
56       return [this]() { return PollPullMessage(); };
57     }
58 
TakeError()59     absl::Status TakeError() { return std::move(error_); }
60 
61    private:
62     friend class RequestBuffer;
63 
64     Poll<ValueOrFailure<ClientMetadataHandle>> PollPullClientInitialMetadata();
65     Poll<ValueOrFailure<absl::optional<MessageHandle>>> PollPullMessage();
66 
67     template <typename T>
ClaimObject(T & object)68     T ClaimObject(T& object) ABSL_EXCLUSIVE_LOCKS_REQUIRED(buffer_->mu_) {
69       if (buffer_->winner_ == this) return std::move(object);
70       return CopyObject(object);
71     }
72 
CopyObject(const ClientMetadataHandle & md)73     ClientMetadataHandle CopyObject(const ClientMetadataHandle& md) {
74       return Arena::MakePooled<ClientMetadata>(md->Copy());
75     }
76 
CopyObject(const MessageHandle & msg)77     MessageHandle CopyObject(const MessageHandle& msg) {
78       return Arena::MakePooled<Message>(msg->payload()->Copy(), msg->flags());
79     }
80 
81     RequestBuffer* const buffer_;
82     bool pulled_client_initial_metadata_ = false;
83     size_t message_index_ = 0;
84     absl::Status error_;
85     Waker pull_waker_;
86   };
87 
88   RequestBuffer();
89 
90   // Push ClientInitialMetadata into the buffer.
91   // This is instantaneous, and returns success with the amount of data
92   // buffered, or failure.
93   ValueOrFailure<size_t> PushClientInitialMetadata(ClientMetadataHandle md);
94   // Resolves to a ValueOrFailure<size_t> where the size_t is the amount of data
95   // buffered (or 0 if we're in committed mode).
PushMessage(MessageHandle message)96   GRPC_MUST_USE_RESULT auto PushMessage(MessageHandle message) {
97     return [this, message = std::move(message)]() mutable {
98       return PollPushMessage(message);
99     };
100   }
101   // Push end of stream (client half-closure).
102   StatusFlag FinishSends();
103   // Cancel the request, propagate failure to all readers.
104   void Cancel(absl::Status error = absl::CancelledError());
105 
106   // Switch to committed mode - needs to be called exactly once with the winning
107   // reader. All other readers will see failure.
108   void Commit(Reader* winner);
109 
committed()110   bool committed() const {
111     MutexLock lock(&mu_);
112     return winner_ != nullptr;
113   }
114 
115  private:
116   // Buffering state: we're collecting metadata and messages.
117   struct Buffering {
118     Buffering();
119     // Initial metadata, or nullptr if not yet received.
120     ClientMetadataHandle initial_metadata;
121     // Buffered messages.
122     absl::InlinedVector<MessageHandle, 1> messages;
123     // Amount of data buffered.
124     size_t buffered = 0;
125   };
126   // Buffered state: all messages have been collected (the client has finished
127   // sending).
128   struct Buffered {
BufferedBuffered129     Buffered(ClientMetadataHandle md,
130              absl::InlinedVector<MessageHandle, 1> msgs)
131         : initial_metadata(std::move(md)), messages(std::move(msgs)) {}
132     ClientMetadataHandle initial_metadata;
133     absl::InlinedVector<MessageHandle, 1> messages;
134   };
135   // Streaming state: we're streaming messages to the server.
136   // This implies winner_ is set.
137   struct Streaming {
138     MessageHandle message;
139     bool end_of_stream = false;
140   };
141   // Cancelled state: the request has been cancelled.
142   struct Cancelled {
CancelledCancelled143     explicit Cancelled(absl::Status error) : error(std::move(error)) {}
144     absl::Status error;
145   };
146   using State = absl::variant<Buffering, Buffered, Streaming, Cancelled>;
147 
148   Poll<ValueOrFailure<size_t>> PollPushMessage(MessageHandle& message);
PendingPull(Reader * reader)149   Pending PendingPull(Reader* reader) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
150     reader->pull_waker_ = Activity::current()->MakeOwningWaker();
151     return Pending{};
152   }
PendingPush()153   Pending PendingPush() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
154     push_waker_ = Activity::current()->MakeOwningWaker();
155     return Pending{};
156   }
MaybeSwitchToStreaming()157   void MaybeSwitchToStreaming() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
158     auto& buffering = absl::get<Buffering>(state_);
159     if (winner_ == nullptr) return;
160     if (winner_->message_index_ < buffering.messages.size()) return;
161     state_.emplace<Streaming>();
162     push_waker_.Wakeup();
163   }
164 
WakeupAsyncAllPullers()165   void WakeupAsyncAllPullers() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
166     WakeupAsyncAllPullersExcept(nullptr);
167   }
168   void WakeupAsyncAllPullersExcept(Reader* except_reader)
169       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
170 
AddReader(Reader * reader)171   void AddReader(Reader* reader) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
172     readers_.insert(reader);
173   }
174 
RemoveReader(Reader * reader)175   void RemoveReader(Reader* reader) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
176     readers_.erase(reader);
177   }
178 
179   std::string DebugString(Reader* caller) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
180 
181   mutable Mutex mu_;
ABSL_GUARDED_BY(mu_)182   Reader* winner_ ABSL_GUARDED_BY(mu_){nullptr};
183   State state_ ABSL_GUARDED_BY(mu_);
184   // TODO(ctiller): change this to an intrusively linked list to avoid
185   // allocations.
186   absl::flat_hash_set<Reader*> readers_ ABSL_GUARDED_BY(mu_);
187   Waker push_waker_ ABSL_GUARDED_BY(mu_);
188 };
189 
190 }  // namespace grpc_core
191 
192 #endif  // GRPC_SRC_CORE_CALL_REQUEST_BUFFER_H
193