1 // Copyright 2023 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_LIB_TRANSPORT_PROMISE_ENDPOINT_H 16 #define GRPC_SRC_CORE_LIB_TRANSPORT_PROMISE_ENDPOINT_H 17 18 #include <grpc/event_engine/event_engine.h> 19 #include <grpc/event_engine/slice.h> 20 #include <grpc/event_engine/slice_buffer.h> 21 #include <grpc/slice_buffer.h> 22 #include <grpc/support/port_platform.h> 23 #include <stddef.h> 24 #include <stdint.h> 25 26 #include <atomic> 27 #include <cstring> 28 #include <functional> 29 #include <memory> 30 #include <utility> 31 32 #include "absl/base/thread_annotations.h" 33 #include "absl/log/check.h" 34 #include "absl/status/status.h" 35 #include "absl/status/statusor.h" 36 #include "absl/types/optional.h" 37 #include "src/core/lib/event_engine/extensions/chaotic_good_extension.h" 38 #include "src/core/lib/event_engine/query_extensions.h" 39 #include "src/core/lib/iomgr/exec_ctx.h" 40 #include "src/core/lib/promise/activity.h" 41 #include "src/core/lib/promise/cancel_callback.h" 42 #include "src/core/lib/promise/if.h" 43 #include "src/core/lib/promise/map.h" 44 #include "src/core/lib/promise/poll.h" 45 #include "src/core/lib/slice/slice.h" 46 #include "src/core/lib/slice/slice_buffer.h" 47 #include "src/core/util/sync.h" 48 49 namespace grpc_core { 50 51 // Wrapper around event engine endpoint that provides a promise like API. 52 class PromiseEndpoint { 53 public: 54 PromiseEndpoint( 55 std::unique_ptr<grpc_event_engine::experimental::EventEngine::Endpoint> 56 endpoint, 57 SliceBuffer already_received); 58 PromiseEndpoint() = default; 59 ~PromiseEndpoint() = default; 60 /// Prevent copying of PromiseEndpoint; moving is fine. 61 PromiseEndpoint(const PromiseEndpoint&) = delete; 62 PromiseEndpoint& operator=(const PromiseEndpoint&) = delete; 63 PromiseEndpoint(PromiseEndpoint&&) = default; 64 PromiseEndpoint& operator=(PromiseEndpoint&&) = default; 65 66 // Returns a promise that resolves to a `absl::Status` indicating the result 67 // of the write operation. 68 // 69 // Concurrent writes are not supported, which means callers should not call 70 // `Write()` before the previous write finishes. Doing that results in 71 // undefined behavior. Write(SliceBuffer data)72 auto Write(SliceBuffer data) { 73 GRPC_LATENT_SEE_PARENT_SCOPE("GRPC:Write"); 74 // Start write and assert previous write finishes. 75 auto prev = write_state_->state.exchange(WriteState::kWriting, 76 std::memory_order_relaxed); 77 CHECK(prev == WriteState::kIdle); 78 bool completed; 79 if (data.Length() == 0) { 80 completed = true; 81 } else { 82 // TODO(ladynana): Replace this with `SliceBufferCast<>` when it is 83 // available. 84 grpc_slice_buffer_swap(write_state_->buffer.c_slice_buffer(), 85 data.c_slice_buffer()); 86 // If `Write()` returns true immediately, the callback will not be called. 87 // We still need to call our callback to pick up the result. 88 write_state_->waker = GetContext<Activity>()->MakeNonOwningWaker(); 89 completed = endpoint_->Write( 90 [write_state = write_state_](absl::Status status) { 91 ApplicationCallbackExecCtx callback_exec_ctx; 92 ExecCtx exec_ctx; 93 write_state->Complete(std::move(status)); 94 }, 95 &write_state_->buffer, nullptr /* uses default arguments */); 96 if (completed) write_state_->waker = Waker(); 97 } 98 return If( 99 completed, 100 [this]() { 101 return [write_state = write_state_]() { 102 auto prev = write_state->state.exchange(WriteState::kIdle, 103 std::memory_order_relaxed); 104 CHECK(prev == WriteState::kWriting); 105 return absl::OkStatus(); 106 }; 107 }, 108 GRPC_LATENT_SEE_PROMISE( 109 "DelayedWrite", ([this]() { 110 return [write_state = write_state_]() -> Poll<absl::Status> { 111 // If current write isn't finished return `Pending()`, else 112 // return write result. 113 WriteState::State expected = WriteState::kWritten; 114 if (write_state->state.compare_exchange_strong( 115 expected, WriteState::kIdle, std::memory_order_acquire, 116 std::memory_order_relaxed)) { 117 // State was Written, and we changed it to Idle. We can return 118 // the result. 119 return std::move(write_state->result); 120 } 121 // State was not Written; since we're polling it must be 122 // Writing. Assert that and return Pending. 123 CHECK(expected == WriteState::kWriting); 124 return Pending(); 125 }; 126 }))); 127 } 128 129 // Returns a promise that resolves to `SliceBuffer` with 130 // `num_bytes` bytes. 131 // 132 // Concurrent reads are not supported, which means callers should not call 133 // `Read()` before the previous read finishes. Doing that results in 134 // undefined behavior. Read(size_t num_bytes)135 auto Read(size_t num_bytes) { 136 GRPC_LATENT_SEE_PARENT_SCOPE("GRPC:Read"); 137 // Assert previous read finishes. 138 CHECK(!read_state_->complete.load(std::memory_order_relaxed)); 139 // Should not have pending reads. 140 CHECK(read_state_->pending_buffer.Count() == 0u); 141 bool complete = true; 142 while (read_state_->buffer.Length() < num_bytes) { 143 // Set read args with hinted bytes. 144 grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs 145 read_args = { 146 static_cast<int64_t>(num_bytes - read_state_->buffer.Length())}; 147 // If `Read()` returns true immediately, the callback will not be 148 // called. 149 read_state_->waker = GetContext<Activity>()->MakeNonOwningWaker(); 150 if (endpoint_->Read( 151 [read_state = read_state_, num_bytes](absl::Status status) { 152 ApplicationCallbackExecCtx callback_exec_ctx; 153 ExecCtx exec_ctx; 154 read_state->Complete(std::move(status), num_bytes); 155 }, 156 &read_state_->pending_buffer, &read_args)) { 157 read_state_->waker = Waker(); 158 read_state_->pending_buffer.MoveFirstNBytesIntoSliceBuffer( 159 read_state_->pending_buffer.Length(), read_state_->buffer); 160 DCHECK(read_state_->pending_buffer.Count() == 0u); 161 } else { 162 complete = false; 163 break; 164 } 165 } 166 return If( 167 complete, 168 [this, num_bytes]() { 169 SliceBuffer ret; 170 grpc_slice_buffer_move_first_no_inline( 171 read_state_->buffer.c_slice_buffer(), num_bytes, 172 ret.c_slice_buffer()); 173 return [ret = std::move( 174 ret)]() mutable -> Poll<absl::StatusOr<SliceBuffer>> { 175 return std::move(ret); 176 }; 177 }, 178 GRPC_LATENT_SEE_PROMISE( 179 "DelayedRead", ([this, num_bytes]() { 180 return [read_state = read_state_, 181 num_bytes]() -> Poll<absl::StatusOr<SliceBuffer>> { 182 if (!read_state->complete.load(std::memory_order_acquire)) { 183 return Pending(); 184 } 185 // If read succeeds, return `SliceBuffer` with `num_bytes` 186 // bytes. 187 if (read_state->result.ok()) { 188 SliceBuffer ret; 189 grpc_slice_buffer_move_first_no_inline( 190 read_state->buffer.c_slice_buffer(), num_bytes, 191 ret.c_slice_buffer()); 192 read_state->complete.store(false, std::memory_order_relaxed); 193 return std::move(ret); 194 } 195 read_state->complete.store(false, std::memory_order_relaxed); 196 return std::move(read_state->result); 197 }; 198 }))); 199 } 200 201 // Returns a promise that resolves to `Slice` with at least 202 // `num_bytes` bytes which should be less than INT64_MAX bytes. 203 // 204 // Concurrent reads are not supported, which means callers should not call 205 // `ReadSlice()` before the previous read finishes. Doing that results in 206 // undefined behavior. ReadSlice(size_t num_bytes)207 auto ReadSlice(size_t num_bytes) { 208 return Map(Read(num_bytes), 209 [](absl::StatusOr<SliceBuffer> buffer) -> absl::StatusOr<Slice> { 210 if (!buffer.ok()) return buffer.status(); 211 return buffer->JoinIntoSlice(); 212 }); 213 } 214 215 // Returns a promise that resolves to a byte with type `uint8_t`. ReadByte()216 auto ReadByte() { 217 return Map(ReadSlice(1), 218 [](absl::StatusOr<Slice> slice) -> absl::StatusOr<uint8_t> { 219 if (!slice.ok()) return slice.status(); 220 return (*slice)[0]; 221 }); 222 } 223 224 // Enables RPC receive coalescing and alignment of memory holding received 225 // RPCs. EnforceRxMemoryAlignmentAndCoalescing()226 void EnforceRxMemoryAlignmentAndCoalescing() { 227 auto* chaotic_good_ext = grpc_event_engine::experimental::QueryExtension< 228 grpc_event_engine::experimental::ChaoticGoodExtension>(endpoint_.get()); 229 if (chaotic_good_ext != nullptr) { 230 chaotic_good_ext->EnforceRxMemoryAlignment(); 231 chaotic_good_ext->EnableRpcReceiveCoalescing(); 232 if (read_state_->buffer.Length() == 0) { 233 return; 234 } 235 236 // Copy everything from read_state_->buffer into a single slice and 237 // replace the contents of read_state_->buffer with that slice. 238 grpc_slice slice = grpc_slice_malloc_large(read_state_->buffer.Length()); 239 CHECK(reinterpret_cast<uintptr_t>(GRPC_SLICE_START_PTR(slice)) % 64 == 0); 240 size_t ofs = 0; 241 for (size_t i = 0; i < read_state_->buffer.Count(); i++) { 242 memcpy( 243 GRPC_SLICE_START_PTR(slice) + ofs, 244 GRPC_SLICE_START_PTR( 245 read_state_->buffer.c_slice_buffer()->slices[i]), 246 GRPC_SLICE_LENGTH(read_state_->buffer.c_slice_buffer()->slices[i])); 247 ofs += 248 GRPC_SLICE_LENGTH(read_state_->buffer.c_slice_buffer()->slices[i]); 249 } 250 251 read_state_->buffer.Clear(); 252 read_state_->buffer.AppendIndexed( 253 grpc_event_engine::experimental::Slice(slice)); 254 DCHECK(read_state_->buffer.Length() == ofs); 255 } 256 } 257 258 const grpc_event_engine::experimental::EventEngine::ResolvedAddress& 259 GetPeerAddress() const; 260 const grpc_event_engine::experimental::EventEngine::ResolvedAddress& 261 GetLocalAddress() const; 262 263 std::shared_ptr<grpc_event_engine::experimental::EventEngine::Endpoint> GetEventEngineEndpoint()264 GetEventEngineEndpoint() const { 265 return endpoint_; 266 } 267 268 private: 269 std::shared_ptr<grpc_event_engine::experimental::EventEngine::Endpoint> 270 endpoint_; 271 272 struct ReadState : public RefCounted<ReadState> { 273 std::atomic<bool> complete{false}; 274 // Read buffer used for storing successful reads given by 275 // `EventEngine::Endpoint` but not yet requested by the caller. 276 grpc_event_engine::experimental::SliceBuffer buffer; 277 // Buffer used to accept data from `EventEngine::Endpoint`. 278 // Every time after a successful read from `EventEngine::Endpoint`, the data 279 // in this buffer should be appended to `buffer`. 280 grpc_event_engine::experimental::SliceBuffer pending_buffer; 281 // Used for store the result from `EventEngine::Endpoint::Read()`. 282 absl::Status result; 283 Waker waker; 284 // Backing endpoint: we keep this on ReadState as reads will need to 285 // repeatedly read until the target size is hit, and we don't want to access 286 // the main object during this dance (indeed the main object may be 287 // deleted). 288 std::weak_ptr<grpc_event_engine::experimental::EventEngine::Endpoint> 289 endpoint; 290 291 void Complete(absl::Status status, size_t num_bytes_requested); 292 }; 293 294 struct WriteState : public RefCounted<WriteState> { 295 enum State : uint8_t { 296 kIdle, // Not writing. 297 kWriting, // Write started, but not completed. 298 kWritten, // Write completed. 299 }; 300 301 std::atomic<State> state{kIdle}; 302 // Write buffer used for `EventEngine::Endpoint::Write()` to ensure the 303 // memory behind the buffer is not lost. 304 grpc_event_engine::experimental::SliceBuffer buffer; 305 // Used for store the result from `EventEngine::Endpoint::Write()`. 306 absl::Status result; 307 Waker waker; 308 CompleteWriteState309 void Complete(absl::Status status) { 310 result = std::move(status); 311 auto w = std::move(waker); 312 auto prev = state.exchange(kWritten, std::memory_order_release); 313 // Previous state should be Writing. If we got anything else we've entered 314 // the callback path twice. 315 CHECK(prev == kWriting); 316 w.Wakeup(); 317 } 318 }; 319 320 RefCountedPtr<WriteState> write_state_ = MakeRefCounted<WriteState>(); 321 RefCountedPtr<ReadState> read_state_ = MakeRefCounted<ReadState>(); 322 }; 323 324 } // namespace grpc_core 325 326 #endif // GRPC_SRC_CORE_LIB_TRANSPORT_PROMISE_ENDPOINT_H 327