1 // Copyright 2022 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_TEST_CORE_EVENT_ENGINE_EVENT_ENGINE_TEST_UTILS_H 16 #define GRPC_TEST_CORE_EVENT_ENGINE_EVENT_ENGINE_TEST_UTILS_H 17 18 #include <grpc/event_engine/event_engine.h> 19 #include <grpc/event_engine/slice_buffer.h> 20 21 #include <functional> 22 #include <map> 23 #include <memory> 24 #include <string> 25 #include <thread> 26 #include <tuple> 27 #include <utility> 28 #include <vector> 29 30 #include "absl/status/status.h" 31 #include "absl/status/statusor.h" 32 #include "absl/strings/string_view.h" 33 #include "src/core/lib/resource_quota/memory_quota.h" 34 #include "src/core/util/notification.h" 35 #include "src/core/util/sync.h" 36 37 using EventEngineFactory = std::function< 38 std::unique_ptr<grpc_event_engine::experimental::EventEngine>()>; 39 40 namespace grpc_event_engine { 41 namespace experimental { 42 43 std::string ExtractSliceBufferIntoString(SliceBuffer* buf); 44 45 // Returns a random message with bounded length. 46 std::string GetNextSendMessage(); 47 48 // Waits until the use_count of the EventEngine shared_ptr has reached 1 49 // and returns. 50 // Callers must give up their ref, or this method will block forever. 51 // Usage: WaitForSingleOwner(std::move(engine)) 52 void WaitForSingleOwner(std::shared_ptr<EventEngine> engine); 53 54 // Waits until the use_count of the EventEngine shared_ptr has reached 1 55 // and returns. 56 // Callers must give up their ref, or this method will block forever. 57 // This version will CRASH after the given timeout 58 // Usage: WaitForSingleOwner(std::move(engine), 30s) 59 void WaitForSingleOwnerWithTimeout(std::shared_ptr<EventEngine> engine, 60 EventEngine::Duration timeout); 61 62 // A helper method to exchange data between two endpoints. It is assumed 63 // that both endpoints are connected. The data (specified as a string) is 64 // written by the sender_endpoint and read by the receiver_endpoint. It 65 // returns OK status only if data written == data read. It also blocks the 66 // calling thread until said Write and Read operations are complete. 67 absl::Status SendValidatePayload(absl::string_view data, 68 EventEngine::Endpoint* send_endpoint, 69 EventEngine::Endpoint* receive_endpoint); 70 71 // A helper class to create clients/listeners and connections between them. 72 // The clients and listeners can be created by the oracle EventEngine 73 // or the EventEngine under test. The class provides handles into the 74 // connections that are created. Individual tests can test expected behavior by 75 // exchanging arbitrary data over these connections. 76 class ConnectionManager { 77 public: ConnectionManager(std::unique_ptr<EventEngine> test_event_engine,std::unique_ptr<EventEngine> oracle_event_engine)78 ConnectionManager(std::unique_ptr<EventEngine> test_event_engine, 79 std::unique_ptr<EventEngine> oracle_event_engine) 80 : memory_quota_(std::make_unique<grpc_core::MemoryQuota>("foo")), 81 test_event_engine_(std::move(test_event_engine)), 82 oracle_event_engine_(std::move(oracle_event_engine)) {} 83 ~ConnectionManager() = default; 84 85 // It creates and starts a listener bound to all the specified list of 86 // addresses. If successful, return OK status. The type of the listener is 87 // determined by the 2nd argument. 88 absl::Status BindAndStartListener(const std::vector<std::string>& addrs, 89 bool listener_type_oracle = true); 90 91 // If connection is successful, returns a tuple containing: 92 // 1. a pointer to the client side endpoint of the connection. 93 // 2. a pointer to the server side endpoint of the connection. 94 // If un-successful it returns a non-OK status containing the error 95 // encountered. 96 absl::StatusOr<std::tuple<std::unique_ptr<EventEngine::Endpoint>, 97 std::unique_ptr<EventEngine::Endpoint>>> 98 CreateConnection(std::string target_addr, EventEngine::Duration timeout, 99 bool client_type_oracle); 100 101 private: 102 class Connection { 103 public: 104 Connection() = default; 105 ~Connection() = default; 106 SetClientEndpoint(std::unique_ptr<EventEngine::Endpoint> && client_endpoint)107 void SetClientEndpoint( 108 std::unique_ptr<EventEngine::Endpoint>&& client_endpoint) { 109 client_endpoint_ = std::move(client_endpoint); 110 client_signal_.Notify(); 111 } SetServerEndpoint(std::unique_ptr<EventEngine::Endpoint> && server_endpoint)112 void SetServerEndpoint( 113 std::unique_ptr<EventEngine::Endpoint>&& server_endpoint) { 114 server_endpoint_ = std::move(server_endpoint); 115 server_signal_.Notify(); 116 } GetClientEndpoint()117 std::unique_ptr<EventEngine::Endpoint> GetClientEndpoint() { 118 auto client_endpoint = std::move(client_endpoint_); 119 client_endpoint_.reset(); 120 return client_endpoint; 121 } GetServerEndpoint()122 std::unique_ptr<EventEngine::Endpoint> GetServerEndpoint() { 123 auto server_endpoint = std::move(server_endpoint_); 124 server_endpoint_.reset(); 125 return server_endpoint; 126 } 127 128 private: 129 std::unique_ptr<EventEngine::Endpoint> client_endpoint_; 130 std::unique_ptr<EventEngine::Endpoint> server_endpoint_; 131 grpc_core::Notification client_signal_; 132 grpc_core::Notification server_signal_; 133 }; 134 135 grpc_core::Mutex mu_; 136 std::unique_ptr<grpc_core::MemoryQuota> memory_quota_; 137 int num_processed_connections_ = 0; 138 Connection last_in_progress_connection_; 139 std::map<std::string, std::shared_ptr<EventEngine::Listener>> listeners_; 140 std::unique_ptr<EventEngine> test_event_engine_; 141 std::unique_ptr<EventEngine> oracle_event_engine_; 142 }; 143 144 void AppendStringToSliceBuffer(SliceBuffer* buf, absl::string_view data); 145 146 class NotifyOnDelete { 147 public: NotifyOnDelete(grpc_core::Notification * signal)148 explicit NotifyOnDelete(grpc_core::Notification* signal) : signal_(signal) {} 149 NotifyOnDelete(const NotifyOnDelete&) = delete; 150 NotifyOnDelete& operator=(const NotifyOnDelete&) = delete; NotifyOnDelete(NotifyOnDelete && other)151 NotifyOnDelete(NotifyOnDelete&& other) noexcept { 152 signal_ = other.signal_; 153 other.signal_ = nullptr; 154 } 155 NotifyOnDelete& operator=(NotifyOnDelete&& other) noexcept { 156 signal_ = other.signal_; 157 other.signal_ = nullptr; 158 return *this; 159 } ~NotifyOnDelete()160 ~NotifyOnDelete() { 161 if (signal_ != nullptr) { 162 signal_->Notify(); 163 } 164 } 165 166 private: 167 grpc_core::Notification* signal_; 168 }; 169 170 // An endpoint implementation that supports Read and Write via std::threads. 171 // Passing a grpc_core::Notification will allow owners to know when all 172 // in-flight callbacks have been run, and all endpoint state has been destroyed. 173 class ThreadedNoopEndpoint : public EventEngine::Endpoint { 174 public: ThreadedNoopEndpoint(grpc_core::Notification * destroyed)175 explicit ThreadedNoopEndpoint(grpc_core::Notification* destroyed) 176 : state_(std::make_shared<EndpointState>(destroyed)) {} ~ThreadedNoopEndpoint()177 ~ThreadedNoopEndpoint() override { 178 std::thread deleter([state = state_]() { 179 CleanupThread(state->read); 180 CleanupThread(state->write); 181 }); 182 deleter.detach(); 183 } 184 Read(absl::AnyInvocable<void (absl::Status)> on_read,SliceBuffer * buffer,const ReadArgs *)185 bool Read(absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer, 186 const ReadArgs* /* args */) override { 187 buffer->Clear(); 188 CleanupThread(state_->read); 189 state_->read = new std::thread([cb = std::move(on_read)]() mutable { 190 cb(absl::UnknownError("test")); 191 }); 192 return false; 193 } 194 Write(absl::AnyInvocable<void (absl::Status)> on_writable,SliceBuffer * data,const WriteArgs *)195 bool Write(absl::AnyInvocable<void(absl::Status)> on_writable, 196 SliceBuffer* data, const WriteArgs* /* args */) override { 197 data->Clear(); 198 CleanupThread(state_->write); 199 state_->write = new std::thread([cb = std::move(on_writable)]() mutable { 200 cb(absl::UnknownError("test")); 201 }); 202 return false; 203 } 204 GetPeerAddress()205 const EventEngine::ResolvedAddress& GetPeerAddress() const override { 206 return peer_; 207 } 208 GetLocalAddress()209 const EventEngine::ResolvedAddress& GetLocalAddress() const override { 210 return local_; 211 } 212 213 private: 214 struct EndpointState { EndpointStateEndpointState215 explicit EndpointState(grpc_core::Notification* deleter) 216 : delete_notifier_(deleter) {} 217 std::thread* read = nullptr; 218 std::thread* write = nullptr; 219 NotifyOnDelete delete_notifier_; 220 }; 221 CleanupThread(std::thread * thd)222 static void CleanupThread(std::thread* thd) { 223 if (thd != nullptr) { 224 thd->join(); 225 delete thd; 226 } 227 } 228 229 std::shared_ptr<EndpointState> state_; 230 EventEngine::ResolvedAddress peer_; 231 EventEngine::ResolvedAddress local_; 232 }; 233 234 } // namespace experimental 235 } // namespace grpc_event_engine 236 237 #endif // GRPC_TEST_CORE_EVENT_ENGINE_EVENT_ENGINE_TEST_UTILS_H 238