• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_TEST_CORE_EVENT_ENGINE_EVENT_ENGINE_TEST_UTILS_H
16 #define GRPC_TEST_CORE_EVENT_ENGINE_EVENT_ENGINE_TEST_UTILS_H
17 
18 #include <grpc/event_engine/event_engine.h>
19 #include <grpc/event_engine/slice_buffer.h>
20 
21 #include <functional>
22 #include <map>
23 #include <memory>
24 #include <string>
25 #include <thread>
26 #include <tuple>
27 #include <utility>
28 #include <vector>
29 
30 #include "absl/status/status.h"
31 #include "absl/status/statusor.h"
32 #include "absl/strings/string_view.h"
33 #include "src/core/lib/resource_quota/memory_quota.h"
34 #include "src/core/util/notification.h"
35 #include "src/core/util/sync.h"
36 
37 using EventEngineFactory = std::function<
38     std::unique_ptr<grpc_event_engine::experimental::EventEngine>()>;
39 
40 namespace grpc_event_engine {
41 namespace experimental {
42 
43 std::string ExtractSliceBufferIntoString(SliceBuffer* buf);
44 
45 // Returns a random message with bounded length.
46 std::string GetNextSendMessage();
47 
48 // Waits until the use_count of the EventEngine shared_ptr has reached 1
49 // and returns.
50 // Callers must give up their ref, or this method will block forever.
51 // Usage: WaitForSingleOwner(std::move(engine))
52 void WaitForSingleOwner(std::shared_ptr<EventEngine> engine);
53 
54 // Waits until the use_count of the EventEngine shared_ptr has reached 1
55 // and returns.
56 // Callers must give up their ref, or this method will block forever.
57 // This version will CRASH after the given timeout
58 // Usage: WaitForSingleOwner(std::move(engine), 30s)
59 void WaitForSingleOwnerWithTimeout(std::shared_ptr<EventEngine> engine,
60                                    EventEngine::Duration timeout);
61 
62 // A helper method to exchange data between two endpoints. It is assumed
63 // that both endpoints are connected. The data (specified as a string) is
64 // written by the sender_endpoint and read by the receiver_endpoint. It
65 // returns OK status only if data written == data read. It also blocks the
66 // calling thread until said Write and Read operations are complete.
67 absl::Status SendValidatePayload(absl::string_view data,
68                                  EventEngine::Endpoint* send_endpoint,
69                                  EventEngine::Endpoint* receive_endpoint);
70 
71 // A helper class to create clients/listeners and connections between them.
72 // The clients and listeners can be created by the oracle EventEngine
73 // or the EventEngine under test. The class provides handles into the
74 // connections that are created. Individual tests can test expected behavior by
75 // exchanging arbitrary data over these connections.
76 class ConnectionManager {
77  public:
ConnectionManager(std::unique_ptr<EventEngine> test_event_engine,std::unique_ptr<EventEngine> oracle_event_engine)78   ConnectionManager(std::unique_ptr<EventEngine> test_event_engine,
79                     std::unique_ptr<EventEngine> oracle_event_engine)
80       : memory_quota_(std::make_unique<grpc_core::MemoryQuota>("foo")),
81         test_event_engine_(std::move(test_event_engine)),
82         oracle_event_engine_(std::move(oracle_event_engine)) {}
83   ~ConnectionManager() = default;
84 
85   // It creates and starts a listener bound to all the specified list of
86   //  addresses.  If successful, return OK status. The type of the listener is
87   //  determined by the 2nd argument.
88   absl::Status BindAndStartListener(const std::vector<std::string>& addrs,
89                                     bool listener_type_oracle = true);
90 
91   // If connection is successful, returns a tuple containing:
92   //    1. a pointer to the client side endpoint of the connection.
93   //    2. a pointer to the server side endpoint of the connection.
94   // If un-successful it returns a non-OK  status containing the error
95   // encountered.
96   absl::StatusOr<std::tuple<std::unique_ptr<EventEngine::Endpoint>,
97                             std::unique_ptr<EventEngine::Endpoint>>>
98   CreateConnection(std::string target_addr, EventEngine::Duration timeout,
99                    bool client_type_oracle);
100 
101  private:
102   class Connection {
103    public:
104     Connection() = default;
105     ~Connection() = default;
106 
SetClientEndpoint(std::unique_ptr<EventEngine::Endpoint> && client_endpoint)107     void SetClientEndpoint(
108         std::unique_ptr<EventEngine::Endpoint>&& client_endpoint) {
109       client_endpoint_ = std::move(client_endpoint);
110       client_signal_.Notify();
111     }
SetServerEndpoint(std::unique_ptr<EventEngine::Endpoint> && server_endpoint)112     void SetServerEndpoint(
113         std::unique_ptr<EventEngine::Endpoint>&& server_endpoint) {
114       server_endpoint_ = std::move(server_endpoint);
115       server_signal_.Notify();
116     }
GetClientEndpoint()117     std::unique_ptr<EventEngine::Endpoint> GetClientEndpoint() {
118       auto client_endpoint = std::move(client_endpoint_);
119       client_endpoint_.reset();
120       return client_endpoint;
121     }
GetServerEndpoint()122     std::unique_ptr<EventEngine::Endpoint> GetServerEndpoint() {
123       auto server_endpoint = std::move(server_endpoint_);
124       server_endpoint_.reset();
125       return server_endpoint;
126     }
127 
128    private:
129     std::unique_ptr<EventEngine::Endpoint> client_endpoint_;
130     std::unique_ptr<EventEngine::Endpoint> server_endpoint_;
131     grpc_core::Notification client_signal_;
132     grpc_core::Notification server_signal_;
133   };
134 
135   grpc_core::Mutex mu_;
136   std::unique_ptr<grpc_core::MemoryQuota> memory_quota_;
137   int num_processed_connections_ = 0;
138   Connection last_in_progress_connection_;
139   std::map<std::string, std::shared_ptr<EventEngine::Listener>> listeners_;
140   std::unique_ptr<EventEngine> test_event_engine_;
141   std::unique_ptr<EventEngine> oracle_event_engine_;
142 };
143 
144 void AppendStringToSliceBuffer(SliceBuffer* buf, absl::string_view data);
145 
146 class NotifyOnDelete {
147  public:
NotifyOnDelete(grpc_core::Notification * signal)148   explicit NotifyOnDelete(grpc_core::Notification* signal) : signal_(signal) {}
149   NotifyOnDelete(const NotifyOnDelete&) = delete;
150   NotifyOnDelete& operator=(const NotifyOnDelete&) = delete;
NotifyOnDelete(NotifyOnDelete && other)151   NotifyOnDelete(NotifyOnDelete&& other) noexcept {
152     signal_ = other.signal_;
153     other.signal_ = nullptr;
154   }
155   NotifyOnDelete& operator=(NotifyOnDelete&& other) noexcept {
156     signal_ = other.signal_;
157     other.signal_ = nullptr;
158     return *this;
159   }
~NotifyOnDelete()160   ~NotifyOnDelete() {
161     if (signal_ != nullptr) {
162       signal_->Notify();
163     }
164   }
165 
166  private:
167   grpc_core::Notification* signal_;
168 };
169 
170 // An endpoint implementation that supports Read and Write via std::threads.
171 // Passing a grpc_core::Notification will allow owners to know when all
172 // in-flight callbacks have been run, and all endpoint state has been destroyed.
173 class ThreadedNoopEndpoint : public EventEngine::Endpoint {
174  public:
ThreadedNoopEndpoint(grpc_core::Notification * destroyed)175   explicit ThreadedNoopEndpoint(grpc_core::Notification* destroyed)
176       : state_(std::make_shared<EndpointState>(destroyed)) {}
~ThreadedNoopEndpoint()177   ~ThreadedNoopEndpoint() override {
178     std::thread deleter([state = state_]() {
179       CleanupThread(state->read);
180       CleanupThread(state->write);
181     });
182     deleter.detach();
183   }
184 
Read(absl::AnyInvocable<void (absl::Status)> on_read,SliceBuffer * buffer,const ReadArgs *)185   bool Read(absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer,
186             const ReadArgs* /* args */) override {
187     buffer->Clear();
188     CleanupThread(state_->read);
189     state_->read = new std::thread([cb = std::move(on_read)]() mutable {
190       cb(absl::UnknownError("test"));
191     });
192     return false;
193   }
194 
Write(absl::AnyInvocable<void (absl::Status)> on_writable,SliceBuffer * data,const WriteArgs *)195   bool Write(absl::AnyInvocable<void(absl::Status)> on_writable,
196              SliceBuffer* data, const WriteArgs* /* args */) override {
197     data->Clear();
198     CleanupThread(state_->write);
199     state_->write = new std::thread([cb = std::move(on_writable)]() mutable {
200       cb(absl::UnknownError("test"));
201     });
202     return false;
203   }
204 
GetPeerAddress()205   const EventEngine::ResolvedAddress& GetPeerAddress() const override {
206     return peer_;
207   }
208 
GetLocalAddress()209   const EventEngine::ResolvedAddress& GetLocalAddress() const override {
210     return local_;
211   }
212 
213  private:
214   struct EndpointState {
EndpointStateEndpointState215     explicit EndpointState(grpc_core::Notification* deleter)
216         : delete_notifier_(deleter) {}
217     std::thread* read = nullptr;
218     std::thread* write = nullptr;
219     NotifyOnDelete delete_notifier_;
220   };
221 
CleanupThread(std::thread * thd)222   static void CleanupThread(std::thread* thd) {
223     if (thd != nullptr) {
224       thd->join();
225       delete thd;
226     }
227   }
228 
229   std::shared_ptr<EndpointState> state_;
230   EventEngine::ResolvedAddress peer_;
231   EventEngine::ResolvedAddress local_;
232 };
233 
234 }  // namespace experimental
235 }  // namespace grpc_event_engine
236 
237 #endif  // GRPC_TEST_CORE_EVENT_ENGINE_EVENT_ENGINE_TEST_UTILS_H
238