1 /*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 #ifdef GRPC_CFSTREAM
24 #import <CoreFoundation/CoreFoundation.h>
25 #import "src/core/lib/iomgr/cfstream_handle.h"
26
27 #include <grpc/support/atm.h>
28 #include <grpc/support/sync.h>
29
30 #include "src/core/lib/debug/trace.h"
31 #include "src/core/lib/iomgr/closure.h"
32 #include "src/core/lib/iomgr/exec_ctx.h"
33
34 extern grpc_core::TraceFlag grpc_tcp_trace;
35
Retain(void * info)36 void* CFStreamHandle::Retain(void* info) {
37 CFStreamHandle* handle = static_cast<CFStreamHandle*>(info);
38 CFSTREAM_HANDLE_REF(handle, "retain");
39 return info;
40 }
41
Release(void * info)42 void CFStreamHandle::Release(void* info) {
43 CFStreamHandle* handle = static_cast<CFStreamHandle*>(info);
44 CFSTREAM_HANDLE_UNREF(handle, "release");
45 }
46
CreateStreamHandle(CFReadStreamRef read_stream,CFWriteStreamRef write_stream)47 CFStreamHandle* CFStreamHandle::CreateStreamHandle(
48 CFReadStreamRef read_stream, CFWriteStreamRef write_stream) {
49 return new CFStreamHandle(read_stream, write_stream);
50 }
51
ReadCallback(CFReadStreamRef stream,CFStreamEventType type,void * client_callback_info)52 void CFStreamHandle::ReadCallback(CFReadStreamRef stream,
53 CFStreamEventType type,
54 void* client_callback_info) {
55 CFStreamHandle* handle = static_cast<CFStreamHandle*>(client_callback_info);
56 CFSTREAM_HANDLE_REF(handle, "read callback");
57 dispatch_async(
58 dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
59 grpc_core::ExecCtx exec_ctx;
60 if (grpc_tcp_trace.enabled()) {
61 gpr_log(GPR_DEBUG, "CFStream ReadCallback (%p, %p, %lu, %p)", handle,
62 stream, type, client_callback_info);
63 }
64 switch (type) {
65 case kCFStreamEventOpenCompleted:
66 handle->open_event_.SetReady();
67 break;
68 case kCFStreamEventHasBytesAvailable:
69 case kCFStreamEventEndEncountered:
70 handle->read_event_.SetReady();
71 break;
72 case kCFStreamEventErrorOccurred:
73 handle->open_event_.SetReady();
74 handle->read_event_.SetReady();
75 break;
76 default:
77 GPR_UNREACHABLE_CODE(return );
78 }
79 CFSTREAM_HANDLE_UNREF(handle, "read callback");
80 });
81 }
WriteCallback(CFWriteStreamRef stream,CFStreamEventType type,void * clientCallBackInfo)82 void CFStreamHandle::WriteCallback(CFWriteStreamRef stream,
83 CFStreamEventType type,
84 void* clientCallBackInfo) {
85 CFStreamHandle* handle = static_cast<CFStreamHandle*>(clientCallBackInfo);
86 CFSTREAM_HANDLE_REF(handle, "write callback");
87 dispatch_async(
88 dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), ^{
89 grpc_core::ExecCtx exec_ctx;
90 if (grpc_tcp_trace.enabled()) {
91 gpr_log(GPR_DEBUG, "CFStream WriteCallback (%p, %p, %lu, %p)", handle,
92 stream, type, clientCallBackInfo);
93 }
94 switch (type) {
95 case kCFStreamEventOpenCompleted:
96 handle->open_event_.SetReady();
97 break;
98 case kCFStreamEventCanAcceptBytes:
99 case kCFStreamEventEndEncountered:
100 handle->write_event_.SetReady();
101 break;
102 case kCFStreamEventErrorOccurred:
103 handle->open_event_.SetReady();
104 handle->write_event_.SetReady();
105 break;
106 default:
107 GPR_UNREACHABLE_CODE(return );
108 }
109 CFSTREAM_HANDLE_UNREF(handle, "write callback");
110 });
111 }
112
CFStreamHandle(CFReadStreamRef read_stream,CFWriteStreamRef write_stream)113 CFStreamHandle::CFStreamHandle(CFReadStreamRef read_stream,
114 CFWriteStreamRef write_stream) {
115 gpr_ref_init(&refcount_, 1);
116 open_event_.InitEvent();
117 read_event_.InitEvent();
118 write_event_.InitEvent();
119 CFStreamClientContext ctx = {0, static_cast<void*>(this),
120 CFStreamHandle::Retain, CFStreamHandle::Release,
121 nil};
122 CFReadStreamSetClient(
123 read_stream,
124 kCFStreamEventOpenCompleted | kCFStreamEventHasBytesAvailable |
125 kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
126 CFStreamHandle::ReadCallback, &ctx);
127 CFWriteStreamSetClient(
128 write_stream,
129 kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes |
130 kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
131 CFStreamHandle::WriteCallback, &ctx);
132 CFReadStreamScheduleWithRunLoop(read_stream, CFRunLoopGetMain(),
133 kCFRunLoopCommonModes);
134 CFWriteStreamScheduleWithRunLoop(write_stream, CFRunLoopGetMain(),
135 kCFRunLoopCommonModes);
136 }
137
~CFStreamHandle()138 CFStreamHandle::~CFStreamHandle() {
139 open_event_.DestroyEvent();
140 read_event_.DestroyEvent();
141 write_event_.DestroyEvent();
142 }
143
NotifyOnOpen(grpc_closure * closure)144 void CFStreamHandle::NotifyOnOpen(grpc_closure* closure) {
145 open_event_.NotifyOn(closure);
146 }
147
NotifyOnRead(grpc_closure * closure)148 void CFStreamHandle::NotifyOnRead(grpc_closure* closure) {
149 read_event_.NotifyOn(closure);
150 }
151
NotifyOnWrite(grpc_closure * closure)152 void CFStreamHandle::NotifyOnWrite(grpc_closure* closure) {
153 write_event_.NotifyOn(closure);
154 }
155
Shutdown(grpc_error * error)156 void CFStreamHandle::Shutdown(grpc_error* error) {
157 open_event_.SetShutdown(GRPC_ERROR_REF(error));
158 read_event_.SetShutdown(GRPC_ERROR_REF(error));
159 write_event_.SetShutdown(GRPC_ERROR_REF(error));
160 GRPC_ERROR_UNREF(error);
161 }
162
Ref(const char * file,int line,const char * reason)163 void CFStreamHandle::Ref(const char* file, int line, const char* reason) {
164 if (grpc_tcp_trace.enabled()) {
165 gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count);
166 gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
167 "CFStream Handle ref %p : %s %" PRIdPTR " -> %" PRIdPTR, this,
168 reason, val, val + 1);
169 }
170 gpr_ref(&refcount_);
171 }
172
Unref(const char * file,int line,const char * reason)173 void CFStreamHandle::Unref(const char* file, int line, const char* reason) {
174 if (grpc_tcp_trace.enabled()) {
175 gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count);
176 gpr_log(GPR_ERROR,
177 "CFStream Handle unref %p : %s %" PRIdPTR " -> %" PRIdPTR, this,
178 reason, val, val - 1);
179 }
180 if (gpr_unref(&refcount_)) {
181 delete this;
182 }
183 }
184
185 #endif
186