• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_LIB_TRANSPORT_PROMISE_ENDPOINT_H
16 #define GRPC_SRC_CORE_LIB_TRANSPORT_PROMISE_ENDPOINT_H
17 
18 #include <grpc/event_engine/event_engine.h>
19 #include <grpc/event_engine/slice.h>
20 #include <grpc/event_engine/slice_buffer.h>
21 #include <grpc/slice_buffer.h>
22 #include <grpc/support/port_platform.h>
23 #include <stddef.h>
24 #include <stdint.h>
25 
26 #include <atomic>
27 #include <cstring>
28 #include <functional>
29 #include <memory>
30 #include <utility>
31 
32 #include "absl/base/thread_annotations.h"
33 #include "absl/log/check.h"
34 #include "absl/status/status.h"
35 #include "absl/status/statusor.h"
36 #include "absl/types/optional.h"
37 #include "src/core/lib/event_engine/extensions/chaotic_good_extension.h"
38 #include "src/core/lib/event_engine/query_extensions.h"
39 #include "src/core/lib/iomgr/exec_ctx.h"
40 #include "src/core/lib/promise/activity.h"
41 #include "src/core/lib/promise/cancel_callback.h"
42 #include "src/core/lib/promise/if.h"
43 #include "src/core/lib/promise/map.h"
44 #include "src/core/lib/promise/poll.h"
45 #include "src/core/lib/slice/slice.h"
46 #include "src/core/lib/slice/slice_buffer.h"
47 #include "src/core/util/sync.h"
48 
49 namespace grpc_core {
50 
51 // Wrapper around event engine endpoint that provides a promise like API.
52 class PromiseEndpoint {
53  public:
54   PromiseEndpoint(
55       std::unique_ptr<grpc_event_engine::experimental::EventEngine::Endpoint>
56           endpoint,
57       SliceBuffer already_received);
58   PromiseEndpoint() = default;
59   ~PromiseEndpoint() = default;
60   /// Prevent copying of PromiseEndpoint; moving is fine.
61   PromiseEndpoint(const PromiseEndpoint&) = delete;
62   PromiseEndpoint& operator=(const PromiseEndpoint&) = delete;
63   PromiseEndpoint(PromiseEndpoint&&) = default;
64   PromiseEndpoint& operator=(PromiseEndpoint&&) = default;
65 
66   // Returns a promise that resolves to a `absl::Status` indicating the result
67   // of the write operation.
68   //
69   // Concurrent writes are not supported, which means callers should not call
70   // `Write()` before the previous write finishes. Doing that results in
71   // undefined behavior.
Write(SliceBuffer data)72   auto Write(SliceBuffer data) {
73     GRPC_LATENT_SEE_PARENT_SCOPE("GRPC:Write");
74     // Start write and assert previous write finishes.
75     auto prev = write_state_->state.exchange(WriteState::kWriting,
76                                              std::memory_order_relaxed);
77     CHECK(prev == WriteState::kIdle);
78     bool completed;
79     if (data.Length() == 0) {
80       completed = true;
81     } else {
82       // TODO(ladynana): Replace this with `SliceBufferCast<>` when it is
83       // available.
84       grpc_slice_buffer_swap(write_state_->buffer.c_slice_buffer(),
85                              data.c_slice_buffer());
86       // If `Write()` returns true immediately, the callback will not be called.
87       // We still need to call our callback to pick up the result.
88       write_state_->waker = GetContext<Activity>()->MakeNonOwningWaker();
89       completed = endpoint_->Write(
90           [write_state = write_state_](absl::Status status) {
91             ApplicationCallbackExecCtx callback_exec_ctx;
92             ExecCtx exec_ctx;
93             write_state->Complete(std::move(status));
94           },
95           &write_state_->buffer, nullptr /* uses default arguments */);
96       if (completed) write_state_->waker = Waker();
97     }
98     return If(
99         completed,
100         [this]() {
101           return [write_state = write_state_]() {
102             auto prev = write_state->state.exchange(WriteState::kIdle,
103                                                     std::memory_order_relaxed);
104             CHECK(prev == WriteState::kWriting);
105             return absl::OkStatus();
106           };
107         },
108         GRPC_LATENT_SEE_PROMISE(
109             "DelayedWrite", ([this]() {
110               return [write_state = write_state_]() -> Poll<absl::Status> {
111                 // If current write isn't finished return `Pending()`, else
112                 // return write result.
113                 WriteState::State expected = WriteState::kWritten;
114                 if (write_state->state.compare_exchange_strong(
115                         expected, WriteState::kIdle, std::memory_order_acquire,
116                         std::memory_order_relaxed)) {
117                   // State was Written, and we changed it to Idle. We can return
118                   // the result.
119                   return std::move(write_state->result);
120                 }
121                 // State was not Written; since we're polling it must be
122                 // Writing. Assert that and return Pending.
123                 CHECK(expected == WriteState::kWriting);
124                 return Pending();
125               };
126             })));
127   }
128 
129   // Returns a promise that resolves to `SliceBuffer` with
130   // `num_bytes` bytes.
131   //
132   // Concurrent reads are not supported, which means callers should not call
133   // `Read()` before the previous read finishes. Doing that results in
134   // undefined behavior.
Read(size_t num_bytes)135   auto Read(size_t num_bytes) {
136     GRPC_LATENT_SEE_PARENT_SCOPE("GRPC:Read");
137     // Assert previous read finishes.
138     CHECK(!read_state_->complete.load(std::memory_order_relaxed));
139     // Should not have pending reads.
140     CHECK(read_state_->pending_buffer.Count() == 0u);
141     bool complete = true;
142     while (read_state_->buffer.Length() < num_bytes) {
143       // Set read args with hinted bytes.
144       grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs
145           read_args = {
146               static_cast<int64_t>(num_bytes - read_state_->buffer.Length())};
147       // If `Read()` returns true immediately, the callback will not be
148       // called.
149       read_state_->waker = GetContext<Activity>()->MakeNonOwningWaker();
150       if (endpoint_->Read(
151               [read_state = read_state_, num_bytes](absl::Status status) {
152                 ApplicationCallbackExecCtx callback_exec_ctx;
153                 ExecCtx exec_ctx;
154                 read_state->Complete(std::move(status), num_bytes);
155               },
156               &read_state_->pending_buffer, &read_args)) {
157         read_state_->waker = Waker();
158         read_state_->pending_buffer.MoveFirstNBytesIntoSliceBuffer(
159             read_state_->pending_buffer.Length(), read_state_->buffer);
160         DCHECK(read_state_->pending_buffer.Count() == 0u);
161       } else {
162         complete = false;
163         break;
164       }
165     }
166     return If(
167         complete,
168         [this, num_bytes]() {
169           SliceBuffer ret;
170           grpc_slice_buffer_move_first_no_inline(
171               read_state_->buffer.c_slice_buffer(), num_bytes,
172               ret.c_slice_buffer());
173           return [ret = std::move(
174                       ret)]() mutable -> Poll<absl::StatusOr<SliceBuffer>> {
175             return std::move(ret);
176           };
177         },
178         GRPC_LATENT_SEE_PROMISE(
179             "DelayedRead", ([this, num_bytes]() {
180               return [read_state = read_state_,
181                       num_bytes]() -> Poll<absl::StatusOr<SliceBuffer>> {
182                 if (!read_state->complete.load(std::memory_order_acquire)) {
183                   return Pending();
184                 }
185                 // If read succeeds, return `SliceBuffer` with `num_bytes`
186                 // bytes.
187                 if (read_state->result.ok()) {
188                   SliceBuffer ret;
189                   grpc_slice_buffer_move_first_no_inline(
190                       read_state->buffer.c_slice_buffer(), num_bytes,
191                       ret.c_slice_buffer());
192                   read_state->complete.store(false, std::memory_order_relaxed);
193                   return std::move(ret);
194                 }
195                 read_state->complete.store(false, std::memory_order_relaxed);
196                 return std::move(read_state->result);
197               };
198             })));
199   }
200 
201   // Returns a promise that resolves to `Slice` with at least
202   // `num_bytes` bytes which should be less than INT64_MAX bytes.
203   //
204   // Concurrent reads are not supported, which means callers should not call
205   // `ReadSlice()` before the previous read finishes. Doing that results in
206   // undefined behavior.
ReadSlice(size_t num_bytes)207   auto ReadSlice(size_t num_bytes) {
208     return Map(Read(num_bytes),
209                [](absl::StatusOr<SliceBuffer> buffer) -> absl::StatusOr<Slice> {
210                  if (!buffer.ok()) return buffer.status();
211                  return buffer->JoinIntoSlice();
212                });
213   }
214 
215   // Returns a promise that resolves to a byte with type `uint8_t`.
ReadByte()216   auto ReadByte() {
217     return Map(ReadSlice(1),
218                [](absl::StatusOr<Slice> slice) -> absl::StatusOr<uint8_t> {
219                  if (!slice.ok()) return slice.status();
220                  return (*slice)[0];
221                });
222   }
223 
224   // Enables RPC receive coalescing and alignment of memory holding received
225   // RPCs.
EnforceRxMemoryAlignmentAndCoalescing()226   void EnforceRxMemoryAlignmentAndCoalescing() {
227     auto* chaotic_good_ext = grpc_event_engine::experimental::QueryExtension<
228         grpc_event_engine::experimental::ChaoticGoodExtension>(endpoint_.get());
229     if (chaotic_good_ext != nullptr) {
230       chaotic_good_ext->EnforceRxMemoryAlignment();
231       chaotic_good_ext->EnableRpcReceiveCoalescing();
232       if (read_state_->buffer.Length() == 0) {
233         return;
234       }
235 
236       // Copy everything from read_state_->buffer into a single slice and
237       // replace the contents of read_state_->buffer with that slice.
238       grpc_slice slice = grpc_slice_malloc_large(read_state_->buffer.Length());
239       CHECK(reinterpret_cast<uintptr_t>(GRPC_SLICE_START_PTR(slice)) % 64 == 0);
240       size_t ofs = 0;
241       for (size_t i = 0; i < read_state_->buffer.Count(); i++) {
242         memcpy(
243             GRPC_SLICE_START_PTR(slice) + ofs,
244             GRPC_SLICE_START_PTR(
245                 read_state_->buffer.c_slice_buffer()->slices[i]),
246             GRPC_SLICE_LENGTH(read_state_->buffer.c_slice_buffer()->slices[i]));
247         ofs +=
248             GRPC_SLICE_LENGTH(read_state_->buffer.c_slice_buffer()->slices[i]);
249       }
250 
251       read_state_->buffer.Clear();
252       read_state_->buffer.AppendIndexed(
253           grpc_event_engine::experimental::Slice(slice));
254       DCHECK(read_state_->buffer.Length() == ofs);
255     }
256   }
257 
258   const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
259   GetPeerAddress() const;
260   const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
261   GetLocalAddress() const;
262 
263   std::shared_ptr<grpc_event_engine::experimental::EventEngine::Endpoint>
GetEventEngineEndpoint()264   GetEventEngineEndpoint() const {
265     return endpoint_;
266   }
267 
268  private:
269   std::shared_ptr<grpc_event_engine::experimental::EventEngine::Endpoint>
270       endpoint_;
271 
272   struct ReadState : public RefCounted<ReadState> {
273     std::atomic<bool> complete{false};
274     // Read buffer used for storing successful reads given by
275     // `EventEngine::Endpoint` but not yet requested by the caller.
276     grpc_event_engine::experimental::SliceBuffer buffer;
277     // Buffer used to accept data from `EventEngine::Endpoint`.
278     // Every time after a successful read from `EventEngine::Endpoint`, the data
279     // in this buffer should be appended to `buffer`.
280     grpc_event_engine::experimental::SliceBuffer pending_buffer;
281     // Used for store the result from `EventEngine::Endpoint::Read()`.
282     absl::Status result;
283     Waker waker;
284     // Backing endpoint: we keep this on ReadState as reads will need to
285     // repeatedly read until the target size is hit, and we don't want to access
286     // the main object during this dance (indeed the main object may be
287     // deleted).
288     std::weak_ptr<grpc_event_engine::experimental::EventEngine::Endpoint>
289         endpoint;
290 
291     void Complete(absl::Status status, size_t num_bytes_requested);
292   };
293 
294   struct WriteState : public RefCounted<WriteState> {
295     enum State : uint8_t {
296       kIdle,     // Not writing.
297       kWriting,  // Write started, but not completed.
298       kWritten,  // Write completed.
299     };
300 
301     std::atomic<State> state{kIdle};
302     // Write buffer used for `EventEngine::Endpoint::Write()` to ensure the
303     // memory behind the buffer is not lost.
304     grpc_event_engine::experimental::SliceBuffer buffer;
305     // Used for store the result from `EventEngine::Endpoint::Write()`.
306     absl::Status result;
307     Waker waker;
308 
CompleteWriteState309     void Complete(absl::Status status) {
310       result = std::move(status);
311       auto w = std::move(waker);
312       auto prev = state.exchange(kWritten, std::memory_order_release);
313       // Previous state should be Writing. If we got anything else we've entered
314       // the callback path twice.
315       CHECK(prev == kWriting);
316       w.Wakeup();
317     }
318   };
319 
320   RefCountedPtr<WriteState> write_state_ = MakeRefCounted<WriteState>();
321   RefCountedPtr<ReadState> read_state_ = MakeRefCounted<ReadState>();
322 };
323 
324 }  // namespace grpc_core
325 
326 #endif  // GRPC_SRC_CORE_LIB_TRANSPORT_PROMISE_ENDPOINT_H
327