• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 #ifdef GRPC_CFSTREAM_ENDPOINT
24 
25 #import <CoreFoundation/CoreFoundation.h>
26 #include <grpc/slice_buffer.h>
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/string_util.h>
29 
30 #include "absl/log/check.h"
31 #include "absl/log/log.h"
32 #include "src/core/lib/address_utils/sockaddr_utils.h"
33 #include "src/core/lib/iomgr/cfstream_handle.h"
34 #include "src/core/lib/iomgr/closure.h"
35 #include "src/core/lib/iomgr/endpoint.h"
36 #import "src/core/lib/iomgr/endpoint_cfstream.h"
37 #include "src/core/lib/iomgr/error_cfstream.h"
38 #include "src/core/lib/iomgr/sockaddr.h"
39 #include "src/core/lib/slice/slice.h"
40 #include "src/core/lib/slice/slice_internal.h"
41 #include "src/core/lib/slice/slice_string_helpers.h"
42 #include "src/core/util/string.h"
43 
44 struct CFStreamEndpoint {
45   grpc_endpoint base;
46   gpr_refcount refcount;
47 
48   CFReadStreamRef read_stream;
49   CFWriteStreamRef write_stream;
50   CFStreamHandle* stream_sync;
51 
52   grpc_closure* read_cb;
53   grpc_closure* write_cb;
54   grpc_slice_buffer* read_slices;
55   grpc_slice_buffer* write_slices;
56 
57   grpc_closure read_action;
58   grpc_closure write_action;
59 
60   std::string peer_string;
61   std::string local_address;
62 };
CFStreamFree(CFStreamEndpoint * ep)63 static void CFStreamFree(CFStreamEndpoint* ep) {
64   CFRelease(ep->read_stream);
65   CFRelease(ep->write_stream);
66   CFSTREAM_HANDLE_UNREF(ep->stream_sync, "free");
67   delete ep;
68 }
69 
70 #ifndef NDEBUG
71 #define EP_REF(ep, reason) CFStreamRef((ep), (reason), __FILE__, __LINE__)
72 #define EP_UNREF(ep, reason) CFStreamUnref((ep), (reason), __FILE__, __LINE__)
CFStreamUnref(CFStreamEndpoint * ep,const char * reason,const char * file,int line)73 static void CFStreamUnref(CFStreamEndpoint* ep, const char* reason,
74                           const char* file, int line) {
75   if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
76     gpr_atm val = gpr_atm_no_barrier_load(&ep->refcount.count);
77     VLOG(2).AtLocation(file, line) << "CFStream endpoint unref " << ep << " : "
78                                    << reason << " " << val << " -> " << val - 1;
79   }
80   if (gpr_unref(&ep->refcount)) {
81     CFStreamFree(ep);
82   }
83 }
CFStreamRef(CFStreamEndpoint * ep,const char * reason,const char * file,int line)84 static void CFStreamRef(CFStreamEndpoint* ep, const char* reason,
85                         const char* file, int line) {
86   if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
87     gpr_atm val = gpr_atm_no_barrier_load(&ep->refcount.count);
88     VLOG(2).AtLocation(file, line) << "CFStream endpoint ref " << ep << " : "
89                                    << reason << " " << val << " -> " << val + 1;
90   }
91   gpr_ref(&ep->refcount);
92 }
93 #else
94 #define EP_REF(ep, reason) CFStreamRef((ep))
95 #define EP_UNREF(ep, reason) CFStreamUnref((ep))
CFStreamUnref(CFStreamEndpoint * ep)96 static void CFStreamUnref(CFStreamEndpoint* ep) {
97   if (gpr_unref(&ep->refcount)) {
98     CFStreamFree(ep);
99   }
100 }
CFStreamRef(CFStreamEndpoint * ep)101 static void CFStreamRef(CFStreamEndpoint* ep) { gpr_ref(&ep->refcount); }
102 #endif
103 
CFStreamAnnotateError(grpc_error_handle src_error)104 static grpc_error_handle CFStreamAnnotateError(grpc_error_handle src_error) {
105   return grpc_error_set_int(src_error, grpc_core::StatusIntProperty::kRpcStatus,
106                             GRPC_STATUS_UNAVAILABLE);
107 }
108 
CallReadCb(CFStreamEndpoint * ep,grpc_error_handle error)109 static void CallReadCb(CFStreamEndpoint* ep, grpc_error_handle error) {
110   if (GRPC_TRACE_FLAG_ENABLED(tcp) && ABSL_VLOG_IS_ON(2)) {
111     VLOG(2) << "CFStream endpoint:" << ep << " call_read_cb " << ep->read_cb
112             << " " << ep->read_cb->cb << ":" << ep->read_cb->cb_arg;
113     size_t i;
114     VLOG(2) << "read: error=" << grpc_core::StatusToString(error);
115 
116     for (i = 0; i < ep->read_slices->count; i++) {
117       char* dump = grpc_dump_slice(ep->read_slices->slices[i],
118                                    GPR_DUMP_HEX | GPR_DUMP_ASCII);
119       VLOG(2) << "READ " << ep << " (peer=" << ep->peer_string << "): " << dump;
120       gpr_free(dump);
121     }
122   }
123   grpc_closure* cb = ep->read_cb;
124   ep->read_cb = nullptr;
125   ep->read_slices = nullptr;
126   grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
127 }
128 
CallWriteCb(CFStreamEndpoint * ep,grpc_error_handle error)129 static void CallWriteCb(CFStreamEndpoint* ep, grpc_error_handle error) {
130   GRPC_TRACE_VLOG(tcp, 2) << "CFStream endpoint:" << ep << " call_write_cb "
131                           << ep->write_cb << " " << ep->write_cb->cb << ":"
132                           << ep->write_cb->cb_arg << "write: error="
133                           << grpc_core::StatusToString(error);
134   grpc_closure* cb = ep->write_cb;
135   ep->write_cb = nullptr;
136   ep->write_slices = nullptr;
137   grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
138 }
139 
ReadAction(void * arg,grpc_error_handle error)140 static void ReadAction(void* arg, grpc_error_handle error) {
141   CFStreamEndpoint* ep = static_cast<CFStreamEndpoint*>(arg);
142   CHECK_NE(ep->read_cb, nullptr);
143   if (!error.ok()) {
144     grpc_slice_buffer_reset_and_unref(ep->read_slices);
145     CallReadCb(ep, error);
146     EP_UNREF(ep, "read");
147     return;
148   }
149 
150   CHECK_EQ(ep->read_slices->count, 1);
151   grpc_slice slice = ep->read_slices->slices[0];
152   size_t len = GRPC_SLICE_LENGTH(slice);
153   CFIndex read_size =
154       CFReadStreamRead(ep->read_stream, GRPC_SLICE_START_PTR(slice), len);
155   if (read_size == -1) {
156     grpc_slice_buffer_reset_and_unref(ep->read_slices);
157     CFErrorRef stream_error = CFReadStreamCopyError(ep->read_stream);
158     if (stream_error != nullptr) {
159       error = CFStreamAnnotateError(
160           GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "Read error"));
161       CFRelease(stream_error);
162     } else {
163       error = GRPC_ERROR_CREATE("Read error");
164     }
165     CallReadCb(ep, error);
166     EP_UNREF(ep, "read");
167   } else if (read_size == 0) {
168     grpc_slice_buffer_reset_and_unref(ep->read_slices);
169     CallReadCb(ep, CFStreamAnnotateError(GRPC_ERROR_CREATE("Socket closed")));
170     EP_UNREF(ep, "read");
171   } else {
172     if (read_size < static_cast<CFIndex>(len)) {
173       grpc_slice_buffer_trim_end(ep->read_slices, len - read_size, nullptr);
174     }
175     CallReadCb(ep, absl::OkStatus());
176     EP_UNREF(ep, "read");
177   }
178 }
179 
WriteAction(void * arg,grpc_error_handle error)180 static void WriteAction(void* arg, grpc_error_handle error) {
181   CFStreamEndpoint* ep = static_cast<CFStreamEndpoint*>(arg);
182   CHECK_NE(ep->write_cb, nullptr);
183   if (!error.ok()) {
184     grpc_slice_buffer_reset_and_unref(ep->write_slices);
185     CallWriteCb(ep, error);
186     EP_UNREF(ep, "write");
187     return;
188   }
189   grpc_slice slice = grpc_slice_buffer_take_first(ep->write_slices);
190   size_t slice_len = GRPC_SLICE_LENGTH(slice);
191   CFIndex write_size = CFWriteStreamWrite(
192       ep->write_stream, GRPC_SLICE_START_PTR(slice), slice_len);
193   if (write_size == -1) {
194     grpc_slice_buffer_reset_and_unref(ep->write_slices);
195     CFErrorRef stream_error = CFWriteStreamCopyError(ep->write_stream);
196     if (stream_error != nullptr) {
197       error = CFStreamAnnotateError(
198           GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "Write failed"));
199       CFRelease(stream_error);
200     } else {
201       error = GRPC_ERROR_CREATE("write failed.");
202     }
203     CallWriteCb(ep, error);
204     EP_UNREF(ep, "write");
205   } else {
206     if (write_size < static_cast<CFIndex>(GRPC_SLICE_LENGTH(slice))) {
207       grpc_slice_buffer_undo_take_first(
208           ep->write_slices, grpc_slice_sub(slice, write_size, slice_len));
209     }
210     if (ep->write_slices->length > 0) {
211       ep->stream_sync->NotifyOnWrite(&ep->write_action);
212     } else {
213       CallWriteCb(ep, absl::OkStatus());
214       EP_UNREF(ep, "write");
215     }
216 
217     if (GRPC_TRACE_FLAG_ENABLED(tcp) && ABSL_VLOG_IS_ON(2)) {
218       grpc_slice trace_slice = grpc_slice_sub(slice, 0, write_size);
219       char* dump = grpc_dump_slice(trace_slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
220       VLOG(2) << "WRITE " << ep << " (peer=" << ep->peer_string
221               << "): " << dump;
222       gpr_free(dump);
223       grpc_core::CSliceUnref(trace_slice);
224     }
225   }
226   grpc_core::CSliceUnref(slice);
227 }
228 
CFStreamRead(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,bool,int)229 static void CFStreamRead(grpc_endpoint* ep, grpc_slice_buffer* slices,
230                          grpc_closure* cb, bool /*urgent*/,
231                          int /*min_progress_size*/) {
232   CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
233   GRPC_TRACE_VLOG(tcp, 2) << "CFStream endpoint:" << ep_impl << " read ("
234                           << slices << ", " << cb
235                           << ") length:" << slices->length;
236   CHECK_EQ(ep_impl->read_cb, nullptr);
237   ep_impl->read_cb = cb;
238   ep_impl->read_slices = slices;
239   grpc_slice_buffer_reset_and_unref(slices);
240   grpc_slice_buffer_add_indexed(
241       slices, GRPC_SLICE_MALLOC(GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
242   EP_REF(ep_impl, "read");
243   ep_impl->stream_sync->NotifyOnRead(&ep_impl->read_action);
244 }
245 
CFStreamWrite(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,void *,int)246 static void CFStreamWrite(grpc_endpoint* ep, grpc_slice_buffer* slices,
247                           grpc_closure* cb, void* /*arg*/,
248                           int /*max_frame_size*/) {
249   CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
250   GRPC_TRACE_VLOG(tcp, 2) << "CFStream endpoint:" << ep_impl << " write ("
251                           << slices << ", " << cb
252                           << ") length:" << slices->length;
253   CHECK_EQ(ep_impl->write_cb, nullptr);
254   ep_impl->write_cb = cb;
255   ep_impl->write_slices = slices;
256   EP_REF(ep_impl, "write");
257   ep_impl->stream_sync->NotifyOnWrite(&ep_impl->write_action);
258 }
259 
CFStreamDestroy(grpc_endpoint * ep)260 void CFStreamDestroy(grpc_endpoint* ep) {
261   CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
262   GRPC_TRACE_VLOG(tcp, 2) << "CFStream endpoint:" << ep_impl << " destroy";
263   CFReadStreamClose(ep_impl->read_stream);
264   CFWriteStreamClose(ep_impl->write_stream);
265   ep_impl->stream_sync->Shutdown(absl::UnavailableError("endpoint shutdown"));
266   GRPC_TRACE_VLOG(tcp, 2) << "CFStream endpoint:" << ep_impl << " destroy DONE";
267   EP_UNREF(ep_impl, "destroy");
268 }
269 
CFStreamGetPeer(grpc_endpoint * ep)270 absl::string_view CFStreamGetPeer(grpc_endpoint* ep) {
271   CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
272   return ep_impl->peer_string;
273 }
274 
CFStreamGetLocalAddress(grpc_endpoint * ep)275 absl::string_view CFStreamGetLocalAddress(grpc_endpoint* ep) {
276   CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
277   return ep_impl->local_address;
278 }
279 
CFStreamGetFD(grpc_endpoint *)280 int CFStreamGetFD(grpc_endpoint* /*ep*/) { return 0; }
281 
CFStreamCanTrackErr(grpc_endpoint *)282 bool CFStreamCanTrackErr(grpc_endpoint* /*ep*/) { return false; }
283 
CFStreamAddToPollset(grpc_endpoint *,grpc_pollset *)284 void CFStreamAddToPollset(grpc_endpoint* /*ep*/, grpc_pollset* /*pollset*/) {}
CFStreamAddToPollsetSet(grpc_endpoint *,grpc_pollset_set *)285 void CFStreamAddToPollsetSet(grpc_endpoint* /*ep*/,
286                              grpc_pollset_set* /*pollset*/) {}
CFStreamDeleteFromPollsetSet(grpc_endpoint *,grpc_pollset_set *)287 void CFStreamDeleteFromPollsetSet(grpc_endpoint* /*ep*/,
288                                   grpc_pollset_set* /*pollset*/) {}
289 
290 static const grpc_endpoint_vtable vtable = {CFStreamRead,
291                                             CFStreamWrite,
292                                             CFStreamAddToPollset,
293                                             CFStreamAddToPollsetSet,
294                                             CFStreamDeleteFromPollsetSet,
295                                             CFStreamDestroy,
296                                             CFStreamGetPeer,
297                                             CFStreamGetLocalAddress,
298                                             CFStreamGetFD,
299                                             CFStreamCanTrackErr};
300 
grpc_cfstream_endpoint_create(CFReadStreamRef read_stream,CFWriteStreamRef write_stream,const char * peer_string,CFStreamHandle * stream_sync)301 grpc_endpoint* grpc_cfstream_endpoint_create(CFReadStreamRef read_stream,
302                                              CFWriteStreamRef write_stream,
303                                              const char* peer_string,
304                                              CFStreamHandle* stream_sync) {
305   CFStreamEndpoint* ep_impl = new CFStreamEndpoint;
306   GRPC_TRACE_VLOG(tcp, 2) << "CFStream endpoint:" << ep_impl
307                           << " create readStream:" << read_stream
308                           << " writeStream: " << write_stream;
309   ep_impl->base.vtable = &vtable;
310   gpr_ref_init(&ep_impl->refcount, 1);
311   ep_impl->read_stream = read_stream;
312   ep_impl->write_stream = write_stream;
313   CFRetain(read_stream);
314   CFRetain(write_stream);
315   ep_impl->stream_sync = stream_sync;
316   CFSTREAM_HANDLE_REF(ep_impl->stream_sync, "endpoint create");
317 
318   ep_impl->peer_string = peer_string;
319   grpc_resolved_address resolved_local_addr;
320   resolved_local_addr.len = sizeof(resolved_local_addr.addr);
321   CFDataRef native_handle = static_cast<CFDataRef>(CFReadStreamCopyProperty(
322       ep_impl->read_stream, kCFStreamPropertySocketNativeHandle));
323   CFSocketNativeHandle sockfd;
324   CFDataGetBytes(native_handle, CFRangeMake(0, sizeof(CFSocketNativeHandle)),
325                  (UInt8*)&sockfd);
326   if (native_handle) {
327     CFRelease(native_handle);
328   }
329   absl::StatusOr<std::string> addr_uri;
330   if (getsockname(sockfd, reinterpret_cast<sockaddr*>(resolved_local_addr.addr),
331                   &resolved_local_addr.len) < 0 ||
332       !(addr_uri = grpc_sockaddr_to_uri(&resolved_local_addr)).ok()) {
333     ep_impl->local_address = "";
334   } else {
335     ep_impl->local_address = addr_uri.value();
336   }
337   ep_impl->read_cb = nil;
338   ep_impl->write_cb = nil;
339   ep_impl->read_slices = nil;
340   ep_impl->write_slices = nil;
341   GRPC_CLOSURE_INIT(&ep_impl->read_action, ReadAction,
342                     static_cast<void*>(ep_impl), grpc_schedule_on_exec_ctx);
343   GRPC_CLOSURE_INIT(&ep_impl->write_action, WriteAction,
344                     static_cast<void*>(ep_impl), grpc_schedule_on_exec_ctx);
345 
346   return &ep_impl->base;
347 }
348 
349 #endif  // GRPC_CFSTREAM_ENDPOINT
350