1 // Copyright 2022 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_TEST_CORE_EVENT_ENGINE_FUZZING_EVENT_ENGINE_FUZZING_EVENT_ENGINE_H 16 #define GRPC_TEST_CORE_EVENT_ENGINE_FUZZING_EVENT_ENGINE_FUZZING_EVENT_ENGINE_H 17 18 #include <grpc/event_engine/endpoint_config.h> 19 #include <grpc/event_engine/event_engine.h> 20 #include <grpc/event_engine/memory_allocator.h> 21 #include <grpc/event_engine/slice_buffer.h> 22 #include <grpc/support/time.h> 23 #include <stddef.h> 24 25 #include <atomic> 26 #include <chrono> 27 #include <cstdint> 28 #include <map> 29 #include <memory> 30 #include <queue> 31 #include <set> 32 #include <thread> 33 #include <utility> 34 #include <vector> 35 36 #include "absl/base/thread_annotations.h" 37 #include "absl/functional/any_invocable.h" 38 #include "absl/log/log.h" 39 #include "absl/status/status.h" 40 #include "absl/status/statusor.h" 41 #include "absl/types/optional.h" 42 #include "src/core/lib/event_engine/time_util.h" 43 #include "src/core/lib/experiments/experiments.h" 44 #include "src/core/util/no_destruct.h" 45 #include "src/core/util/sync.h" 46 #include "test/core/event_engine/fuzzing_event_engine/fuzzing_event_engine.pb.h" 47 #include "test/core/test_util/port.h" 48 49 namespace grpc_event_engine { 50 namespace experimental { 51 52 // EventEngine implementation to be used by fuzzers. 53 // It's only allowed to have one FuzzingEventEngine instantiated at a time. 54 class FuzzingEventEngine : public EventEngine { 55 public: 56 struct Options { 57 Duration max_delay_run_after = std::chrono::seconds(30); 58 Duration max_delay_write = std::chrono::seconds(30); 59 }; 60 explicit FuzzingEventEngine(Options options, 61 const fuzzing_event_engine::Actions& actions); ~FuzzingEventEngine()62 ~FuzzingEventEngine() override { UnsetGlobalHooks(); } 63 64 using Time = std::chrono::time_point<FuzzingEventEngine, Duration>; 65 66 // Once the fuzzing work is completed, this method should be called to speed 67 // quiescence. 68 void FuzzingDone() ABSL_LOCKS_EXCLUDED(mu_); 69 // Increment time once and perform any scheduled work. 70 void Tick(Duration max_time = std::chrono::seconds(600)) 71 ABSL_LOCKS_EXCLUDED(mu_); 72 // Repeatedly call Tick() until there is no more work to do. 73 void TickUntilIdle() ABSL_LOCKS_EXCLUDED(mu_); 74 // Returns true if idle. 75 bool IsIdle() ABSL_LOCKS_EXCLUDED(mu_); 76 // Tick until some time 77 void TickUntil(Time t) ABSL_LOCKS_EXCLUDED(mu_); 78 // Tick for some duration 79 void TickForDuration(Duration d) ABSL_LOCKS_EXCLUDED(mu_); 80 81 // Sets a callback to be invoked any time RunAfter() is called. 82 // Allows tests to verify the specified duration. 83 void SetRunAfterDurationCallback(absl::AnyInvocable<void(Duration)> callback); 84 85 absl::StatusOr<std::unique_ptr<Listener>> CreateListener( 86 Listener::AcceptCallback on_accept, 87 absl::AnyInvocable<void(absl::Status)> on_shutdown, 88 const EndpointConfig& config, 89 std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) 90 ABSL_LOCKS_EXCLUDED(mu_) override; 91 92 ConnectionHandle Connect(OnConnectCallback on_connect, 93 const ResolvedAddress& addr, 94 const EndpointConfig& args, 95 MemoryAllocator memory_allocator, Duration timeout) 96 ABSL_LOCKS_EXCLUDED(mu_) override; 97 98 bool CancelConnect(ConnectionHandle handle) ABSL_LOCKS_EXCLUDED(mu_) override; 99 100 bool IsWorkerThread() override; 101 102 absl::StatusOr<std::unique_ptr<DNSResolver>> GetDNSResolver( 103 const DNSResolver::ResolverOptions& options) override; 104 105 void Run(Closure* closure) ABSL_LOCKS_EXCLUDED(mu_) override; 106 void Run(absl::AnyInvocable<void()> closure) 107 ABSL_LOCKS_EXCLUDED(mu_) override; 108 TaskHandle RunAfter(Duration when, Closure* closure) 109 ABSL_LOCKS_EXCLUDED(mu_) override; 110 TaskHandle RunAfter(Duration when, absl::AnyInvocable<void()> closure) 111 ABSL_LOCKS_EXCLUDED(mu_) override; 112 bool Cancel(TaskHandle handle) ABSL_LOCKS_EXCLUDED(mu_) override; 113 114 TaskHandle RunAfterExactly(Duration when, absl::AnyInvocable<void()> closure) 115 ABSL_LOCKS_EXCLUDED(mu_); 116 117 Time Now() ABSL_LOCKS_EXCLUDED(mu_); 118 119 // Clear any global hooks installed by this event engine. Call prior to 120 // destruction to ensure no overlap between tests if constructing/destructing 121 // each test. 122 void UnsetGlobalHooks() ABSL_LOCKS_EXCLUDED(mu_); 123 max_delay_write()124 Duration max_delay_write() const { 125 return max_delay_[static_cast<int>(RunType::kWrite)]; 126 } 127 128 private: 129 class IoToken { 130 public: IoToken()131 IoToken() : refs_(nullptr) {} IoToken(std::atomic<size_t> * refs)132 explicit IoToken(std::atomic<size_t>* refs) : refs_(refs) { 133 refs_->fetch_add(1, std::memory_order_relaxed); 134 } ~IoToken()135 ~IoToken() { 136 if (refs_ != nullptr) refs_->fetch_sub(1, std::memory_order_relaxed); 137 } IoToken(const IoToken & other)138 IoToken(const IoToken& other) : refs_(other.refs_) { 139 if (refs_ != nullptr) refs_->fetch_add(1, std::memory_order_relaxed); 140 } 141 IoToken& operator=(const IoToken& other) { 142 IoToken copy(other); 143 Swap(copy); 144 return *this; 145 } IoToken(IoToken && other)146 IoToken(IoToken&& other) noexcept 147 : refs_(std::exchange(other.refs_, nullptr)) {} 148 IoToken& operator=(IoToken&& other) noexcept { 149 if (refs_ != nullptr) refs_->fetch_sub(1, std::memory_order_relaxed); 150 refs_ = std::exchange(other.refs_, nullptr); 151 return *this; 152 } Swap(IoToken & other)153 void Swap(IoToken& other) { std::swap(refs_, other.refs_); } 154 155 private: 156 std::atomic<size_t>* refs_; 157 }; 158 159 enum class RunType { 160 kWrite, 161 kRunAfter, 162 kExact, 163 }; 164 165 // One pending task to be run. 166 struct Task { TaskTask167 Task(intptr_t id, absl::AnyInvocable<void()> closure) 168 : id(id), closure(std::move(closure)) {} 169 intptr_t id; 170 absl::AnyInvocable<void()> closure; 171 }; 172 173 // Per listener information. 174 // We keep a shared_ptr to this, one reference held by the FuzzingListener 175 // Listener implementation, and one reference in the event engine state, so it 176 // may be iterated through and inspected - principally to discover the ports 177 // on which this listener is listening. 178 struct ListenerInfo { ListenerInfoListenerInfo179 ListenerInfo( 180 Listener::AcceptCallback on_accept, 181 absl::AnyInvocable<void(absl::Status)> on_shutdown, 182 std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) 183 : on_accept(std::move(on_accept)), 184 on_shutdown(std::move(on_shutdown)), 185 memory_allocator_factory(std::move(memory_allocator_factory)), 186 started(false) {} 187 ~ListenerInfo() ABSL_LOCKS_EXCLUDED(mu_); 188 // The callback to invoke when a new connection is accepted. 189 Listener::AcceptCallback on_accept; 190 // The callback to invoke when the listener is shut down. 191 absl::AnyInvocable<void(absl::Status)> on_shutdown; 192 // The memory allocator factory to use for this listener. 193 const std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory; 194 // The ports on which this listener is listening. 195 std::vector<int> ports ABSL_GUARDED_BY(mu_); 196 // Has start been called on the listener? 197 // Used to emulate the Bind/Start semantics demanded by the API. 198 bool started ABSL_GUARDED_BY(mu_); 199 // The status to return via on_shutdown. 200 absl::Status shutdown_status ABSL_GUARDED_BY(mu_); 201 }; 202 203 // Implementation of Listener. 204 class FuzzingListener final : public Listener { 205 public: FuzzingListener(std::shared_ptr<ListenerInfo> info)206 explicit FuzzingListener(std::shared_ptr<ListenerInfo> info) 207 : info_(std::move(info)) {} 208 ~FuzzingListener() override; 209 absl::StatusOr<int> Bind(const ResolvedAddress& addr) override; 210 absl::Status Start() override; 211 212 private: 213 std::shared_ptr<ListenerInfo> info_; 214 }; 215 216 // One read that's outstanding. 217 struct PendingRead { 218 // The associated io token 219 IoToken io_token; 220 // Callback to invoke when the read completes. 221 absl::AnyInvocable<void(absl::Status)> on_read; 222 // The buffer to read into. 223 SliceBuffer* buffer; 224 }; 225 226 // The join between two Endpoint instances. 227 struct EndpointMiddle { 228 EndpointMiddle(int listener_port, int client_port) 229 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 230 // Address of each side of the endpoint. 231 const ResolvedAddress addrs[2]; 232 // Is the endpoint closed? 233 bool closed[2] ABSL_GUARDED_BY(mu_) = {false, false}; 234 // Is the endpoint writing? 235 bool writing[2] ABSL_GUARDED_BY(mu_) = {false, false}; 236 // Bytes written into each endpoint and awaiting a read. 237 std::vector<uint8_t> pending[2] ABSL_GUARDED_BY(mu_); 238 // The sizes of each accepted write, as determined by the fuzzer actions. 239 std::queue<size_t> write_sizes[2] ABSL_GUARDED_BY(mu_); 240 // The next read that's pending (or nullopt). 241 absl::optional<PendingRead> pending_read[2] ABSL_GUARDED_BY(mu_); 242 243 // Helper to take some bytes from data and queue them into pending[index]. 244 // Returns true if all bytes were consumed, false if more writes are needed. 245 bool Write(SliceBuffer* data, int index) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 246 }; 247 248 // Implementation of Endpoint. 249 // When a connection is formed, we create two of these - one with index 0, the 250 // other index 1, both pointing to the same EndpointMiddle. 251 class FuzzingEndpoint final : public Endpoint { 252 public: FuzzingEndpoint(std::shared_ptr<EndpointMiddle> middle,int index)253 FuzzingEndpoint(std::shared_ptr<EndpointMiddle> middle, int index) 254 : middle_(std::move(middle)), index_(index) {} 255 ~FuzzingEndpoint() override; 256 257 bool Read(absl::AnyInvocable<void(absl::Status)> on_read, 258 SliceBuffer* buffer, const ReadArgs* args) override; 259 bool Write(absl::AnyInvocable<void(absl::Status)> on_writable, 260 SliceBuffer* data, const WriteArgs* args) override; GetPeerAddress()261 const ResolvedAddress& GetPeerAddress() const override { 262 return middle_->addrs[peer_index()]; 263 } GetLocalAddress()264 const ResolvedAddress& GetLocalAddress() const override { 265 return middle_->addrs[my_index()]; 266 } 267 268 private: my_index()269 int my_index() const { return index_; } peer_index()270 int peer_index() const { return 1 - index_; } 271 // Schedule additional writes to be performed later. 272 // Takes a ref to middle instead of holding this, so that should the 273 // endpoint be destroyed we don't have to worry about use-after-free. 274 // Instead that scheduled callback will see the middle is closed and finally 275 // report completion to the caller. 276 // Since there is no timeliness contract for the completion of writes after 277 // endpoint shutdown, it's believed this is a legal implementation. 278 static void ScheduleDelayedWrite( 279 std::shared_ptr<EndpointMiddle> middle, int index, 280 absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data, 281 IoToken write_token) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 282 const std::shared_ptr<EndpointMiddle> middle_; 283 const int index_; 284 }; 285 RunLocked(RunType run_type,absl::AnyInvocable<void ()> closure)286 void RunLocked(RunType run_type, absl::AnyInvocable<void()> closure) 287 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { 288 RunAfterLocked(run_type, Duration::zero(), std::move(closure)); 289 } 290 291 TaskHandle RunAfterLocked(RunType run_type, Duration when, 292 absl::AnyInvocable<void()> closure) 293 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 294 295 // Allocate a port. Considered fuzzer selected port orderings first, and then 296 // falls back to an exhaustive incremental search from port #1. 297 int AllocatePort() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 298 // Is the given port in use by any listener? 299 bool IsPortUsed(int port) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 300 301 bool IsIdleLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 302 303 // Called whenever the time is incremented, and used by 304 // ThreadedFuzzingEventEngine to insert a sleep so that real time passes at 305 // approximately the same rate as FuzzingEventEngine time. 306 // TODO(ctiller): This is very approximate and unsound and we should probably 307 // evaluate whether we want to continue supporting ThreadedFuzzingEventEngine 308 // at all. OnClockIncremented(Duration)309 virtual void OnClockIncremented(Duration) {} 310 311 // We need everything EventEngine to do reasonable timer steps -- without it 312 // we need to do a bunch of evil to make sure both timer systems are ticking 313 // each step. IsSaneTimerEnvironment()314 static bool IsSaneTimerEnvironment() { 315 return grpc_core::IsEventEngineClientEnabled() && 316 grpc_core::IsEventEngineListenerEnabled() && 317 grpc_core::IsEventEngineDnsEnabled(); 318 } 319 320 // For the next connection being built, query the list of fuzzer selected 321 // write size limits. 322 std::queue<size_t> WriteSizesForConnection() 323 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 324 325 gpr_timespec NowAsTimespec(gpr_clock_type clock_type) 326 ABSL_EXCLUSIVE_LOCKS_REQUIRED(now_mu_); 327 static gpr_timespec GlobalNowImpl(gpr_clock_type clock_type) 328 ABSL_LOCKS_EXCLUDED(mu_); 329 330 static grpc_core::NoDestruct<grpc_core::Mutex> mu_; 331 static grpc_core::NoDestruct<grpc_core::Mutex> now_mu_ 332 ABSL_ACQUIRED_AFTER(mu_); 333 334 const Duration max_delay_[2]; 335 intptr_t next_task_id_ ABSL_GUARDED_BY(mu_) = 1; 336 // Start at 5 seconds after the epoch. 337 // This needs to be more than 1, and otherwise is kind of arbitrary. 338 // The grpc_core::Timer code special cases the zero second time period after 339 // epoch to allow for some fancy atomic stuff. 340 Time now_ ABSL_GUARDED_BY(now_mu_) = Time() + std::chrono::seconds(5); 341 std::queue<Duration> task_delays_ ABSL_GUARDED_BY(mu_); 342 std::map<intptr_t, std::shared_ptr<Task>> tasks_by_id_ ABSL_GUARDED_BY(mu_); 343 std::multimap<Time, std::shared_ptr<Task>> tasks_by_time_ 344 ABSL_GUARDED_BY(mu_); 345 std::set<std::shared_ptr<ListenerInfo>> listeners_ ABSL_GUARDED_BY(mu_); 346 // Fuzzer selected port allocations. 347 std::queue<int> free_ports_ ABSL_GUARDED_BY(mu_); 348 // Next free port to allocate once fuzzer selections are exhausted. 349 int next_free_port_ ABSL_GUARDED_BY(mu_) = 1; 350 // Ports that were included in the fuzzer selected port orderings. 351 std::set<int> fuzzer_mentioned_ports_ ABSL_GUARDED_BY(mu_); 352 // Fuzzer selected write sizes for future connections - one picked off per 353 // WriteSizesForConnection() call. 354 std::queue<std::queue<size_t>> write_sizes_for_future_connections_ 355 ABSL_GUARDED_BY(mu_); 356 grpc_pick_port_functions previous_pick_port_functions_; 357 std::atomic<size_t> outstanding_writes_{0}; 358 std::atomic<size_t> outstanding_reads_{0}; 359 360 // TODO(ctiller): these can be removed when IsSaneTimerEnvironment() is 361 // guaranteed to be true. 362 Duration exponential_gate_time_increment_ ABSL_GUARDED_BY(mu_) = 363 std::chrono::milliseconds(1); 364 intptr_t current_tick_ ABSL_GUARDED_BY(now_mu_) = 0; 365 366 grpc_core::Mutex run_after_duration_callback_mu_; 367 absl::AnyInvocable<void(Duration)> run_after_duration_callback_ 368 ABSL_GUARDED_BY(run_after_duration_callback_mu_); 369 }; 370 371 class ThreadedFuzzingEventEngine : public FuzzingEventEngine { 372 public: ThreadedFuzzingEventEngine()373 ThreadedFuzzingEventEngine() 374 : ThreadedFuzzingEventEngine(std::chrono::milliseconds(10)) {} 375 ThreadedFuzzingEventEngine(Duration max_time)376 explicit ThreadedFuzzingEventEngine(Duration max_time) 377 : FuzzingEventEngine(FuzzingEventEngine::Options(), 378 fuzzing_event_engine::Actions()), 379 main_([this, max_time]() { 380 while (!done_.load()) { 381 Tick(max_time); 382 } 383 }) {} 384 ~ThreadedFuzzingEventEngine()385 ~ThreadedFuzzingEventEngine() override { 386 done_.store(true); 387 main_.join(); 388 } 389 390 private: OnClockIncremented(Duration duration)391 void OnClockIncremented(Duration duration) override { 392 absl::SleepFor(absl::Milliseconds( 393 1 + grpc_event_engine::experimental::Milliseconds(duration))); 394 } 395 396 std::atomic<bool> done_{false}; 397 std::thread main_; 398 }; 399 400 } // namespace experimental 401 } // namespace grpc_event_engine 402 403 #endif // GRPC_TEST_CORE_EVENT_ENGINE_FUZZING_EVENT_ENGINE_FUZZING_EVENT_ENGINE_H 404