1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 #ifdef GRPC_CFSTREAM_ENDPOINT
24
25 #import <CoreFoundation/CoreFoundation.h>
26 #include <grpc/slice_buffer.h>
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/string_util.h>
29
30 #include "absl/log/check.h"
31 #include "absl/log/log.h"
32 #include "src/core/lib/address_utils/sockaddr_utils.h"
33 #include "src/core/lib/iomgr/cfstream_handle.h"
34 #include "src/core/lib/iomgr/closure.h"
35 #include "src/core/lib/iomgr/endpoint.h"
36 #import "src/core/lib/iomgr/endpoint_cfstream.h"
37 #include "src/core/lib/iomgr/error_cfstream.h"
38 #include "src/core/lib/iomgr/sockaddr.h"
39 #include "src/core/lib/slice/slice.h"
40 #include "src/core/lib/slice/slice_internal.h"
41 #include "src/core/lib/slice/slice_string_helpers.h"
42 #include "src/core/util/string.h"
43
44 struct CFStreamEndpoint {
45 grpc_endpoint base;
46 gpr_refcount refcount;
47
48 CFReadStreamRef read_stream;
49 CFWriteStreamRef write_stream;
50 CFStreamHandle* stream_sync;
51
52 grpc_closure* read_cb;
53 grpc_closure* write_cb;
54 grpc_slice_buffer* read_slices;
55 grpc_slice_buffer* write_slices;
56
57 grpc_closure read_action;
58 grpc_closure write_action;
59
60 std::string peer_string;
61 std::string local_address;
62 };
CFStreamFree(CFStreamEndpoint * ep)63 static void CFStreamFree(CFStreamEndpoint* ep) {
64 CFRelease(ep->read_stream);
65 CFRelease(ep->write_stream);
66 CFSTREAM_HANDLE_UNREF(ep->stream_sync, "free");
67 delete ep;
68 }
69
70 #ifndef NDEBUG
71 #define EP_REF(ep, reason) CFStreamRef((ep), (reason), __FILE__, __LINE__)
72 #define EP_UNREF(ep, reason) CFStreamUnref((ep), (reason), __FILE__, __LINE__)
CFStreamUnref(CFStreamEndpoint * ep,const char * reason,const char * file,int line)73 static void CFStreamUnref(CFStreamEndpoint* ep, const char* reason,
74 const char* file, int line) {
75 if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
76 gpr_atm val = gpr_atm_no_barrier_load(&ep->refcount.count);
77 VLOG(2).AtLocation(file, line) << "CFStream endpoint unref " << ep << " : "
78 << reason << " " << val << " -> " << val - 1;
79 }
80 if (gpr_unref(&ep->refcount)) {
81 CFStreamFree(ep);
82 }
83 }
CFStreamRef(CFStreamEndpoint * ep,const char * reason,const char * file,int line)84 static void CFStreamRef(CFStreamEndpoint* ep, const char* reason,
85 const char* file, int line) {
86 if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
87 gpr_atm val = gpr_atm_no_barrier_load(&ep->refcount.count);
88 VLOG(2).AtLocation(file, line) << "CFStream endpoint ref " << ep << " : "
89 << reason << " " << val << " -> " << val + 1;
90 }
91 gpr_ref(&ep->refcount);
92 }
93 #else
94 #define EP_REF(ep, reason) CFStreamRef((ep))
95 #define EP_UNREF(ep, reason) CFStreamUnref((ep))
CFStreamUnref(CFStreamEndpoint * ep)96 static void CFStreamUnref(CFStreamEndpoint* ep) {
97 if (gpr_unref(&ep->refcount)) {
98 CFStreamFree(ep);
99 }
100 }
CFStreamRef(CFStreamEndpoint * ep)101 static void CFStreamRef(CFStreamEndpoint* ep) { gpr_ref(&ep->refcount); }
102 #endif
103
CFStreamAnnotateError(grpc_error_handle src_error)104 static grpc_error_handle CFStreamAnnotateError(grpc_error_handle src_error) {
105 return grpc_error_set_int(src_error, grpc_core::StatusIntProperty::kRpcStatus,
106 GRPC_STATUS_UNAVAILABLE);
107 }
108
CallReadCb(CFStreamEndpoint * ep,grpc_error_handle error)109 static void CallReadCb(CFStreamEndpoint* ep, grpc_error_handle error) {
110 if (GRPC_TRACE_FLAG_ENABLED(tcp) && ABSL_VLOG_IS_ON(2)) {
111 VLOG(2) << "CFStream endpoint:" << ep << " call_read_cb " << ep->read_cb
112 << " " << ep->read_cb->cb << ":" << ep->read_cb->cb_arg;
113 size_t i;
114 VLOG(2) << "read: error=" << grpc_core::StatusToString(error);
115
116 for (i = 0; i < ep->read_slices->count; i++) {
117 char* dump = grpc_dump_slice(ep->read_slices->slices[i],
118 GPR_DUMP_HEX | GPR_DUMP_ASCII);
119 VLOG(2) << "READ " << ep << " (peer=" << ep->peer_string << "): " << dump;
120 gpr_free(dump);
121 }
122 }
123 grpc_closure* cb = ep->read_cb;
124 ep->read_cb = nullptr;
125 ep->read_slices = nullptr;
126 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
127 }
128
CallWriteCb(CFStreamEndpoint * ep,grpc_error_handle error)129 static void CallWriteCb(CFStreamEndpoint* ep, grpc_error_handle error) {
130 GRPC_TRACE_VLOG(tcp, 2) << "CFStream endpoint:" << ep << " call_write_cb "
131 << ep->write_cb << " " << ep->write_cb->cb << ":"
132 << ep->write_cb->cb_arg << "write: error="
133 << grpc_core::StatusToString(error);
134 grpc_closure* cb = ep->write_cb;
135 ep->write_cb = nullptr;
136 ep->write_slices = nullptr;
137 grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error);
138 }
139
ReadAction(void * arg,grpc_error_handle error)140 static void ReadAction(void* arg, grpc_error_handle error) {
141 CFStreamEndpoint* ep = static_cast<CFStreamEndpoint*>(arg);
142 CHECK_NE(ep->read_cb, nullptr);
143 if (!error.ok()) {
144 grpc_slice_buffer_reset_and_unref(ep->read_slices);
145 CallReadCb(ep, error);
146 EP_UNREF(ep, "read");
147 return;
148 }
149
150 CHECK_EQ(ep->read_slices->count, 1);
151 grpc_slice slice = ep->read_slices->slices[0];
152 size_t len = GRPC_SLICE_LENGTH(slice);
153 CFIndex read_size =
154 CFReadStreamRead(ep->read_stream, GRPC_SLICE_START_PTR(slice), len);
155 if (read_size == -1) {
156 grpc_slice_buffer_reset_and_unref(ep->read_slices);
157 CFErrorRef stream_error = CFReadStreamCopyError(ep->read_stream);
158 if (stream_error != nullptr) {
159 error = CFStreamAnnotateError(
160 GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "Read error"));
161 CFRelease(stream_error);
162 } else {
163 error = GRPC_ERROR_CREATE("Read error");
164 }
165 CallReadCb(ep, error);
166 EP_UNREF(ep, "read");
167 } else if (read_size == 0) {
168 grpc_slice_buffer_reset_and_unref(ep->read_slices);
169 CallReadCb(ep, CFStreamAnnotateError(GRPC_ERROR_CREATE("Socket closed")));
170 EP_UNREF(ep, "read");
171 } else {
172 if (read_size < static_cast<CFIndex>(len)) {
173 grpc_slice_buffer_trim_end(ep->read_slices, len - read_size, nullptr);
174 }
175 CallReadCb(ep, absl::OkStatus());
176 EP_UNREF(ep, "read");
177 }
178 }
179
WriteAction(void * arg,grpc_error_handle error)180 static void WriteAction(void* arg, grpc_error_handle error) {
181 CFStreamEndpoint* ep = static_cast<CFStreamEndpoint*>(arg);
182 CHECK_NE(ep->write_cb, nullptr);
183 if (!error.ok()) {
184 grpc_slice_buffer_reset_and_unref(ep->write_slices);
185 CallWriteCb(ep, error);
186 EP_UNREF(ep, "write");
187 return;
188 }
189 grpc_slice slice = grpc_slice_buffer_take_first(ep->write_slices);
190 size_t slice_len = GRPC_SLICE_LENGTH(slice);
191 CFIndex write_size = CFWriteStreamWrite(
192 ep->write_stream, GRPC_SLICE_START_PTR(slice), slice_len);
193 if (write_size == -1) {
194 grpc_slice_buffer_reset_and_unref(ep->write_slices);
195 CFErrorRef stream_error = CFWriteStreamCopyError(ep->write_stream);
196 if (stream_error != nullptr) {
197 error = CFStreamAnnotateError(
198 GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "Write failed"));
199 CFRelease(stream_error);
200 } else {
201 error = GRPC_ERROR_CREATE("write failed.");
202 }
203 CallWriteCb(ep, error);
204 EP_UNREF(ep, "write");
205 } else {
206 if (write_size < static_cast<CFIndex>(GRPC_SLICE_LENGTH(slice))) {
207 grpc_slice_buffer_undo_take_first(
208 ep->write_slices, grpc_slice_sub(slice, write_size, slice_len));
209 }
210 if (ep->write_slices->length > 0) {
211 ep->stream_sync->NotifyOnWrite(&ep->write_action);
212 } else {
213 CallWriteCb(ep, absl::OkStatus());
214 EP_UNREF(ep, "write");
215 }
216
217 if (GRPC_TRACE_FLAG_ENABLED(tcp) && ABSL_VLOG_IS_ON(2)) {
218 grpc_slice trace_slice = grpc_slice_sub(slice, 0, write_size);
219 char* dump = grpc_dump_slice(trace_slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
220 VLOG(2) << "WRITE " << ep << " (peer=" << ep->peer_string
221 << "): " << dump;
222 gpr_free(dump);
223 grpc_core::CSliceUnref(trace_slice);
224 }
225 }
226 grpc_core::CSliceUnref(slice);
227 }
228
CFStreamRead(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,bool,int)229 static void CFStreamRead(grpc_endpoint* ep, grpc_slice_buffer* slices,
230 grpc_closure* cb, bool /*urgent*/,
231 int /*min_progress_size*/) {
232 CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
233 GRPC_TRACE_VLOG(tcp, 2) << "CFStream endpoint:" << ep_impl << " read ("
234 << slices << ", " << cb
235 << ") length:" << slices->length;
236 CHECK_EQ(ep_impl->read_cb, nullptr);
237 ep_impl->read_cb = cb;
238 ep_impl->read_slices = slices;
239 grpc_slice_buffer_reset_and_unref(slices);
240 grpc_slice_buffer_add_indexed(
241 slices, GRPC_SLICE_MALLOC(GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
242 EP_REF(ep_impl, "read");
243 ep_impl->stream_sync->NotifyOnRead(&ep_impl->read_action);
244 }
245
CFStreamWrite(grpc_endpoint * ep,grpc_slice_buffer * slices,grpc_closure * cb,void *,int)246 static void CFStreamWrite(grpc_endpoint* ep, grpc_slice_buffer* slices,
247 grpc_closure* cb, void* /*arg*/,
248 int /*max_frame_size*/) {
249 CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
250 GRPC_TRACE_VLOG(tcp, 2) << "CFStream endpoint:" << ep_impl << " write ("
251 << slices << ", " << cb
252 << ") length:" << slices->length;
253 CHECK_EQ(ep_impl->write_cb, nullptr);
254 ep_impl->write_cb = cb;
255 ep_impl->write_slices = slices;
256 EP_REF(ep_impl, "write");
257 ep_impl->stream_sync->NotifyOnWrite(&ep_impl->write_action);
258 }
259
CFStreamDestroy(grpc_endpoint * ep)260 void CFStreamDestroy(grpc_endpoint* ep) {
261 CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
262 GRPC_TRACE_VLOG(tcp, 2) << "CFStream endpoint:" << ep_impl << " destroy";
263 CFReadStreamClose(ep_impl->read_stream);
264 CFWriteStreamClose(ep_impl->write_stream);
265 ep_impl->stream_sync->Shutdown(absl::UnavailableError("endpoint shutdown"));
266 GRPC_TRACE_VLOG(tcp, 2) << "CFStream endpoint:" << ep_impl << " destroy DONE";
267 EP_UNREF(ep_impl, "destroy");
268 }
269
CFStreamGetPeer(grpc_endpoint * ep)270 absl::string_view CFStreamGetPeer(grpc_endpoint* ep) {
271 CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
272 return ep_impl->peer_string;
273 }
274
CFStreamGetLocalAddress(grpc_endpoint * ep)275 absl::string_view CFStreamGetLocalAddress(grpc_endpoint* ep) {
276 CFStreamEndpoint* ep_impl = reinterpret_cast<CFStreamEndpoint*>(ep);
277 return ep_impl->local_address;
278 }
279
CFStreamGetFD(grpc_endpoint *)280 int CFStreamGetFD(grpc_endpoint* /*ep*/) { return 0; }
281
CFStreamCanTrackErr(grpc_endpoint *)282 bool CFStreamCanTrackErr(grpc_endpoint* /*ep*/) { return false; }
283
CFStreamAddToPollset(grpc_endpoint *,grpc_pollset *)284 void CFStreamAddToPollset(grpc_endpoint* /*ep*/, grpc_pollset* /*pollset*/) {}
CFStreamAddToPollsetSet(grpc_endpoint *,grpc_pollset_set *)285 void CFStreamAddToPollsetSet(grpc_endpoint* /*ep*/,
286 grpc_pollset_set* /*pollset*/) {}
CFStreamDeleteFromPollsetSet(grpc_endpoint *,grpc_pollset_set *)287 void CFStreamDeleteFromPollsetSet(grpc_endpoint* /*ep*/,
288 grpc_pollset_set* /*pollset*/) {}
289
290 static const grpc_endpoint_vtable vtable = {CFStreamRead,
291 CFStreamWrite,
292 CFStreamAddToPollset,
293 CFStreamAddToPollsetSet,
294 CFStreamDeleteFromPollsetSet,
295 CFStreamDestroy,
296 CFStreamGetPeer,
297 CFStreamGetLocalAddress,
298 CFStreamGetFD,
299 CFStreamCanTrackErr};
300
grpc_cfstream_endpoint_create(CFReadStreamRef read_stream,CFWriteStreamRef write_stream,const char * peer_string,CFStreamHandle * stream_sync)301 grpc_endpoint* grpc_cfstream_endpoint_create(CFReadStreamRef read_stream,
302 CFWriteStreamRef write_stream,
303 const char* peer_string,
304 CFStreamHandle* stream_sync) {
305 CFStreamEndpoint* ep_impl = new CFStreamEndpoint;
306 GRPC_TRACE_VLOG(tcp, 2) << "CFStream endpoint:" << ep_impl
307 << " create readStream:" << read_stream
308 << " writeStream: " << write_stream;
309 ep_impl->base.vtable = &vtable;
310 gpr_ref_init(&ep_impl->refcount, 1);
311 ep_impl->read_stream = read_stream;
312 ep_impl->write_stream = write_stream;
313 CFRetain(read_stream);
314 CFRetain(write_stream);
315 ep_impl->stream_sync = stream_sync;
316 CFSTREAM_HANDLE_REF(ep_impl->stream_sync, "endpoint create");
317
318 ep_impl->peer_string = peer_string;
319 grpc_resolved_address resolved_local_addr;
320 resolved_local_addr.len = sizeof(resolved_local_addr.addr);
321 CFDataRef native_handle = static_cast<CFDataRef>(CFReadStreamCopyProperty(
322 ep_impl->read_stream, kCFStreamPropertySocketNativeHandle));
323 CFSocketNativeHandle sockfd;
324 CFDataGetBytes(native_handle, CFRangeMake(0, sizeof(CFSocketNativeHandle)),
325 (UInt8*)&sockfd);
326 if (native_handle) {
327 CFRelease(native_handle);
328 }
329 absl::StatusOr<std::string> addr_uri;
330 if (getsockname(sockfd, reinterpret_cast<sockaddr*>(resolved_local_addr.addr),
331 &resolved_local_addr.len) < 0 ||
332 !(addr_uri = grpc_sockaddr_to_uri(&resolved_local_addr)).ok()) {
333 ep_impl->local_address = "";
334 } else {
335 ep_impl->local_address = addr_uri.value();
336 }
337 ep_impl->read_cb = nil;
338 ep_impl->write_cb = nil;
339 ep_impl->read_slices = nil;
340 ep_impl->write_slices = nil;
341 GRPC_CLOSURE_INIT(&ep_impl->read_action, ReadAction,
342 static_cast<void*>(ep_impl), grpc_schedule_on_exec_ctx);
343 GRPC_CLOSURE_INIT(&ep_impl->write_action, WriteAction,
344 static_cast<void*>(ep_impl), grpc_schedule_on_exec_ctx);
345
346 return &ep_impl->base;
347 }
348
349 #endif // GRPC_CFSTREAM_ENDPOINT
350