1 // Copyright 2022 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_TEST_CORE_EVENT_ENGINE_TEST_SUITE_POSIX_ORACLE_EVENT_ENGINE_POSIX_H 16 #define GRPC_TEST_CORE_EVENT_ENGINE_TEST_SUITE_POSIX_ORACLE_EVENT_ENGINE_POSIX_H 17 18 #include <memory> 19 #include <string> 20 #include <utility> 21 #include <vector> 22 23 #include "absl/base/thread_annotations.h" 24 #include "absl/functional/any_invocable.h" 25 #include "absl/status/status.h" 26 #include "absl/status/statusor.h" 27 28 #include <grpc/event_engine/endpoint_config.h> 29 #include <grpc/event_engine/event_engine.h> 30 #include <grpc/event_engine/memory_allocator.h> 31 #include <grpc/event_engine/slice_buffer.h> 32 33 #include "src/core/lib/gprpp/crash.h" 34 #include "src/core/lib/gprpp/notification.h" 35 #include "src/core/lib/gprpp/sync.h" 36 #include "src/core/lib/gprpp/thd.h" 37 #include "test/core/event_engine/event_engine_test_utils.h" 38 39 namespace grpc_event_engine { 40 namespace experimental { 41 42 class PosixOracleEndpoint : public EventEngine::Endpoint { 43 public: 44 explicit PosixOracleEndpoint(int socket_fd); 45 static std::unique_ptr<PosixOracleEndpoint> Create(int socket_fd); 46 ~PosixOracleEndpoint() override; 47 bool Read(absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer, 48 const ReadArgs* args) override; 49 bool Write(absl::AnyInvocable<void(absl::Status)> on_writable, 50 SliceBuffer* data, const WriteArgs* args) override; 51 void Shutdown(); GetPeerAddress()52 EventEngine::ResolvedAddress& GetPeerAddress() const override { 53 grpc_core::Crash("unimplemented"); 54 } GetLocalAddress()55 EventEngine::ResolvedAddress& GetLocalAddress() const override { 56 grpc_core::Crash("unimplemented"); 57 } 58 59 private: 60 // An internal helper class definition of Read operations to be performed 61 // by the TCPServerEndpoint. 62 class ReadOperation { 63 public: ReadOperation()64 ReadOperation() 65 : num_bytes_to_read_(-1), buffer_(nullptr), on_complete_(nullptr) {} ReadOperation(int num_bytes_to_read,SliceBuffer * buffer,absl::AnyInvocable<void (absl::Status)> && on_complete)66 ReadOperation(int num_bytes_to_read, SliceBuffer* buffer, 67 absl::AnyInvocable<void(absl::Status)>&& on_complete) 68 : num_bytes_to_read_(num_bytes_to_read), 69 buffer_(buffer), 70 on_complete_(std::move(on_complete)) {} IsValid()71 bool IsValid() { return num_bytes_to_read_ >= 0 && buffer_ != nullptr; } GetNumBytesToRead()72 int GetNumBytesToRead() const { return num_bytes_to_read_; } operator()73 void operator()(std::string read_data, absl::Status status) { 74 if (on_complete_ != nullptr) { 75 AppendStringToSliceBuffer(std::exchange(buffer_, nullptr), read_data); 76 std::exchange(on_complete_, nullptr)(status); 77 } 78 } 79 80 private: 81 int num_bytes_to_read_; 82 SliceBuffer* buffer_; 83 absl::AnyInvocable<void(absl::Status)> on_complete_; 84 }; 85 86 // An internal helper class definition of Write operations to be performed 87 // by the TCPServerEndpoint. 88 class WriteOperation { 89 public: WriteOperation()90 WriteOperation() : bytes_to_write_(std::string()), on_complete_(nullptr) {} WriteOperation(SliceBuffer * buffer,absl::AnyInvocable<void (absl::Status)> && on_complete)91 WriteOperation(SliceBuffer* buffer, 92 absl::AnyInvocable<void(absl::Status)>&& on_complete) 93 : bytes_to_write_(ExtractSliceBufferIntoString(buffer)), 94 on_complete_(std::move(on_complete)) {} IsValid()95 bool IsValid() { return bytes_to_write_.length() > 0; } GetBytesToWrite()96 std::string GetBytesToWrite() const { return bytes_to_write_; } operator()97 void operator()(absl::Status status) { 98 if (on_complete_ != nullptr) { 99 std::exchange(on_complete_, nullptr)(status); 100 } 101 } 102 103 private: 104 std::string bytes_to_write_; 105 absl::AnyInvocable<void(absl::Status)> on_complete_; 106 }; 107 108 void ProcessReadOperations(); 109 void ProcessWriteOperations(); 110 111 mutable grpc_core::Mutex mu_; 112 bool is_shutdown_ = false; 113 int socket_fd_; 114 ReadOperation read_ops_channel_; 115 WriteOperation write_ops_channel_; 116 std::unique_ptr<grpc_core::Notification> read_op_signal_{ 117 new grpc_core::Notification()}; 118 std::unique_ptr<grpc_core::Notification> write_op_signal_{ 119 new grpc_core::Notification()}; 120 grpc_core::Thread read_ops_ ABSL_GUARDED_BY(mu_); 121 grpc_core::Thread write_ops_ ABSL_GUARDED_BY(mu_); 122 }; 123 124 class PosixOracleListener : public EventEngine::Listener { 125 public: 126 PosixOracleListener( 127 EventEngine::Listener::AcceptCallback on_accept, 128 absl::AnyInvocable<void(absl::Status)> on_shutdown, 129 std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory); 130 ~PosixOracleListener() override; 131 absl::StatusOr<int> Bind(const EventEngine::ResolvedAddress& addr) override; 132 absl::Status Start() override; 133 134 private: 135 void HandleIncomingConnections(); 136 137 mutable grpc_core::Mutex mu_; 138 EventEngine::Listener::AcceptCallback on_accept_; 139 absl::AnyInvocable<void(absl::Status)> on_shutdown_; 140 std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory_; 141 grpc_core::Thread serve_; 142 int pipefd_[2]; 143 bool is_started_ = false; 144 std::vector<int> listener_fds_; 145 }; 146 147 // A posix based oracle EventEngine. 148 class PosixOracleEventEngine final : public EventEngine { 149 public: 150 PosixOracleEventEngine() = default; 151 ~PosixOracleEventEngine() override = default; 152 CreateListener(Listener::AcceptCallback on_accept,absl::AnyInvocable<void (absl::Status)> on_shutdown,const EndpointConfig &,std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)153 absl::StatusOr<std::unique_ptr<Listener>> CreateListener( 154 Listener::AcceptCallback on_accept, 155 absl::AnyInvocable<void(absl::Status)> on_shutdown, 156 const EndpointConfig& /*config*/, 157 std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) 158 override { 159 return std::make_unique<PosixOracleListener>( 160 std::move(on_accept), std::move(on_shutdown), 161 std::move(memory_allocator_factory)); 162 } 163 164 ConnectionHandle Connect(OnConnectCallback on_connect, 165 const ResolvedAddress& addr, 166 const EndpointConfig& args, 167 MemoryAllocator memory_allocator, 168 EventEngine::Duration timeout) override; 169 CancelConnect(ConnectionHandle)170 bool CancelConnect(ConnectionHandle /*handle*/) override { 171 grpc_core::Crash("unimplemented"); 172 } IsWorkerThread()173 bool IsWorkerThread() override { return false; }; GetDNSResolver(const DNSResolver::ResolverOptions &)174 absl::StatusOr<std::unique_ptr<DNSResolver>> GetDNSResolver( 175 const DNSResolver::ResolverOptions& /*options*/) override { 176 grpc_core::Crash("unimplemented"); 177 } Run(Closure *)178 void Run(Closure* /*closure*/) override { grpc_core::Crash("unimplemented"); } Run(absl::AnyInvocable<void ()>)179 void Run(absl::AnyInvocable<void()> /*closure*/) override { 180 grpc_core::Crash("unimplemented"); 181 } RunAfter(EventEngine::Duration,Closure *)182 TaskHandle RunAfter(EventEngine::Duration /*duration*/, 183 Closure* /*closure*/) override { 184 grpc_core::Crash("unimplemented"); 185 } RunAfter(EventEngine::Duration,absl::AnyInvocable<void ()>)186 TaskHandle RunAfter(EventEngine::Duration /*duration*/, 187 absl::AnyInvocable<void()> /*closure*/) override { 188 grpc_core::Crash("unimplemented"); 189 } Cancel(TaskHandle)190 bool Cancel(TaskHandle /*handle*/) override { 191 grpc_core::Crash("unimplemented"); 192 } 193 }; 194 195 } // namespace experimental 196 } // namespace grpc_event_engine 197 198 #endif // GRPC_TEST_CORE_EVENT_ENGINE_TEST_SUITE_POSIX_ORACLE_EVENT_ENGINE_POSIX_H 199