1 // Copyright 2023 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <grpc/support/port_platform.h>
16
17 #ifdef GPR_APPLE
18 #include <AvailabilityMacros.h>
19 #ifdef AVAILABLE_MAC_OS_X_VERSION_10_12_AND_LATER
20
21 #include <CoreFoundation/CoreFoundation.h>
22 #include <grpc/support/cpu.h>
23
24 #include "absl/log/check.h"
25 #include "absl/log/log.h"
26 #include "src/core/lib/event_engine/cf_engine/cf_engine.h"
27 #include "src/core/lib/event_engine/cf_engine/cfstream_endpoint.h"
28 #include "src/core/lib/event_engine/cf_engine/dns_service_resolver.h"
29 #include "src/core/lib/event_engine/posix_engine/timer_manager.h"
30 #include "src/core/lib/event_engine/tcp_socket_utils.h"
31 #include "src/core/lib/event_engine/thread_pool/thread_pool.h"
32 #include "src/core/lib/event_engine/utils.h"
33 #include "src/core/util/crash.h"
34
35 namespace grpc_event_engine {
36 namespace experimental {
37
38 struct CFEventEngine::Closure final : public EventEngine::Closure {
39 absl::AnyInvocable<void()> cb;
40 Timer timer;
41 CFEventEngine* engine;
42 EventEngine::TaskHandle handle;
43
Rungrpc_event_engine::experimental::CFEventEngine::Closure44 void Run() override {
45 GRPC_TRACE_LOG(event_engine, INFO)
46 << "CFEventEngine:" << engine << " executing callback:" << handle;
47 {
48 grpc_core::MutexLock lock(&engine->task_mu_);
49 engine->known_handles_.erase(handle);
50 }
51 cb();
52 delete this;
53 }
54 };
55
CFEventEngine()56 CFEventEngine::CFEventEngine()
57 : thread_pool_(
58 MakeThreadPool(grpc_core::Clamp(gpr_cpu_num_cores(), 2u, 16u))),
59 timer_manager_(thread_pool_) {}
60
~CFEventEngine()61 CFEventEngine::~CFEventEngine() {
62 {
63 grpc_core::MutexLock lock(&task_mu_);
64 if (GRPC_TRACE_FLAG_ENABLED(event_engine)) {
65 for (auto handle : known_handles_) {
66 LOG(ERROR) << "CFEventEngine:" << this
67 << " uncleared TaskHandle at shutdown:"
68 << HandleToString(handle);
69 }
70 }
71 CHECK(GPR_LIKELY(known_handles_.empty()));
72 timer_manager_.Shutdown();
73 }
74 thread_pool_->Quiesce();
75 }
76
77 absl::StatusOr<std::unique_ptr<EventEngine::Listener>>
CreateListener(Listener::AcceptCallback,absl::AnyInvocable<void (absl::Status)>,const EndpointConfig &,std::unique_ptr<MemoryAllocatorFactory>)78 CFEventEngine::CreateListener(
79 Listener::AcceptCallback /* on_accept */,
80 absl::AnyInvocable<void(absl::Status)> /* on_shutdown */,
81 const EndpointConfig& /* config */,
82 std::unique_ptr<MemoryAllocatorFactory> /* memory_allocator_factory */) {
83 grpc_core::Crash("unimplemented");
84 }
85
Connect(OnConnectCallback on_connect,const ResolvedAddress & addr,const EndpointConfig &,MemoryAllocator memory_allocator,Duration timeout)86 CFEventEngine::ConnectionHandle CFEventEngine::Connect(
87 OnConnectCallback on_connect, const ResolvedAddress& addr,
88 const EndpointConfig& /* args */, MemoryAllocator memory_allocator,
89 Duration timeout) {
90 auto endpoint_ptr = new CFStreamEndpoint(
91 std::static_pointer_cast<CFEventEngine>(shared_from_this()),
92 std::move(memory_allocator));
93
94 ConnectionHandle handle{reinterpret_cast<intptr_t>(endpoint_ptr), 0};
95 {
96 grpc_core::MutexLock lock(&conn_mu_);
97 conn_handles_.insert(handle);
98 }
99
100 auto deadline_timer =
101 RunAfter(timeout, [handle, that = std::static_pointer_cast<CFEventEngine>(
102 shared_from_this())]() {
103 that->CancelConnectInternal(
104 handle, absl::DeadlineExceededError("Connect timed out"));
105 });
106
107 auto on_connect2 =
108 [that = std::static_pointer_cast<CFEventEngine>(shared_from_this()),
109 deadline_timer, handle,
110 on_connect = std::move(on_connect)](absl::Status status) mutable {
111 // best effort canceling deadline timer
112 that->Cancel(deadline_timer);
113
114 {
115 grpc_core::MutexLock lock(&that->conn_mu_);
116 that->conn_handles_.erase(handle);
117 }
118
119 auto endpoint_ptr = reinterpret_cast<CFStreamEndpoint*>(handle.keys[0]);
120
121 if (!status.ok()) {
122 on_connect(std::move(status));
123 delete endpoint_ptr;
124 return;
125 }
126
127 on_connect(std::unique_ptr<EventEngine::Endpoint>(endpoint_ptr));
128 };
129
130 endpoint_ptr->Connect(std::move(on_connect2), addr);
131
132 return handle;
133 }
134
CancelConnect(ConnectionHandle handle)135 bool CFEventEngine::CancelConnect(ConnectionHandle handle) {
136 CancelConnectInternal(handle, absl::CancelledError("CancelConnect"));
137 // on_connect will always be called, even if cancellation is successful
138 return false;
139 }
140
CancelConnectInternal(ConnectionHandle handle,absl::Status status)141 bool CFEventEngine::CancelConnectInternal(ConnectionHandle handle,
142 absl::Status status) {
143 grpc_core::MutexLock lock(&conn_mu_);
144
145 if (!conn_handles_.contains(handle)) {
146 GRPC_TRACE_LOG(event_engine, INFO)
147 << "Unknown connection handle: " << handle;
148 return false;
149 }
150 conn_handles_.erase(handle);
151
152 // keep the `conn_mu_` lock to prevent endpoint_ptr from being deleted
153
154 auto endpoint_ptr = reinterpret_cast<CFStreamEndpoint*>(handle.keys[0]);
155 return endpoint_ptr->CancelConnect(status);
156 }
157
IsWorkerThread()158 bool CFEventEngine::IsWorkerThread() { grpc_core::Crash("unimplemented"); }
159
160 absl::StatusOr<std::unique_ptr<EventEngine::DNSResolver>>
GetDNSResolver(const DNSResolver::ResolverOptions & options)161 CFEventEngine::GetDNSResolver(const DNSResolver::ResolverOptions& options) {
162 if (!options.dns_server.empty()) {
163 return absl::InvalidArgumentError(
164 "CFEventEngine does not support custom DNS servers");
165 }
166
167 return std::make_unique<DNSServiceResolver>(
168 std::static_pointer_cast<CFEventEngine>(shared_from_this()));
169 }
170
Run(EventEngine::Closure * closure)171 void CFEventEngine::Run(EventEngine::Closure* closure) {
172 thread_pool_->Run(closure);
173 }
174
Run(absl::AnyInvocable<void ()> closure)175 void CFEventEngine::Run(absl::AnyInvocable<void()> closure) {
176 thread_pool_->Run(std::move(closure));
177 }
178
RunAfter(Duration when,EventEngine::Closure * closure)179 EventEngine::TaskHandle CFEventEngine::RunAfter(Duration when,
180 EventEngine::Closure* closure) {
181 return RunAfterInternal(when, [closure]() { closure->Run(); });
182 }
183
RunAfter(Duration when,absl::AnyInvocable<void ()> closure)184 EventEngine::TaskHandle CFEventEngine::RunAfter(
185 Duration when, absl::AnyInvocable<void()> closure) {
186 return RunAfterInternal(when, std::move(closure));
187 }
188
Cancel(TaskHandle handle)189 bool CFEventEngine::Cancel(TaskHandle handle) {
190 grpc_core::MutexLock lock(&task_mu_);
191 if (!known_handles_.contains(handle)) return false;
192 auto* cd = reinterpret_cast<Closure*>(handle.keys[0]);
193 bool r = timer_manager_.TimerCancel(&cd->timer);
194 known_handles_.erase(handle);
195 if (r) delete cd;
196 return r;
197 }
198
RunAfterInternal(Duration when,absl::AnyInvocable<void ()> cb)199 EventEngine::TaskHandle CFEventEngine::RunAfterInternal(
200 Duration when, absl::AnyInvocable<void()> cb) {
201 auto when_ts = ToTimestamp(timer_manager_.Now(), when);
202 auto* cd = new Closure;
203 cd->cb = std::move(cb);
204 cd->engine = this;
205 EventEngine::TaskHandle handle{reinterpret_cast<intptr_t>(cd),
206 aba_token_.fetch_add(1)};
207 grpc_core::MutexLock lock(&task_mu_);
208 known_handles_.insert(handle);
209 cd->handle = handle;
210 GRPC_TRACE_LOG(event_engine, INFO)
211 << "CFEventEngine:" << this << " scheduling callback:" << handle;
212 timer_manager_.TimerInit(&cd->timer, when_ts, cd);
213 return handle;
214 }
215
216 } // namespace experimental
217 } // namespace grpc_event_engine
218
219 #endif // AVAILABLE_MAC_OS_X_VERSION_10_12_AND_LATER
220 #endif // GPR_APPLE
221