• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2023 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <grpc/support/port_platform.h>
16 
17 #ifdef GPR_APPLE
18 #include <AvailabilityMacros.h>
19 #ifdef AVAILABLE_MAC_OS_X_VERSION_10_12_AND_LATER
20 
21 #include <CoreFoundation/CoreFoundation.h>
22 #include <grpc/support/cpu.h>
23 
24 #include "absl/log/check.h"
25 #include "absl/log/log.h"
26 #include "src/core/lib/event_engine/cf_engine/cf_engine.h"
27 #include "src/core/lib/event_engine/cf_engine/cfstream_endpoint.h"
28 #include "src/core/lib/event_engine/cf_engine/dns_service_resolver.h"
29 #include "src/core/lib/event_engine/posix_engine/timer_manager.h"
30 #include "src/core/lib/event_engine/tcp_socket_utils.h"
31 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
32 #include "src/core/lib/event_engine/utils.h"
33 #include "src/core/util/crash.h"
34 
35 namespace grpc_event_engine {
36 namespace experimental {
37 
38 struct CFEventEngine::Closure final : public EventEngine::Closure {
39   absl::AnyInvocable<void()> cb;
40   Timer timer;
41   CFEventEngine* engine;
42   EventEngine::TaskHandle handle;
43 
Rungrpc_event_engine::experimental::CFEventEngine::Closure44   void Run() override {
45     GRPC_TRACE_LOG(event_engine, INFO)
46         << "CFEventEngine:" << engine << " executing callback:" << handle;
47     {
48       grpc_core::MutexLock lock(&engine->task_mu_);
49       engine->known_handles_.erase(handle);
50     }
51     cb();
52     delete this;
53   }
54 };
55 
CFEventEngine()56 CFEventEngine::CFEventEngine()
57     : thread_pool_(
58           MakeThreadPool(grpc_core::Clamp(gpr_cpu_num_cores(), 2u, 16u))),
59       timer_manager_(thread_pool_) {}
60 
~CFEventEngine()61 CFEventEngine::~CFEventEngine() {
62   {
63     grpc_core::MutexLock lock(&task_mu_);
64     if (GRPC_TRACE_FLAG_ENABLED(event_engine)) {
65       for (auto handle : known_handles_) {
66         LOG(ERROR) << "CFEventEngine:" << this
67                    << " uncleared TaskHandle at shutdown:"
68                    << HandleToString(handle);
69       }
70     }
71     CHECK(GPR_LIKELY(known_handles_.empty()));
72     timer_manager_.Shutdown();
73   }
74   thread_pool_->Quiesce();
75 }
76 
77 absl::StatusOr<std::unique_ptr<EventEngine::Listener>>
CreateListener(Listener::AcceptCallback,absl::AnyInvocable<void (absl::Status)>,const EndpointConfig &,std::unique_ptr<MemoryAllocatorFactory>)78 CFEventEngine::CreateListener(
79     Listener::AcceptCallback /* on_accept */,
80     absl::AnyInvocable<void(absl::Status)> /* on_shutdown */,
81     const EndpointConfig& /* config */,
82     std::unique_ptr<MemoryAllocatorFactory> /* memory_allocator_factory */) {
83   grpc_core::Crash("unimplemented");
84 }
85 
Connect(OnConnectCallback on_connect,const ResolvedAddress & addr,const EndpointConfig &,MemoryAllocator memory_allocator,Duration timeout)86 CFEventEngine::ConnectionHandle CFEventEngine::Connect(
87     OnConnectCallback on_connect, const ResolvedAddress& addr,
88     const EndpointConfig& /* args */, MemoryAllocator memory_allocator,
89     Duration timeout) {
90   auto endpoint_ptr = new CFStreamEndpoint(
91       std::static_pointer_cast<CFEventEngine>(shared_from_this()),
92       std::move(memory_allocator));
93 
94   ConnectionHandle handle{reinterpret_cast<intptr_t>(endpoint_ptr), 0};
95   {
96     grpc_core::MutexLock lock(&conn_mu_);
97     conn_handles_.insert(handle);
98   }
99 
100   auto deadline_timer =
101       RunAfter(timeout, [handle, that = std::static_pointer_cast<CFEventEngine>(
102                                      shared_from_this())]() {
103         that->CancelConnectInternal(
104             handle, absl::DeadlineExceededError("Connect timed out"));
105       });
106 
107   auto on_connect2 =
108       [that = std::static_pointer_cast<CFEventEngine>(shared_from_this()),
109        deadline_timer, handle,
110        on_connect = std::move(on_connect)](absl::Status status) mutable {
111         // best effort canceling deadline timer
112         that->Cancel(deadline_timer);
113 
114         {
115           grpc_core::MutexLock lock(&that->conn_mu_);
116           that->conn_handles_.erase(handle);
117         }
118 
119         auto endpoint_ptr = reinterpret_cast<CFStreamEndpoint*>(handle.keys[0]);
120 
121         if (!status.ok()) {
122           on_connect(std::move(status));
123           delete endpoint_ptr;
124           return;
125         }
126 
127         on_connect(std::unique_ptr<EventEngine::Endpoint>(endpoint_ptr));
128       };
129 
130   endpoint_ptr->Connect(std::move(on_connect2), addr);
131 
132   return handle;
133 }
134 
CancelConnect(ConnectionHandle handle)135 bool CFEventEngine::CancelConnect(ConnectionHandle handle) {
136   CancelConnectInternal(handle, absl::CancelledError("CancelConnect"));
137   // on_connect will always be called, even if cancellation is successful
138   return false;
139 }
140 
CancelConnectInternal(ConnectionHandle handle,absl::Status status)141 bool CFEventEngine::CancelConnectInternal(ConnectionHandle handle,
142                                           absl::Status status) {
143   grpc_core::MutexLock lock(&conn_mu_);
144 
145   if (!conn_handles_.contains(handle)) {
146     GRPC_TRACE_LOG(event_engine, INFO)
147         << "Unknown connection handle: " << handle;
148     return false;
149   }
150   conn_handles_.erase(handle);
151 
152   // keep the `conn_mu_` lock to prevent endpoint_ptr from being deleted
153 
154   auto endpoint_ptr = reinterpret_cast<CFStreamEndpoint*>(handle.keys[0]);
155   return endpoint_ptr->CancelConnect(status);
156 }
157 
IsWorkerThread()158 bool CFEventEngine::IsWorkerThread() { grpc_core::Crash("unimplemented"); }
159 
160 absl::StatusOr<std::unique_ptr<EventEngine::DNSResolver>>
GetDNSResolver(const DNSResolver::ResolverOptions & options)161 CFEventEngine::GetDNSResolver(const DNSResolver::ResolverOptions& options) {
162   if (!options.dns_server.empty()) {
163     return absl::InvalidArgumentError(
164         "CFEventEngine does not support custom DNS servers");
165   }
166 
167   return std::make_unique<DNSServiceResolver>(
168       std::static_pointer_cast<CFEventEngine>(shared_from_this()));
169 }
170 
Run(EventEngine::Closure * closure)171 void CFEventEngine::Run(EventEngine::Closure* closure) {
172   thread_pool_->Run(closure);
173 }
174 
Run(absl::AnyInvocable<void ()> closure)175 void CFEventEngine::Run(absl::AnyInvocable<void()> closure) {
176   thread_pool_->Run(std::move(closure));
177 }
178 
RunAfter(Duration when,EventEngine::Closure * closure)179 EventEngine::TaskHandle CFEventEngine::RunAfter(Duration when,
180                                                 EventEngine::Closure* closure) {
181   return RunAfterInternal(when, [closure]() { closure->Run(); });
182 }
183 
RunAfter(Duration when,absl::AnyInvocable<void ()> closure)184 EventEngine::TaskHandle CFEventEngine::RunAfter(
185     Duration when, absl::AnyInvocable<void()> closure) {
186   return RunAfterInternal(when, std::move(closure));
187 }
188 
Cancel(TaskHandle handle)189 bool CFEventEngine::Cancel(TaskHandle handle) {
190   grpc_core::MutexLock lock(&task_mu_);
191   if (!known_handles_.contains(handle)) return false;
192   auto* cd = reinterpret_cast<Closure*>(handle.keys[0]);
193   bool r = timer_manager_.TimerCancel(&cd->timer);
194   known_handles_.erase(handle);
195   if (r) delete cd;
196   return r;
197 }
198 
RunAfterInternal(Duration when,absl::AnyInvocable<void ()> cb)199 EventEngine::TaskHandle CFEventEngine::RunAfterInternal(
200     Duration when, absl::AnyInvocable<void()> cb) {
201   auto when_ts = ToTimestamp(timer_manager_.Now(), when);
202   auto* cd = new Closure;
203   cd->cb = std::move(cb);
204   cd->engine = this;
205   EventEngine::TaskHandle handle{reinterpret_cast<intptr_t>(cd),
206                                  aba_token_.fetch_add(1)};
207   grpc_core::MutexLock lock(&task_mu_);
208   known_handles_.insert(handle);
209   cd->handle = handle;
210   GRPC_TRACE_LOG(event_engine, INFO)
211       << "CFEventEngine:" << this << " scheduling callback:" << handle;
212   timer_manager_.TimerInit(&cd->timer, when_ts, cd);
213   return handle;
214 }
215 
216 }  // namespace experimental
217 }  // namespace grpc_event_engine
218 
219 #endif  // AVAILABLE_MAC_OS_X_VERSION_10_12_AND_LATER
220 #endif  // GPR_APPLE
221