1 // Copyright 2024 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_TEST_CORE_TEST_UTIL_PASSTHROUGH_ENDPOINT_H 16 #define GRPC_TEST_CORE_TEST_UTIL_PASSTHROUGH_ENDPOINT_H 17 18 #include <grpc/event_engine/event_engine.h> 19 20 #include <memory> 21 22 #include "src/core/lib/event_engine/default_event_engine.h" 23 #include "src/core/lib/event_engine/tcp_socket_utils.h" 24 #include "src/core/util/ref_counted.h" 25 26 namespace grpc_event_engine { 27 namespace experimental { 28 29 class PassthroughEndpoint final : public EventEngine::Endpoint { 30 public: 31 struct PassthroughEndpointPair { 32 std::unique_ptr<PassthroughEndpoint> client; 33 std::unique_ptr<PassthroughEndpoint> server; 34 }; 35 // client_port, server_port are markers that are baked into the peer/local 36 // addresses for debug information. 37 // allow_inline_callbacks is a flag that allows the endpoint to call the 38 // on_read/on_write callbacks inline (but outside any PassthroughEndpoint 39 // locks) 40 static PassthroughEndpointPair MakePassthroughEndpoint( 41 int client_port, int server_port, bool allow_inline_callbacks); 42 43 ~PassthroughEndpoint() override; 44 45 bool Read(absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer, 46 const ReadArgs* args) override; 47 48 bool Write(absl::AnyInvocable<void(absl::Status)> on_write, 49 SliceBuffer* buffer, const WriteArgs* args) override; 50 GetPeerAddress()51 const EventEngine::ResolvedAddress& GetPeerAddress() const override { 52 return recv_middle_->address; 53 } GetLocalAddress()54 const EventEngine::ResolvedAddress& GetLocalAddress() const override { 55 return send_middle_->address; 56 } 57 58 private: 59 class CallbackHelper; 60 61 struct Middle : public grpc_core::RefCounted<Middle> { MiddleMiddle62 explicit Middle(int port) 63 : address(URIToResolvedAddress(absl::StrCat("ipv4:127.0.0.1:", port)) 64 .value()) {} 65 66 void Close(CallbackHelper& callback_helper); 67 68 grpc_core::Mutex mu; 69 bool closed ABSL_GUARDED_BY(mu) = false; 70 SliceBuffer* read_buffer ABSL_GUARDED_BY(mu) = nullptr; 71 absl::AnyInvocable<void(absl::Status)> on_read ABSL_GUARDED_BY(mu) = 72 nullptr; 73 SliceBuffer* write_buffer ABSL_GUARDED_BY(mu) = nullptr; 74 absl::AnyInvocable<void(absl::Status)> on_write ABSL_GUARDED_BY(mu) = 75 nullptr; 76 EventEngine::ResolvedAddress address; 77 }; 78 PassthroughEndpoint(grpc_core::RefCountedPtr<Middle> send_middle,grpc_core::RefCountedPtr<Middle> recv_middle,bool allow_inline_callbacks)79 PassthroughEndpoint(grpc_core::RefCountedPtr<Middle> send_middle, 80 grpc_core::RefCountedPtr<Middle> recv_middle, 81 bool allow_inline_callbacks) 82 : send_middle_(std::move(send_middle)), 83 recv_middle_(std::move(recv_middle)), 84 allow_inline_callbacks_(allow_inline_callbacks) {} 85 86 grpc_core::RefCountedPtr<Middle> send_middle_; 87 grpc_core::RefCountedPtr<Middle> recv_middle_; 88 std::shared_ptr<EventEngine> event_engine_ = GetDefaultEventEngine(); 89 bool allow_inline_callbacks_; 90 }; 91 92 } // namespace experimental 93 } // namespace grpc_event_engine 94 95 #endif // GRPC_TEST_CORE_TEST_UTIL_PASSTHROUGH_ENDPOINT_H 96