1 /*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 #ifdef GRPC_CFSTREAM_ENDPOINT
24
25 #import <CoreFoundation/CoreFoundation.h>
26 #import "src/core/lib/iomgr/endpoint_cfstream.h"
27
28 #include <grpc/slice_buffer.h>
29 #include <grpc/support/alloc.h>
30 #include <grpc/support/string_util.h>
31
32 #include "src/core/lib/gpr/string.h"
33 #include "src/core/lib/iomgr/cfstream_handle.h"
34 #include "src/core/lib/iomgr/closure.h"
35 #include "src/core/lib/iomgr/endpoint.h"
36 #include "src/core/lib/iomgr/error_cfstream.h"
37 #include "src/core/lib/slice/slice_internal.h"
38 #include "src/core/lib/slice/slice_string_helpers.h"
39
40 extern grpc_core::TraceFlag grpc_tcp_trace;
41
42 typedef struct {
43 grpc_endpoint base;
44 gpr_refcount refcount;
45
46 CFReadStreamRef read_stream;
47 CFWriteStreamRef write_stream;
48 CFStreamHandle* stream_sync;
49
50 grpc_closure* read_cb;
51 grpc_closure* write_cb;
52 grpc_slice_buffer* read_slices;
53 grpc_slice_buffer* write_slices;
54
55 grpc_closure read_action;
56 grpc_closure write_action;
57
58 char* peer_string;
59 grpc_resource_user* resource_user;
60 grpc_resource_user_slice_allocator slice_allocator;
61 } CFStreamEndpoint;
62
CFStreamFree(CFStreamEndpoint * ep)63 static void CFStreamFree(CFStreamEndpoint* ep) {
64 grpc_resource_user_unref(ep->resource_user);
65 CFRelease(ep->read_stream);
66 CFRelease(ep->write_stream);
67 CFSTREAM_HANDLE_UNREF(ep->stream_sync, "free");
68 gpr_free(ep->peer_string);
69 gpr_free(ep);
70 }
71
72 #ifndef NDEBUG
73 #define EP_REF(ep, reason) CFStreamRef((ep), (reason), __FILE__, __LINE__)
74 #define EP_UNREF(ep, reason) CFStreamUnref((ep), (reason), __FILE__, __LINE__)
CFStreamUnref(CFStreamEndpoint * ep,const char * reason,const char * file,int line)75 static void CFStreamUnref(CFStreamEndpoint* ep, const char* reason,
76 const char* file, int line) {
77 if (grpc_tcp_trace.enabled()) {
78 gpr_atm val = gpr_atm_no_barrier_load(&ep->refcount.count);
79 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
80 "CFStream endpoint unref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep,
81 reason, val, val - 1);
82 }
83 if (gpr_unref(&ep->refcount)) {
84 CFStreamFree(ep);
85 }
86 }
CFStreamRef(CFStreamEndpoint * ep,const char * reason,const char * file,int line)87 static void CFStreamRef(CFStreamEndpoint* ep, const char* reason,
88 const char* file, int line) {
89 if (grpc_tcp_trace.enabled()) {
90 gpr_atm val = gpr_atm_no_barrier_load(&ep->refcount.count);
91 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
92 "CFStream endpoint ref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep,
93 reason, val, val + 1);
94 }
95 gpr_ref(&ep->refcount);
96 }
97 #else
98 #define EP_REF(ep, reason) CFStreamRef((ep))
99 #define EP_UNREF(ep, reason) CFStreamUnref((ep))
CFStreamUnref(CFStreamEndpoint * ep)100 static void CFStreamUnref(CFStreamEndpoint* ep) {
101 if (gpr_unref(&ep->refcount)) {
102 CFStreamFree(ep);
103 }
104 }
CFStreamRef(CFStreamEndpoint * ep)105 static void CFStreamRef(CFStreamEndpoint* ep) { gpr_ref(&ep->refcount); }
106 #endif
107
CFStreamAnnotateError(grpc_error * src_error,CFStreamEndpoint * ep)108 static grpc_error* CFStreamAnnotateError(grpc_error* src_error,
109 CFStreamEndpoint* ep) {
110 return grpc_error_set_str(
111 grpc_error_set_int(src_error, GRPC_ERROR_INT_GRPC_STATUS,
112 GRPC_STATUS_UNAVAILABLE),
113 GRPC_ERROR_STR_TARGET_ADDRESS,
114 grpc_slice_from_copied_string(ep->peer_string));
115 }
116
CallReadCb(CFStreamEndpoint * ep,grpc_error * error)117 static void CallReadCb(CFStreamEndpoint* ep, grpc_error* error) {
118 if (grpc_tcp_trace.enabled()) {
119 gpr_log(GPR_DEBUG, "CFStream endpoint:%p call_read_cb %p %p:%p", ep,
120 ep->read_cb, ep->read_cb->cb, ep->read_cb->cb_arg);
121 size_t i;
122 const char* str = grpc_error_string(error);
123 gpr_log(GPR_DEBUG, "read: error=%s", str);
124
125 for (i = 0; i < ep->read_slices->count; i++) {
126 char* dump = grpc_dump_slice(ep->read_slices->slices[i],
127 GPR_DUMP_HEX | GPR_DUMP_ASCII);
128 gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", ep, ep->peer_string, dump);
129 gpr_free(dump);
130 }
131 }
132 grpc_closure* cb = ep->read_cb;
133 ep->read_cb = nullptr;
134 ep->read_slices = nullptr;
135 GRPC_CLOSURE_SCHED(cb, error);
136 }
137
CallWriteCb(CFStreamEndpoint * ep,grpc_error * error)138 static void CallWriteCb(CFStreamEndpoint* ep, grpc_error* error) {
139 if (grpc_tcp_trace.enabled()) {
140 gpr_log(GPR_DEBUG, "CFStream endpoint:%p call_write_cb %p %p:%p", ep,
141 ep->write_cb, ep->write_cb->cb, ep->write_cb->cb_arg);
142 const char* str = grpc_error_string(error);
143 gpr_log(GPR_DEBUG, "write: error=%s", str);
144 }
145 grpc_closure* cb = ep->write_cb;
146 ep->write_cb = nullptr;
147 ep->write_slices = nullptr;
148 GRPC_CLOSURE_SCHED(cb, error);
149 }
150
ReadAction(void * arg,grpc_error * error)151 static void ReadAction(void* arg, grpc_error* error) {
152 CFStreamEndpoint* ep = static_cast<CFStreamEndpoint*>(arg);
153 GPR_ASSERT(ep->read_cb != nullptr);
154 if (error) {
155 grpc_slice_buffer_reset_and_unref_internal(ep->read_slices);
156 CallReadCb(ep, GRPC_ERROR_REF(error));
157 EP_UNREF(ep, "read");
158 return;
159 }
160
161 GPR_ASSERT(ep->read_slices->count == 1);
162 grpc_slice slice = ep->read_slices->slices[0];
163 size_t len = GRPC_SLICE_LENGTH(slice);
164 CFIndex read_size =
165 CFReadStreamRead(ep->read_stream, GRPC_SLICE_START_PTR(slice), len);
166 if (read_size == -1) {
167 grpc_slice_buffer_reset_and_unref_internal(ep->read_slices);
168 CFErrorRef stream_error = CFReadStreamCopyError(ep->read_stream);
169 if (stream_error != nullptr) {
170 error = CFStreamAnnotateError(
171 GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "Read error"), ep);
172 CFRelease(stream_error);
173 } else {
174 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Read error");
175 }
176 CallReadCb(ep, error);
177 EP_UNREF(ep, "read");
178 } else if (read_size == 0) {
179 grpc_slice_buffer_reset_and_unref_internal(ep->read_slices);
180 CallReadCb(ep,
181 CFStreamAnnotateError(
182 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), ep));
183 EP_UNREF(ep, "read");
184 } else {
185 if (read_size < len) {
186 grpc_slice_buffer_trim_end(ep->read_slices, len - read_size, nullptr);
187 }
188 CallReadCb(ep, GRPC_ERROR_NONE);
189 EP_UNREF(ep, "read");
190 }
191 }
192
WriteAction(void * arg,grpc_error * error)193 static void WriteAction(void* arg, grpc_error* error) {
194 CFStreamEndpoint* ep = static_cast<CFStreamEndpoint*>(arg);
195 GPR_ASSERT(ep->write_cb != nullptr);
196 if (error) {
197 grpc_slice_buffer_reset_and_unref_internal(ep->write_slices);
198 CallWriteCb(ep, GRPC_ERROR_REF(error));
199 EP_UNREF(ep, "write");
200 return;
201 }
202
203 grpc_slice slice = grpc_slice_buffer_take_first(ep->write_slices);
204 size_t slice_len = GRPC_SLICE_LENGTH(slice);
205 CFIndex write_size = CFWriteStreamWrite(
206 ep->write_stream, GRPC_SLICE_START_PTR(slice), slice_len);
207 if (write_size == -1) {
208 grpc_slice_buffer_reset_and_unref_internal(ep->write_slices);
209 CFErrorRef stream_error = CFWriteStreamCopyError(ep->write_stream);
210 if (stream_error != nullptr) {
211 error = CFStreamAnnotateError(
212 GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "write failed."), ep);
213 CFRelease(stream_error);
214 } else {
215 error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("write failed.");
216 }
217 CallWriteCb(ep, error);
218 EP_UNREF(ep, "write");
219 } else {
220 if (write_size < GRPC_SLICE_LENGTH(slice)) {
221 grpc_slice_buffer_undo_take_first(
222 ep->write_slices, grpc_slice_sub(slice, write_size, slice_len));
223 }
224 if (ep->write_slices->length > 0) {
225 ep->stream_sync->NotifyOnWrite(&ep->write_action);
226 } else {
227 CallWriteCb(ep, GRPC_ERROR_NONE);
228 EP_UNREF(ep, "write");
229 }
230
231 if (grpc_tcp_trace.enabled()) {
232 grpc_slice trace_slice = grpc_slice_sub(slice, 0, write_size);
233 char* dump = grpc_dump_slice(trace_slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
234 gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", ep, ep->peer_string, dump);
235 gpr_free(dump);
236 grpc_slice_unref_internal(trace_slice);
237 }
238 }
239 grpc_slice_unref_internal(slice);
240 }
241
CFStreamReadAllocationDone(void * arg,grpc_error * error)242 static void CFStreamReadAllocationDone(void* arg, grpc_error* error) {
243 CFStreamEndpoint* ep = static_cast<CFStreamEndpoint*>(arg);
244 if (error == GRPC_ERROR_NONE) {
245 ep->stream_sync->NotifyOnRead(&ep->read_action);
246 } else {
247 grpc_slice_buffer_reset_and_unref_internal(ep->read_slices);
248 CallReadCb(ep, error);
249 EP_UNREF(ep, "read");
250 }
251 }
252
CFStreamRead(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb)253 static void CFStreamRead(grpc_endpoint* ep, grpc_slice_buffer* slices,
254 grpc_closure* cb) {
255 CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
256 if (grpc_tcp_trace.enabled()) {
257 gpr_log(GPR_DEBUG, "CFStream endpoint:%p read (%p, %p) length:%zu", ep_impl,
258 slices, cb, slices->length);
259 }
260 GPR_ASSERT(ep_impl->read_cb == nullptr);
261 ep_impl->read_cb = cb;
262 ep_impl->read_slices = slices;
263 grpc_slice_buffer_reset_and_unref_internal(slices);
264 grpc_resource_user_alloc_slices(&ep_impl->slice_allocator,
265 GRPC_TCP_DEFAULT_READ_SLICE_SIZE, 1,
266 ep_impl->read_slices);
267 EP_REF(ep_impl, "read");
268 }
269
CFStreamWrite(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,void * arg)270 static void CFStreamWrite(grpc_endpoint* ep, grpc_slice_buffer* slices,
271 grpc_closure* cb, void* arg) {
272 CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
273 if (grpc_tcp_trace.enabled()) {
274 gpr_log(GPR_DEBUG, "CFStream endpoint:%p write (%p, %p) length:%zu",
275 ep_impl, slices, cb, slices->length);
276 }
277 GPR_ASSERT(ep_impl->write_cb == nullptr);
278 ep_impl->write_cb = cb;
279 ep_impl->write_slices = slices;
280 EP_REF(ep_impl, "write");
281 ep_impl->stream_sync->NotifyOnWrite(&ep_impl->write_action);
282 }
283
CFStreamShutdown(grpc_endpoint * ep,grpc_error * why)284 void CFStreamShutdown(grpc_endpoint* ep, grpc_error* why) {
285 CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
286 if (grpc_tcp_trace.enabled()) {
287 gpr_log(GPR_DEBUG, "CFStream endpoint:%p shutdown (%p)", ep_impl, why);
288 }
289 CFReadStreamClose(ep_impl->read_stream);
290 CFWriteStreamClose(ep_impl->write_stream);
291 ep_impl->stream_sync->Shutdown(why);
292 grpc_resource_user_shutdown(ep_impl->resource_user);
293 if (grpc_tcp_trace.enabled()) {
294 gpr_log(GPR_DEBUG, "CFStream endpoint:%p shutdown DONE (%p)", ep_impl, why);
295 }
296 }
297
CFStreamDestroy(grpc_endpoint * ep)298 void CFStreamDestroy(grpc_endpoint* ep) {
299 CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
300 if (grpc_tcp_trace.enabled()) {
301 gpr_log(GPR_DEBUG, "CFStream endpoint:%p destroy", ep_impl);
302 }
303 EP_UNREF(ep_impl, "destroy");
304 }
305
CFStreamGetResourceUser(grpc_endpoint * ep)306 grpc_resource_user* CFStreamGetResourceUser(grpc_endpoint* ep) {
307 CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
308 return ep_impl->resource_user;
309 }
310
CFStreamGetPeer(grpc_endpoint * ep)311 char* CFStreamGetPeer(grpc_endpoint* ep) {
312 CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
313 return gpr_strdup(ep_impl->peer_string);
314 }
315
CFStreamGetFD(grpc_endpoint * ep)316 int CFStreamGetFD(grpc_endpoint* ep) { return 0; }
317
CFStreamAddToPollset(grpc_endpoint * ep,grpc_pollset * pollset)318 void CFStreamAddToPollset(grpc_endpoint* ep, grpc_pollset* pollset) {}
CFStreamAddToPollsetSet(grpc_endpoint * ep,grpc_pollset_set * pollset)319 void CFStreamAddToPollsetSet(grpc_endpoint* ep, grpc_pollset_set* pollset) {}
CFStreamDeleteFromPollsetSet(grpc_endpoint * ep,grpc_pollset_set * pollset)320 void CFStreamDeleteFromPollsetSet(grpc_endpoint* ep,
321 grpc_pollset_set* pollset) {}
322
323 static const grpc_endpoint_vtable vtable = {CFStreamRead,
324 CFStreamWrite,
325 CFStreamAddToPollset,
326 CFStreamAddToPollsetSet,
327 CFStreamDeleteFromPollsetSet,
328 CFStreamShutdown,
329 CFStreamDestroy,
330 CFStreamGetResourceUser,
331 CFStreamGetPeer,
332 CFStreamGetFD};
333
grpc_cfstream_endpoint_create(CFReadStreamRef read_stream,CFWriteStreamRef write_stream,const char * peer_string,grpc_resource_quota * resource_quota,CFStreamHandle * stream_sync)334 grpc_endpoint* grpc_cfstream_endpoint_create(
335 CFReadStreamRef read_stream, CFWriteStreamRef write_stream,
336 const char* peer_string, grpc_resource_quota* resource_quota,
337 CFStreamHandle* stream_sync) {
338 CFStreamEndpoint* ep_impl =
339 static_cast<CFStreamEndpoint*>(gpr_malloc(sizeof(CFStreamEndpoint)));
340 if (grpc_tcp_trace.enabled()) {
341 gpr_log(GPR_DEBUG,
342 "CFStream endpoint:%p create readStream:%p writeStream: %p",
343 ep_impl, read_stream, write_stream);
344 }
345 ep_impl->base.vtable = &vtable;
346 gpr_ref_init(&ep_impl->refcount, 1);
347 ep_impl->read_stream = read_stream;
348 ep_impl->write_stream = write_stream;
349 CFRetain(read_stream);
350 CFRetain(write_stream);
351 ep_impl->stream_sync = stream_sync;
352 CFSTREAM_HANDLE_REF(ep_impl->stream_sync, "endpoint create");
353
354 ep_impl->peer_string = gpr_strdup(peer_string);
355 ep_impl->read_cb = nil;
356 ep_impl->write_cb = nil;
357 ep_impl->read_slices = nil;
358 ep_impl->write_slices = nil;
359 GRPC_CLOSURE_INIT(&ep_impl->read_action, ReadAction,
360 static_cast<void*>(ep_impl), grpc_schedule_on_exec_ctx);
361 GRPC_CLOSURE_INIT(&ep_impl->write_action, WriteAction,
362 static_cast<void*>(ep_impl), grpc_schedule_on_exec_ctx);
363 ep_impl->resource_user =
364 grpc_resource_user_create(resource_quota, peer_string);
365 grpc_resource_user_slice_allocator_init(&ep_impl->slice_allocator,
366 ep_impl->resource_user,
367 CFStreamReadAllocationDone, ep_impl);
368
369 return &ep_impl->base;
370 }
371
372 #endif /* GRPC_CFSTREAM_ENDPOINT */
373