1 // Copyright 2022 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_TEST_CORE_EVENT_ENGINE_TEST_SUITE_POSIX_ORACLE_EVENT_ENGINE_POSIX_H 16 #define GRPC_TEST_CORE_EVENT_ENGINE_TEST_SUITE_POSIX_ORACLE_EVENT_ENGINE_POSIX_H 17 18 #include <grpc/event_engine/endpoint_config.h> 19 #include <grpc/event_engine/event_engine.h> 20 #include <grpc/event_engine/memory_allocator.h> 21 #include <grpc/event_engine/slice_buffer.h> 22 23 #include <memory> 24 #include <string> 25 #include <utility> 26 #include <vector> 27 28 #include "absl/base/thread_annotations.h" 29 #include "absl/functional/any_invocable.h" 30 #include "absl/status/status.h" 31 #include "absl/status/statusor.h" 32 #include "src/core/util/crash.h" 33 #include "src/core/util/notification.h" 34 #include "src/core/util/sync.h" 35 #include "src/core/util/thd.h" 36 #include "test/core/event_engine/event_engine_test_utils.h" 37 38 namespace grpc_event_engine { 39 namespace experimental { 40 41 class PosixOracleEndpoint : public EventEngine::Endpoint { 42 public: 43 explicit PosixOracleEndpoint(int socket_fd); 44 static std::unique_ptr<PosixOracleEndpoint> Create(int socket_fd); 45 ~PosixOracleEndpoint() override; 46 bool Read(absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer, 47 const ReadArgs* args) override; 48 bool Write(absl::AnyInvocable<void(absl::Status)> on_writable, 49 SliceBuffer* data, const WriteArgs* args) override; 50 void Shutdown(); GetPeerAddress()51 EventEngine::ResolvedAddress& GetPeerAddress() const override { 52 grpc_core::Crash("unimplemented"); 53 } GetLocalAddress()54 EventEngine::ResolvedAddress& GetLocalAddress() const override { 55 grpc_core::Crash("unimplemented"); 56 } 57 58 private: 59 // An internal helper class definition of Read operations to be performed 60 // by the TCPServerEndpoint. 61 class ReadOperation { 62 public: ReadOperation()63 ReadOperation() 64 : num_bytes_to_read_(-1), buffer_(nullptr), on_complete_(nullptr) {} ReadOperation(int num_bytes_to_read,SliceBuffer * buffer,absl::AnyInvocable<void (absl::Status)> && on_complete)65 ReadOperation(int num_bytes_to_read, SliceBuffer* buffer, 66 absl::AnyInvocable<void(absl::Status)>&& on_complete) 67 : num_bytes_to_read_(num_bytes_to_read), 68 buffer_(buffer), 69 on_complete_(std::move(on_complete)) {} IsValid()70 bool IsValid() { return num_bytes_to_read_ >= 0 && buffer_ != nullptr; } GetNumBytesToRead()71 int GetNumBytesToRead() const { return num_bytes_to_read_; } operator()72 void operator()(std::string read_data, absl::Status status) { 73 if (on_complete_ != nullptr) { 74 AppendStringToSliceBuffer(std::exchange(buffer_, nullptr), read_data); 75 std::exchange(on_complete_, nullptr)(status); 76 } 77 } 78 79 private: 80 int num_bytes_to_read_; 81 SliceBuffer* buffer_; 82 absl::AnyInvocable<void(absl::Status)> on_complete_; 83 }; 84 85 // An internal helper class definition of Write operations to be performed 86 // by the TCPServerEndpoint. 87 class WriteOperation { 88 public: WriteOperation()89 WriteOperation() : bytes_to_write_(std::string()), on_complete_(nullptr) {} WriteOperation(SliceBuffer * buffer,absl::AnyInvocable<void (absl::Status)> && on_complete)90 WriteOperation(SliceBuffer* buffer, 91 absl::AnyInvocable<void(absl::Status)>&& on_complete) 92 : bytes_to_write_(ExtractSliceBufferIntoString(buffer)), 93 on_complete_(std::move(on_complete)) {} IsValid()94 bool IsValid() { return !bytes_to_write_.empty(); } GetBytesToWrite()95 std::string GetBytesToWrite() const { return bytes_to_write_; } operator()96 void operator()(absl::Status status) { 97 if (on_complete_ != nullptr) { 98 std::exchange(on_complete_, nullptr)(status); 99 } 100 } 101 102 private: 103 std::string bytes_to_write_; 104 absl::AnyInvocable<void(absl::Status)> on_complete_; 105 }; 106 107 void ProcessReadOperations(); 108 void ProcessWriteOperations(); 109 110 mutable grpc_core::Mutex mu_; 111 bool is_shutdown_ = false; 112 int socket_fd_; 113 ReadOperation read_ops_channel_; 114 WriteOperation write_ops_channel_; 115 std::unique_ptr<grpc_core::Notification> read_op_signal_{ 116 new grpc_core::Notification()}; 117 std::unique_ptr<grpc_core::Notification> write_op_signal_{ 118 new grpc_core::Notification()}; 119 grpc_core::Thread read_ops_ ABSL_GUARDED_BY(mu_); 120 grpc_core::Thread write_ops_ ABSL_GUARDED_BY(mu_); 121 }; 122 123 class PosixOracleListener : public EventEngine::Listener { 124 public: 125 PosixOracleListener( 126 EventEngine::Listener::AcceptCallback on_accept, 127 absl::AnyInvocable<void(absl::Status)> on_shutdown, 128 std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory); 129 ~PosixOracleListener() override; 130 absl::StatusOr<int> Bind(const EventEngine::ResolvedAddress& addr) override; 131 absl::Status Start() override; 132 133 private: 134 void HandleIncomingConnections(); 135 136 mutable grpc_core::Mutex mu_; 137 EventEngine::Listener::AcceptCallback on_accept_; 138 absl::AnyInvocable<void(absl::Status)> on_shutdown_; 139 std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory_; 140 grpc_core::Thread serve_; 141 int pipefd_[2]; 142 bool is_started_ = false; 143 std::vector<int> listener_fds_; 144 }; 145 146 // A posix based oracle EventEngine. 147 class PosixOracleEventEngine final : public EventEngine { 148 public: 149 PosixOracleEventEngine() = default; 150 ~PosixOracleEventEngine() override = default; 151 CreateListener(Listener::AcceptCallback on_accept,absl::AnyInvocable<void (absl::Status)> on_shutdown,const EndpointConfig &,std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)152 absl::StatusOr<std::unique_ptr<Listener>> CreateListener( 153 Listener::AcceptCallback on_accept, 154 absl::AnyInvocable<void(absl::Status)> on_shutdown, 155 const EndpointConfig& /*config*/, 156 std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) 157 override { 158 return std::make_unique<PosixOracleListener>( 159 std::move(on_accept), std::move(on_shutdown), 160 std::move(memory_allocator_factory)); 161 } 162 163 ConnectionHandle Connect(OnConnectCallback on_connect, 164 const ResolvedAddress& addr, 165 const EndpointConfig& args, 166 MemoryAllocator memory_allocator, 167 EventEngine::Duration timeout) override; 168 CancelConnect(ConnectionHandle)169 bool CancelConnect(ConnectionHandle /*handle*/) override { 170 grpc_core::Crash("unimplemented"); 171 } IsWorkerThread()172 bool IsWorkerThread() override { return false; }; GetDNSResolver(const DNSResolver::ResolverOptions &)173 absl::StatusOr<std::unique_ptr<DNSResolver>> GetDNSResolver( 174 const DNSResolver::ResolverOptions& /*options*/) override { 175 grpc_core::Crash("unimplemented"); 176 } Run(Closure *)177 void Run(Closure* /*closure*/) override { grpc_core::Crash("unimplemented"); } Run(absl::AnyInvocable<void ()>)178 void Run(absl::AnyInvocable<void()> /*closure*/) override { 179 grpc_core::Crash("unimplemented"); 180 } RunAfter(EventEngine::Duration,Closure *)181 TaskHandle RunAfter(EventEngine::Duration /*duration*/, 182 Closure* /*closure*/) override { 183 grpc_core::Crash("unimplemented"); 184 } RunAfter(EventEngine::Duration,absl::AnyInvocable<void ()>)185 TaskHandle RunAfter(EventEngine::Duration /*duration*/, 186 absl::AnyInvocable<void()> /*closure*/) override { 187 grpc_core::Crash("unimplemented"); 188 } Cancel(TaskHandle)189 bool Cancel(TaskHandle /*handle*/) override { 190 grpc_core::Crash("unimplemented"); 191 } 192 }; 193 194 } // namespace experimental 195 } // namespace grpc_event_engine 196 197 #endif // GRPC_TEST_CORE_EVENT_ENGINE_TEST_SUITE_POSIX_ORACLE_EVENT_ENGINE_POSIX_H 198