1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
16
17 #include <grpc/event_engine/slice.h>
18 #include <grpc/support/time.h>
19 #include <inttypes.h>
20 #include <stdlib.h>
21
22 #include <algorithm>
23 #include <atomic>
24 #include <chrono>
25 #include <limits>
26 #include <vector>
27
28 #include "absl/log/check.h"
29 #include "absl/memory/memory.h"
30 #include "absl/strings/str_cat.h"
31 #include "src/core/lib/debug/trace.h"
32 #include "src/core/lib/event_engine/tcp_socket_utils.h"
33 #include "src/core/lib/iomgr/port.h"
34 #include "src/core/telemetry/stats.h"
35 #include "src/core/util/dump_args.h"
36 #include "src/core/util/time.h"
37 #include "src/core/util/useful.h"
38 #include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h"
39 #include "test/core/test_util/port.h"
40
41 #if defined(GRPC_POSIX_SOCKET_TCP)
42 #include "src/core/lib/event_engine/posix_engine/native_posix_dns_resolver.h"
43 #else
44 #include "src/core/util/crash.h"
45 #endif
46 // IWYU pragma: no_include <sys/socket.h>
47
48 extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type);
49
50 using namespace std::chrono_literals;
51
52 namespace grpc_event_engine {
53 namespace experimental {
54
55 namespace {
56
57 constexpr EventEngine::Duration kOneYear = 8760h;
58
59 // Inside the fuzzing event engine we consider everything is bound to a single
60 // loopback device. It cannot reach any other devices, and shares all ports
61 // between ipv4 and ipv6.
62
PortToAddress(int port)63 EventEngine::ResolvedAddress PortToAddress(int port) {
64 return URIToResolvedAddress(absl::StrCat("ipv4:127.0.0.1:", port)).value();
65 }
66
67 } // namespace
68
69 grpc_core::NoDestruct<grpc_core::Mutex> FuzzingEventEngine::mu_;
70 grpc_core::NoDestruct<grpc_core::Mutex> FuzzingEventEngine::now_mu_;
71
72 namespace {
73 const intptr_t kTaskHandleSalt = 12345;
74 FuzzingEventEngine* g_fuzzing_event_engine = nullptr;
75 gpr_timespec (*g_orig_gpr_now_impl)(gpr_clock_type clock_type);
76 } // namespace
77
FuzzingEventEngine(Options options,const fuzzing_event_engine::Actions & actions)78 FuzzingEventEngine::FuzzingEventEngine(
79 Options options, const fuzzing_event_engine::Actions& actions)
80 : max_delay_{options.max_delay_write, options.max_delay_run_after} {
81 // Allow the fuzzer to assign ports.
82 // Once this list is exhausted, we fall back to a deterministic algorithm.
83 for (auto port : actions.assign_ports()) {
84 if (port == 0 || port > 65535) continue;
85 free_ports_.push(port);
86 fuzzer_mentioned_ports_.insert(port);
87 }
88
89 // Fill the write sizes queue for future connections.
90 for (const auto& connection : actions.connections()) {
91 std::queue<size_t> write_sizes;
92 for (auto size : connection.write_size()) {
93 write_sizes.push(size);
94 }
95 write_sizes_for_future_connections_.emplace(std::move(write_sizes));
96 }
97
98 // Whilst a fuzzing EventEngine is active we override grpc's now function.
99 g_orig_gpr_now_impl = gpr_now_impl;
100 gpr_now_impl = GlobalNowImpl;
101 CHECK_EQ(g_fuzzing_event_engine, nullptr);
102 g_fuzzing_event_engine = this;
103 grpc_core::TestOnlySetProcessEpoch(NowAsTimespec(GPR_CLOCK_MONOTONIC));
104
105 for (const auto& delay_ns : actions.run_delay()) {
106 Duration delay = std::chrono::nanoseconds(delay_ns);
107 task_delays_.push(delay);
108 }
109
110 previous_pick_port_functions_ = grpc_set_pick_port_functions(
__anond8da1ff50302() 111 grpc_pick_port_functions{+[]() -> int {
112 grpc_core::MutexLock lock(&*mu_);
113 return g_fuzzing_event_engine->AllocatePort();
114 },
115 +[](int) {}});
116 }
117
FuzzingDone()118 void FuzzingEventEngine::FuzzingDone() {
119 grpc_core::MutexLock lock(&*mu_);
120 while (!task_delays_.empty()) task_delays_.pop();
121 }
122
NowAsTimespec(gpr_clock_type clock_type)123 gpr_timespec FuzzingEventEngine::NowAsTimespec(gpr_clock_type clock_type) {
124 // TODO(ctiller): add a facility to track realtime and monotonic clocks
125 // separately to simulate divergence.
126 CHECK(clock_type != GPR_TIMESPAN);
127 const Duration d = now_.time_since_epoch();
128 auto secs = std::chrono::duration_cast<std::chrono::seconds>(d);
129 return {secs.count(), static_cast<int32_t>((d - secs).count()), clock_type};
130 }
131
Tick(Duration max_time)132 void FuzzingEventEngine::Tick(Duration max_time) {
133 if (IsSaneTimerEnvironment()) {
134 std::vector<absl::AnyInvocable<void()>> to_run;
135 Duration incr = max_time;
136 DCHECK_GT(incr.count(), Duration::zero().count());
137 {
138 grpc_core::MutexLock lock(&*mu_);
139 grpc_core::MutexLock now_lock(&*now_mu_);
140 if (!tasks_by_time_.empty()) {
141 incr = std::min(incr, tasks_by_time_.begin()->first - now_);
142 }
143 now_ += incr;
144 CHECK_GE(now_.time_since_epoch().count(), 0);
145 // Find newly expired timers.
146 while (!tasks_by_time_.empty() && tasks_by_time_.begin()->first <= now_) {
147 auto& task = *tasks_by_time_.begin()->second;
148 tasks_by_id_.erase(task.id);
149 if (task.closure != nullptr) {
150 to_run.push_back(std::move(task.closure));
151 }
152 tasks_by_time_.erase(tasks_by_time_.begin());
153 }
154 }
155 OnClockIncremented(incr);
156 if (to_run.empty()) return;
157 for (auto& closure : to_run) {
158 closure();
159 }
160 } else {
161 bool incremented_time = false;
162 while (true) {
163 std::vector<absl::AnyInvocable<void()>> to_run;
164 Duration incr = Duration::zero();
165 {
166 grpc_core::MutexLock lock(&*mu_);
167 grpc_core::MutexLock now_lock(&*now_mu_);
168 if (!incremented_time) {
169 incr = max_time;
170 // TODO(ctiller): look at tasks_by_time_ and jump forward (once iomgr
171 // timers are gone)
172 if (!tasks_by_time_.empty()) {
173 incr = std::min(incr, tasks_by_time_.begin()->first - now_);
174 }
175 if (incr < exponential_gate_time_increment_) {
176 exponential_gate_time_increment_ = std::chrono::milliseconds(1);
177 } else {
178 incr = std::min(incr, exponential_gate_time_increment_);
179 exponential_gate_time_increment_ +=
180 exponential_gate_time_increment_ / 1000;
181 }
182 incr = std::max(incr, std::chrono::duration_cast<Duration>(
183 std::chrono::milliseconds(1)));
184 now_ += incr;
185 CHECK_GE(now_.time_since_epoch().count(), 0);
186 ++current_tick_;
187 incremented_time = true;
188 }
189 // Find newly expired timers.
190 while (!tasks_by_time_.empty() &&
191 tasks_by_time_.begin()->first <= now_) {
192 auto& task = *tasks_by_time_.begin()->second;
193 tasks_by_id_.erase(task.id);
194 if (task.closure != nullptr) {
195 to_run.push_back(std::move(task.closure));
196 }
197 tasks_by_time_.erase(tasks_by_time_.begin());
198 }
199 }
200 OnClockIncremented(incr);
201 if (to_run.empty()) return;
202 for (auto& closure : to_run) {
203 closure();
204 }
205 }
206 }
207 }
208
TickUntilIdle()209 void FuzzingEventEngine::TickUntilIdle() {
210 while (true) {
211 {
212 grpc_core::MutexLock lock(&*mu_);
213 LOG_EVERY_N_SEC(INFO, 5)
214 << "TickUntilIdle: "
215 << GRPC_DUMP_ARGS(tasks_by_id_.size(), outstanding_reads_.load(),
216 outstanding_writes_.load());
217 if (IsIdleLocked()) return;
218 }
219 Tick();
220 }
221 }
222
IsIdle()223 bool FuzzingEventEngine::IsIdle() {
224 grpc_core::MutexLock lock(&*mu_);
225 return IsIdleLocked();
226 }
227
IsIdleLocked()228 bool FuzzingEventEngine::IsIdleLocked() {
229 return tasks_by_id_.empty() &&
230 outstanding_writes_.load(std::memory_order_relaxed) == 0 &&
231 outstanding_reads_.load(std::memory_order_relaxed) == 0;
232 }
233
TickUntil(Time t)234 void FuzzingEventEngine::TickUntil(Time t) {
235 while (true) {
236 auto now = Now();
237 if (now >= t) break;
238 Tick(t - now);
239 }
240 }
241
TickForDuration(Duration d)242 void FuzzingEventEngine::TickForDuration(Duration d) { TickUntil(Now() + d); }
243
SetRunAfterDurationCallback(absl::AnyInvocable<void (Duration)> callback)244 void FuzzingEventEngine::SetRunAfterDurationCallback(
245 absl::AnyInvocable<void(Duration)> callback) {
246 grpc_core::MutexLock lock(&run_after_duration_callback_mu_);
247 run_after_duration_callback_ = std::move(callback);
248 }
249
Now()250 FuzzingEventEngine::Time FuzzingEventEngine::Now() {
251 grpc_core::MutexLock lock(&*now_mu_);
252 return now_;
253 }
254
AllocatePort()255 int FuzzingEventEngine::AllocatePort() {
256 // If the fuzzer selected some port orderings, do that first.
257 if (!free_ports_.empty()) {
258 int p = free_ports_.front();
259 free_ports_.pop();
260 return p;
261 }
262 // Otherwise just scan through starting at one and skipping any ports
263 // that were in the fuzzers initial list.
264 while (true) {
265 int p = next_free_port_++;
266 if (fuzzer_mentioned_ports_.count(p) == 0) {
267 return p;
268 }
269 }
270 }
271
272 absl::StatusOr<std::unique_ptr<EventEngine::Listener>>
CreateListener(Listener::AcceptCallback on_accept,absl::AnyInvocable<void (absl::Status)> on_shutdown,const EndpointConfig &,std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)273 FuzzingEventEngine::CreateListener(
274 Listener::AcceptCallback on_accept,
275 absl::AnyInvocable<void(absl::Status)> on_shutdown, const EndpointConfig&,
276 std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) {
277 grpc_core::MutexLock lock(&*mu_);
278 // Create a listener and register it into the set of listener info in the
279 // event engine.
280 return absl::make_unique<FuzzingListener>(
281 *listeners_
282 .emplace(std::make_shared<ListenerInfo>(
283 std::move(on_accept), std::move(on_shutdown),
284 std::move(memory_allocator_factory)))
285 .first);
286 }
287
~FuzzingListener()288 FuzzingEventEngine::FuzzingListener::~FuzzingListener() {
289 grpc_core::MutexLock lock(&*mu_);
290 g_fuzzing_event_engine->listeners_.erase(info_);
291 }
292
IsPortUsed(int port)293 bool FuzzingEventEngine::IsPortUsed(int port) {
294 // Return true if a port is bound to a listener.
295 for (const auto& listener : listeners_) {
296 if (std::find(listener->ports.begin(), listener->ports.end(), port) !=
297 listener->ports.end()) {
298 return true;
299 }
300 }
301 return false;
302 }
303
Bind(const ResolvedAddress & addr)304 absl::StatusOr<int> FuzzingEventEngine::FuzzingListener::Bind(
305 const ResolvedAddress& addr) {
306 // Extract the port from the address (or fail if non-localhost).
307 auto port = ResolvedAddressGetPort(addr);
308 grpc_core::MutexLock lock(&*mu_);
309 // Check that the listener hasn't already been started.
310 if (info_->started) return absl::InternalError("Already started");
311 if (port != 0) {
312 // If the port is non-zero, check that it's not already in use.
313 if (g_fuzzing_event_engine->IsPortUsed(port)) {
314 return absl::InternalError("Port in use");
315 }
316 } else {
317 // If the port is zero, allocate a new one.
318 do {
319 port = g_fuzzing_event_engine->AllocatePort();
320 } while (g_fuzzing_event_engine->IsPortUsed(port));
321 }
322 // Add the port to the listener.
323 info_->ports.push_back(port);
324 return port;
325 }
326
Start()327 absl::Status FuzzingEventEngine::FuzzingListener::Start() {
328 // Start the listener or fail if it's already started.
329 grpc_core::MutexLock lock(&*mu_);
330 if (info_->started) return absl::InternalError("Already started");
331 info_->started = true;
332 return absl::OkStatus();
333 }
334
Write(SliceBuffer * data,int index)335 bool FuzzingEventEngine::EndpointMiddle::Write(SliceBuffer* data, int index) {
336 CHECK(!closed[index]);
337 const int peer_index = 1 - index;
338 GRPC_TRACE_LOG(fuzzing_ee_writes, INFO)
339 << "WRITE[" << this << ":" << index << "]: entry "
340 << GRPC_DUMP_ARGS(data->Length());
341 if (data->Length() == 0) return true;
342 size_t write_len = std::numeric_limits<size_t>::max();
343 // Check the write_sizes queue for fuzzer imposed restrictions on this write
344 // size. This allows the fuzzer to force small writes to be seen by the
345 // reader.
346 if (!write_sizes[index].empty()) {
347 write_len = write_sizes[index].front();
348 write_sizes[index].pop();
349 }
350 if (write_len > data->Length()) {
351 write_len = data->Length();
352 }
353 // If the write_len is zero, we still need to write something, so we write one
354 // byte.
355 if (write_len == 0) write_len = 1;
356 GRPC_TRACE_LOG(fuzzing_ee_writes, INFO)
357 << "WRITE[" << this << ":" << index << "]: " << write_len << " bytes; "
358 << GRPC_DUMP_ARGS(pending_read[peer_index].has_value());
359 // Expand the pending buffer.
360 size_t prev_len = pending[index].size();
361 pending[index].resize(prev_len + write_len);
362 // Move bytes from the to-write data into the pending buffer.
363 data->MoveFirstNBytesIntoBuffer(write_len, pending[index].data() + prev_len);
364 GRPC_TRACE_LOG(fuzzing_ee_writes, INFO)
365 << "WRITE[" << this << ":" << index << "]: post-move "
366 << GRPC_DUMP_ARGS(data->Length());
367 // If there was a pending read, then we can fulfill it.
368 if (pending_read[peer_index].has_value()) {
369 pending_read[peer_index]->buffer->Append(
370 Slice::FromCopiedBuffer(pending[index]));
371 pending[index].clear();
372 g_fuzzing_event_engine->RunLocked(
373 RunType::kWrite,
374 [cb = std::move(pending_read[peer_index]->on_read), this, peer_index,
375 buffer = pending_read[peer_index]->buffer]() mutable {
376 GRPC_TRACE_LOG(fuzzing_ee_writes, INFO)
377 << "FINISH_READ[" << this << ":" << peer_index
378 << "]: " << GRPC_DUMP_ARGS(buffer->Length());
379 cb(absl::OkStatus());
380 });
381 pending_read[peer_index].reset();
382 }
383 return data->Length() == 0;
384 }
385
Write(absl::AnyInvocable<void (absl::Status)> on_writable,SliceBuffer * data,const WriteArgs *)386 bool FuzzingEventEngine::FuzzingEndpoint::Write(
387 absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data,
388 const WriteArgs*) {
389 GRPC_TRACE_LOG(fuzzing_ee_writes, INFO)
390 << "START_WRITE[" << middle_.get() << ":" << my_index()
391 << "]: " << data->Length() << " bytes";
392 IoToken write_token(&g_fuzzing_event_engine->outstanding_writes_);
393 grpc_core::global_stats().IncrementSyscallWrite();
394 grpc_core::MutexLock lock(&*mu_);
395 CHECK(!middle_->closed[my_index()]);
396 CHECK(!middle_->writing[my_index()]);
397 // If the write succeeds immediately, then we return true.
398 if (middle_->Write(data, my_index())) return true;
399 middle_->writing[my_index()] = true;
400 ScheduleDelayedWrite(middle_, my_index(), std::move(on_writable), data,
401 std::move(write_token));
402 return false;
403 }
404
ScheduleDelayedWrite(std::shared_ptr<EndpointMiddle> middle,int index,absl::AnyInvocable<void (absl::Status)> on_writable,SliceBuffer * data,IoToken write_token)405 void FuzzingEventEngine::FuzzingEndpoint::ScheduleDelayedWrite(
406 std::shared_ptr<EndpointMiddle> middle, int index,
407 absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data,
408 IoToken write_token) {
409 g_fuzzing_event_engine->RunLocked(
410 RunType::kWrite,
411 [write_token = std::move(write_token), middle = std::move(middle), index,
412 data, on_writable = std::move(on_writable)]() mutable {
413 grpc_core::ReleasableMutexLock lock(&*mu_);
414 CHECK(middle->writing[index]);
415 if (middle->closed[index]) {
416 GRPC_TRACE_LOG(fuzzing_ee_writes, INFO)
417 << "CLOSED[" << middle.get() << ":" << index << "]";
418 g_fuzzing_event_engine->RunLocked(
419 RunType::kRunAfter,
420 [on_writable = std::move(on_writable)]() mutable {
421 on_writable(absl::InternalError("Endpoint closed"));
422 });
423 if (middle->pending_read[1 - index].has_value()) {
424 g_fuzzing_event_engine->RunLocked(
425 RunType::kRunAfter,
426 [cb = std::move(
427 middle->pending_read[1 - index]->on_read)]() mutable {
428 cb(absl::InternalError("Endpoint closed"));
429 });
430 middle->pending_read[1 - index].reset();
431 }
432 return;
433 }
434 if (middle->Write(data, index)) {
435 middle->writing[index] = false;
436 lock.Release();
437 on_writable(absl::OkStatus());
438 return;
439 }
440 ScheduleDelayedWrite(std::move(middle), index, std::move(on_writable),
441 data, std::move(write_token));
442 });
443 }
444
~FuzzingEndpoint()445 FuzzingEventEngine::FuzzingEndpoint::~FuzzingEndpoint() {
446 grpc_core::MutexLock lock(&*mu_);
447 GRPC_TRACE_LOG(fuzzing_ee_writes, INFO)
448 << "CLOSE[" << middle_.get() << ":" << my_index() << "]: "
449 << GRPC_DUMP_ARGS(
450 middle_->closed[my_index()], middle_->closed[peer_index()],
451 middle_->pending_read[my_index()].has_value(),
452 middle_->pending_read[peer_index()].has_value(),
453 middle_->writing[my_index()], middle_->writing[peer_index()]);
454 middle_->closed[my_index()] = true;
455 if (middle_->pending_read[my_index()].has_value()) {
456 GRPC_TRACE_LOG(fuzzing_ee_writes, INFO)
457 << "CLOSED_READING[" << middle_.get() << ":" << my_index() << "]";
458 g_fuzzing_event_engine->RunLocked(
459 RunType::kRunAfter,
460 [cb = std::move(middle_->pending_read[my_index()]->on_read)]() mutable {
461 cb(absl::InternalError("Endpoint closed"));
462 });
463 middle_->pending_read[my_index()].reset();
464 }
465 if (!middle_->writing[my_index()] &&
466 middle_->pending_read[peer_index()].has_value()) {
467 g_fuzzing_event_engine->RunLocked(
468 RunType::kRunAfter,
469 [cb = std::move(
470 middle_->pending_read[peer_index()]->on_read)]() mutable {
471 cb(absl::InternalError("Endpoint closed"));
472 });
473 middle_->pending_read[peer_index()].reset();
474 }
475 }
476
Read(absl::AnyInvocable<void (absl::Status)> on_read,SliceBuffer * buffer,const ReadArgs *)477 bool FuzzingEventEngine::FuzzingEndpoint::Read(
478 absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer,
479 const ReadArgs*) {
480 GRPC_TRACE_LOG(fuzzing_ee_writes, INFO)
481 << "START_READ[" << middle_.get() << ":" << my_index() << "]";
482 buffer->Clear();
483 IoToken read_token(&g_fuzzing_event_engine->outstanding_reads_);
484 grpc_core::MutexLock lock(&*mu_);
485 CHECK(!middle_->closed[my_index()]);
486 if (middle_->pending[peer_index()].empty()) {
487 // If the endpoint is closed, fail asynchronously.
488 if (middle_->closed[peer_index()]) {
489 g_fuzzing_event_engine->RunLocked(
490 RunType::kRunAfter,
491 [read_token, on_read = std::move(on_read)]() mutable {
492 on_read(absl::InternalError("Endpoint closed"));
493 });
494 return false;
495 }
496 // If the endpoint has no pending data, then we need to wait for a write.
497 middle_->pending_read[my_index()] =
498 PendingRead{std::move(read_token), std::move(on_read), buffer};
499 return false;
500 } else {
501 // If the endpoint has pending data, then we can fulfill the read
502 // immediately.
503 buffer->Append(Slice::FromCopiedBuffer(middle_->pending[peer_index()]));
504 middle_->pending[peer_index()].clear();
505 return true;
506 }
507 }
508
WriteSizesForConnection()509 std::queue<size_t> FuzzingEventEngine::WriteSizesForConnection() {
510 if (write_sizes_for_future_connections_.empty()) return std::queue<size_t>();
511 auto ret = std::move(write_sizes_for_future_connections_.front());
512 write_sizes_for_future_connections_.pop();
513 return ret;
514 }
515
EndpointMiddle(int listener_port,int client_port)516 FuzzingEventEngine::EndpointMiddle::EndpointMiddle(int listener_port,
517 int client_port)
518 : addrs{PortToAddress(listener_port), PortToAddress(client_port)},
519 write_sizes{g_fuzzing_event_engine->WriteSizesForConnection(),
520 g_fuzzing_event_engine->WriteSizesForConnection()} {}
521
Connect(OnConnectCallback on_connect,const ResolvedAddress & addr,const EndpointConfig &,MemoryAllocator,Duration)522 EventEngine::ConnectionHandle FuzzingEventEngine::Connect(
523 OnConnectCallback on_connect, const ResolvedAddress& addr,
524 const EndpointConfig&, MemoryAllocator, Duration) {
525 // TODO(ctiller): do something with the timeout
526 // Schedule a timer to run (with some fuzzer selected delay) the on_connect
527 // callback.
528 grpc_core::MutexLock lock(&*mu_);
529 auto task_handle = RunAfterLocked(
530 RunType::kRunAfter, Duration(0),
531 [this, addr, on_connect = std::move(on_connect)]() mutable {
532 // Check for a legal address and extract the target port number.
533 auto port = ResolvedAddressGetPort(addr);
534 grpc_core::MutexLock lock(&*mu_);
535 // Find the listener that is listening on the target port.
536 for (auto it = listeners_.begin(); it != listeners_.end(); ++it) {
537 const auto& listener = *it;
538 // Listener must be started.
539 if (!listener->started) continue;
540 for (int listener_port : listener->ports) {
541 if (port == listener_port) {
542 // Port matches on a started listener: create an endpoint, call
543 // on_accept for the listener and on_connect for the client.
544 auto middle = std::make_shared<EndpointMiddle>(
545 listener_port, g_fuzzing_event_engine->AllocatePort());
546 auto ep1 = std::make_unique<FuzzingEndpoint>(middle, 0);
547 auto ep2 = std::make_unique<FuzzingEndpoint>(middle, 1);
548 RunLocked(RunType::kRunAfter, [listener,
549 ep1 = std::move(ep1)]() mutable {
550 listener->on_accept(
551 std::move(ep1),
552 listener->memory_allocator_factory->CreateMemoryAllocator(
553 "fuzzing"));
554 });
555 RunLocked(RunType::kRunAfter, [on_connect = std::move(on_connect),
556 ep2 = std::move(ep2)]() mutable {
557 on_connect(std::move(ep2));
558 });
559 return;
560 }
561 }
562 }
563 // Fail: no such listener.
564 RunLocked(RunType::kRunAfter,
565 [on_connect = std::move(on_connect)]() mutable {
566 on_connect(absl::InvalidArgumentError("No listener found"));
567 });
568 });
569 return ConnectionHandle{{task_handle.keys[0], task_handle.keys[1]}};
570 }
571
CancelConnect(ConnectionHandle connection_handle)572 bool FuzzingEventEngine::CancelConnect(ConnectionHandle connection_handle) {
573 return Cancel(
574 TaskHandle{{connection_handle.keys[0], connection_handle.keys[1]}});
575 }
576
IsWorkerThread()577 bool FuzzingEventEngine::IsWorkerThread() { abort(); }
578
579 absl::StatusOr<std::unique_ptr<EventEngine::DNSResolver>>
GetDNSResolver(const DNSResolver::ResolverOptions &)580 FuzzingEventEngine::GetDNSResolver(const DNSResolver::ResolverOptions&) {
581 #if defined(GRPC_POSIX_SOCKET_TCP)
582 return std::make_unique<NativePosixDNSResolver>(shared_from_this());
583 #else
584 grpc_core::Crash("FuzzingEventEngine::GetDNSResolver Not implemented");
585 #endif
586 }
587
Run(Closure * closure)588 void FuzzingEventEngine::Run(Closure* closure) {
589 grpc_core::MutexLock lock(&*mu_);
590 RunAfterLocked(RunType::kRunAfter, Duration::zero(),
591 [closure]() { closure->Run(); });
592 }
593
Run(absl::AnyInvocable<void ()> closure)594 void FuzzingEventEngine::Run(absl::AnyInvocable<void()> closure) {
595 grpc_core::MutexLock lock(&*mu_);
596 RunAfterLocked(RunType::kRunAfter, Duration::zero(), std::move(closure));
597 }
598
RunAfter(Duration when,Closure * closure)599 EventEngine::TaskHandle FuzzingEventEngine::RunAfter(Duration when,
600 Closure* closure) {
601 return RunAfter(when, [closure]() { closure->Run(); });
602 }
603
RunAfter(Duration when,absl::AnyInvocable<void ()> closure)604 EventEngine::TaskHandle FuzzingEventEngine::RunAfter(
605 Duration when, absl::AnyInvocable<void()> closure) {
606 {
607 grpc_core::MutexLock lock(&run_after_duration_callback_mu_);
608 if (run_after_duration_callback_ != nullptr) {
609 run_after_duration_callback_(when);
610 }
611 }
612 grpc_core::MutexLock lock(&*mu_);
613 // (b/258949216): Cap it to one year to avoid integer overflow errors.
614 return RunAfterLocked(RunType::kRunAfter, std::min(when, kOneYear),
615 std::move(closure));
616 }
617
RunAfterExactly(Duration when,absl::AnyInvocable<void ()> closure)618 EventEngine::TaskHandle FuzzingEventEngine::RunAfterExactly(
619 Duration when, absl::AnyInvocable<void()> closure) {
620 grpc_core::MutexLock lock(&*mu_);
621 // (b/258949216): Cap it to one year to avoid integer overflow errors.
622 return RunAfterLocked(RunType::kExact, std::min(when, kOneYear),
623 std::move(closure));
624 }
625
RunAfterLocked(RunType run_type,Duration when,absl::AnyInvocable<void ()> closure)626 EventEngine::TaskHandle FuzzingEventEngine::RunAfterLocked(
627 RunType run_type, Duration when, absl::AnyInvocable<void()> closure) {
628 const intptr_t id = next_task_id_;
629 ++next_task_id_;
630 Duration delay_taken = Duration::zero();
631 if (run_type != RunType::kExact) {
632 if (!task_delays_.empty()) {
633 delay_taken = grpc_core::Clamp(task_delays_.front(), Duration::zero(),
634 max_delay_[static_cast<int>(run_type)]);
635 task_delays_.pop();
636 } else if (run_type != RunType::kWrite && when == Duration::zero()) {
637 // For zero-duration events, if there is no more delay input from
638 // the test case, we default to a small non-zero value to avoid
639 // busy loops that prevent us from making forward progress.
640 delay_taken = std::chrono::microseconds(1);
641 }
642 when += delay_taken;
643 }
644 auto task = std::make_shared<Task>(id, std::move(closure));
645 tasks_by_id_.emplace(id, task);
646 Time final_time;
647 Time now;
648 {
649 grpc_core::MutexLock lock(&*now_mu_);
650 final_time = now_ + when;
651 now = now_;
652 tasks_by_time_.emplace(final_time, std::move(task));
653 }
654 GRPC_TRACE_LOG(fuzzing_ee_timers, INFO)
655 << "Schedule timer " << id << " @ "
656 << static_cast<uint64_t>(final_time.time_since_epoch().count())
657 << " (now=" << now.time_since_epoch().count()
658 << "; delay=" << when.count() << "; fuzzing_added=" << delay_taken.count()
659 << "; type=" << static_cast<int>(run_type) << ")";
660 return TaskHandle{id, kTaskHandleSalt};
661 }
662
Cancel(TaskHandle handle)663 bool FuzzingEventEngine::Cancel(TaskHandle handle) {
664 grpc_core::MutexLock lock(&*mu_);
665 CHECK(handle.keys[1] == kTaskHandleSalt);
666 const intptr_t id = handle.keys[0];
667 auto it = tasks_by_id_.find(id);
668 if (it == tasks_by_id_.end()) {
669 return false;
670 }
671 if (it->second->closure == nullptr) {
672 return false;
673 }
674 GRPC_TRACE_LOG(fuzzing_ee_timers, INFO) << "Cancel timer " << id;
675 it->second->closure = nullptr;
676 return true;
677 }
678
GlobalNowImpl(gpr_clock_type clock_type)679 gpr_timespec FuzzingEventEngine::GlobalNowImpl(gpr_clock_type clock_type) {
680 if (g_fuzzing_event_engine == nullptr) {
681 return gpr_inf_future(clock_type);
682 }
683 CHECK_NE(g_fuzzing_event_engine, nullptr);
684 grpc_core::MutexLock lock(&*now_mu_);
685 return g_fuzzing_event_engine->NowAsTimespec(clock_type);
686 }
687
UnsetGlobalHooks()688 void FuzzingEventEngine::UnsetGlobalHooks() {
689 if (g_fuzzing_event_engine != this) return;
690 g_fuzzing_event_engine = nullptr;
691 gpr_now_impl = g_orig_gpr_now_impl;
692 g_orig_gpr_now_impl = nullptr;
693 grpc_set_pick_port_functions(previous_pick_port_functions_);
694 }
695
~ListenerInfo()696 FuzzingEventEngine::ListenerInfo::~ListenerInfo() {
697 CHECK_NE(g_fuzzing_event_engine, nullptr);
698 g_fuzzing_event_engine->Run(
699 [on_shutdown = std::move(on_shutdown),
700 shutdown_status = std::move(shutdown_status)]() mutable {
701 on_shutdown(std::move(shutdown_status));
702 });
703 }
704
705 } // namespace experimental
706 } // namespace grpc_event_engine
707