• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.h"
16 
17 #include <grpc/event_engine/slice.h>
18 #include <grpc/support/time.h>
19 #include <inttypes.h>
20 #include <stdlib.h>
21 
22 #include <algorithm>
23 #include <atomic>
24 #include <chrono>
25 #include <limits>
26 #include <vector>
27 
28 #include "absl/log/check.h"
29 #include "absl/memory/memory.h"
30 #include "absl/strings/str_cat.h"
31 #include "src/core/lib/debug/trace.h"
32 #include "src/core/lib/event_engine/tcp_socket_utils.h"
33 #include "src/core/lib/iomgr/port.h"
34 #include "src/core/telemetry/stats.h"
35 #include "src/core/util/dump_args.h"
36 #include "src/core/util/time.h"
37 #include "src/core/util/useful.h"
38 #include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h"
39 #include "test/core/test_util/port.h"
40 
41 #if defined(GRPC_POSIX_SOCKET_TCP)
42 #include "src/core/lib/event_engine/posix_engine/native_posix_dns_resolver.h"
43 #else
44 #include "src/core/util/crash.h"
45 #endif
46 // IWYU pragma: no_include <sys/socket.h>
47 
48 extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type);
49 
50 using namespace std::chrono_literals;
51 
52 namespace grpc_event_engine {
53 namespace experimental {
54 
55 namespace {
56 
57 constexpr EventEngine::Duration kOneYear = 8760h;
58 
59 // Inside the fuzzing event engine we consider everything is bound to a single
60 // loopback device. It cannot reach any other devices, and shares all ports
61 // between ipv4 and ipv6.
62 
PortToAddress(int port)63 EventEngine::ResolvedAddress PortToAddress(int port) {
64   return URIToResolvedAddress(absl::StrCat("ipv4:127.0.0.1:", port)).value();
65 }
66 
67 }  // namespace
68 
69 grpc_core::NoDestruct<grpc_core::Mutex> FuzzingEventEngine::mu_;
70 grpc_core::NoDestruct<grpc_core::Mutex> FuzzingEventEngine::now_mu_;
71 
72 namespace {
73 const intptr_t kTaskHandleSalt = 12345;
74 FuzzingEventEngine* g_fuzzing_event_engine = nullptr;
75 gpr_timespec (*g_orig_gpr_now_impl)(gpr_clock_type clock_type);
76 }  // namespace
77 
FuzzingEventEngine(Options options,const fuzzing_event_engine::Actions & actions)78 FuzzingEventEngine::FuzzingEventEngine(
79     Options options, const fuzzing_event_engine::Actions& actions)
80     : max_delay_{options.max_delay_write, options.max_delay_run_after} {
81   // Allow the fuzzer to assign ports.
82   // Once this list is exhausted, we fall back to a deterministic algorithm.
83   for (auto port : actions.assign_ports()) {
84     if (port == 0 || port > 65535) continue;
85     free_ports_.push(port);
86     fuzzer_mentioned_ports_.insert(port);
87   }
88 
89   // Fill the write sizes queue for future connections.
90   for (const auto& connection : actions.connections()) {
91     std::queue<size_t> write_sizes;
92     for (auto size : connection.write_size()) {
93       write_sizes.push(size);
94     }
95     write_sizes_for_future_connections_.emplace(std::move(write_sizes));
96   }
97 
98   // Whilst a fuzzing EventEngine is active we override grpc's now function.
99   g_orig_gpr_now_impl = gpr_now_impl;
100   gpr_now_impl = GlobalNowImpl;
101   CHECK_EQ(g_fuzzing_event_engine, nullptr);
102   g_fuzzing_event_engine = this;
103   grpc_core::TestOnlySetProcessEpoch(NowAsTimespec(GPR_CLOCK_MONOTONIC));
104 
105   for (const auto& delay_ns : actions.run_delay()) {
106     Duration delay = std::chrono::nanoseconds(delay_ns);
107     task_delays_.push(delay);
108   }
109 
110   previous_pick_port_functions_ = grpc_set_pick_port_functions(
__anond8da1ff50302() 111       grpc_pick_port_functions{+[]() -> int {
112                                  grpc_core::MutexLock lock(&*mu_);
113                                  return g_fuzzing_event_engine->AllocatePort();
114                                },
115                                +[](int) {}});
116 }
117 
FuzzingDone()118 void FuzzingEventEngine::FuzzingDone() {
119   grpc_core::MutexLock lock(&*mu_);
120   while (!task_delays_.empty()) task_delays_.pop();
121 }
122 
NowAsTimespec(gpr_clock_type clock_type)123 gpr_timespec FuzzingEventEngine::NowAsTimespec(gpr_clock_type clock_type) {
124   // TODO(ctiller): add a facility to track realtime and monotonic clocks
125   // separately to simulate divergence.
126   CHECK(clock_type != GPR_TIMESPAN);
127   const Duration d = now_.time_since_epoch();
128   auto secs = std::chrono::duration_cast<std::chrono::seconds>(d);
129   return {secs.count(), static_cast<int32_t>((d - secs).count()), clock_type};
130 }
131 
Tick(Duration max_time)132 void FuzzingEventEngine::Tick(Duration max_time) {
133   if (IsSaneTimerEnvironment()) {
134     std::vector<absl::AnyInvocable<void()>> to_run;
135     Duration incr = max_time;
136     DCHECK_GT(incr.count(), Duration::zero().count());
137     {
138       grpc_core::MutexLock lock(&*mu_);
139       grpc_core::MutexLock now_lock(&*now_mu_);
140       if (!tasks_by_time_.empty()) {
141         incr = std::min(incr, tasks_by_time_.begin()->first - now_);
142       }
143       now_ += incr;
144       CHECK_GE(now_.time_since_epoch().count(), 0);
145       // Find newly expired timers.
146       while (!tasks_by_time_.empty() && tasks_by_time_.begin()->first <= now_) {
147         auto& task = *tasks_by_time_.begin()->second;
148         tasks_by_id_.erase(task.id);
149         if (task.closure != nullptr) {
150           to_run.push_back(std::move(task.closure));
151         }
152         tasks_by_time_.erase(tasks_by_time_.begin());
153       }
154     }
155     OnClockIncremented(incr);
156     if (to_run.empty()) return;
157     for (auto& closure : to_run) {
158       closure();
159     }
160   } else {
161     bool incremented_time = false;
162     while (true) {
163       std::vector<absl::AnyInvocable<void()>> to_run;
164       Duration incr = Duration::zero();
165       {
166         grpc_core::MutexLock lock(&*mu_);
167         grpc_core::MutexLock now_lock(&*now_mu_);
168         if (!incremented_time) {
169           incr = max_time;
170           // TODO(ctiller): look at tasks_by_time_ and jump forward (once iomgr
171           // timers are gone)
172           if (!tasks_by_time_.empty()) {
173             incr = std::min(incr, tasks_by_time_.begin()->first - now_);
174           }
175           if (incr < exponential_gate_time_increment_) {
176             exponential_gate_time_increment_ = std::chrono::milliseconds(1);
177           } else {
178             incr = std::min(incr, exponential_gate_time_increment_);
179             exponential_gate_time_increment_ +=
180                 exponential_gate_time_increment_ / 1000;
181           }
182           incr = std::max(incr, std::chrono::duration_cast<Duration>(
183                                     std::chrono::milliseconds(1)));
184           now_ += incr;
185           CHECK_GE(now_.time_since_epoch().count(), 0);
186           ++current_tick_;
187           incremented_time = true;
188         }
189         // Find newly expired timers.
190         while (!tasks_by_time_.empty() &&
191                tasks_by_time_.begin()->first <= now_) {
192           auto& task = *tasks_by_time_.begin()->second;
193           tasks_by_id_.erase(task.id);
194           if (task.closure != nullptr) {
195             to_run.push_back(std::move(task.closure));
196           }
197           tasks_by_time_.erase(tasks_by_time_.begin());
198         }
199       }
200       OnClockIncremented(incr);
201       if (to_run.empty()) return;
202       for (auto& closure : to_run) {
203         closure();
204       }
205     }
206   }
207 }
208 
TickUntilIdle()209 void FuzzingEventEngine::TickUntilIdle() {
210   while (true) {
211     {
212       grpc_core::MutexLock lock(&*mu_);
213       LOG_EVERY_N_SEC(INFO, 5)
214           << "TickUntilIdle: "
215           << GRPC_DUMP_ARGS(tasks_by_id_.size(), outstanding_reads_.load(),
216                             outstanding_writes_.load());
217       if (IsIdleLocked()) return;
218     }
219     Tick();
220   }
221 }
222 
IsIdle()223 bool FuzzingEventEngine::IsIdle() {
224   grpc_core::MutexLock lock(&*mu_);
225   return IsIdleLocked();
226 }
227 
IsIdleLocked()228 bool FuzzingEventEngine::IsIdleLocked() {
229   return tasks_by_id_.empty() &&
230          outstanding_writes_.load(std::memory_order_relaxed) == 0 &&
231          outstanding_reads_.load(std::memory_order_relaxed) == 0;
232 }
233 
TickUntil(Time t)234 void FuzzingEventEngine::TickUntil(Time t) {
235   while (true) {
236     auto now = Now();
237     if (now >= t) break;
238     Tick(t - now);
239   }
240 }
241 
TickForDuration(Duration d)242 void FuzzingEventEngine::TickForDuration(Duration d) { TickUntil(Now() + d); }
243 
SetRunAfterDurationCallback(absl::AnyInvocable<void (Duration)> callback)244 void FuzzingEventEngine::SetRunAfterDurationCallback(
245     absl::AnyInvocable<void(Duration)> callback) {
246   grpc_core::MutexLock lock(&run_after_duration_callback_mu_);
247   run_after_duration_callback_ = std::move(callback);
248 }
249 
Now()250 FuzzingEventEngine::Time FuzzingEventEngine::Now() {
251   grpc_core::MutexLock lock(&*now_mu_);
252   return now_;
253 }
254 
AllocatePort()255 int FuzzingEventEngine::AllocatePort() {
256   // If the fuzzer selected some port orderings, do that first.
257   if (!free_ports_.empty()) {
258     int p = free_ports_.front();
259     free_ports_.pop();
260     return p;
261   }
262   // Otherwise just scan through starting at one and skipping any ports
263   // that were in the fuzzers initial list.
264   while (true) {
265     int p = next_free_port_++;
266     if (fuzzer_mentioned_ports_.count(p) == 0) {
267       return p;
268     }
269   }
270 }
271 
272 absl::StatusOr<std::unique_ptr<EventEngine::Listener>>
CreateListener(Listener::AcceptCallback on_accept,absl::AnyInvocable<void (absl::Status)> on_shutdown,const EndpointConfig &,std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)273 FuzzingEventEngine::CreateListener(
274     Listener::AcceptCallback on_accept,
275     absl::AnyInvocable<void(absl::Status)> on_shutdown, const EndpointConfig&,
276     std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) {
277   grpc_core::MutexLock lock(&*mu_);
278   // Create a listener and register it into the set of listener info in the
279   // event engine.
280   return absl::make_unique<FuzzingListener>(
281       *listeners_
282            .emplace(std::make_shared<ListenerInfo>(
283                std::move(on_accept), std::move(on_shutdown),
284                std::move(memory_allocator_factory)))
285            .first);
286 }
287 
~FuzzingListener()288 FuzzingEventEngine::FuzzingListener::~FuzzingListener() {
289   grpc_core::MutexLock lock(&*mu_);
290   g_fuzzing_event_engine->listeners_.erase(info_);
291 }
292 
IsPortUsed(int port)293 bool FuzzingEventEngine::IsPortUsed(int port) {
294   // Return true if a port is bound to a listener.
295   for (const auto& listener : listeners_) {
296     if (std::find(listener->ports.begin(), listener->ports.end(), port) !=
297         listener->ports.end()) {
298       return true;
299     }
300   }
301   return false;
302 }
303 
Bind(const ResolvedAddress & addr)304 absl::StatusOr<int> FuzzingEventEngine::FuzzingListener::Bind(
305     const ResolvedAddress& addr) {
306   // Extract the port from the address (or fail if non-localhost).
307   auto port = ResolvedAddressGetPort(addr);
308   grpc_core::MutexLock lock(&*mu_);
309   // Check that the listener hasn't already been started.
310   if (info_->started) return absl::InternalError("Already started");
311   if (port != 0) {
312     // If the port is non-zero, check that it's not already in use.
313     if (g_fuzzing_event_engine->IsPortUsed(port)) {
314       return absl::InternalError("Port in use");
315     }
316   } else {
317     // If the port is zero, allocate a new one.
318     do {
319       port = g_fuzzing_event_engine->AllocatePort();
320     } while (g_fuzzing_event_engine->IsPortUsed(port));
321   }
322   // Add the port to the listener.
323   info_->ports.push_back(port);
324   return port;
325 }
326 
Start()327 absl::Status FuzzingEventEngine::FuzzingListener::Start() {
328   // Start the listener or fail if it's already started.
329   grpc_core::MutexLock lock(&*mu_);
330   if (info_->started) return absl::InternalError("Already started");
331   info_->started = true;
332   return absl::OkStatus();
333 }
334 
Write(SliceBuffer * data,int index)335 bool FuzzingEventEngine::EndpointMiddle::Write(SliceBuffer* data, int index) {
336   CHECK(!closed[index]);
337   const int peer_index = 1 - index;
338   GRPC_TRACE_LOG(fuzzing_ee_writes, INFO)
339       << "WRITE[" << this << ":" << index << "]: entry "
340       << GRPC_DUMP_ARGS(data->Length());
341   if (data->Length() == 0) return true;
342   size_t write_len = std::numeric_limits<size_t>::max();
343   // Check the write_sizes queue for fuzzer imposed restrictions on this write
344   // size. This allows the fuzzer to force small writes to be seen by the
345   // reader.
346   if (!write_sizes[index].empty()) {
347     write_len = write_sizes[index].front();
348     write_sizes[index].pop();
349   }
350   if (write_len > data->Length()) {
351     write_len = data->Length();
352   }
353   // If the write_len is zero, we still need to write something, so we write one
354   // byte.
355   if (write_len == 0) write_len = 1;
356   GRPC_TRACE_LOG(fuzzing_ee_writes, INFO)
357       << "WRITE[" << this << ":" << index << "]: " << write_len << " bytes; "
358       << GRPC_DUMP_ARGS(pending_read[peer_index].has_value());
359   // Expand the pending buffer.
360   size_t prev_len = pending[index].size();
361   pending[index].resize(prev_len + write_len);
362   // Move bytes from the to-write data into the pending buffer.
363   data->MoveFirstNBytesIntoBuffer(write_len, pending[index].data() + prev_len);
364   GRPC_TRACE_LOG(fuzzing_ee_writes, INFO)
365       << "WRITE[" << this << ":" << index << "]: post-move "
366       << GRPC_DUMP_ARGS(data->Length());
367   // If there was a pending read, then we can fulfill it.
368   if (pending_read[peer_index].has_value()) {
369     pending_read[peer_index]->buffer->Append(
370         Slice::FromCopiedBuffer(pending[index]));
371     pending[index].clear();
372     g_fuzzing_event_engine->RunLocked(
373         RunType::kWrite,
374         [cb = std::move(pending_read[peer_index]->on_read), this, peer_index,
375          buffer = pending_read[peer_index]->buffer]() mutable {
376           GRPC_TRACE_LOG(fuzzing_ee_writes, INFO)
377               << "FINISH_READ[" << this << ":" << peer_index
378               << "]: " << GRPC_DUMP_ARGS(buffer->Length());
379           cb(absl::OkStatus());
380         });
381     pending_read[peer_index].reset();
382   }
383   return data->Length() == 0;
384 }
385 
Write(absl::AnyInvocable<void (absl::Status)> on_writable,SliceBuffer * data,const WriteArgs *)386 bool FuzzingEventEngine::FuzzingEndpoint::Write(
387     absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data,
388     const WriteArgs*) {
389   GRPC_TRACE_LOG(fuzzing_ee_writes, INFO)
390       << "START_WRITE[" << middle_.get() << ":" << my_index()
391       << "]: " << data->Length() << " bytes";
392   IoToken write_token(&g_fuzzing_event_engine->outstanding_writes_);
393   grpc_core::global_stats().IncrementSyscallWrite();
394   grpc_core::MutexLock lock(&*mu_);
395   CHECK(!middle_->closed[my_index()]);
396   CHECK(!middle_->writing[my_index()]);
397   // If the write succeeds immediately, then we return true.
398   if (middle_->Write(data, my_index())) return true;
399   middle_->writing[my_index()] = true;
400   ScheduleDelayedWrite(middle_, my_index(), std::move(on_writable), data,
401                        std::move(write_token));
402   return false;
403 }
404 
ScheduleDelayedWrite(std::shared_ptr<EndpointMiddle> middle,int index,absl::AnyInvocable<void (absl::Status)> on_writable,SliceBuffer * data,IoToken write_token)405 void FuzzingEventEngine::FuzzingEndpoint::ScheduleDelayedWrite(
406     std::shared_ptr<EndpointMiddle> middle, int index,
407     absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data,
408     IoToken write_token) {
409   g_fuzzing_event_engine->RunLocked(
410       RunType::kWrite,
411       [write_token = std::move(write_token), middle = std::move(middle), index,
412        data, on_writable = std::move(on_writable)]() mutable {
413         grpc_core::ReleasableMutexLock lock(&*mu_);
414         CHECK(middle->writing[index]);
415         if (middle->closed[index]) {
416           GRPC_TRACE_LOG(fuzzing_ee_writes, INFO)
417               << "CLOSED[" << middle.get() << ":" << index << "]";
418           g_fuzzing_event_engine->RunLocked(
419               RunType::kRunAfter,
420               [on_writable = std::move(on_writable)]() mutable {
421                 on_writable(absl::InternalError("Endpoint closed"));
422               });
423           if (middle->pending_read[1 - index].has_value()) {
424             g_fuzzing_event_engine->RunLocked(
425                 RunType::kRunAfter,
426                 [cb = std::move(
427                      middle->pending_read[1 - index]->on_read)]() mutable {
428                   cb(absl::InternalError("Endpoint closed"));
429                 });
430             middle->pending_read[1 - index].reset();
431           }
432           return;
433         }
434         if (middle->Write(data, index)) {
435           middle->writing[index] = false;
436           lock.Release();
437           on_writable(absl::OkStatus());
438           return;
439         }
440         ScheduleDelayedWrite(std::move(middle), index, std::move(on_writable),
441                              data, std::move(write_token));
442       });
443 }
444 
~FuzzingEndpoint()445 FuzzingEventEngine::FuzzingEndpoint::~FuzzingEndpoint() {
446   grpc_core::MutexLock lock(&*mu_);
447   GRPC_TRACE_LOG(fuzzing_ee_writes, INFO)
448       << "CLOSE[" << middle_.get() << ":" << my_index() << "]: "
449       << GRPC_DUMP_ARGS(
450              middle_->closed[my_index()], middle_->closed[peer_index()],
451              middle_->pending_read[my_index()].has_value(),
452              middle_->pending_read[peer_index()].has_value(),
453              middle_->writing[my_index()], middle_->writing[peer_index()]);
454   middle_->closed[my_index()] = true;
455   if (middle_->pending_read[my_index()].has_value()) {
456     GRPC_TRACE_LOG(fuzzing_ee_writes, INFO)
457         << "CLOSED_READING[" << middle_.get() << ":" << my_index() << "]";
458     g_fuzzing_event_engine->RunLocked(
459         RunType::kRunAfter,
460         [cb = std::move(middle_->pending_read[my_index()]->on_read)]() mutable {
461           cb(absl::InternalError("Endpoint closed"));
462         });
463     middle_->pending_read[my_index()].reset();
464   }
465   if (!middle_->writing[my_index()] &&
466       middle_->pending_read[peer_index()].has_value()) {
467     g_fuzzing_event_engine->RunLocked(
468         RunType::kRunAfter,
469         [cb = std::move(
470              middle_->pending_read[peer_index()]->on_read)]() mutable {
471           cb(absl::InternalError("Endpoint closed"));
472         });
473     middle_->pending_read[peer_index()].reset();
474   }
475 }
476 
Read(absl::AnyInvocable<void (absl::Status)> on_read,SliceBuffer * buffer,const ReadArgs *)477 bool FuzzingEventEngine::FuzzingEndpoint::Read(
478     absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer,
479     const ReadArgs*) {
480   GRPC_TRACE_LOG(fuzzing_ee_writes, INFO)
481       << "START_READ[" << middle_.get() << ":" << my_index() << "]";
482   buffer->Clear();
483   IoToken read_token(&g_fuzzing_event_engine->outstanding_reads_);
484   grpc_core::MutexLock lock(&*mu_);
485   CHECK(!middle_->closed[my_index()]);
486   if (middle_->pending[peer_index()].empty()) {
487     // If the endpoint is closed, fail asynchronously.
488     if (middle_->closed[peer_index()]) {
489       g_fuzzing_event_engine->RunLocked(
490           RunType::kRunAfter,
491           [read_token, on_read = std::move(on_read)]() mutable {
492             on_read(absl::InternalError("Endpoint closed"));
493           });
494       return false;
495     }
496     // If the endpoint has no pending data, then we need to wait for a write.
497     middle_->pending_read[my_index()] =
498         PendingRead{std::move(read_token), std::move(on_read), buffer};
499     return false;
500   } else {
501     // If the endpoint has pending data, then we can fulfill the read
502     // immediately.
503     buffer->Append(Slice::FromCopiedBuffer(middle_->pending[peer_index()]));
504     middle_->pending[peer_index()].clear();
505     return true;
506   }
507 }
508 
WriteSizesForConnection()509 std::queue<size_t> FuzzingEventEngine::WriteSizesForConnection() {
510   if (write_sizes_for_future_connections_.empty()) return std::queue<size_t>();
511   auto ret = std::move(write_sizes_for_future_connections_.front());
512   write_sizes_for_future_connections_.pop();
513   return ret;
514 }
515 
EndpointMiddle(int listener_port,int client_port)516 FuzzingEventEngine::EndpointMiddle::EndpointMiddle(int listener_port,
517                                                    int client_port)
518     : addrs{PortToAddress(listener_port), PortToAddress(client_port)},
519       write_sizes{g_fuzzing_event_engine->WriteSizesForConnection(),
520                   g_fuzzing_event_engine->WriteSizesForConnection()} {}
521 
Connect(OnConnectCallback on_connect,const ResolvedAddress & addr,const EndpointConfig &,MemoryAllocator,Duration)522 EventEngine::ConnectionHandle FuzzingEventEngine::Connect(
523     OnConnectCallback on_connect, const ResolvedAddress& addr,
524     const EndpointConfig&, MemoryAllocator, Duration) {
525   // TODO(ctiller): do something with the timeout
526   // Schedule a timer to run (with some fuzzer selected delay) the on_connect
527   // callback.
528   grpc_core::MutexLock lock(&*mu_);
529   auto task_handle = RunAfterLocked(
530       RunType::kRunAfter, Duration(0),
531       [this, addr, on_connect = std::move(on_connect)]() mutable {
532         // Check for a legal address and extract the target port number.
533         auto port = ResolvedAddressGetPort(addr);
534         grpc_core::MutexLock lock(&*mu_);
535         // Find the listener that is listening on the target port.
536         for (auto it = listeners_.begin(); it != listeners_.end(); ++it) {
537           const auto& listener = *it;
538           // Listener must be started.
539           if (!listener->started) continue;
540           for (int listener_port : listener->ports) {
541             if (port == listener_port) {
542               // Port matches on a started listener: create an endpoint, call
543               // on_accept for the listener and on_connect for the client.
544               auto middle = std::make_shared<EndpointMiddle>(
545                   listener_port, g_fuzzing_event_engine->AllocatePort());
546               auto ep1 = std::make_unique<FuzzingEndpoint>(middle, 0);
547               auto ep2 = std::make_unique<FuzzingEndpoint>(middle, 1);
548               RunLocked(RunType::kRunAfter, [listener,
549                                              ep1 = std::move(ep1)]() mutable {
550                 listener->on_accept(
551                     std::move(ep1),
552                     listener->memory_allocator_factory->CreateMemoryAllocator(
553                         "fuzzing"));
554               });
555               RunLocked(RunType::kRunAfter, [on_connect = std::move(on_connect),
556                                              ep2 = std::move(ep2)]() mutable {
557                 on_connect(std::move(ep2));
558               });
559               return;
560             }
561           }
562         }
563         // Fail: no such listener.
564         RunLocked(RunType::kRunAfter,
565                   [on_connect = std::move(on_connect)]() mutable {
566                     on_connect(absl::InvalidArgumentError("No listener found"));
567                   });
568       });
569   return ConnectionHandle{{task_handle.keys[0], task_handle.keys[1]}};
570 }
571 
CancelConnect(ConnectionHandle connection_handle)572 bool FuzzingEventEngine::CancelConnect(ConnectionHandle connection_handle) {
573   return Cancel(
574       TaskHandle{{connection_handle.keys[0], connection_handle.keys[1]}});
575 }
576 
IsWorkerThread()577 bool FuzzingEventEngine::IsWorkerThread() { abort(); }
578 
579 absl::StatusOr<std::unique_ptr<EventEngine::DNSResolver>>
GetDNSResolver(const DNSResolver::ResolverOptions &)580 FuzzingEventEngine::GetDNSResolver(const DNSResolver::ResolverOptions&) {
581 #if defined(GRPC_POSIX_SOCKET_TCP)
582   return std::make_unique<NativePosixDNSResolver>(shared_from_this());
583 #else
584   grpc_core::Crash("FuzzingEventEngine::GetDNSResolver Not implemented");
585 #endif
586 }
587 
Run(Closure * closure)588 void FuzzingEventEngine::Run(Closure* closure) {
589   grpc_core::MutexLock lock(&*mu_);
590   RunAfterLocked(RunType::kRunAfter, Duration::zero(),
591                  [closure]() { closure->Run(); });
592 }
593 
Run(absl::AnyInvocable<void ()> closure)594 void FuzzingEventEngine::Run(absl::AnyInvocable<void()> closure) {
595   grpc_core::MutexLock lock(&*mu_);
596   RunAfterLocked(RunType::kRunAfter, Duration::zero(), std::move(closure));
597 }
598 
RunAfter(Duration when,Closure * closure)599 EventEngine::TaskHandle FuzzingEventEngine::RunAfter(Duration when,
600                                                      Closure* closure) {
601   return RunAfter(when, [closure]() { closure->Run(); });
602 }
603 
RunAfter(Duration when,absl::AnyInvocable<void ()> closure)604 EventEngine::TaskHandle FuzzingEventEngine::RunAfter(
605     Duration when, absl::AnyInvocable<void()> closure) {
606   {
607     grpc_core::MutexLock lock(&run_after_duration_callback_mu_);
608     if (run_after_duration_callback_ != nullptr) {
609       run_after_duration_callback_(when);
610     }
611   }
612   grpc_core::MutexLock lock(&*mu_);
613   // (b/258949216): Cap it to one year to avoid integer overflow errors.
614   return RunAfterLocked(RunType::kRunAfter, std::min(when, kOneYear),
615                         std::move(closure));
616 }
617 
RunAfterExactly(Duration when,absl::AnyInvocable<void ()> closure)618 EventEngine::TaskHandle FuzzingEventEngine::RunAfterExactly(
619     Duration when, absl::AnyInvocable<void()> closure) {
620   grpc_core::MutexLock lock(&*mu_);
621   // (b/258949216): Cap it to one year to avoid integer overflow errors.
622   return RunAfterLocked(RunType::kExact, std::min(when, kOneYear),
623                         std::move(closure));
624 }
625 
RunAfterLocked(RunType run_type,Duration when,absl::AnyInvocable<void ()> closure)626 EventEngine::TaskHandle FuzzingEventEngine::RunAfterLocked(
627     RunType run_type, Duration when, absl::AnyInvocable<void()> closure) {
628   const intptr_t id = next_task_id_;
629   ++next_task_id_;
630   Duration delay_taken = Duration::zero();
631   if (run_type != RunType::kExact) {
632     if (!task_delays_.empty()) {
633       delay_taken = grpc_core::Clamp(task_delays_.front(), Duration::zero(),
634                                      max_delay_[static_cast<int>(run_type)]);
635       task_delays_.pop();
636     } else if (run_type != RunType::kWrite && when == Duration::zero()) {
637       // For zero-duration events, if there is no more delay input from
638       // the test case, we default to a small non-zero value to avoid
639       // busy loops that prevent us from making forward progress.
640       delay_taken = std::chrono::microseconds(1);
641     }
642     when += delay_taken;
643   }
644   auto task = std::make_shared<Task>(id, std::move(closure));
645   tasks_by_id_.emplace(id, task);
646   Time final_time;
647   Time now;
648   {
649     grpc_core::MutexLock lock(&*now_mu_);
650     final_time = now_ + when;
651     now = now_;
652     tasks_by_time_.emplace(final_time, std::move(task));
653   }
654   GRPC_TRACE_LOG(fuzzing_ee_timers, INFO)
655       << "Schedule timer " << id << " @ "
656       << static_cast<uint64_t>(final_time.time_since_epoch().count())
657       << " (now=" << now.time_since_epoch().count()
658       << "; delay=" << when.count() << "; fuzzing_added=" << delay_taken.count()
659       << "; type=" << static_cast<int>(run_type) << ")";
660   return TaskHandle{id, kTaskHandleSalt};
661 }
662 
Cancel(TaskHandle handle)663 bool FuzzingEventEngine::Cancel(TaskHandle handle) {
664   grpc_core::MutexLock lock(&*mu_);
665   CHECK(handle.keys[1] == kTaskHandleSalt);
666   const intptr_t id = handle.keys[0];
667   auto it = tasks_by_id_.find(id);
668   if (it == tasks_by_id_.end()) {
669     return false;
670   }
671   if (it->second->closure == nullptr) {
672     return false;
673   }
674   GRPC_TRACE_LOG(fuzzing_ee_timers, INFO) << "Cancel timer " << id;
675   it->second->closure = nullptr;
676   return true;
677 }
678 
GlobalNowImpl(gpr_clock_type clock_type)679 gpr_timespec FuzzingEventEngine::GlobalNowImpl(gpr_clock_type clock_type) {
680   if (g_fuzzing_event_engine == nullptr) {
681     return gpr_inf_future(clock_type);
682   }
683   CHECK_NE(g_fuzzing_event_engine, nullptr);
684   grpc_core::MutexLock lock(&*now_mu_);
685   return g_fuzzing_event_engine->NowAsTimespec(clock_type);
686 }
687 
UnsetGlobalHooks()688 void FuzzingEventEngine::UnsetGlobalHooks() {
689   if (g_fuzzing_event_engine != this) return;
690   g_fuzzing_event_engine = nullptr;
691   gpr_now_impl = g_orig_gpr_now_impl;
692   g_orig_gpr_now_impl = nullptr;
693   grpc_set_pick_port_functions(previous_pick_port_functions_);
694 }
695 
~ListenerInfo()696 FuzzingEventEngine::ListenerInfo::~ListenerInfo() {
697   CHECK_NE(g_fuzzing_event_engine, nullptr);
698   g_fuzzing_event_engine->Run(
699       [on_shutdown = std::move(on_shutdown),
700        shutdown_status = std::move(shutdown_status)]() mutable {
701         on_shutdown(std::move(shutdown_status));
702       });
703 }
704 
705 }  // namespace experimental
706 }  // namespace grpc_event_engine
707