• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_TEST_CORE_EVENT_ENGINE_FUZZING_EVENT_ENGINE_FUZZING_EVENT_ENGINE_H
16 #define GRPC_TEST_CORE_EVENT_ENGINE_FUZZING_EVENT_ENGINE_FUZZING_EVENT_ENGINE_H
17 
18 #include <grpc/event_engine/endpoint_config.h>
19 #include <grpc/event_engine/event_engine.h>
20 #include <grpc/event_engine/memory_allocator.h>
21 #include <grpc/event_engine/slice_buffer.h>
22 #include <grpc/support/time.h>
23 #include <stddef.h>
24 
25 #include <atomic>
26 #include <chrono>
27 #include <cstdint>
28 #include <map>
29 #include <memory>
30 #include <queue>
31 #include <set>
32 #include <thread>
33 #include <utility>
34 #include <vector>
35 
36 #include "absl/base/thread_annotations.h"
37 #include "absl/functional/any_invocable.h"
38 #include "absl/log/log.h"
39 #include "absl/status/status.h"
40 #include "absl/status/statusor.h"
41 #include "absl/types/optional.h"
42 #include "src/core/lib/event_engine/time_util.h"
43 #include "src/core/lib/experiments/experiments.h"
44 #include "src/core/util/no_destruct.h"
45 #include "src/core/util/sync.h"
46 #include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h"
47 #include "test/core/test_util/port.h"
48 
49 namespace grpc_event_engine {
50 namespace experimental {
51 
52 // EventEngine implementation to be used by fuzzers.
53 // It's only allowed to have one FuzzingEventEngine instantiated at a time.
54 class FuzzingEventEngine : public EventEngine {
55  public:
56   struct Options {
57     Duration max_delay_run_after = std::chrono::seconds(30);
58     Duration max_delay_write = std::chrono::seconds(30);
59   };
60   explicit FuzzingEventEngine(Options options,
61                               const fuzzing_event_engine::Actions& actions);
~FuzzingEventEngine()62   ~FuzzingEventEngine() override { UnsetGlobalHooks(); }
63 
64   using Time = std::chrono::time_point<FuzzingEventEngine, Duration>;
65 
66   // Once the fuzzing work is completed, this method should be called to speed
67   // quiescence.
68   void FuzzingDone() ABSL_LOCKS_EXCLUDED(mu_);
69   // Increment time once and perform any scheduled work.
70   void Tick(Duration max_time = std::chrono::seconds(600))
71       ABSL_LOCKS_EXCLUDED(mu_);
72   // Repeatedly call Tick() until there is no more work to do.
73   void TickUntilIdle() ABSL_LOCKS_EXCLUDED(mu_);
74   // Returns true if idle.
75   bool IsIdle() ABSL_LOCKS_EXCLUDED(mu_);
76   // Tick until some time
77   void TickUntil(Time t) ABSL_LOCKS_EXCLUDED(mu_);
78   // Tick for some duration
79   void TickForDuration(Duration d) ABSL_LOCKS_EXCLUDED(mu_);
80 
81   // Sets a callback to be invoked any time RunAfter() is called.
82   // Allows tests to verify the specified duration.
83   void SetRunAfterDurationCallback(absl::AnyInvocable<void(Duration)> callback);
84 
85   absl::StatusOr<std::unique_ptr<Listener>> CreateListener(
86       Listener::AcceptCallback on_accept,
87       absl::AnyInvocable<void(absl::Status)> on_shutdown,
88       const EndpointConfig& config,
89       std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)
90       ABSL_LOCKS_EXCLUDED(mu_) override;
91 
92   ConnectionHandle Connect(OnConnectCallback on_connect,
93                            const ResolvedAddress& addr,
94                            const EndpointConfig& args,
95                            MemoryAllocator memory_allocator, Duration timeout)
96       ABSL_LOCKS_EXCLUDED(mu_) override;
97 
98   bool CancelConnect(ConnectionHandle handle) ABSL_LOCKS_EXCLUDED(mu_) override;
99 
100   bool IsWorkerThread() override;
101 
102   absl::StatusOr<std::unique_ptr<DNSResolver>> GetDNSResolver(
103       const DNSResolver::ResolverOptions& options) override;
104 
105   void Run(Closure* closure) ABSL_LOCKS_EXCLUDED(mu_) override;
106   void Run(absl::AnyInvocable<void()> closure)
107       ABSL_LOCKS_EXCLUDED(mu_) override;
108   TaskHandle RunAfter(Duration when, Closure* closure)
109       ABSL_LOCKS_EXCLUDED(mu_) override;
110   TaskHandle RunAfter(Duration when, absl::AnyInvocable<void()> closure)
111       ABSL_LOCKS_EXCLUDED(mu_) override;
112   bool Cancel(TaskHandle handle) ABSL_LOCKS_EXCLUDED(mu_) override;
113 
114   TaskHandle RunAfterExactly(Duration when, absl::AnyInvocable<void()> closure)
115       ABSL_LOCKS_EXCLUDED(mu_);
116 
117   Time Now() ABSL_LOCKS_EXCLUDED(mu_);
118 
119   // Clear any global hooks installed by this event engine. Call prior to
120   // destruction to ensure no overlap between tests if constructing/destructing
121   // each test.
122   void UnsetGlobalHooks() ABSL_LOCKS_EXCLUDED(mu_);
123 
max_delay_write()124   Duration max_delay_write() const {
125     return max_delay_[static_cast<int>(RunType::kWrite)];
126   }
127 
128  private:
129   class IoToken {
130    public:
IoToken()131     IoToken() : refs_(nullptr) {}
IoToken(std::atomic<size_t> * refs)132     explicit IoToken(std::atomic<size_t>* refs) : refs_(refs) {
133       refs_->fetch_add(1, std::memory_order_relaxed);
134     }
~IoToken()135     ~IoToken() {
136       if (refs_ != nullptr) refs_->fetch_sub(1, std::memory_order_relaxed);
137     }
IoToken(const IoToken & other)138     IoToken(const IoToken& other) : refs_(other.refs_) {
139       if (refs_ != nullptr) refs_->fetch_add(1, std::memory_order_relaxed);
140     }
141     IoToken& operator=(const IoToken& other) {
142       IoToken copy(other);
143       Swap(copy);
144       return *this;
145     }
IoToken(IoToken && other)146     IoToken(IoToken&& other) noexcept
147         : refs_(std::exchange(other.refs_, nullptr)) {}
148     IoToken& operator=(IoToken&& other) noexcept {
149       if (refs_ != nullptr) refs_->fetch_sub(1, std::memory_order_relaxed);
150       refs_ = std::exchange(other.refs_, nullptr);
151       return *this;
152     }
Swap(IoToken & other)153     void Swap(IoToken& other) { std::swap(refs_, other.refs_); }
154 
155    private:
156     std::atomic<size_t>* refs_;
157   };
158 
159   enum class RunType {
160     kWrite,
161     kRunAfter,
162     kExact,
163   };
164 
165   // One pending task to be run.
166   struct Task {
TaskTask167     Task(intptr_t id, absl::AnyInvocable<void()> closure)
168         : id(id), closure(std::move(closure)) {}
169     intptr_t id;
170     absl::AnyInvocable<void()> closure;
171   };
172 
173   // Per listener information.
174   // We keep a shared_ptr to this, one reference held by the FuzzingListener
175   // Listener implementation, and one reference in the event engine state, so it
176   // may be iterated through and inspected - principally to discover the ports
177   // on which this listener is listening.
178   struct ListenerInfo {
ListenerInfoListenerInfo179     ListenerInfo(
180         Listener::AcceptCallback on_accept,
181         absl::AnyInvocable<void(absl::Status)> on_shutdown,
182         std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory)
183         : on_accept(std::move(on_accept)),
184           on_shutdown(std::move(on_shutdown)),
185           memory_allocator_factory(std::move(memory_allocator_factory)),
186           started(false) {}
187     ~ListenerInfo() ABSL_LOCKS_EXCLUDED(mu_);
188     // The callback to invoke when a new connection is accepted.
189     Listener::AcceptCallback on_accept;
190     // The callback to invoke when the listener is shut down.
191     absl::AnyInvocable<void(absl::Status)> on_shutdown;
192     // The memory allocator factory to use for this listener.
193     const std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory;
194     // The ports on which this listener is listening.
195     std::vector<int> ports ABSL_GUARDED_BY(mu_);
196     // Has start been called on the listener?
197     // Used to emulate the Bind/Start semantics demanded by the API.
198     bool started ABSL_GUARDED_BY(mu_);
199     // The status to return via on_shutdown.
200     absl::Status shutdown_status ABSL_GUARDED_BY(mu_);
201   };
202 
203   // Implementation of Listener.
204   class FuzzingListener final : public Listener {
205    public:
FuzzingListener(std::shared_ptr<ListenerInfo> info)206     explicit FuzzingListener(std::shared_ptr<ListenerInfo> info)
207         : info_(std::move(info)) {}
208     ~FuzzingListener() override;
209     absl::StatusOr<int> Bind(const ResolvedAddress& addr) override;
210     absl::Status Start() override;
211 
212    private:
213     std::shared_ptr<ListenerInfo> info_;
214   };
215 
216   // One read that's outstanding.
217   struct PendingRead {
218     // The associated io token
219     IoToken io_token;
220     // Callback to invoke when the read completes.
221     absl::AnyInvocable<void(absl::Status)> on_read;
222     // The buffer to read into.
223     SliceBuffer* buffer;
224   };
225 
226   // The join between two Endpoint instances.
227   struct EndpointMiddle {
228     EndpointMiddle(int listener_port, int client_port)
229         ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
230     // Address of each side of the endpoint.
231     const ResolvedAddress addrs[2];
232     // Is the endpoint closed?
233     bool closed[2] ABSL_GUARDED_BY(mu_) = {false, false};
234     // Is the endpoint writing?
235     bool writing[2] ABSL_GUARDED_BY(mu_) = {false, false};
236     // Bytes written into each endpoint and awaiting a read.
237     std::vector<uint8_t> pending[2] ABSL_GUARDED_BY(mu_);
238     // The sizes of each accepted write, as determined by the fuzzer actions.
239     std::queue<size_t> write_sizes[2] ABSL_GUARDED_BY(mu_);
240     // The next read that's pending (or nullopt).
241     absl::optional<PendingRead> pending_read[2] ABSL_GUARDED_BY(mu_);
242 
243     // Helper to take some bytes from data and queue them into pending[index].
244     // Returns true if all bytes were consumed, false if more writes are needed.
245     bool Write(SliceBuffer* data, int index) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
246   };
247 
248   // Implementation of Endpoint.
249   // When a connection is formed, we create two of these - one with index 0, the
250   // other index 1, both pointing to the same EndpointMiddle.
251   class FuzzingEndpoint final : public Endpoint {
252    public:
FuzzingEndpoint(std::shared_ptr<EndpointMiddle> middle,int index)253     FuzzingEndpoint(std::shared_ptr<EndpointMiddle> middle, int index)
254         : middle_(std::move(middle)), index_(index) {}
255     ~FuzzingEndpoint() override;
256 
257     bool Read(absl::AnyInvocable<void(absl::Status)> on_read,
258               SliceBuffer* buffer, const ReadArgs* args) override;
259     bool Write(absl::AnyInvocable<void(absl::Status)> on_writable,
260                SliceBuffer* data, const WriteArgs* args) override;
GetPeerAddress()261     const ResolvedAddress& GetPeerAddress() const override {
262       return middle_->addrs[peer_index()];
263     }
GetLocalAddress()264     const ResolvedAddress& GetLocalAddress() const override {
265       return middle_->addrs[my_index()];
266     }
267 
268    private:
my_index()269     int my_index() const { return index_; }
peer_index()270     int peer_index() const { return 1 - index_; }
271     // Schedule additional writes to be performed later.
272     // Takes a ref to middle instead of holding this, so that should the
273     // endpoint be destroyed we don't have to worry about use-after-free.
274     // Instead that scheduled callback will see the middle is closed and finally
275     // report completion to the caller.
276     // Since there is no timeliness contract for the completion of writes after
277     // endpoint shutdown, it's believed this is a legal implementation.
278     static void ScheduleDelayedWrite(
279         std::shared_ptr<EndpointMiddle> middle, int index,
280         absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data,
281         IoToken write_token) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
282     const std::shared_ptr<EndpointMiddle> middle_;
283     const int index_;
284   };
285 
RunLocked(RunType run_type,absl::AnyInvocable<void ()> closure)286   void RunLocked(RunType run_type, absl::AnyInvocable<void()> closure)
287       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
288     RunAfterLocked(run_type, Duration::zero(), std::move(closure));
289   }
290 
291   TaskHandle RunAfterLocked(RunType run_type, Duration when,
292                             absl::AnyInvocable<void()> closure)
293       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
294 
295   // Allocate a port. Considered fuzzer selected port orderings first, and then
296   // falls back to an exhaustive incremental search from port #1.
297   int AllocatePort() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
298   // Is the given port in use by any listener?
299   bool IsPortUsed(int port) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
300 
301   bool IsIdleLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
302 
303   // Called whenever the time is incremented, and used by
304   // ThreadedFuzzingEventEngine to insert a sleep so that real time passes at
305   // approximately the same rate as FuzzingEventEngine time.
306   // TODO(ctiller): This is very approximate and unsound and we should probably
307   // evaluate whether we want to continue supporting ThreadedFuzzingEventEngine
308   // at all.
OnClockIncremented(Duration)309   virtual void OnClockIncremented(Duration) {}
310 
311   // We need everything EventEngine to do reasonable timer steps -- without it
312   // we need to do a bunch of evil to make sure both timer systems are ticking
313   // each step.
IsSaneTimerEnvironment()314   static bool IsSaneTimerEnvironment() {
315     return grpc_core::IsEventEngineClientEnabled() &&
316            grpc_core::IsEventEngineListenerEnabled() &&
317            grpc_core::IsEventEngineDnsEnabled();
318   }
319 
320   // For the next connection being built, query the list of fuzzer selected
321   // write size limits.
322   std::queue<size_t> WriteSizesForConnection()
323       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
324 
325   gpr_timespec NowAsTimespec(gpr_clock_type clock_type)
326       ABSL_EXCLUSIVE_LOCKS_REQUIRED(now_mu_);
327   static gpr_timespec GlobalNowImpl(gpr_clock_type clock_type)
328       ABSL_LOCKS_EXCLUDED(mu_);
329 
330   static grpc_core::NoDestruct<grpc_core::Mutex> mu_;
331   static grpc_core::NoDestruct<grpc_core::Mutex> now_mu_
332       ABSL_ACQUIRED_AFTER(mu_);
333 
334   const Duration max_delay_[2];
335   intptr_t next_task_id_ ABSL_GUARDED_BY(mu_) = 1;
336   // Start at 5 seconds after the epoch.
337   // This needs to be more than 1, and otherwise is kind of arbitrary.
338   // The grpc_core::Timer code special cases the zero second time period after
339   // epoch to allow for some fancy atomic stuff.
340   Time now_ ABSL_GUARDED_BY(now_mu_) = Time() + std::chrono::seconds(5);
341   std::queue<Duration> task_delays_ ABSL_GUARDED_BY(mu_);
342   std::map<intptr_t, std::shared_ptr<Task>> tasks_by_id_ ABSL_GUARDED_BY(mu_);
343   std::multimap<Time, std::shared_ptr<Task>> tasks_by_time_
344       ABSL_GUARDED_BY(mu_);
345   std::set<std::shared_ptr<ListenerInfo>> listeners_ ABSL_GUARDED_BY(mu_);
346   // Fuzzer selected port allocations.
347   std::queue<int> free_ports_ ABSL_GUARDED_BY(mu_);
348   // Next free port to allocate once fuzzer selections are exhausted.
349   int next_free_port_ ABSL_GUARDED_BY(mu_) = 1;
350   // Ports that were included in the fuzzer selected port orderings.
351   std::set<int> fuzzer_mentioned_ports_ ABSL_GUARDED_BY(mu_);
352   // Fuzzer selected write sizes for future connections - one picked off per
353   // WriteSizesForConnection() call.
354   std::queue<std::queue<size_t>> write_sizes_for_future_connections_
355       ABSL_GUARDED_BY(mu_);
356   grpc_pick_port_functions previous_pick_port_functions_;
357   std::atomic<size_t> outstanding_writes_{0};
358   std::atomic<size_t> outstanding_reads_{0};
359 
360   // TODO(ctiller): these can be removed when IsSaneTimerEnvironment() is
361   // guaranteed to be true.
362   Duration exponential_gate_time_increment_ ABSL_GUARDED_BY(mu_) =
363       std::chrono::milliseconds(1);
364   intptr_t current_tick_ ABSL_GUARDED_BY(now_mu_) = 0;
365 
366   grpc_core::Mutex run_after_duration_callback_mu_;
367   absl::AnyInvocable<void(Duration)> run_after_duration_callback_
368       ABSL_GUARDED_BY(run_after_duration_callback_mu_);
369 };
370 
371 class ThreadedFuzzingEventEngine : public FuzzingEventEngine {
372  public:
ThreadedFuzzingEventEngine()373   ThreadedFuzzingEventEngine()
374       : ThreadedFuzzingEventEngine(std::chrono::milliseconds(10)) {}
375 
ThreadedFuzzingEventEngine(Duration max_time)376   explicit ThreadedFuzzingEventEngine(Duration max_time)
377       : FuzzingEventEngine(FuzzingEventEngine::Options(),
378                            fuzzing_event_engine::Actions()),
379         main_([this, max_time]() {
380           while (!done_.load()) {
381             Tick(max_time);
382           }
383         }) {}
384 
~ThreadedFuzzingEventEngine()385   ~ThreadedFuzzingEventEngine() override {
386     done_.store(true);
387     main_.join();
388   }
389 
390  private:
OnClockIncremented(Duration duration)391   void OnClockIncremented(Duration duration) override {
392     absl::SleepFor(absl::Milliseconds(
393         1 + grpc_event_engine::experimental::Milliseconds(duration)));
394   }
395 
396   std::atomic<bool> done_{false};
397   std::thread main_;
398 };
399 
400 }  // namespace experimental
401 }  // namespace grpc_event_engine
402 
403 #endif  // GRPC_TEST_CORE_EVENT_ENGINE_FUZZING_EVENT_ENGINE_FUZZING_EVENT_ENGINE_H
404