• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 
23 #ifdef GRPC_CFSTREAM_ENDPOINT
24 
25 #import <CoreFoundation/CoreFoundation.h>
26 #import "src/core/lib/iomgr/endpoint_cfstream.h"
27 
28 #include <grpc/slice_buffer.h>
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/string_util.h>
31 
32 #include "src/core/lib/gpr/string.h"
33 #include "src/core/lib/iomgr/cfstream_handle.h"
34 #include "src/core/lib/iomgr/closure.h"
35 #include "src/core/lib/iomgr/endpoint.h"
36 #include "src/core/lib/iomgr/error_cfstream.h"
37 #include "src/core/lib/slice/slice_internal.h"
38 #include "src/core/lib/slice/slice_string_helpers.h"
39 
40 extern grpc_core::TraceFlag grpc_tcp_trace;
41 
42 struct CFStreamEndpoint {
43   grpc_endpoint base;
44   gpr_refcount refcount;
45 
46   CFReadStreamRef read_stream;
47   CFWriteStreamRef write_stream;
48   CFStreamHandle* stream_sync;
49 
50   grpc_closure* read_cb;
51   grpc_closure* write_cb;
52   grpc_slice_buffer* read_slices;
53   grpc_slice_buffer* write_slices;
54 
55   grpc_closure read_action;
56   grpc_closure write_action;
57 
58   char* peer_string;
59   grpc_resource_user* resource_user;
60   grpc_resource_user_slice_allocator slice_allocator;
61 };
CFStreamFree(CFStreamEndpoint * ep)62 static void CFStreamFree(CFStreamEndpoint* ep) {
63   grpc_resource_user_unref(ep->resource_user);
64   CFRelease(ep->read_stream);
65   CFRelease(ep->write_stream);
66   CFSTREAM_HANDLE_UNREF(ep->stream_sync, "free");
67   gpr_free(ep->peer_string);
68   gpr_free(ep);
69 }
70 
71 #ifndef NDEBUG
72 #define EP_REF(ep, reason) CFStreamRef((ep), (reason), __FILE__, __LINE__)
73 #define EP_UNREF(ep, reason) CFStreamUnref((ep), (reason), __FILE__, __LINE__)
CFStreamUnref(CFStreamEndpoint * ep,const char * reason,const char * file,int line)74 static void CFStreamUnref(CFStreamEndpoint* ep, const char* reason,
75                           const char* file, int line) {
76   if (grpc_tcp_trace.enabled()) {
77     gpr_atm val = gpr_atm_no_barrier_load(&ep->refcount.count);
78     gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
79             "CFStream endpoint unref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep,
80             reason, val, val - 1);
81   }
82   if (gpr_unref(&ep->refcount)) {
83     CFStreamFree(ep);
84   }
85 }
CFStreamRef(CFStreamEndpoint * ep,const char * reason,const char * file,int line)86 static void CFStreamRef(CFStreamEndpoint* ep, const char* reason,
87                         const char* file, int line) {
88   if (grpc_tcp_trace.enabled()) {
89     gpr_atm val = gpr_atm_no_barrier_load(&ep->refcount.count);
90     gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
91             "CFStream endpoint ref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep,
92             reason, val, val + 1);
93   }
94   gpr_ref(&ep->refcount);
95 }
96 #else
97 #define EP_REF(ep, reason) CFStreamRef((ep))
98 #define EP_UNREF(ep, reason) CFStreamUnref((ep))
CFStreamUnref(CFStreamEndpoint * ep)99 static void CFStreamUnref(CFStreamEndpoint* ep) {
100   if (gpr_unref(&ep->refcount)) {
101     CFStreamFree(ep);
102   }
103 }
CFStreamRef(CFStreamEndpoint * ep)104 static void CFStreamRef(CFStreamEndpoint* ep) { gpr_ref(&ep->refcount); }
105 #endif
106 
CFStreamAnnotateError(grpc_error * src_error,CFStreamEndpoint * ep)107 static grpc_error* CFStreamAnnotateError(grpc_error* src_error,
108                                          CFStreamEndpoint* ep) {
109   return grpc_error_set_str(
110       grpc_error_set_int(src_error, GRPC_ERROR_INT_GRPC_STATUS,
111                          GRPC_STATUS_UNAVAILABLE),
112       GRPC_ERROR_STR_TARGET_ADDRESS,
113       grpc_slice_from_copied_string(ep->peer_string));
114 }
115 
CallReadCb(CFStreamEndpoint * ep,grpc_error * error)116 static void CallReadCb(CFStreamEndpoint* ep, grpc_error* error) {
117   if (grpc_tcp_trace.enabled()) {
118     gpr_log(GPR_DEBUG, "CFStream endpoint:%p call_read_cb %p %p:%p", ep,
119             ep->read_cb, ep->read_cb->cb, ep->read_cb->cb_arg);
120     size_t i;
121     const char* str = grpc_error_string(error);
122     gpr_log(GPR_DEBUG, "read: error=%s", str);
123 
124     for (i = 0; i < ep->read_slices->count; i++) {
125       char* dump = grpc_dump_slice(ep->read_slices->slices[i],
126                                    GPR_DUMP_HEX | GPR_DUMP_ASCII);
127       gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", ep, ep->peer_string, dump);
128       gpr_free(dump);
129     }
130   }
131   grpc_closure* cb = ep->read_cb;
132   ep->read_cb = nullptr;
133   ep->read_slices = nullptr;
134   grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
135 }
136 
CallWriteCb(CFStreamEndpoint * ep,grpc_error * error)137 static void CallWriteCb(CFStreamEndpoint* ep, grpc_error* error) {
138   if (grpc_tcp_trace.enabled()) {
139     gpr_log(GPR_DEBUG, "CFStream endpoint:%p call_write_cb %p %p:%p", ep,
140             ep->write_cb, ep->write_cb->cb, ep->write_cb->cb_arg);
141     const char* str = grpc_error_string(error);
142     gpr_log(GPR_DEBUG, "write: error=%s", str);
143   }
144   grpc_closure* cb = ep->write_cb;
145   ep->write_cb = nullptr;
146   ep->write_slices = nullptr;
147   grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
148 }
149 
ReadAction(void * arg,grpc_error * error)150 static void ReadAction(void* arg, grpc_error* error) {
151   CFStreamEndpoint* ep = static_cast<CFStreamEndpoint*>(arg);
152   GPR_ASSERT(ep->read_cb != nullptr);
153   if (error) {
154     grpc_slice_buffer_reset_and_unref_internal(ep->read_slices);
155     CallReadCb(ep, GRPC_ERROR_REF(error));
156     EP_UNREF(ep, "read");
157     return;
158   }
159 
160   GPR_ASSERT(ep->read_slices->count == 1);
161   grpc_slice slice = ep->read_slices->slices[0];
162   size_t len = GRPC_SLICE_LENGTH(slice);
163   CFIndex read_size =
164       CFReadStreamRead(ep->read_stream, GRPC_SLICE_START_PTR(slice), len);
165   if (read_size == -1) {
166     grpc_slice_buffer_reset_and_unref_internal(ep->read_slices);
167     CFErrorRef stream_error = CFReadStreamCopyError(ep->read_stream);
168     if (stream_error != nullptr) {
169       error = CFStreamAnnotateError(
170           GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "Read error"), ep);
171       CFRelease(stream_error);
172     } else {
173       error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Read error");
174     }
175     CallReadCb(ep, error);
176     EP_UNREF(ep, "read");
177   } else if (read_size == 0) {
178     grpc_slice_buffer_reset_and_unref_internal(ep->read_slices);
179     CallReadCb(ep,
180                CFStreamAnnotateError(
181                    GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), ep));
182     EP_UNREF(ep, "read");
183   } else {
184     if (read_size < static_cast<CFIndex>(len)) {
185       grpc_slice_buffer_trim_end(ep->read_slices, len - read_size, nullptr);
186     }
187     CallReadCb(ep, GRPC_ERROR_NONE);
188     EP_UNREF(ep, "read");
189   }
190 }
191 
WriteAction(void * arg,grpc_error * error)192 static void WriteAction(void* arg, grpc_error* error) {
193   CFStreamEndpoint* ep = static_cast<CFStreamEndpoint*>(arg);
194   GPR_ASSERT(ep->write_cb != nullptr);
195   if (error) {
196     grpc_slice_buffer_reset_and_unref_internal(ep->write_slices);
197     CallWriteCb(ep, GRPC_ERROR_REF(error));
198     EP_UNREF(ep, "write");
199     return;
200   }
201 
202   grpc_slice slice = grpc_slice_buffer_take_first(ep->write_slices);
203   size_t slice_len = GRPC_SLICE_LENGTH(slice);
204   CFIndex write_size = CFWriteStreamWrite(
205       ep->write_stream, GRPC_SLICE_START_PTR(slice), slice_len);
206   if (write_size == -1) {
207     grpc_slice_buffer_reset_and_unref_internal(ep->write_slices);
208     CFErrorRef stream_error = CFWriteStreamCopyError(ep->write_stream);
209     if (stream_error != nullptr) {
210       error = CFStreamAnnotateError(
211           GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "write failed."), ep);
212       CFRelease(stream_error);
213     } else {
214       error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("write failed.");
215     }
216     CallWriteCb(ep, error);
217     EP_UNREF(ep, "write");
218   } else {
219     if (write_size < static_cast<CFIndex>(GRPC_SLICE_LENGTH(slice))) {
220       grpc_slice_buffer_undo_take_first(
221           ep->write_slices, grpc_slice_sub(slice, write_size, slice_len));
222     }
223     if (ep->write_slices->length > 0) {
224       ep->stream_sync->NotifyOnWrite(&ep->write_action);
225     } else {
226       CallWriteCb(ep, GRPC_ERROR_NONE);
227       EP_UNREF(ep, "write");
228     }
229 
230     if (grpc_tcp_trace.enabled()) {
231       grpc_slice trace_slice = grpc_slice_sub(slice, 0, write_size);
232       char* dump = grpc_dump_slice(trace_slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
233       gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", ep, ep->peer_string, dump);
234       gpr_free(dump);
235       grpc_slice_unref_internal(trace_slice);
236     }
237   }
238   grpc_slice_unref_internal(slice);
239 }
240 
CFStreamReadAllocationDone(void * arg,grpc_error * error)241 static void CFStreamReadAllocationDone(void* arg, grpc_error* error) {
242   CFStreamEndpoint* ep = static_cast<CFStreamEndpoint*>(arg);
243   if (error == GRPC_ERROR_NONE) {
244     ep->stream_sync->NotifyOnRead(&ep->read_action);
245   } else {
246     grpc_slice_buffer_reset_and_unref_internal(ep->read_slices);
247     CallReadCb(ep, error);
248     EP_UNREF(ep, "read");
249   }
250 }
251 
CFStreamRead(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,bool urgent)252 static void CFStreamRead(grpc_endpoint* ep, grpc_slice_buffer* slices,
253                          grpc_closure* cb, bool urgent) {
254   CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
255   if (grpc_tcp_trace.enabled()) {
256     gpr_log(GPR_DEBUG, "CFStream endpoint:%p read (%p, %p) length:%zu", ep_impl,
257             slices, cb, slices->length);
258   }
259   GPR_ASSERT(ep_impl->read_cb == nullptr);
260   ep_impl->read_cb = cb;
261   ep_impl->read_slices = slices;
262   grpc_slice_buffer_reset_and_unref_internal(slices);
263   EP_REF(ep_impl, "read");
264   if (grpc_resource_user_alloc_slices(&ep_impl->slice_allocator,
265                                       GRPC_TCP_DEFAULT_READ_SLICE_SIZE, 1,
266                                       ep_impl->read_slices)) {
267     ep_impl->stream_sync->NotifyOnRead(&ep_impl->read_action);
268   }
269 }
270 
CFStreamWrite(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,void * arg)271 static void CFStreamWrite(grpc_endpoint* ep, grpc_slice_buffer* slices,
272                           grpc_closure* cb, void* arg) {
273   CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
274   if (grpc_tcp_trace.enabled()) {
275     gpr_log(GPR_DEBUG, "CFStream endpoint:%p write (%p, %p) length:%zu",
276             ep_impl, slices, cb, slices->length);
277   }
278   GPR_ASSERT(ep_impl->write_cb == nullptr);
279   ep_impl->write_cb = cb;
280   ep_impl->write_slices = slices;
281   EP_REF(ep_impl, "write");
282   ep_impl->stream_sync->NotifyOnWrite(&ep_impl->write_action);
283 }
284 
CFStreamShutdown(grpc_endpoint * ep,grpc_error * why)285 void CFStreamShutdown(grpc_endpoint* ep, grpc_error* why) {
286   CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
287   if (grpc_tcp_trace.enabled()) {
288     gpr_log(GPR_DEBUG, "CFStream endpoint:%p shutdown (%p)", ep_impl, why);
289   }
290   CFReadStreamClose(ep_impl->read_stream);
291   CFWriteStreamClose(ep_impl->write_stream);
292   ep_impl->stream_sync->Shutdown(why);
293   grpc_resource_user_shutdown(ep_impl->resource_user);
294   if (grpc_tcp_trace.enabled()) {
295     gpr_log(GPR_DEBUG, "CFStream endpoint:%p shutdown DONE (%p)", ep_impl, why);
296   }
297 }
298 
CFStreamDestroy(grpc_endpoint * ep)299 void CFStreamDestroy(grpc_endpoint* ep) {
300   CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
301   if (grpc_tcp_trace.enabled()) {
302     gpr_log(GPR_DEBUG, "CFStream endpoint:%p destroy", ep_impl);
303   }
304   EP_UNREF(ep_impl, "destroy");
305 }
306 
CFStreamGetResourceUser(grpc_endpoint * ep)307 grpc_resource_user* CFStreamGetResourceUser(grpc_endpoint* ep) {
308   CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
309   return ep_impl->resource_user;
310 }
311 
CFStreamGetPeer(grpc_endpoint * ep)312 char* CFStreamGetPeer(grpc_endpoint* ep) {
313   CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
314   return gpr_strdup(ep_impl->peer_string);
315 }
316 
CFStreamGetFD(grpc_endpoint * ep)317 int CFStreamGetFD(grpc_endpoint* ep) { return 0; }
318 
CFStreamCanTrackErr(grpc_endpoint * ep)319 bool CFStreamCanTrackErr(grpc_endpoint* ep) { return false; }
320 
CFStreamAddToPollset(grpc_endpoint * ep,grpc_pollset * pollset)321 void CFStreamAddToPollset(grpc_endpoint* ep, grpc_pollset* pollset) {}
CFStreamAddToPollsetSet(grpc_endpoint * ep,grpc_pollset_set * pollset)322 void CFStreamAddToPollsetSet(grpc_endpoint* ep, grpc_pollset_set* pollset) {}
CFStreamDeleteFromPollsetSet(grpc_endpoint * ep,grpc_pollset_set * pollset)323 void CFStreamDeleteFromPollsetSet(grpc_endpoint* ep,
324                                   grpc_pollset_set* pollset) {}
325 
326 static const grpc_endpoint_vtable vtable = {CFStreamRead,
327                                             CFStreamWrite,
328                                             CFStreamAddToPollset,
329                                             CFStreamAddToPollsetSet,
330                                             CFStreamDeleteFromPollsetSet,
331                                             CFStreamShutdown,
332                                             CFStreamDestroy,
333                                             CFStreamGetResourceUser,
334                                             CFStreamGetPeer,
335                                             CFStreamGetFD,
336                                             CFStreamCanTrackErr};
337 
grpc_cfstream_endpoint_create(CFReadStreamRef read_stream,CFWriteStreamRef write_stream,const char * peer_string,grpc_resource_quota * resource_quota,CFStreamHandle * stream_sync)338 grpc_endpoint* grpc_cfstream_endpoint_create(
339     CFReadStreamRef read_stream, CFWriteStreamRef write_stream,
340     const char* peer_string, grpc_resource_quota* resource_quota,
341     CFStreamHandle* stream_sync) {
342   CFStreamEndpoint* ep_impl =
343       static_cast<CFStreamEndpoint*>(gpr_malloc(sizeof(CFStreamEndpoint)));
344   if (grpc_tcp_trace.enabled()) {
345     gpr_log(GPR_DEBUG,
346             "CFStream endpoint:%p create readStream:%p writeStream: %p",
347             ep_impl, read_stream, write_stream);
348   }
349   ep_impl->base.vtable = &vtable;
350   gpr_ref_init(&ep_impl->refcount, 1);
351   ep_impl->read_stream = read_stream;
352   ep_impl->write_stream = write_stream;
353   CFRetain(read_stream);
354   CFRetain(write_stream);
355   ep_impl->stream_sync = stream_sync;
356   CFSTREAM_HANDLE_REF(ep_impl->stream_sync, "endpoint create");
357 
358   ep_impl->peer_string = gpr_strdup(peer_string);
359   ep_impl->read_cb = nil;
360   ep_impl->write_cb = nil;
361   ep_impl->read_slices = nil;
362   ep_impl->write_slices = nil;
363   GRPC_CLOSURE_INIT(&ep_impl->read_action, ReadAction,
364                     static_cast<void*>(ep_impl), grpc_schedule_on_exec_ctx);
365   GRPC_CLOSURE_INIT(&ep_impl->write_action, WriteAction,
366                     static_cast<void*>(ep_impl), grpc_schedule_on_exec_ctx);
367   ep_impl->resource_user =
368       grpc_resource_user_create(resource_quota, peer_string);
369   grpc_resource_user_slice_allocator_init(&ep_impl->slice_allocator,
370                                           ep_impl->resource_user,
371                                           CFStreamReadAllocationDone, ep_impl);
372 
373   return &ep_impl->base;
374 }
375 
376 #endif /* GRPC_CFSTREAM_ENDPOINT */
377