• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_TEST_CORE_TEST_UTIL_PASSTHROUGH_ENDPOINT_H
16 #define GRPC_TEST_CORE_TEST_UTIL_PASSTHROUGH_ENDPOINT_H
17 
18 #include <grpc/event_engine/event_engine.h>
19 
20 #include <memory>
21 
22 #include "src/core/lib/event_engine/default_event_engine.h"
23 #include "src/core/lib/event_engine/tcp_socket_utils.h"
24 #include "src/core/util/ref_counted.h"
25 
26 namespace grpc_event_engine {
27 namespace experimental {
28 
29 class PassthroughEndpoint final : public EventEngine::Endpoint {
30  public:
31   struct PassthroughEndpointPair {
32     std::unique_ptr<PassthroughEndpoint> client;
33     std::unique_ptr<PassthroughEndpoint> server;
34   };
35   // client_port, server_port are markers that are baked into the peer/local
36   // addresses for debug information.
37   // allow_inline_callbacks is a flag that allows the endpoint to call the
38   // on_read/on_write callbacks inline (but outside any PassthroughEndpoint
39   // locks)
40   static PassthroughEndpointPair MakePassthroughEndpoint(
41       int client_port, int server_port, bool allow_inline_callbacks);
42 
43   ~PassthroughEndpoint() override;
44 
45   bool Read(absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer,
46             const ReadArgs* args) override;
47 
48   bool Write(absl::AnyInvocable<void(absl::Status)> on_write,
49              SliceBuffer* buffer, const WriteArgs* args) override;
50 
GetPeerAddress()51   const EventEngine::ResolvedAddress& GetPeerAddress() const override {
52     return recv_middle_->address;
53   }
GetLocalAddress()54   const EventEngine::ResolvedAddress& GetLocalAddress() const override {
55     return send_middle_->address;
56   }
57 
58  private:
59   class CallbackHelper;
60 
61   struct Middle : public grpc_core::RefCounted<Middle> {
MiddleMiddle62     explicit Middle(int port)
63         : address(URIToResolvedAddress(absl::StrCat("ipv4:127.0.0.1:", port))
64                       .value()) {}
65 
66     void Close(CallbackHelper& callback_helper);
67 
68     grpc_core::Mutex mu;
69     bool closed ABSL_GUARDED_BY(mu) = false;
70     SliceBuffer* read_buffer ABSL_GUARDED_BY(mu) = nullptr;
71     absl::AnyInvocable<void(absl::Status)> on_read ABSL_GUARDED_BY(mu) =
72         nullptr;
73     SliceBuffer* write_buffer ABSL_GUARDED_BY(mu) = nullptr;
74     absl::AnyInvocable<void(absl::Status)> on_write ABSL_GUARDED_BY(mu) =
75         nullptr;
76     EventEngine::ResolvedAddress address;
77   };
78 
PassthroughEndpoint(grpc_core::RefCountedPtr<Middle> send_middle,grpc_core::RefCountedPtr<Middle> recv_middle,bool allow_inline_callbacks)79   PassthroughEndpoint(grpc_core::RefCountedPtr<Middle> send_middle,
80                       grpc_core::RefCountedPtr<Middle> recv_middle,
81                       bool allow_inline_callbacks)
82       : send_middle_(std::move(send_middle)),
83         recv_middle_(std::move(recv_middle)),
84         allow_inline_callbacks_(allow_inline_callbacks) {}
85 
86   grpc_core::RefCountedPtr<Middle> send_middle_;
87   grpc_core::RefCountedPtr<Middle> recv_middle_;
88   std::shared_ptr<EventEngine> event_engine_ = GetDefaultEventEngine();
89   bool allow_inline_callbacks_;
90 };
91 
92 }  // namespace experimental
93 }  // namespace grpc_event_engine
94 
95 #endif  // GRPC_TEST_CORE_TEST_UTIL_PASSTHROUGH_ENDPOINT_H
96