• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 #ifdef GRPC_CFSTREAM_ENDPOINT
24 
25 #import <CoreFoundation/CoreFoundation.h>
26 #import "src/core/lib/iomgr/endpoint_cfstream.h"
27 
28 #include <grpc/slice_buffer.h>
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/string_util.h>
31 
32 #include "src/core/lib/gpr/string.h"
33 #include "src/core/lib/iomgr/cfstream_handle.h"
34 #include "src/core/lib/iomgr/closure.h"
35 #include "src/core/lib/iomgr/endpoint.h"
36 #include "src/core/lib/iomgr/error_cfstream.h"
37 #include "src/core/lib/slice/slice_internal.h"
38 #include "src/core/lib/slice/slice_string_helpers.h"
39 
40 extern grpc_core::TraceFlag grpc_tcp_trace;
41 
42 typedef struct {
43   grpc_endpoint base;
44   gpr_refcount refcount;
45 
46   CFReadStreamRef read_stream;
47   CFWriteStreamRef write_stream;
48   CFStreamHandle* stream_sync;
49 
50   grpc_closure* read_cb;
51   grpc_closure* write_cb;
52   grpc_slice_buffer* read_slices;
53   grpc_slice_buffer* write_slices;
54 
55   grpc_closure read_action;
56   grpc_closure write_action;
57 
58   char* peer_string;
59   grpc_resource_user* resource_user;
60   grpc_resource_user_slice_allocator slice_allocator;
61 } CFStreamEndpoint;
62 
CFStreamFree(CFStreamEndpoint * ep)63 static void CFStreamFree(CFStreamEndpoint* ep) {
64   grpc_resource_user_unref(ep->resource_user);
65   CFRelease(ep->read_stream);
66   CFRelease(ep->write_stream);
67   CFSTREAM_HANDLE_UNREF(ep->stream_sync, "free");
68   gpr_free(ep->peer_string);
69   gpr_free(ep);
70 }
71 
72 #ifndef NDEBUG
73 #define EP_REF(ep, reason) CFStreamRef((ep), (reason), __FILE__, __LINE__)
74 #define EP_UNREF(ep, reason) CFStreamUnref((ep), (reason), __FILE__, __LINE__)
CFStreamUnref(CFStreamEndpoint * ep,const char * reason,const char * file,int line)75 static void CFStreamUnref(CFStreamEndpoint* ep, const char* reason,
76                           const char* file, int line) {
77   if (grpc_tcp_trace.enabled()) {
78     gpr_atm val = gpr_atm_no_barrier_load(&ep->refcount.count);
79     gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
80             "CFStream endpoint unref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep,
81             reason, val, val - 1);
82   }
83   if (gpr_unref(&ep->refcount)) {
84     CFStreamFree(ep);
85   }
86 }
CFStreamRef(CFStreamEndpoint * ep,const char * reason,const char * file,int line)87 static void CFStreamRef(CFStreamEndpoint* ep, const char* reason,
88                         const char* file, int line) {
89   if (grpc_tcp_trace.enabled()) {
90     gpr_atm val = gpr_atm_no_barrier_load(&ep->refcount.count);
91     gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
92             "CFStream endpoint ref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep,
93             reason, val, val + 1);
94   }
95   gpr_ref(&ep->refcount);
96 }
97 #else
98 #define EP_REF(ep, reason) CFStreamRef((ep))
99 #define EP_UNREF(ep, reason) CFStreamUnref((ep))
CFStreamUnref(CFStreamEndpoint * ep)100 static void CFStreamUnref(CFStreamEndpoint* ep) {
101   if (gpr_unref(&ep->refcount)) {
102     CFStreamFree(ep);
103   }
104 }
CFStreamRef(CFStreamEndpoint * ep)105 static void CFStreamRef(CFStreamEndpoint* ep) { gpr_ref(&ep->refcount); }
106 #endif
107 
CFStreamAnnotateError(grpc_error * src_error,CFStreamEndpoint * ep)108 static grpc_error* CFStreamAnnotateError(grpc_error* src_error,
109                                          CFStreamEndpoint* ep) {
110   return grpc_error_set_str(
111       grpc_error_set_int(src_error, GRPC_ERROR_INT_GRPC_STATUS,
112                          GRPC_STATUS_UNAVAILABLE),
113       GRPC_ERROR_STR_TARGET_ADDRESS,
114       grpc_slice_from_copied_string(ep->peer_string));
115 }
116 
CallReadCb(CFStreamEndpoint * ep,grpc_error * error)117 static void CallReadCb(CFStreamEndpoint* ep, grpc_error* error) {
118   if (grpc_tcp_trace.enabled()) {
119     gpr_log(GPR_DEBUG, "CFStream endpoint:%p call_read_cb %p %p:%p", ep,
120             ep->read_cb, ep->read_cb->cb, ep->read_cb->cb_arg);
121     size_t i;
122     const char* str = grpc_error_string(error);
123     gpr_log(GPR_DEBUG, "read: error=%s", str);
124 
125     for (i = 0; i < ep->read_slices->count; i++) {
126       char* dump = grpc_dump_slice(ep->read_slices->slices[i],
127                                    GPR_DUMP_HEX | GPR_DUMP_ASCII);
128       gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", ep, ep->peer_string, dump);
129       gpr_free(dump);
130     }
131   }
132   grpc_closure* cb = ep->read_cb;
133   ep->read_cb = nullptr;
134   ep->read_slices = nullptr;
135   GRPC_CLOSURE_SCHED(cb, error);
136 }
137 
CallWriteCb(CFStreamEndpoint * ep,grpc_error * error)138 static void CallWriteCb(CFStreamEndpoint* ep, grpc_error* error) {
139   if (grpc_tcp_trace.enabled()) {
140     gpr_log(GPR_DEBUG, "CFStream endpoint:%p call_write_cb %p %p:%p", ep,
141             ep->write_cb, ep->write_cb->cb, ep->write_cb->cb_arg);
142     const char* str = grpc_error_string(error);
143     gpr_log(GPR_DEBUG, "write: error=%s", str);
144   }
145   grpc_closure* cb = ep->write_cb;
146   ep->write_cb = nullptr;
147   ep->write_slices = nullptr;
148   GRPC_CLOSURE_SCHED(cb, error);
149 }
150 
ReadAction(void * arg,grpc_error * error)151 static void ReadAction(void* arg, grpc_error* error) {
152   CFStreamEndpoint* ep = static_cast<CFStreamEndpoint*>(arg);
153   GPR_ASSERT(ep->read_cb != nullptr);
154   if (error) {
155     grpc_slice_buffer_reset_and_unref_internal(ep->read_slices);
156     CallReadCb(ep, GRPC_ERROR_REF(error));
157     EP_UNREF(ep, "read");
158     return;
159   }
160 
161   GPR_ASSERT(ep->read_slices->count == 1);
162   grpc_slice slice = ep->read_slices->slices[0];
163   size_t len = GRPC_SLICE_LENGTH(slice);
164   CFIndex read_size =
165       CFReadStreamRead(ep->read_stream, GRPC_SLICE_START_PTR(slice), len);
166   if (read_size == -1) {
167     grpc_slice_buffer_reset_and_unref_internal(ep->read_slices);
168     CFErrorRef stream_error = CFReadStreamCopyError(ep->read_stream);
169     if (stream_error != nullptr) {
170       error = CFStreamAnnotateError(
171           GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "Read error"), ep);
172       CFRelease(stream_error);
173     } else {
174       error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Read error");
175     }
176     CallReadCb(ep, error);
177     EP_UNREF(ep, "read");
178   } else if (read_size == 0) {
179     grpc_slice_buffer_reset_and_unref_internal(ep->read_slices);
180     CallReadCb(ep,
181                CFStreamAnnotateError(
182                    GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), ep));
183     EP_UNREF(ep, "read");
184   } else {
185     if (read_size < len) {
186       grpc_slice_buffer_trim_end(ep->read_slices, len - read_size, nullptr);
187     }
188     CallReadCb(ep, GRPC_ERROR_NONE);
189     EP_UNREF(ep, "read");
190   }
191 }
192 
WriteAction(void * arg,grpc_error * error)193 static void WriteAction(void* arg, grpc_error* error) {
194   CFStreamEndpoint* ep = static_cast<CFStreamEndpoint*>(arg);
195   GPR_ASSERT(ep->write_cb != nullptr);
196   if (error) {
197     grpc_slice_buffer_reset_and_unref_internal(ep->write_slices);
198     CallWriteCb(ep, GRPC_ERROR_REF(error));
199     EP_UNREF(ep, "write");
200     return;
201   }
202 
203   grpc_slice slice = grpc_slice_buffer_take_first(ep->write_slices);
204   size_t slice_len = GRPC_SLICE_LENGTH(slice);
205   CFIndex write_size = CFWriteStreamWrite(
206       ep->write_stream, GRPC_SLICE_START_PTR(slice), slice_len);
207   if (write_size == -1) {
208     grpc_slice_buffer_reset_and_unref_internal(ep->write_slices);
209     CFErrorRef stream_error = CFWriteStreamCopyError(ep->write_stream);
210     if (stream_error != nullptr) {
211       error = CFStreamAnnotateError(
212           GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "write failed."), ep);
213       CFRelease(stream_error);
214     } else {
215       error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("write failed.");
216     }
217     CallWriteCb(ep, error);
218     EP_UNREF(ep, "write");
219   } else {
220     if (write_size < GRPC_SLICE_LENGTH(slice)) {
221       grpc_slice_buffer_undo_take_first(
222           ep->write_slices, grpc_slice_sub(slice, write_size, slice_len));
223     }
224     if (ep->write_slices->length > 0) {
225       ep->stream_sync->NotifyOnWrite(&ep->write_action);
226     } else {
227       CallWriteCb(ep, GRPC_ERROR_NONE);
228       EP_UNREF(ep, "write");
229     }
230 
231     if (grpc_tcp_trace.enabled()) {
232       grpc_slice trace_slice = grpc_slice_sub(slice, 0, write_size);
233       char* dump = grpc_dump_slice(trace_slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
234       gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", ep, ep->peer_string, dump);
235       gpr_free(dump);
236       grpc_slice_unref_internal(trace_slice);
237     }
238   }
239   grpc_slice_unref_internal(slice);
240 }
241 
CFStreamReadAllocationDone(void * arg,grpc_error * error)242 static void CFStreamReadAllocationDone(void* arg, grpc_error* error) {
243   CFStreamEndpoint* ep = static_cast<CFStreamEndpoint*>(arg);
244   if (error == GRPC_ERROR_NONE) {
245     ep->stream_sync->NotifyOnRead(&ep->read_action);
246   } else {
247     grpc_slice_buffer_reset_and_unref_internal(ep->read_slices);
248     CallReadCb(ep, error);
249     EP_UNREF(ep, "read");
250   }
251 }
252 
CFStreamRead(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb)253 static void CFStreamRead(grpc_endpoint* ep, grpc_slice_buffer* slices,
254                          grpc_closure* cb) {
255   CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
256   if (grpc_tcp_trace.enabled()) {
257     gpr_log(GPR_DEBUG, "CFStream endpoint:%p read (%p, %p) length:%zu", ep_impl,
258             slices, cb, slices->length);
259   }
260   GPR_ASSERT(ep_impl->read_cb == nullptr);
261   ep_impl->read_cb = cb;
262   ep_impl->read_slices = slices;
263   grpc_slice_buffer_reset_and_unref_internal(slices);
264   grpc_resource_user_alloc_slices(&ep_impl->slice_allocator,
265                                   GRPC_TCP_DEFAULT_READ_SLICE_SIZE, 1,
266                                   ep_impl->read_slices);
267   EP_REF(ep_impl, "read");
268 }
269 
CFStreamWrite(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,void * arg)270 static void CFStreamWrite(grpc_endpoint* ep, grpc_slice_buffer* slices,
271                           grpc_closure* cb, void* arg) {
272   CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
273   if (grpc_tcp_trace.enabled()) {
274     gpr_log(GPR_DEBUG, "CFStream endpoint:%p write (%p, %p) length:%zu",
275             ep_impl, slices, cb, slices->length);
276   }
277   GPR_ASSERT(ep_impl->write_cb == nullptr);
278   ep_impl->write_cb = cb;
279   ep_impl->write_slices = slices;
280   EP_REF(ep_impl, "write");
281   ep_impl->stream_sync->NotifyOnWrite(&ep_impl->write_action);
282 }
283 
CFStreamShutdown(grpc_endpoint * ep,grpc_error * why)284 void CFStreamShutdown(grpc_endpoint* ep, grpc_error* why) {
285   CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
286   if (grpc_tcp_trace.enabled()) {
287     gpr_log(GPR_DEBUG, "CFStream endpoint:%p shutdown (%p)", ep_impl, why);
288   }
289   CFReadStreamClose(ep_impl->read_stream);
290   CFWriteStreamClose(ep_impl->write_stream);
291   ep_impl->stream_sync->Shutdown(why);
292   grpc_resource_user_shutdown(ep_impl->resource_user);
293   if (grpc_tcp_trace.enabled()) {
294     gpr_log(GPR_DEBUG, "CFStream endpoint:%p shutdown DONE (%p)", ep_impl, why);
295   }
296 }
297 
CFStreamDestroy(grpc_endpoint * ep)298 void CFStreamDestroy(grpc_endpoint* ep) {
299   CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
300   if (grpc_tcp_trace.enabled()) {
301     gpr_log(GPR_DEBUG, "CFStream endpoint:%p destroy", ep_impl);
302   }
303   EP_UNREF(ep_impl, "destroy");
304 }
305 
CFStreamGetResourceUser(grpc_endpoint * ep)306 grpc_resource_user* CFStreamGetResourceUser(grpc_endpoint* ep) {
307   CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
308   return ep_impl->resource_user;
309 }
310 
CFStreamGetPeer(grpc_endpoint * ep)311 char* CFStreamGetPeer(grpc_endpoint* ep) {
312   CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
313   return gpr_strdup(ep_impl->peer_string);
314 }
315 
CFStreamGetFD(grpc_endpoint * ep)316 int CFStreamGetFD(grpc_endpoint* ep) { return 0; }
317 
CFStreamAddToPollset(grpc_endpoint * ep,grpc_pollset * pollset)318 void CFStreamAddToPollset(grpc_endpoint* ep, grpc_pollset* pollset) {}
CFStreamAddToPollsetSet(grpc_endpoint * ep,grpc_pollset_set * pollset)319 void CFStreamAddToPollsetSet(grpc_endpoint* ep, grpc_pollset_set* pollset) {}
CFStreamDeleteFromPollsetSet(grpc_endpoint * ep,grpc_pollset_set * pollset)320 void CFStreamDeleteFromPollsetSet(grpc_endpoint* ep,
321                                   grpc_pollset_set* pollset) {}
322 
323 static const grpc_endpoint_vtable vtable = {CFStreamRead,
324                                             CFStreamWrite,
325                                             CFStreamAddToPollset,
326                                             CFStreamAddToPollsetSet,
327                                             CFStreamDeleteFromPollsetSet,
328                                             CFStreamShutdown,
329                                             CFStreamDestroy,
330                                             CFStreamGetResourceUser,
331                                             CFStreamGetPeer,
332                                             CFStreamGetFD};
333 
grpc_cfstream_endpoint_create(CFReadStreamRef read_stream,CFWriteStreamRef write_stream,const char * peer_string,grpc_resource_quota * resource_quota,CFStreamHandle * stream_sync)334 grpc_endpoint* grpc_cfstream_endpoint_create(
335     CFReadStreamRef read_stream, CFWriteStreamRef write_stream,
336     const char* peer_string, grpc_resource_quota* resource_quota,
337     CFStreamHandle* stream_sync) {
338   CFStreamEndpoint* ep_impl =
339       static_cast<CFStreamEndpoint*>(gpr_malloc(sizeof(CFStreamEndpoint)));
340   if (grpc_tcp_trace.enabled()) {
341     gpr_log(GPR_DEBUG,
342             "CFStream endpoint:%p create readStream:%p writeStream: %p",
343             ep_impl, read_stream, write_stream);
344   }
345   ep_impl->base.vtable = &vtable;
346   gpr_ref_init(&ep_impl->refcount, 1);
347   ep_impl->read_stream = read_stream;
348   ep_impl->write_stream = write_stream;
349   CFRetain(read_stream);
350   CFRetain(write_stream);
351   ep_impl->stream_sync = stream_sync;
352   CFSTREAM_HANDLE_REF(ep_impl->stream_sync, "endpoint create");
353 
354   ep_impl->peer_string = gpr_strdup(peer_string);
355   ep_impl->read_cb = nil;
356   ep_impl->write_cb = nil;
357   ep_impl->read_slices = nil;
358   ep_impl->write_slices = nil;
359   GRPC_CLOSURE_INIT(&ep_impl->read_action, ReadAction,
360                     static_cast<void*>(ep_impl), grpc_schedule_on_exec_ctx);
361   GRPC_CLOSURE_INIT(&ep_impl->write_action, WriteAction,
362                     static_cast<void*>(ep_impl), grpc_schedule_on_exec_ctx);
363   ep_impl->resource_user =
364       grpc_resource_user_create(resource_quota, peer_string);
365   grpc_resource_user_slice_allocator_init(&ep_impl->slice_allocator,
366                                           ep_impl->resource_user,
367                                           CFStreamReadAllocationDone, ep_impl);
368 
369   return &ep_impl->base;
370 }
371 
372 #endif /* GRPC_CFSTREAM_ENDPOINT */
373