• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "test/core/event_engine/event_engine_test_utils.h"
16 
17 #include <grpc/event_engine/event_engine.h>
18 #include <grpc/event_engine/memory_allocator.h>
19 #include <grpc/event_engine/slice.h>
20 #include <grpc/event_engine/slice_buffer.h>
21 #include <grpc/slice_buffer.h>
22 #include <stdlib.h>
23 
24 #include <algorithm>
25 #include <chrono>
26 #include <memory>
27 #include <random>
28 #include <string>
29 #include <utility>
30 
31 #include "absl/log/check.h"
32 #include "absl/log/log.h"
33 #include "absl/status/status.h"
34 #include "absl/status/statusor.h"
35 #include "absl/strings/str_cat.h"
36 #include "absl/time/clock.h"
37 #include "absl/time/time.h"
38 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
39 #include "src/core/lib/event_engine/tcp_socket_utils.h"
40 #include "src/core/lib/resource_quota/memory_quota.h"
41 #include "src/core/util/crash.h"
42 #include "src/core/util/notification.h"
43 #include "src/core/util/time.h"
44 #include "test/core/test_util/build.h"
45 
46 // IWYU pragma: no_include <sys/socket.h>
47 
48 namespace grpc_event_engine {
49 namespace experimental {
50 
51 namespace {
52 constexpr int kMinMessageSize = 1024;
53 constexpr int kMaxMessageSize = 4096;
54 }  // namespace
55 
56 // Returns a random message with bounded length.
GetNextSendMessage()57 std::string GetNextSendMessage() {
58   static const char alphanum[] =
59       "0123456789"
60       "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
61       "abcdefghijklmnopqrstuvwxyz";
62   static std::random_device rd;
63   static std::seed_seq seed{rd()};
64   static std::mt19937 gen(seed);
65   static std::uniform_real_distribution<> dis(kMinMessageSize, kMaxMessageSize);
66   static grpc_core::Mutex g_mu;
67   std::string tmp_s;
68   int len;
69   {
70     grpc_core::MutexLock lock(&g_mu);
71     len = dis(gen);
72   }
73   tmp_s.reserve(len);
74   for (int i = 0; i < len; ++i) {
75     tmp_s += alphanum[rand() % (sizeof(alphanum) - 1)];
76   }
77   return tmp_s;
78 }
79 
WaitForSingleOwner(std::shared_ptr<EventEngine> engine)80 void WaitForSingleOwner(std::shared_ptr<EventEngine> engine) {
81   WaitForSingleOwnerWithTimeout(std::move(engine), std::chrono::hours{24});
82 }
83 
WaitForSingleOwnerWithTimeout(std::shared_ptr<EventEngine> engine,EventEngine::Duration timeout)84 void WaitForSingleOwnerWithTimeout(std::shared_ptr<EventEngine> engine,
85                                    EventEngine::Duration timeout) {
86   int n = 0;
87   auto start = std::chrono::system_clock::now();
88   while (engine.use_count() > 1) {
89     ++n;
90     if (n % 100 == 0) {
91       LOG(INFO) << "Checking for leaks...";
92       AsanAssertNoLeaks();
93     }
94     auto remaining = timeout - (std::chrono::system_clock::now() - start);
95     if (remaining < std::chrono::seconds{0}) {
96       grpc_core::Crash("Timed out waiting for a single EventEngine owner");
97     }
98     LOG_EVERY_N_SEC(INFO, 2)
99         << "engine.use_count() = " << engine.use_count()
100         << " timeout_remaining = "
101         << absl::FormatDuration(absl::Nanoseconds(remaining.count()));
102     absl::SleepFor(absl::Milliseconds(100));
103   }
104 }
105 
AppendStringToSliceBuffer(SliceBuffer * buf,absl::string_view data)106 void AppendStringToSliceBuffer(SliceBuffer* buf, absl::string_view data) {
107   buf->Append(Slice::FromCopiedString(data));
108 }
109 
ExtractSliceBufferIntoString(SliceBuffer * buf)110 std::string ExtractSliceBufferIntoString(SliceBuffer* buf) {
111   if (!buf->Length()) {
112     return std::string();
113   }
114   std::string tmp(buf->Length(), '\0');
115   char* bytes = const_cast<char*>(tmp.c_str());
116   grpc_slice_buffer_move_first_into_buffer(buf->c_slice_buffer(), buf->Length(),
117                                            bytes);
118   return tmp;
119 }
120 
SendValidatePayload(absl::string_view data,EventEngine::Endpoint * send_endpoint,EventEngine::Endpoint * receive_endpoint)121 absl::Status SendValidatePayload(absl::string_view data,
122                                  EventEngine::Endpoint* send_endpoint,
123                                  EventEngine::Endpoint* receive_endpoint) {
124   CHECK_NE(receive_endpoint, nullptr);
125   CHECK_NE(send_endpoint, nullptr);
126   int num_bytes_written = data.size();
127   grpc_core::Notification read_signal;
128   grpc_core::Notification write_signal;
129   SliceBuffer read_slice_buf;
130   SliceBuffer read_store_buf;
131   SliceBuffer write_slice_buf;
132 
133   read_slice_buf.Clear();
134   write_slice_buf.Clear();
135   read_store_buf.Clear();
136   // std::cout << "SendValidatePayload ... " << std::endl;
137   // fflush(stdout);
138 
139   AppendStringToSliceBuffer(&write_slice_buf, data);
140   EventEngine::Endpoint::ReadArgs args = {num_bytes_written};
141   std::function<void(absl::Status)> read_cb;
142   read_cb = [receive_endpoint, &read_slice_buf, &read_store_buf, &read_cb,
143              &read_signal, &args](absl::Status status) {
144     CHECK_OK(status);
145     if (read_slice_buf.Length() == static_cast<size_t>(args.read_hint_bytes)) {
146       read_slice_buf.MoveFirstNBytesIntoSliceBuffer(read_slice_buf.Length(),
147                                                     read_store_buf);
148       read_signal.Notify();
149       return;
150     }
151     args.read_hint_bytes -= read_slice_buf.Length();
152     read_slice_buf.MoveFirstNBytesIntoSliceBuffer(read_slice_buf.Length(),
153                                                   read_store_buf);
154     if (receive_endpoint->Read(read_cb, &read_slice_buf, &args)) {
155       CHECK_NE(read_slice_buf.Length(), 0u);
156       read_cb(absl::OkStatus());
157     }
158   };
159   // Start asynchronous reading at the receive_endpoint.
160   if (receive_endpoint->Read(read_cb, &read_slice_buf, &args)) {
161     read_cb(absl::OkStatus());
162   }
163   // Start asynchronous writing at the send_endpoint.
164   if (send_endpoint->Write(
165           [&write_signal](absl::Status status) {
166             CHECK_OK(status);
167             write_signal.Notify();
168           },
169           &write_slice_buf, nullptr)) {
170     write_signal.Notify();
171   }
172   write_signal.WaitForNotification();
173   read_signal.WaitForNotification();
174   // Check if data written == data read
175   std::string data_read = ExtractSliceBufferIntoString(&read_store_buf);
176   if (data != data_read) {
177     LOG(INFO) << "Data written = " << data;
178     LOG(INFO) << "Data read = " << data_read;
179     return absl::CancelledError("Data read != Data written");
180   }
181   return absl::OkStatus();
182 }
183 
BindAndStartListener(const std::vector<std::string> & addrs,bool listener_type_oracle)184 absl::Status ConnectionManager::BindAndStartListener(
185     const std::vector<std::string>& addrs, bool listener_type_oracle) {
186   grpc_core::MutexLock lock(&mu_);
187   if (addrs.empty()) {
188     return absl::InvalidArgumentError(
189         "Atleast one bind address must be specified");
190   }
191   for (auto& addr : addrs) {
192     if (listeners_.find(addr) != listeners_.end()) {
193       // There is already a listener at this address. Return error.
194       return absl::AlreadyExistsError(
195           absl::StrCat("Listener already existis for address: ", addr));
196     }
197   }
198   EventEngine::Listener::AcceptCallback accept_cb =
199       [this](std::unique_ptr<EventEngine::Endpoint> ep,
200              MemoryAllocator /*memory_allocator*/) {
201         last_in_progress_connection_.SetServerEndpoint(std::move(ep));
202       };
203 
204   EventEngine* event_engine = listener_type_oracle ? oracle_event_engine_.get()
205                                                    : test_event_engine_.get();
206 
207   ChannelArgsEndpointConfig config;
208   auto status = event_engine->CreateListener(
209       std::move(accept_cb), [](absl::Status status) { CHECK_OK(status); },
210       config, std::make_unique<grpc_core::MemoryQuota>("foo"));
211   if (!status.ok()) {
212     return status.status();
213   }
214 
215   std::shared_ptr<EventEngine::Listener> listener((*status).release());
216   for (auto& addr : addrs) {
217     auto bind_status = listener->Bind(*URIToResolvedAddress(addr));
218     if (!bind_status.ok()) {
219       LOG(ERROR) << "Binding listener failed: "
220                  << bind_status.status().ToString();
221       return bind_status.status();
222     }
223   }
224   CHECK_OK(listener->Start());
225   // Insert same listener pointer for all bind addresses after the listener
226   // has started successfully.
227   for (auto& addr : addrs) {
228     listeners_.insert(std::make_pair(addr, listener));
229   }
230   return absl::OkStatus();
231 }
232 
233 absl::StatusOr<std::tuple<std::unique_ptr<EventEngine::Endpoint>,
234                           std::unique_ptr<EventEngine::Endpoint>>>
CreateConnection(std::string target_addr,EventEngine::Duration timeout,bool client_type_oracle)235 ConnectionManager::CreateConnection(std::string target_addr,
236                                     EventEngine::Duration timeout,
237                                     bool client_type_oracle) {
238   // Only allow one CreateConnection call to proceed at a time.
239   grpc_core::MutexLock lock(&mu_);
240   std::string conn_name =
241       absl::StrCat("connection-", std::to_string(num_processed_connections_++));
242   EventEngine* event_engine = client_type_oracle ? oracle_event_engine_.get()
243                                                  : test_event_engine_.get();
244   ChannelArgsEndpointConfig config;
245   event_engine->Connect(
246       [this](absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>> status) {
247         if (!status.ok()) {
248           LOG(ERROR) << "Connect failed: " << status.status().ToString();
249           last_in_progress_connection_.SetClientEndpoint(nullptr);
250         } else {
251           last_in_progress_connection_.SetClientEndpoint(std::move(*status));
252         }
253       },
254       *URIToResolvedAddress(target_addr), config,
255       memory_quota_->CreateMemoryAllocator(conn_name), timeout);
256 
257   auto client_endpoint = last_in_progress_connection_.GetClientEndpoint();
258   if (client_endpoint != nullptr &&
259       listeners_.find(target_addr) != listeners_.end()) {
260     // There is a listener for the specified address. Wait until it
261     // creates a ServerEndpoint after accepting the connection.
262     auto server_endpoint = last_in_progress_connection_.GetServerEndpoint();
263     CHECK(server_endpoint != nullptr);
264     // Set last_in_progress_connection_ to nullptr
265     return std::make_tuple(std::move(client_endpoint),
266                            std::move(server_endpoint));
267   }
268   return absl::CancelledError("Failed to create connection.");
269 }
270 
271 }  // namespace experimental
272 }  // namespace grpc_event_engine
273