1 // Copyright 2023 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "src/core/lib/transport/promise_endpoint.h"
16
17 #include <grpc/event_engine/event_engine.h>
18 #include <grpc/event_engine/slice_buffer.h>
19 #include <grpc/slice_buffer.h>
20 #include <grpc/support/port_platform.h>
21
22 #include <atomic>
23 #include <functional>
24 #include <memory>
25 #include <utility>
26
27 #include "absl/log/check.h"
28 #include "absl/status/status.h"
29 #include "absl/types/optional.h"
30 #include "src/core/lib/slice/slice_buffer.h"
31 #include "src/core/util/sync.h"
32
33 namespace grpc_core {
34
PromiseEndpoint(std::unique_ptr<grpc_event_engine::experimental::EventEngine::Endpoint> endpoint,SliceBuffer already_received)35 PromiseEndpoint::PromiseEndpoint(
36 std::unique_ptr<grpc_event_engine::experimental::EventEngine::Endpoint>
37 endpoint,
38 SliceBuffer already_received)
39 : endpoint_(std::move(endpoint)) {
40 CHECK_NE(endpoint_, nullptr);
41 read_state_->endpoint = endpoint_;
42 // TODO(ladynana): Replace this with `SliceBufferCast<>` when it is
43 // available.
44 grpc_slice_buffer_swap(read_state_->buffer.c_slice_buffer(),
45 already_received.c_slice_buffer());
46 }
47
48 const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
GetPeerAddress() const49 PromiseEndpoint::GetPeerAddress() const {
50 return endpoint_->GetPeerAddress();
51 }
52
53 const grpc_event_engine::experimental::EventEngine::ResolvedAddress&
GetLocalAddress() const54 PromiseEndpoint::GetLocalAddress() const {
55 return endpoint_->GetLocalAddress();
56 }
57
Complete(absl::Status status,const size_t num_bytes_requested)58 void PromiseEndpoint::ReadState::Complete(absl::Status status,
59 const size_t num_bytes_requested) {
60 while (true) {
61 if (!status.ok()) {
62 // Invalidates all previous reads.
63 pending_buffer.Clear();
64 buffer.Clear();
65 result = status;
66 auto w = std::move(waker);
67 complete.store(true, std::memory_order_release);
68 w.Wakeup();
69 return;
70 }
71 // Appends `pending_buffer` to `buffer`.
72 pending_buffer.MoveFirstNBytesIntoSliceBuffer(pending_buffer.Length(),
73 buffer);
74 DCHECK(pending_buffer.Count() == 0u);
75 if (buffer.Length() < num_bytes_requested) {
76 // A further read is needed.
77 // Set read args with number of bytes needed as hint.
78 grpc_event_engine::experimental::EventEngine::Endpoint::ReadArgs
79 read_args = {
80 static_cast<int64_t>(num_bytes_requested - buffer.Length())};
81 // If `Read()` returns true immediately, the callback will not be
82 // called. We still need to call our callback to pick up the result and
83 // maybe do further reads.
84 auto ep = endpoint.lock();
85 if (ep == nullptr) {
86 status = absl::UnavailableError("Endpoint closed during read.");
87 continue;
88 }
89 if (ep->Read(
90 [self = Ref(), num_bytes_requested](absl::Status status) {
91 ApplicationCallbackExecCtx callback_exec_ctx;
92 ExecCtx exec_ctx;
93 self->Complete(std::move(status), num_bytes_requested);
94 },
95 &pending_buffer, &read_args)) {
96 continue;
97 }
98 return;
99 }
100 result = status;
101 auto w = std::move(waker);
102 complete.store(true, std::memory_order_release);
103 w.Wakeup();
104 return;
105 }
106 }
107
108 } // namespace grpc_core
109