1 // Copyright 2023 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 #include <grpc/support/port_platform.h>
16
17 #ifdef GPR_APPLE
18 #include <AvailabilityMacros.h>
19 #ifdef AVAILABLE_MAC_OS_X_VERSION_10_12_AND_LATER
20
21 #include "absl/status/status.h"
22 #include "absl/strings/str_cat.h"
23 #include "absl/strings/str_format.h"
24 #include "src/core/lib/event_engine/cf_engine/cfstream_endpoint.h"
25 #include "src/core/util/strerror.h"
26
27 namespace grpc_event_engine {
28 namespace experimental {
29
30 namespace {
31
32 int kDefaultReadBufferSize = 8192;
33
CFErrorToStatus(CFTypeUniqueRef<CFErrorRef> cf_error)34 absl::Status CFErrorToStatus(CFTypeUniqueRef<CFErrorRef> cf_error) {
35 if (cf_error == nullptr) {
36 return absl::OkStatus();
37 }
38 CFErrorDomain cf_domain = CFErrorGetDomain((cf_error));
39 CFIndex code = CFErrorGetCode((cf_error));
40 CFTypeUniqueRef<CFStringRef> cf_desc = CFErrorCopyDescription((cf_error));
41 char domain_buf[256];
42 char desc_buf[256];
43 CFStringGetCString(cf_domain, domain_buf, 256, kCFStringEncodingUTF8);
44 CFStringGetCString(cf_desc, desc_buf, 256, kCFStringEncodingUTF8);
45 return absl::Status(absl::StatusCode::kUnknown,
46 absl::StrFormat("(domain:%s, code:%ld, description:%s)",
47 domain_buf, code, desc_buf));
48 }
49
CFReadStreamLocallAddress(CFReadStreamRef stream)50 absl::StatusOr<EventEngine::ResolvedAddress> CFReadStreamLocallAddress(
51 CFReadStreamRef stream) {
52 CFTypeUniqueRef<CFDataRef> cf_native_handle = static_cast<CFDataRef>(
53 CFReadStreamCopyProperty(stream, kCFStreamPropertySocketNativeHandle));
54 CFSocketNativeHandle socket;
55 CFDataGetBytes(cf_native_handle, CFRangeMake(0, sizeof(CFSocketNativeHandle)),
56 (UInt8*)&socket);
57 EventEngine::ResolvedAddress addr;
58 socklen_t len = EventEngine::ResolvedAddress::MAX_SIZE_BYTES;
59 if (getsockname(socket, const_cast<sockaddr*>(addr.address()), &len) < 0) {
60 return absl::InternalError(
61 absl::StrCat("getsockname:", grpc_core::StrError(errno)));
62 }
63 return EventEngine::ResolvedAddress(addr.address(), len);
64 }
65
66 } // namespace
67
CancelConnect(absl::Status status)68 bool CFStreamEndpointImpl::CancelConnect(absl::Status status) {
69 GRPC_TRACE_LOG(event_engine_endpoint, INFO)
70 << "CFStreamEndpointImpl::CancelConnect: status: " << status
71 << ", this: " << this;
72
73 return open_event_.SetShutdown(std::move(status));
74 }
75
Connect(absl::AnyInvocable<void (absl::Status)> on_connect,EventEngine::ResolvedAddress addr)76 void CFStreamEndpointImpl::Connect(
77 absl::AnyInvocable<void(absl::Status)> on_connect,
78 EventEngine::ResolvedAddress addr) {
79 auto addr_uri = ResolvedAddressToURI(addr);
80
81 if (!addr_uri.ok()) {
82 on_connect(std::move(addr_uri).status());
83 return;
84 }
85
86 GRPC_TRACE_LOG(event_engine_endpoint, INFO)
87 << "CFStreamEndpointImpl::Connect: " << addr_uri.value();
88
89 peer_address_ = std::move(addr);
90 auto host_port = ResolvedAddressToNormalizedString(peer_address_);
91 if (!host_port.ok()) {
92 on_connect(std::move(host_port).status());
93 return;
94 }
95
96 peer_address_string_ = host_port.value();
97 GRPC_TRACE_LOG(event_engine_endpoint, INFO)
98 << "CFStreamEndpointImpl::Connect, host_port: " << peer_address_string_;
99
100 std::string host_string;
101 std::string port_string;
102 grpc_core::SplitHostPort(host_port.value(), &host_string, &port_string);
103 CFTypeUniqueRef<CFStringRef> host = CFStringCreateWithCString(
104 NULL, host_string.c_str(), kCFStringEncodingUTF8);
105 int port = ResolvedAddressGetPort(peer_address_);
106 CFStreamCreatePairWithSocketToHost(NULL, host, port, &cf_read_stream_,
107 &cf_write_stream_);
108
109 CFStreamClientContext cf_context = {0, this, Retain, Release, nullptr};
110 CFReadStreamSetClient(
111 cf_read_stream_,
112 kCFStreamEventOpenCompleted | kCFStreamEventHasBytesAvailable |
113 kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
114 ReadCallback, &cf_context);
115 CFWriteStreamSetClient(
116 cf_write_stream_,
117 kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes |
118 kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
119 WriteCallback, &cf_context);
120 CFReadStreamSetDispatchQueue(cf_read_stream_,
121 dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0));
122 CFWriteStreamSetDispatchQueue(
123 cf_write_stream_, dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0));
124
125 if (!CFReadStreamOpen(cf_read_stream_)) {
126 auto status = CFErrorToStatus(CFReadStreamCopyError(cf_read_stream_));
127 on_connect(std::move(status));
128 return;
129 }
130
131 if (!CFWriteStreamOpen(cf_write_stream_)) {
132 auto status = CFErrorToStatus(CFWriteStreamCopyError(cf_write_stream_));
133 on_connect(std::move(status));
134 return;
135 }
136
137 open_event_.NotifyOn(new PosixEngineClosure(
138 [that = Ref(),
139 on_connect = std::move(on_connect)](absl::Status status) mutable {
140 if (!status.ok()) {
141 on_connect(std::move(status));
142 return;
143 }
144
145 auto local_addr = CFReadStreamLocallAddress(that->cf_read_stream_);
146 if (!local_addr.ok()) {
147 on_connect(std::move(local_addr).status());
148 return;
149 }
150
151 that->local_address_ = local_addr.value();
152 that->local_address_string_ =
153 *ResolvedAddressToURI(that->local_address_);
154 on_connect(absl::OkStatus());
155 },
156 false /* is_permanent */));
157 }
158
ReadCallback(CFReadStreamRef stream,CFStreamEventType type,void * client_callback_info)159 /* static */ void CFStreamEndpointImpl::ReadCallback(
160 CFReadStreamRef stream, CFStreamEventType type,
161 void* client_callback_info) {
162 auto self = static_cast<CFStreamEndpointImpl*>(client_callback_info);
163
164 GRPC_TRACE_LOG(event_engine_endpoint, INFO)
165 << "CFStreamEndpointImpl::ReadCallback, type: " << type
166 << ", this: " << self;
167
168 switch (type) {
169 case kCFStreamEventOpenCompleted:
170 // wait for write stream open completed to signal connection ready
171 break;
172 case kCFStreamEventHasBytesAvailable:
173 ABSL_FALLTHROUGH_INTENDED;
174 case kCFStreamEventEndEncountered:
175 self->read_event_.SetReady();
176 break;
177 case kCFStreamEventErrorOccurred: {
178 auto status = CFErrorToStatus(CFReadStreamCopyError(stream));
179 GRPC_TRACE_LOG(event_engine_endpoint, INFO)
180 << "CFStream Read error: " << status;
181
182 self->open_event_.SetShutdown(status);
183 self->read_event_.SetShutdown(status);
184 self->write_event_.SetShutdown(status);
185 } break;
186 default:
187 GPR_UNREACHABLE_CODE(return);
188 }
189 }
190
191 /* static */
WriteCallback(CFWriteStreamRef stream,CFStreamEventType type,void * client_callback_info)192 void CFStreamEndpointImpl::WriteCallback(CFWriteStreamRef stream,
193 CFStreamEventType type,
194 void* client_callback_info) {
195 auto self = static_cast<CFStreamEndpointImpl*>(client_callback_info);
196 GRPC_TRACE_LOG(event_engine_endpoint, INFO)
197 << "CFStreamEndpointImpl::WriteCallback, type: " << type
198 << ", this: " << self;
199
200 switch (type) {
201 case kCFStreamEventOpenCompleted:
202 self->open_event_.SetReady();
203 break;
204 case kCFStreamEventCanAcceptBytes:
205 ABSL_FALLTHROUGH_INTENDED;
206 case kCFStreamEventEndEncountered:
207 self->write_event_.SetReady();
208 break;
209 case kCFStreamEventErrorOccurred: {
210 auto status = CFErrorToStatus(CFWriteStreamCopyError(stream));
211 GRPC_TRACE_LOG(event_engine_endpoint, INFO)
212 << "CFStream Write error: " << status;
213
214 self->open_event_.SetShutdown(status);
215 self->read_event_.SetShutdown(status);
216 self->write_event_.SetShutdown(status);
217 } break;
218 default:
219 GPR_UNREACHABLE_CODE(return);
220 }
221 }
222
CFStreamEndpointImpl(std::shared_ptr<CFEventEngine> engine,MemoryAllocator memory_allocator)223 CFStreamEndpointImpl::CFStreamEndpointImpl(
224 std::shared_ptr<CFEventEngine> engine, MemoryAllocator memory_allocator)
225 : engine_(std::move(engine)),
226 memory_allocator_(std::move(memory_allocator)),
227 open_event_(engine_.get()),
228 read_event_(engine_.get()),
229 write_event_(engine_.get()) {
230 open_event_.InitEvent();
231 read_event_.InitEvent();
232 write_event_.InitEvent();
233 }
234
~CFStreamEndpointImpl()235 CFStreamEndpointImpl::~CFStreamEndpointImpl() {
236 open_event_.DestroyEvent();
237 read_event_.DestroyEvent();
238 write_event_.DestroyEvent();
239 }
240
Shutdown()241 void CFStreamEndpointImpl::Shutdown() {
242 GRPC_TRACE_LOG(event_engine_endpoint, INFO)
243 << "CFStreamEndpointImpl::Shutdown: this: " << this;
244
245 auto shutdownStatus =
246 absl::Status(absl::StatusCode::kUnknown,
247 absl::StrFormat("Shutting down CFStreamEndpointImpl"));
248 open_event_.SetShutdown(shutdownStatus);
249 read_event_.SetShutdown(shutdownStatus);
250 write_event_.SetShutdown(shutdownStatus);
251
252 CFReadStreamSetDispatchQueue(cf_read_stream_, nullptr);
253 CFWriteStreamSetDispatchQueue(cf_write_stream_, nullptr);
254
255 CFReadStreamClose(cf_read_stream_);
256 CFWriteStreamClose(cf_write_stream_);
257 }
258
Read(absl::AnyInvocable<void (absl::Status)> on_read,SliceBuffer * buffer,const EventEngine::Endpoint::ReadArgs *)259 bool CFStreamEndpointImpl::Read(
260 absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer,
261 const EventEngine::Endpoint::ReadArgs* /* args */) {
262 GRPC_TRACE_LOG(event_engine_endpoint, INFO)
263 << "CFStreamEndpointImpl::Read, this: " << this;
264
265 read_event_.NotifyOn(new PosixEngineClosure(
266 [that = Ref(), on_read = std::move(on_read),
267 buffer](absl::Status status) mutable {
268 if (status.ok()) {
269 that->DoRead(std::move(on_read), buffer);
270 } else {
271 on_read(status);
272 }
273 },
274 false /* is_permanent*/));
275
276 return false;
277 }
278
DoRead(absl::AnyInvocable<void (absl::Status)> on_read,SliceBuffer * buffer)279 void CFStreamEndpointImpl::DoRead(
280 absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer) {
281 GRPC_TRACE_LOG(event_engine_endpoint, INFO)
282 << "CFStreamEndpointImpl::DoRead, this: " << this;
283
284 auto buffer_index = buffer->AppendIndexed(
285 Slice(memory_allocator_.MakeSlice(kDefaultReadBufferSize)));
286
287 CFIndex read_size = CFReadStreamRead(
288 cf_read_stream_,
289 internal::SliceCast<MutableSlice>(buffer->MutableSliceAt(buffer_index))
290 .begin(),
291 kDefaultReadBufferSize);
292
293 if (read_size < 0) {
294 auto status = CFErrorToStatus(CFReadStreamCopyError(cf_read_stream_));
295 GRPC_TRACE_LOG(event_engine_endpoint, INFO)
296 << "CFStream read error: " << status << ", read_size: " << read_size;
297 on_read(status);
298 return;
299 }
300
301 buffer->RemoveLastNBytes(buffer->Length() - read_size);
302 on_read(absl::OkStatus());
303 }
304
Write(absl::AnyInvocable<void (absl::Status)> on_writable,SliceBuffer * data,const EventEngine::Endpoint::WriteArgs *)305 bool CFStreamEndpointImpl::Write(
306 absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data,
307 const EventEngine::Endpoint::WriteArgs* /* args */) {
308 GRPC_TRACE_LOG(event_engine_endpoint, INFO)
309 << "CFStreamEndpointImpl::Write, this: " << this;
310
311 write_event_.NotifyOn(new PosixEngineClosure(
312 [that = Ref(), on_writable = std::move(on_writable),
313 data](absl::Status status) mutable {
314 if (status.ok()) {
315 that->DoWrite(std::move(on_writable), data);
316 } else {
317 on_writable(status);
318 }
319 },
320 false /* is_permanent*/));
321
322 return false;
323 }
324
DoWrite(absl::AnyInvocable<void (absl::Status)> on_writable,SliceBuffer * data)325 void CFStreamEndpointImpl::DoWrite(
326 absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data) {
327 GRPC_TRACE_LOG(event_engine_endpoint, INFO)
328 << "CFStreamEndpointImpl::DoWrite, this: " << this;
329
330 size_t total_written_size = 0;
331 for (size_t i = 0; i < data->Count(); i++) {
332 auto slice = data->RefSlice(i);
333 if (slice.size() == 0) {
334 continue;
335 }
336
337 CFIndex written_size =
338 CFWriteStreamWrite(cf_write_stream_, slice.begin(), slice.size());
339
340 if (written_size < 0) {
341 auto status = CFErrorToStatus(CFWriteStreamCopyError(cf_write_stream_));
342 GRPC_TRACE_LOG(event_engine_endpoint, INFO)
343 << "CFStream write error: " << status
344 << ", written_size: " << written_size;
345 on_writable(status);
346 return;
347 }
348
349 total_written_size += written_size;
350 if (written_size < slice.size()) {
351 SliceBuffer written;
352 data->MoveFirstNBytesIntoSliceBuffer(total_written_size, written);
353
354 write_event_.NotifyOn(new PosixEngineClosure(
355 [that = Ref(), on_writable = std::move(on_writable),
356 data](absl::Status status) mutable {
357 if (status.ok()) {
358 that->DoWrite(std::move(on_writable), data);
359 } else {
360 on_writable(status);
361 }
362 },
363 false /* is_permanent*/));
364 return;
365 }
366 }
367 on_writable(absl::OkStatus());
368 }
369
370 } // namespace experimental
371 } // namespace grpc_event_engine
372
373 #endif // AVAILABLE_MAC_OS_X_VERSION_10_12_AND_LATER
374 #endif // GPR_APPLE
375