1 // Copyright 2023 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREADY_EVENT_ENGINE_THREADY_EVENT_ENGINE_H 16 #define GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREADY_EVENT_ENGINE_THREADY_EVENT_ENGINE_H 17 18 #include <grpc/event_engine/endpoint_config.h> 19 #include <grpc/event_engine/event_engine.h> 20 #include <grpc/event_engine/memory_allocator.h> 21 #include <grpc/support/port_platform.h> 22 23 #include <memory> 24 #include <utility> 25 26 #include "absl/functional/any_invocable.h" 27 #include "absl/status/status.h" 28 #include "absl/status/statusor.h" 29 #include "absl/strings/string_view.h" 30 31 namespace grpc_event_engine { 32 namespace experimental { 33 34 // An EventEngine that spawns a thread at every available opportunity: 35 // - Run() spawns a thread 36 // - RunAfter() schedules a timer that spawns a thread to run the callback 37 // - Endpoint operations spawn threads and then call the underlying event engine 38 // functions 39 // Implemented as a decorator around a complete EventEngine so that it need not 40 // deal with OS details. 41 // This event engine is intended to be used for testing with TSAN to maximize 42 // its visibility into race conditions in the calling code. 43 class ThreadyEventEngine final : public EventEngine { 44 public: ThreadyEventEngine(std::shared_ptr<EventEngine> impl)45 explicit ThreadyEventEngine(std::shared_ptr<EventEngine> impl) 46 : impl_(std::move(impl)) {} 47 48 absl::StatusOr<std::unique_ptr<Listener>> CreateListener( 49 Listener::AcceptCallback on_accept, 50 absl::AnyInvocable<void(absl::Status)> on_shutdown, 51 const EndpointConfig& config, 52 std::unique_ptr<MemoryAllocatorFactory> memory_allocator_factory) 53 override; 54 55 ConnectionHandle Connect(OnConnectCallback on_connect, 56 const ResolvedAddress& addr, 57 const EndpointConfig& args, 58 MemoryAllocator memory_allocator, 59 Duration timeout) override; 60 61 bool CancelConnect(ConnectionHandle handle) override; 62 63 bool IsWorkerThread() override; 64 65 absl::StatusOr<std::unique_ptr<DNSResolver>> GetDNSResolver( 66 const DNSResolver::ResolverOptions& options) override; 67 68 void Run(Closure* closure) override; 69 void Run(absl::AnyInvocable<void()> closure) override; 70 71 TaskHandle RunAfter(Duration when, Closure* closure) override; 72 TaskHandle RunAfter(Duration when, 73 absl::AnyInvocable<void()> closure) override; 74 75 bool Cancel(TaskHandle handle) override; 76 77 private: 78 class ThreadyDNSResolver final : public DNSResolver { 79 public: ThreadyDNSResolver(std::unique_ptr<DNSResolver> impl,std::shared_ptr<ThreadyEventEngine> engine)80 ThreadyDNSResolver(std::unique_ptr<DNSResolver> impl, 81 std::shared_ptr<ThreadyEventEngine> engine) 82 : impl_(std::move(impl)), engine_(std::move(engine)) {} 83 void LookupHostname(LookupHostnameCallback on_resolve, 84 absl::string_view name, 85 absl::string_view default_port) override; 86 void LookupSRV(LookupSRVCallback on_resolve, 87 absl::string_view name) override; 88 void LookupTXT(LookupTXTCallback on_resolve, 89 absl::string_view name) override; 90 91 private: 92 std::unique_ptr<DNSResolver> impl_; 93 std::shared_ptr<ThreadyEventEngine> engine_; 94 }; 95 96 void Asynchronously(absl::AnyInvocable<void()> fn); 97 98 std::shared_ptr<EventEngine> impl_; 99 }; 100 101 } // namespace experimental 102 } // namespace grpc_event_engine 103 104 #endif // GRPC_SRC_CORE_LIB_EVENT_ENGINE_THREADY_EVENT_ENGINE_THREADY_EVENT_ENGINE_H 105