• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 #ifdef GRPC_CFSTREAM_ENDPOINT
24 
25 #import <CoreFoundation/CoreFoundation.h>
26 
27 #include <grpc/slice_buffer.h>
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/string_util.h>
30 
31 #include "src/core/lib/address_utils/sockaddr_utils.h"
32 #include "src/core/lib/gpr/string.h"
33 #include "src/core/lib/iomgr/cfstream_handle.h"
34 #include "src/core/lib/iomgr/closure.h"
35 #include "src/core/lib/iomgr/endpoint.h"
36 #import "src/core/lib/iomgr/endpoint_cfstream.h"
37 #include "src/core/lib/iomgr/error_cfstream.h"
38 #include "src/core/lib/iomgr/sockaddr.h"
39 #include "src/core/lib/slice/slice.h"
40 #include "src/core/lib/slice/slice_internal.h"
41 #include "src/core/lib/slice/slice_string_helpers.h"
42 
43 extern grpc_core::TraceFlag grpc_tcp_trace;
44 
45 struct CFStreamEndpoint {
46   grpc_endpoint base;
47   gpr_refcount refcount;
48 
49   CFReadStreamRef read_stream;
50   CFWriteStreamRef write_stream;
51   CFStreamHandle* stream_sync;
52 
53   grpc_closure* read_cb;
54   grpc_closure* write_cb;
55   grpc_slice_buffer* read_slices;
56   grpc_slice_buffer* write_slices;
57 
58   grpc_closure read_action;
59   grpc_closure write_action;
60 
61   std::string peer_string;
62   std::string local_address;
63 };
CFStreamFree(CFStreamEndpoint * ep)64 static void CFStreamFree(CFStreamEndpoint* ep) {
65   CFRelease(ep->read_stream);
66   CFRelease(ep->write_stream);
67   CFSTREAM_HANDLE_UNREF(ep->stream_sync, "free");
68   delete ep;
69 }
70 
71 #ifndef NDEBUG
72 #define EP_REF(ep, reason) CFStreamRef((ep), (reason), __FILE__, __LINE__)
73 #define EP_UNREF(ep, reason) CFStreamUnref((ep), (reason), __FILE__, __LINE__)
CFStreamUnref(CFStreamEndpoint * ep,const char * reason,const char * file,int line)74 static void CFStreamUnref(CFStreamEndpoint* ep, const char* reason,
75                           const char* file, int line) {
76   if (grpc_tcp_trace.enabled()) {
77     gpr_atm val = gpr_atm_no_barrier_load(&ep->refcount.count);
78     gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
79             "CFStream endpoint unref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep,
80             reason, val, val - 1);
81   }
82   if (gpr_unref(&ep->refcount)) {
83     CFStreamFree(ep);
84   }
85 }
CFStreamRef(CFStreamEndpoint * ep,const char * reason,const char * file,int line)86 static void CFStreamRef(CFStreamEndpoint* ep, const char* reason,
87                         const char* file, int line) {
88   if (grpc_tcp_trace.enabled()) {
89     gpr_atm val = gpr_atm_no_barrier_load(&ep->refcount.count);
90     gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
91             "CFStream endpoint ref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep,
92             reason, val, val + 1);
93   }
94   gpr_ref(&ep->refcount);
95 }
96 #else
97 #define EP_REF(ep, reason) CFStreamRef((ep))
98 #define EP_UNREF(ep, reason) CFStreamUnref((ep))
CFStreamUnref(CFStreamEndpoint * ep)99 static void CFStreamUnref(CFStreamEndpoint* ep) {
100   if (gpr_unref(&ep->refcount)) {
101     CFStreamFree(ep);
102   }
103 }
CFStreamRef(CFStreamEndpoint * ep)104 static void CFStreamRef(CFStreamEndpoint* ep) { gpr_ref(&ep->refcount); }
105 #endif
106 
CFStreamAnnotateError(grpc_error_handle src_error,CFStreamEndpoint * ep)107 static grpc_error_handle CFStreamAnnotateError(grpc_error_handle src_error,
108                                                CFStreamEndpoint* ep) {
109   return grpc_error_set_str(
110       grpc_error_set_int(src_error, grpc_core::StatusIntProperty::kRpcStatus,
111                          GRPC_STATUS_UNAVAILABLE),
112       grpc_core::StatusStrProperty::kTargetAddress, ep->peer_string);
113 }
114 
CallReadCb(CFStreamEndpoint * ep,grpc_error_handle error)115 static void CallReadCb(CFStreamEndpoint* ep, grpc_error_handle error) {
116   if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace) &&
117       gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) {
118     gpr_log(GPR_DEBUG, "CFStream endpoint:%p call_read_cb %p %p:%p", ep,
119             ep->read_cb, ep->read_cb->cb, ep->read_cb->cb_arg);
120     size_t i;
121     gpr_log(GPR_DEBUG, "read: error=%s",
122             grpc_core::StatusToString(error).c_str());
123 
124     for (i = 0; i < ep->read_slices->count; i++) {
125       char* dump = grpc_dump_slice(ep->read_slices->slices[i],
126                                    GPR_DUMP_HEX | GPR_DUMP_ASCII);
127       gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", ep, ep->peer_string.c_str(),
128               dump);
129       gpr_free(dump);
130     }
131   }
132   grpc_closure* cb = ep->read_cb;
133   ep->read_cb = nullptr;
134   ep->read_slices = nullptr;
135   grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
136 }
137 
CallWriteCb(CFStreamEndpoint * ep,grpc_error_handle error)138 static void CallWriteCb(CFStreamEndpoint* ep, grpc_error_handle error) {
139   if (grpc_tcp_trace.enabled()) {
140     gpr_log(GPR_DEBUG, "CFStream endpoint:%p call_write_cb %p %p:%p", ep,
141             ep->write_cb, ep->write_cb->cb, ep->write_cb->cb_arg);
142     gpr_log(GPR_DEBUG, "write: error=%s",
143             grpc_core::StatusToString(error).c_str());
144   }
145   grpc_closure* cb = ep->write_cb;
146   ep->write_cb = nullptr;
147   ep->write_slices = nullptr;
148   grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
149 }
150 
ReadAction(void * arg,grpc_error_handle error)151 static void ReadAction(void* arg, grpc_error_handle error) {
152   CFStreamEndpoint* ep = static_cast<CFStreamEndpoint*>(arg);
153   GPR_ASSERT(ep->read_cb != nullptr);
154   if (!error.ok()) {
155     grpc_slice_buffer_reset_and_unref(ep->read_slices);
156     CallReadCb(ep, error);
157     EP_UNREF(ep, "read");
158     return;
159   }
160 
161   GPR_ASSERT(ep->read_slices->count == 1);
162   grpc_slice slice = ep->read_slices->slices[0];
163   size_t len = GRPC_SLICE_LENGTH(slice);
164   CFIndex read_size =
165       CFReadStreamRead(ep->read_stream, GRPC_SLICE_START_PTR(slice), len);
166   if (read_size == -1) {
167     grpc_slice_buffer_reset_and_unref(ep->read_slices);
168     CFErrorRef stream_error = CFReadStreamCopyError(ep->read_stream);
169     if (stream_error != nullptr) {
170       error = CFStreamAnnotateError(
171           GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "Read error"), ep);
172       CFRelease(stream_error);
173     } else {
174       error = GRPC_ERROR_CREATE("Read error");
175     }
176     CallReadCb(ep, error);
177     EP_UNREF(ep, "read");
178   } else if (read_size == 0) {
179     grpc_slice_buffer_reset_and_unref(ep->read_slices);
180     CallReadCb(ep,
181                CFStreamAnnotateError(GRPC_ERROR_CREATE("Socket closed"), ep));
182     EP_UNREF(ep, "read");
183   } else {
184     if (read_size < static_cast<CFIndex>(len)) {
185       grpc_slice_buffer_trim_end(ep->read_slices, len - read_size, nullptr);
186     }
187     CallReadCb(ep, absl::OkStatus());
188     EP_UNREF(ep, "read");
189   }
190 }
191 
WriteAction(void * arg,grpc_error_handle error)192 static void WriteAction(void* arg, grpc_error_handle error) {
193   CFStreamEndpoint* ep = static_cast<CFStreamEndpoint*>(arg);
194   GPR_ASSERT(ep->write_cb != nullptr);
195   if (!error.ok()) {
196     grpc_slice_buffer_reset_and_unref(ep->write_slices);
197     CallWriteCb(ep, error);
198     EP_UNREF(ep, "write");
199     return;
200   }
201   grpc_slice slice = grpc_slice_buffer_take_first(ep->write_slices);
202   size_t slice_len = GRPC_SLICE_LENGTH(slice);
203   CFIndex write_size = CFWriteStreamWrite(
204       ep->write_stream, GRPC_SLICE_START_PTR(slice), slice_len);
205   if (write_size == -1) {
206     grpc_slice_buffer_reset_and_unref(ep->write_slices);
207     CFErrorRef stream_error = CFWriteStreamCopyError(ep->write_stream);
208     if (stream_error != nullptr) {
209       error = CFStreamAnnotateError(
210           GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "write failed."), ep);
211       CFRelease(stream_error);
212     } else {
213       error = GRPC_ERROR_CREATE("write failed.");
214     }
215     CallWriteCb(ep, error);
216     EP_UNREF(ep, "write");
217   } else {
218     if (write_size < static_cast<CFIndex>(GRPC_SLICE_LENGTH(slice))) {
219       grpc_slice_buffer_undo_take_first(
220           ep->write_slices, grpc_slice_sub(slice, write_size, slice_len));
221     }
222     if (ep->write_slices->length > 0) {
223       ep->stream_sync->NotifyOnWrite(&ep->write_action);
224     } else {
225       CallWriteCb(ep, absl::OkStatus());
226       EP_UNREF(ep, "write");
227     }
228 
229     if (GRPC_TRACE_FLAG_ENABLED(grpc_tcp_trace) &&
230         gpr_should_log(GPR_LOG_SEVERITY_DEBUG)) {
231       grpc_slice trace_slice = grpc_slice_sub(slice, 0, write_size);
232       char* dump = grpc_dump_slice(trace_slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
233       gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", ep, ep->peer_string.c_str(),
234               dump);
235       gpr_free(dump);
236       grpc_core::CSliceUnref(trace_slice);
237     }
238   }
239   grpc_core::CSliceUnref(slice);
240 }
241 
CFStreamRead(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,bool,int)242 static void CFStreamRead(grpc_endpoint* ep, grpc_slice_buffer* slices,
243                          grpc_closure* cb, bool /*urgent*/,
244                          int /*min_progress_size*/) {
245   CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
246   if (grpc_tcp_trace.enabled()) {
247     gpr_log(GPR_DEBUG, "CFStream endpoint:%p read (%p, %p) length:%zu", ep_impl,
248             slices, cb, slices->length);
249   }
250   GPR_ASSERT(ep_impl->read_cb == nullptr);
251   ep_impl->read_cb = cb;
252   ep_impl->read_slices = slices;
253   grpc_slice_buffer_reset_and_unref(slices);
254   grpc_slice_buffer_add_indexed(
255       slices, GRPC_SLICE_MALLOC(GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
256   EP_REF(ep_impl, "read");
257   ep_impl->stream_sync->NotifyOnRead(&ep_impl->read_action);
258 }
259 
CFStreamWrite(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,void *,int)260 static void CFStreamWrite(grpc_endpoint* ep, grpc_slice_buffer* slices,
261                           grpc_closure* cb, void* /*arg*/,
262                           int /*max_frame_size*/) {
263   CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
264   if (grpc_tcp_trace.enabled()) {
265     gpr_log(GPR_DEBUG, "CFStream endpoint:%p write (%p, %p) length:%zu",
266             ep_impl, slices, cb, slices->length);
267   }
268   GPR_ASSERT(ep_impl->write_cb == nullptr);
269   ep_impl->write_cb = cb;
270   ep_impl->write_slices = slices;
271   EP_REF(ep_impl, "write");
272   ep_impl->stream_sync->NotifyOnWrite(&ep_impl->write_action);
273 }
274 
CFStreamShutdown(grpc_endpoint * ep,grpc_error_handle why)275 void CFStreamShutdown(grpc_endpoint* ep, grpc_error_handle why) {
276   CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
277   if (grpc_tcp_trace.enabled()) {
278     gpr_log(GPR_DEBUG, "CFStream endpoint:%p shutdown (%s)", ep_impl,
279             grpc_core::StatusToString(why).c_str());
280   }
281   CFReadStreamClose(ep_impl->read_stream);
282   CFWriteStreamClose(ep_impl->write_stream);
283   ep_impl->stream_sync->Shutdown(why);
284   if (grpc_tcp_trace.enabled()) {
285     gpr_log(GPR_DEBUG, "CFStream endpoint:%p shutdown DONE (%s)", ep_impl,
286             grpc_core::StatusToString(why).c_str());
287   }
288 }
289 
CFStreamDestroy(grpc_endpoint * ep)290 void CFStreamDestroy(grpc_endpoint* ep) {
291   CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
292   if (grpc_tcp_trace.enabled()) {
293     gpr_log(GPR_DEBUG, "CFStream endpoint:%p destroy", ep_impl);
294   }
295   EP_UNREF(ep_impl, "destroy");
296 }
297 
CFStreamGetPeer(grpc_endpoint * ep)298 absl::string_view CFStreamGetPeer(grpc_endpoint* ep) {
299   CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
300   return ep_impl->peer_string;
301 }
302 
CFStreamGetLocalAddress(grpc_endpoint * ep)303 absl::string_view CFStreamGetLocalAddress(grpc_endpoint* ep) {
304   CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
305   return ep_impl->local_address;
306 }
307 
CFStreamGetFD(grpc_endpoint *)308 int CFStreamGetFD(grpc_endpoint* /*ep*/) { return 0; }
309 
CFStreamCanTrackErr(grpc_endpoint *)310 bool CFStreamCanTrackErr(grpc_endpoint* /*ep*/) { return false; }
311 
CFStreamAddToPollset(grpc_endpoint *,grpc_pollset *)312 void CFStreamAddToPollset(grpc_endpoint* /*ep*/, grpc_pollset* /*pollset*/) {}
CFStreamAddToPollsetSet(grpc_endpoint *,grpc_pollset_set *)313 void CFStreamAddToPollsetSet(grpc_endpoint* /*ep*/,
314                              grpc_pollset_set* /*pollset*/) {}
CFStreamDeleteFromPollsetSet(grpc_endpoint *,grpc_pollset_set *)315 void CFStreamDeleteFromPollsetSet(grpc_endpoint* /*ep*/,
316                                   grpc_pollset_set* /*pollset*/) {}
317 
318 static const grpc_endpoint_vtable vtable = {CFStreamRead,
319                                             CFStreamWrite,
320                                             CFStreamAddToPollset,
321                                             CFStreamAddToPollsetSet,
322                                             CFStreamDeleteFromPollsetSet,
323                                             CFStreamShutdown,
324                                             CFStreamDestroy,
325                                             CFStreamGetPeer,
326                                             CFStreamGetLocalAddress,
327                                             CFStreamGetFD,
328                                             CFStreamCanTrackErr};
329 
grpc_cfstream_endpoint_create(CFReadStreamRef read_stream,CFWriteStreamRef write_stream,const char * peer_string,CFStreamHandle * stream_sync)330 grpc_endpoint* grpc_cfstream_endpoint_create(CFReadStreamRef read_stream,
331                                              CFWriteStreamRef write_stream,
332                                              const char* peer_string,
333                                              CFStreamHandle* stream_sync) {
334   CFStreamEndpoint* ep_impl = new CFStreamEndpoint;
335   if (grpc_tcp_trace.enabled()) {
336     gpr_log(GPR_DEBUG,
337             "CFStream endpoint:%p create readStream:%p writeStream: %p",
338             ep_impl, read_stream, write_stream);
339   }
340   ep_impl->base.vtable = &vtable;
341   gpr_ref_init(&ep_impl->refcount, 1);
342   ep_impl->read_stream = read_stream;
343   ep_impl->write_stream = write_stream;
344   CFRetain(read_stream);
345   CFRetain(write_stream);
346   ep_impl->stream_sync = stream_sync;
347   CFSTREAM_HANDLE_REF(ep_impl->stream_sync, "endpoint create");
348 
349   ep_impl->peer_string = peer_string;
350   grpc_resolved_address resolved_local_addr;
351   resolved_local_addr.len = sizeof(resolved_local_addr.addr);
352   CFDataRef native_handle = static_cast<CFDataRef>(CFReadStreamCopyProperty(
353       ep_impl->read_stream, kCFStreamPropertySocketNativeHandle));
354   CFSocketNativeHandle sockfd;
355   CFDataGetBytes(native_handle, CFRangeMake(0, sizeof(CFSocketNativeHandle)),
356                  (UInt8*)&sockfd);
357   if (native_handle) {
358     CFRelease(native_handle);
359   }
360   absl::StatusOr<std::string> addr_uri;
361   if (getsockname(sockfd, reinterpret_cast<sockaddr*>(resolved_local_addr.addr),
362                   &resolved_local_addr.len) < 0 ||
363       !(addr_uri = grpc_sockaddr_to_uri(&resolved_local_addr)).ok()) {
364     ep_impl->local_address = "";
365   } else {
366     ep_impl->local_address = addr_uri.value();
367   }
368   ep_impl->read_cb = nil;
369   ep_impl->write_cb = nil;
370   ep_impl->read_slices = nil;
371   ep_impl->write_slices = nil;
372   GRPC_CLOSURE_INIT(&ep_impl->read_action, ReadAction,
373                     static_cast<void*>(ep_impl), grpc_schedule_on_exec_ctx);
374   GRPC_CLOSURE_INIT(&ep_impl->write_action, WriteAction,
375                     static_cast<void*>(ep_impl), grpc_schedule_on_exec_ctx);
376 
377   return &ep_impl->base;
378 }
379 
380 #endif  // GRPC_CFSTREAM_ENDPOINT
381