• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2023 The gRPC Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <grpc/support/port_platform.h>
16 
17 #ifdef GPR_APPLE
18 #include <AvailabilityMacros.h>
19 #ifdef AVAILABLE_MAC_OS_X_VERSION_10_12_AND_LATER
20 
21 #include "absl/status/status.h"
22 #include "absl/strings/str_cat.h"
23 #include "absl/strings/str_format.h"
24 #include "src/core/lib/event_engine/cf_engine/cfstream_endpoint.h"
25 #include "src/core/util/strerror.h"
26 
27 namespace grpc_event_engine {
28 namespace experimental {
29 
30 namespace {
31 
32 int kDefaultReadBufferSize = 8192;
33 
CFErrorToStatus(CFTypeUniqueRef<CFErrorRef> cf_error)34 absl::Status CFErrorToStatus(CFTypeUniqueRef<CFErrorRef> cf_error) {
35   if (cf_error == nullptr) {
36     return absl::OkStatus();
37   }
38   CFErrorDomain cf_domain = CFErrorGetDomain((cf_error));
39   CFIndex code = CFErrorGetCode((cf_error));
40   CFTypeUniqueRef<CFStringRef> cf_desc = CFErrorCopyDescription((cf_error));
41   char domain_buf[256];
42   char desc_buf[256];
43   CFStringGetCString(cf_domain, domain_buf, 256, kCFStringEncodingUTF8);
44   CFStringGetCString(cf_desc, desc_buf, 256, kCFStringEncodingUTF8);
45   return absl::Status(absl::StatusCode::kUnknown,
46                       absl::StrFormat("(domain:%s, code:%ld, description:%s)",
47                                       domain_buf, code, desc_buf));
48 }
49 
CFReadStreamLocallAddress(CFReadStreamRef stream)50 absl::StatusOr<EventEngine::ResolvedAddress> CFReadStreamLocallAddress(
51     CFReadStreamRef stream) {
52   CFTypeUniqueRef<CFDataRef> cf_native_handle = static_cast<CFDataRef>(
53       CFReadStreamCopyProperty(stream, kCFStreamPropertySocketNativeHandle));
54   CFSocketNativeHandle socket;
55   CFDataGetBytes(cf_native_handle, CFRangeMake(0, sizeof(CFSocketNativeHandle)),
56                  (UInt8*)&socket);
57   EventEngine::ResolvedAddress addr;
58   socklen_t len = EventEngine::ResolvedAddress::MAX_SIZE_BYTES;
59   if (getsockname(socket, const_cast<sockaddr*>(addr.address()), &len) < 0) {
60     return absl::InternalError(
61         absl::StrCat("getsockname:", grpc_core::StrError(errno)));
62   }
63   return EventEngine::ResolvedAddress(addr.address(), len);
64 }
65 
66 }  // namespace
67 
CancelConnect(absl::Status status)68 bool CFStreamEndpointImpl::CancelConnect(absl::Status status) {
69   GRPC_TRACE_LOG(event_engine_endpoint, INFO)
70       << "CFStreamEndpointImpl::CancelConnect: status: " << status
71       << ", this: " << this;
72 
73   return open_event_.SetShutdown(std::move(status));
74 }
75 
Connect(absl::AnyInvocable<void (absl::Status)> on_connect,EventEngine::ResolvedAddress addr)76 void CFStreamEndpointImpl::Connect(
77     absl::AnyInvocable<void(absl::Status)> on_connect,
78     EventEngine::ResolvedAddress addr) {
79   auto addr_uri = ResolvedAddressToURI(addr);
80 
81   if (!addr_uri.ok()) {
82     on_connect(std::move(addr_uri).status());
83     return;
84   }
85 
86   GRPC_TRACE_LOG(event_engine_endpoint, INFO)
87       << "CFStreamEndpointImpl::Connect: " << addr_uri.value();
88 
89   peer_address_ = std::move(addr);
90   auto host_port = ResolvedAddressToNormalizedString(peer_address_);
91   if (!host_port.ok()) {
92     on_connect(std::move(host_port).status());
93     return;
94   }
95 
96   peer_address_string_ = host_port.value();
97   GRPC_TRACE_LOG(event_engine_endpoint, INFO)
98       << "CFStreamEndpointImpl::Connect, host_port: " << peer_address_string_;
99 
100   std::string host_string;
101   std::string port_string;
102   grpc_core::SplitHostPort(host_port.value(), &host_string, &port_string);
103   CFTypeUniqueRef<CFStringRef> host = CFStringCreateWithCString(
104       NULL, host_string.c_str(), kCFStringEncodingUTF8);
105   int port = ResolvedAddressGetPort(peer_address_);
106   CFStreamCreatePairWithSocketToHost(NULL, host, port, &cf_read_stream_,
107                                      &cf_write_stream_);
108 
109   CFStreamClientContext cf_context = {0, this, Retain, Release, nullptr};
110   CFReadStreamSetClient(
111       cf_read_stream_,
112       kCFStreamEventOpenCompleted | kCFStreamEventHasBytesAvailable |
113           kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
114       ReadCallback, &cf_context);
115   CFWriteStreamSetClient(
116       cf_write_stream_,
117       kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes |
118           kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
119       WriteCallback, &cf_context);
120   CFReadStreamSetDispatchQueue(cf_read_stream_,
121                                dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0));
122   CFWriteStreamSetDispatchQueue(
123       cf_write_stream_, dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0));
124 
125   if (!CFReadStreamOpen(cf_read_stream_)) {
126     auto status = CFErrorToStatus(CFReadStreamCopyError(cf_read_stream_));
127     on_connect(std::move(status));
128     return;
129   }
130 
131   if (!CFWriteStreamOpen(cf_write_stream_)) {
132     auto status = CFErrorToStatus(CFWriteStreamCopyError(cf_write_stream_));
133     on_connect(std::move(status));
134     return;
135   }
136 
137   open_event_.NotifyOn(new PosixEngineClosure(
138       [that = Ref(),
139        on_connect = std::move(on_connect)](absl::Status status) mutable {
140         if (!status.ok()) {
141           on_connect(std::move(status));
142           return;
143         }
144 
145         auto local_addr = CFReadStreamLocallAddress(that->cf_read_stream_);
146         if (!local_addr.ok()) {
147           on_connect(std::move(local_addr).status());
148           return;
149         }
150 
151         that->local_address_ = local_addr.value();
152         that->local_address_string_ =
153             *ResolvedAddressToURI(that->local_address_);
154         on_connect(absl::OkStatus());
155       },
156       false /* is_permanent */));
157 }
158 
ReadCallback(CFReadStreamRef stream,CFStreamEventType type,void * client_callback_info)159 /* static */ void CFStreamEndpointImpl::ReadCallback(
160     CFReadStreamRef stream, CFStreamEventType type,
161     void* client_callback_info) {
162   auto self = static_cast<CFStreamEndpointImpl*>(client_callback_info);
163 
164   GRPC_TRACE_LOG(event_engine_endpoint, INFO)
165       << "CFStreamEndpointImpl::ReadCallback, type: " << type
166       << ", this: " << self;
167 
168   switch (type) {
169     case kCFStreamEventOpenCompleted:
170       // wait for write stream open completed to signal connection ready
171       break;
172     case kCFStreamEventHasBytesAvailable:
173       ABSL_FALLTHROUGH_INTENDED;
174     case kCFStreamEventEndEncountered:
175       self->read_event_.SetReady();
176       break;
177     case kCFStreamEventErrorOccurred: {
178       auto status = CFErrorToStatus(CFReadStreamCopyError(stream));
179       GRPC_TRACE_LOG(event_engine_endpoint, INFO)
180           << "CFStream Read error: " << status;
181 
182       self->open_event_.SetShutdown(status);
183       self->read_event_.SetShutdown(status);
184       self->write_event_.SetShutdown(status);
185     } break;
186     default:
187       GPR_UNREACHABLE_CODE(return);
188   }
189 }
190 
191 /* static */
WriteCallback(CFWriteStreamRef stream,CFStreamEventType type,void * client_callback_info)192 void CFStreamEndpointImpl::WriteCallback(CFWriteStreamRef stream,
193                                          CFStreamEventType type,
194                                          void* client_callback_info) {
195   auto self = static_cast<CFStreamEndpointImpl*>(client_callback_info);
196   GRPC_TRACE_LOG(event_engine_endpoint, INFO)
197       << "CFStreamEndpointImpl::WriteCallback, type: " << type
198       << ", this: " << self;
199 
200   switch (type) {
201     case kCFStreamEventOpenCompleted:
202       self->open_event_.SetReady();
203       break;
204     case kCFStreamEventCanAcceptBytes:
205       ABSL_FALLTHROUGH_INTENDED;
206     case kCFStreamEventEndEncountered:
207       self->write_event_.SetReady();
208       break;
209     case kCFStreamEventErrorOccurred: {
210       auto status = CFErrorToStatus(CFWriteStreamCopyError(stream));
211       GRPC_TRACE_LOG(event_engine_endpoint, INFO)
212           << "CFStream Write error: " << status;
213 
214       self->open_event_.SetShutdown(status);
215       self->read_event_.SetShutdown(status);
216       self->write_event_.SetShutdown(status);
217     } break;
218     default:
219       GPR_UNREACHABLE_CODE(return);
220   }
221 }
222 
CFStreamEndpointImpl(std::shared_ptr<CFEventEngine> engine,MemoryAllocator memory_allocator)223 CFStreamEndpointImpl::CFStreamEndpointImpl(
224     std::shared_ptr<CFEventEngine> engine, MemoryAllocator memory_allocator)
225     : engine_(std::move(engine)),
226       memory_allocator_(std::move(memory_allocator)),
227       open_event_(engine_.get()),
228       read_event_(engine_.get()),
229       write_event_(engine_.get()) {
230   open_event_.InitEvent();
231   read_event_.InitEvent();
232   write_event_.InitEvent();
233 }
234 
~CFStreamEndpointImpl()235 CFStreamEndpointImpl::~CFStreamEndpointImpl() {
236   open_event_.DestroyEvent();
237   read_event_.DestroyEvent();
238   write_event_.DestroyEvent();
239 }
240 
Shutdown()241 void CFStreamEndpointImpl::Shutdown() {
242   GRPC_TRACE_LOG(event_engine_endpoint, INFO)
243       << "CFStreamEndpointImpl::Shutdown: this: " << this;
244 
245   auto shutdownStatus =
246       absl::Status(absl::StatusCode::kUnknown,
247                    absl::StrFormat("Shutting down CFStreamEndpointImpl"));
248   open_event_.SetShutdown(shutdownStatus);
249   read_event_.SetShutdown(shutdownStatus);
250   write_event_.SetShutdown(shutdownStatus);
251 
252   CFReadStreamSetDispatchQueue(cf_read_stream_, nullptr);
253   CFWriteStreamSetDispatchQueue(cf_write_stream_, nullptr);
254 
255   CFReadStreamClose(cf_read_stream_);
256   CFWriteStreamClose(cf_write_stream_);
257 }
258 
Read(absl::AnyInvocable<void (absl::Status)> on_read,SliceBuffer * buffer,const EventEngine::Endpoint::ReadArgs *)259 bool CFStreamEndpointImpl::Read(
260     absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer,
261     const EventEngine::Endpoint::ReadArgs* /* args */) {
262   GRPC_TRACE_LOG(event_engine_endpoint, INFO)
263       << "CFStreamEndpointImpl::Read, this: " << this;
264 
265   read_event_.NotifyOn(new PosixEngineClosure(
266       [that = Ref(), on_read = std::move(on_read),
267        buffer](absl::Status status) mutable {
268         if (status.ok()) {
269           that->DoRead(std::move(on_read), buffer);
270         } else {
271           on_read(status);
272         }
273       },
274       false /* is_permanent*/));
275 
276   return false;
277 }
278 
DoRead(absl::AnyInvocable<void (absl::Status)> on_read,SliceBuffer * buffer)279 void CFStreamEndpointImpl::DoRead(
280     absl::AnyInvocable<void(absl::Status)> on_read, SliceBuffer* buffer) {
281   GRPC_TRACE_LOG(event_engine_endpoint, INFO)
282       << "CFStreamEndpointImpl::DoRead, this: " << this;
283 
284   auto buffer_index = buffer->AppendIndexed(
285       Slice(memory_allocator_.MakeSlice(kDefaultReadBufferSize)));
286 
287   CFIndex read_size = CFReadStreamRead(
288       cf_read_stream_,
289       internal::SliceCast<MutableSlice>(buffer->MutableSliceAt(buffer_index))
290           .begin(),
291       kDefaultReadBufferSize);
292 
293   if (read_size < 0) {
294     auto status = CFErrorToStatus(CFReadStreamCopyError(cf_read_stream_));
295     GRPC_TRACE_LOG(event_engine_endpoint, INFO)
296         << "CFStream read error: " << status << ", read_size: " << read_size;
297     on_read(status);
298     return;
299   }
300 
301   buffer->RemoveLastNBytes(buffer->Length() - read_size);
302   on_read(absl::OkStatus());
303 }
304 
Write(absl::AnyInvocable<void (absl::Status)> on_writable,SliceBuffer * data,const EventEngine::Endpoint::WriteArgs *)305 bool CFStreamEndpointImpl::Write(
306     absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data,
307     const EventEngine::Endpoint::WriteArgs* /* args */) {
308   GRPC_TRACE_LOG(event_engine_endpoint, INFO)
309       << "CFStreamEndpointImpl::Write, this: " << this;
310 
311   write_event_.NotifyOn(new PosixEngineClosure(
312       [that = Ref(), on_writable = std::move(on_writable),
313        data](absl::Status status) mutable {
314         if (status.ok()) {
315           that->DoWrite(std::move(on_writable), data);
316         } else {
317           on_writable(status);
318         }
319       },
320       false /* is_permanent*/));
321 
322   return false;
323 }
324 
DoWrite(absl::AnyInvocable<void (absl::Status)> on_writable,SliceBuffer * data)325 void CFStreamEndpointImpl::DoWrite(
326     absl::AnyInvocable<void(absl::Status)> on_writable, SliceBuffer* data) {
327   GRPC_TRACE_LOG(event_engine_endpoint, INFO)
328       << "CFStreamEndpointImpl::DoWrite, this: " << this;
329 
330   size_t total_written_size = 0;
331   for (size_t i = 0; i < data->Count(); i++) {
332     auto slice = data->RefSlice(i);
333     if (slice.size() == 0) {
334       continue;
335     }
336 
337     CFIndex written_size =
338         CFWriteStreamWrite(cf_write_stream_, slice.begin(), slice.size());
339 
340     if (written_size < 0) {
341       auto status = CFErrorToStatus(CFWriteStreamCopyError(cf_write_stream_));
342       GRPC_TRACE_LOG(event_engine_endpoint, INFO)
343           << "CFStream write error: " << status
344           << ", written_size: " << written_size;
345       on_writable(status);
346       return;
347     }
348 
349     total_written_size += written_size;
350     if (written_size < slice.size()) {
351       SliceBuffer written;
352       data->MoveFirstNBytesIntoSliceBuffer(total_written_size, written);
353 
354       write_event_.NotifyOn(new PosixEngineClosure(
355           [that = Ref(), on_writable = std::move(on_writable),
356            data](absl::Status status) mutable {
357             if (status.ok()) {
358               that->DoWrite(std::move(on_writable), data);
359             } else {
360               on_writable(status);
361             }
362           },
363           false /* is_permanent*/));
364       return;
365     }
366   }
367   on_writable(absl::OkStatus());
368 }
369 
370 }  // namespace experimental
371 }  // namespace grpc_event_engine
372 
373 #endif  // AVAILABLE_MAC_OS_X_VERSION_10_12_AND_LATER
374 #endif  // GPR_APPLE
375