1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "test/core/event_engine/event_engine_test_utils.h"
16
17 #include <grpc/event_engine/event_engine.h>
18 #include <grpc/event_engine/memory_allocator.h>
19 #include <grpc/event_engine/slice.h>
20 #include <grpc/event_engine/slice_buffer.h>
21 #include <grpc/slice_buffer.h>
22 #include <stdlib.h>
23
24 #include <algorithm>
25 #include <chrono>
26 #include <memory>
27 #include <random>
28 #include <string>
29 #include <utility>
30
31 #include "absl/log/check.h"
32 #include "absl/log/log.h"
33 #include "absl/status/status.h"
34 #include "absl/status/statusor.h"
35 #include "absl/strings/str_cat.h"
36 #include "absl/time/clock.h"
37 #include "absl/time/time.h"
38 #include "src/core/lib/event_engine/channel_args_endpoint_config.h"
39 #include "src/core/lib/event_engine/tcp_socket_utils.h"
40 #include "src/core/lib/resource_quota/memory_quota.h"
41 #include "src/core/util/crash.h"
42 #include "src/core/util/notification.h"
43 #include "src/core/util/time.h"
44 #include "test/core/test_util/build.h"
45
46 // IWYU pragma: no_include <sys/socket.h>
47
48 namespace grpc_event_engine {
49 namespace experimental {
50
51 namespace {
52 constexpr int kMinMessageSize = 1024;
53 constexpr int kMaxMessageSize = 4096;
54 } // namespace
55
56 // Returns a random message with bounded length.
GetNextSendMessage()57 std::string GetNextSendMessage() {
58 static const char alphanum[] =
59 "0123456789"
60 "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
61 "abcdefghijklmnopqrstuvwxyz";
62 static std::random_device rd;
63 static std::seed_seq seed{rd()};
64 static std::mt19937 gen(seed);
65 static std::uniform_real_distribution<> dis(kMinMessageSize, kMaxMessageSize);
66 static grpc_core::Mutex g_mu;
67 std::string tmp_s;
68 int len;
69 {
70 grpc_core::MutexLock lock(&g_mu);
71 len = dis(gen);
72 }
73 tmp_s.reserve(len);
74 for (int i = 0; i < len; ++i) {
75 tmp_s += alphanum[rand() % (sizeof(alphanum) - 1)];
76 }
77 return tmp_s;
78 }
79
WaitForSingleOwner(std::shared_ptr<EventEngine> engine)80 void WaitForSingleOwner(std::shared_ptr<EventEngine> engine) {
81 WaitForSingleOwnerWithTimeout(std::move(engine), std::chrono::hours{24});
82 }
83
WaitForSingleOwnerWithTimeout(std::shared_ptr<EventEngine> engine,EventEngine::Duration timeout)84 void WaitForSingleOwnerWithTimeout(std::shared_ptr<EventEngine> engine,
85 EventEngine::Duration timeout) {
86 int n = 0;
87 auto start = std::chrono::system_clock::now();
88 while (engine.use_count() > 1) {
89 ++n;
90 if (n % 100 == 0) {
91 LOG(INFO) << "Checking for leaks...";
92 AsanAssertNoLeaks();
93 }
94 auto remaining = timeout - (std::chrono::system_clock::now() - start);
95 if (remaining < std::chrono::seconds{0}) {
96 grpc_core::Crash("Timed out waiting for a single EventEngine owner");
97 }
98 LOG_EVERY_N_SEC(INFO, 2)
99 << "engine.use_count() = " << engine.use_count()
100 << " timeout_remaining = "
101 << absl::FormatDuration(absl::Nanoseconds(remaining.count()));
102 absl::SleepFor(absl::Milliseconds(100));
103 }
104 }
105
AppendStringToSliceBuffer(SliceBuffer * buf,absl::string_view data)106 void AppendStringToSliceBuffer(SliceBuffer* buf, absl::string_view data) {
107 buf->Append(Slice::FromCopiedString(data));
108 }
109
ExtractSliceBufferIntoString(SliceBuffer * buf)110 std::string ExtractSliceBufferIntoString(SliceBuffer* buf) {
111 if (!buf->Length()) {
112 return std::string();
113 }
114 std::string tmp(buf->Length(), '\0');
115 char* bytes = const_cast<char*>(tmp.c_str());
116 grpc_slice_buffer_move_first_into_buffer(buf->c_slice_buffer(), buf->Length(),
117 bytes);
118 return tmp;
119 }
120
SendValidatePayload(absl::string_view data,EventEngine::Endpoint * send_endpoint,EventEngine::Endpoint * receive_endpoint)121 absl::Status SendValidatePayload(absl::string_view data,
122 EventEngine::Endpoint* send_endpoint,
123 EventEngine::Endpoint* receive_endpoint) {
124 CHECK_NE(receive_endpoint, nullptr);
125 CHECK_NE(send_endpoint, nullptr);
126 int num_bytes_written = data.size();
127 grpc_core::Notification read_signal;
128 grpc_core::Notification write_signal;
129 SliceBuffer read_slice_buf;
130 SliceBuffer read_store_buf;
131 SliceBuffer write_slice_buf;
132
133 read_slice_buf.Clear();
134 write_slice_buf.Clear();
135 read_store_buf.Clear();
136 // std::cout << "SendValidatePayload ... " << std::endl;
137 // fflush(stdout);
138
139 AppendStringToSliceBuffer(&write_slice_buf, data);
140 EventEngine::Endpoint::ReadArgs args = {num_bytes_written};
141 std::function<void(absl::Status)> read_cb;
142 read_cb = [receive_endpoint, &read_slice_buf, &read_store_buf, &read_cb,
143 &read_signal, &args](absl::Status status) {
144 CHECK_OK(status);
145 if (read_slice_buf.Length() == static_cast<size_t>(args.read_hint_bytes)) {
146 read_slice_buf.MoveFirstNBytesIntoSliceBuffer(read_slice_buf.Length(),
147 read_store_buf);
148 read_signal.Notify();
149 return;
150 }
151 args.read_hint_bytes -= read_slice_buf.Length();
152 read_slice_buf.MoveFirstNBytesIntoSliceBuffer(read_slice_buf.Length(),
153 read_store_buf);
154 if (receive_endpoint->Read(read_cb, &read_slice_buf, &args)) {
155 CHECK_NE(read_slice_buf.Length(), 0u);
156 read_cb(absl::OkStatus());
157 }
158 };
159 // Start asynchronous reading at the receive_endpoint.
160 if (receive_endpoint->Read(read_cb, &read_slice_buf, &args)) {
161 read_cb(absl::OkStatus());
162 }
163 // Start asynchronous writing at the send_endpoint.
164 if (send_endpoint->Write(
165 [&write_signal](absl::Status status) {
166 CHECK_OK(status);
167 write_signal.Notify();
168 },
169 &write_slice_buf, nullptr)) {
170 write_signal.Notify();
171 }
172 write_signal.WaitForNotification();
173 read_signal.WaitForNotification();
174 // Check if data written == data read
175 std::string data_read = ExtractSliceBufferIntoString(&read_store_buf);
176 if (data != data_read) {
177 LOG(INFO) << "Data written = " << data;
178 LOG(INFO) << "Data read = " << data_read;
179 return absl::CancelledError("Data read != Data written");
180 }
181 return absl::OkStatus();
182 }
183
BindAndStartListener(const std::vector<std::string> & addrs,bool listener_type_oracle)184 absl::Status ConnectionManager::BindAndStartListener(
185 const std::vector<std::string>& addrs, bool listener_type_oracle) {
186 grpc_core::MutexLock lock(&mu_);
187 if (addrs.empty()) {
188 return absl::InvalidArgumentError(
189 "Atleast one bind address must be specified");
190 }
191 for (auto& addr : addrs) {
192 if (listeners_.find(addr) != listeners_.end()) {
193 // There is already a listener at this address. Return error.
194 return absl::AlreadyExistsError(
195 absl::StrCat("Listener already existis for address: ", addr));
196 }
197 }
198 EventEngine::Listener::AcceptCallback accept_cb =
199 [this](std::unique_ptr<EventEngine::Endpoint> ep,
200 MemoryAllocator /*memory_allocator*/) {
201 last_in_progress_connection_.SetServerEndpoint(std::move(ep));
202 };
203
204 EventEngine* event_engine = listener_type_oracle ? oracle_event_engine_.get()
205 : test_event_engine_.get();
206
207 ChannelArgsEndpointConfig config;
208 auto status = event_engine->CreateListener(
209 std::move(accept_cb), [](absl::Status status) { CHECK_OK(status); },
210 config, std::make_unique<grpc_core::MemoryQuota>("foo"));
211 if (!status.ok()) {
212 return status.status();
213 }
214
215 std::shared_ptr<EventEngine::Listener> listener((*status).release());
216 for (auto& addr : addrs) {
217 auto bind_status = listener->Bind(*URIToResolvedAddress(addr));
218 if (!bind_status.ok()) {
219 LOG(ERROR) << "Binding listener failed: "
220 << bind_status.status().ToString();
221 return bind_status.status();
222 }
223 }
224 CHECK_OK(listener->Start());
225 // Insert same listener pointer for all bind addresses after the listener
226 // has started successfully.
227 for (auto& addr : addrs) {
228 listeners_.insert(std::make_pair(addr, listener));
229 }
230 return absl::OkStatus();
231 }
232
233 absl::StatusOr<std::tuple<std::unique_ptr<EventEngine::Endpoint>,
234 std::unique_ptr<EventEngine::Endpoint>>>
CreateConnection(std::string target_addr,EventEngine::Duration timeout,bool client_type_oracle)235 ConnectionManager::CreateConnection(std::string target_addr,
236 EventEngine::Duration timeout,
237 bool client_type_oracle) {
238 // Only allow one CreateConnection call to proceed at a time.
239 grpc_core::MutexLock lock(&mu_);
240 std::string conn_name =
241 absl::StrCat("connection-", std::to_string(num_processed_connections_++));
242 EventEngine* event_engine = client_type_oracle ? oracle_event_engine_.get()
243 : test_event_engine_.get();
244 ChannelArgsEndpointConfig config;
245 event_engine->Connect(
246 [this](absl::StatusOr<std::unique_ptr<EventEngine::Endpoint>> status) {
247 if (!status.ok()) {
248 LOG(ERROR) << "Connect failed: " << status.status().ToString();
249 last_in_progress_connection_.SetClientEndpoint(nullptr);
250 } else {
251 last_in_progress_connection_.SetClientEndpoint(std::move(*status));
252 }
253 },
254 *URIToResolvedAddress(target_addr), config,
255 memory_quota_->CreateMemoryAllocator(conn_name), timeout);
256
257 auto client_endpoint = last_in_progress_connection_.GetClientEndpoint();
258 if (client_endpoint != nullptr &&
259 listeners_.find(target_addr) != listeners_.end()) {
260 // There is a listener for the specified address. Wait until it
261 // creates a ServerEndpoint after accepting the connection.
262 auto server_endpoint = last_in_progress_connection_.GetServerEndpoint();
263 CHECK(server_endpoint != nullptr);
264 // Set last_in_progress_connection_ to nullptr
265 return std::make_tuple(std::move(client_endpoint),
266 std::move(server_endpoint));
267 }
268 return absl::CancelledError("Failed to create connection.");
269 }
270
271 } // namespace experimental
272 } // namespace grpc_event_engine
273