• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/gprpp/memory.h"
22 #include "src/core/lib/iomgr/port.h"
23 
24 #ifdef GRPC_CFSTREAM
25 #import <CoreFoundation/CoreFoundation.h>
26 #import "src/core/lib/iomgr/cfstream_handle.h"
27 
28 #include <grpc/grpc.h>
29 #include <grpc/support/atm.h>
30 #include <grpc/support/sync.h>
31 
32 #include "src/core/lib/debug/trace.h"
33 #include "src/core/lib/iomgr/closure.h"
34 #include "src/core/lib/iomgr/error_cfstream.h"
35 #include "src/core/lib/iomgr/ev_apple.h"
36 #include "src/core/lib/iomgr/exec_ctx.h"
37 
38 extern grpc_core::TraceFlag grpc_tcp_trace;
39 
GrpcLibraryInitHolder()40 GrpcLibraryInitHolder::GrpcLibraryInitHolder() { grpc_init(); }
41 
~GrpcLibraryInitHolder()42 GrpcLibraryInitHolder::~GrpcLibraryInitHolder() { grpc_shutdown(); }
43 
Retain(void * info)44 void* CFStreamHandle::Retain(void* info) {
45   CFStreamHandle* handle = static_cast<CFStreamHandle*>(info);
46   CFSTREAM_HANDLE_REF(handle, "retain");
47   return info;
48 }
49 
Release(void * info)50 void CFStreamHandle::Release(void* info) {
51   CFStreamHandle* handle = static_cast<CFStreamHandle*>(info);
52   CFSTREAM_HANDLE_UNREF(handle, "release");
53 }
54 
CreateStreamHandle(CFReadStreamRef read_stream,CFWriteStreamRef write_stream)55 CFStreamHandle* CFStreamHandle::CreateStreamHandle(
56     CFReadStreamRef read_stream, CFWriteStreamRef write_stream) {
57   return new CFStreamHandle(read_stream, write_stream);
58 }
59 
ReadCallback(CFReadStreamRef stream,CFStreamEventType type,void * client_callback_info)60 void CFStreamHandle::ReadCallback(CFReadStreamRef stream,
61                                   CFStreamEventType type,
62                                   void* client_callback_info) {
63   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
64   grpc_core::ExecCtx exec_ctx;
65   grpc_error* error;
66   CFErrorRef stream_error;
67   CFStreamHandle* handle = static_cast<CFStreamHandle*>(client_callback_info);
68   if (grpc_tcp_trace.enabled()) {
69     gpr_log(GPR_DEBUG, "CFStream ReadCallback (%p, %p, %lu, %p)", handle,
70             stream, type, client_callback_info);
71   }
72   switch (type) {
73     case kCFStreamEventOpenCompleted:
74       handle->open_event_.SetReady();
75       break;
76     case kCFStreamEventHasBytesAvailable:
77     case kCFStreamEventEndEncountered:
78       handle->read_event_.SetReady();
79       break;
80     case kCFStreamEventErrorOccurred:
81       stream_error = CFReadStreamCopyError(stream);
82       error = grpc_error_set_int(
83           GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "read error"),
84           GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
85       CFRelease(stream_error);
86       handle->open_event_.SetShutdown(GRPC_ERROR_REF(error));
87       handle->write_event_.SetShutdown(GRPC_ERROR_REF(error));
88       handle->read_event_.SetShutdown(GRPC_ERROR_REF(error));
89       GRPC_ERROR_UNREF(error);
90       break;
91     default:
92       GPR_UNREACHABLE_CODE(return );
93   }
94 }
WriteCallback(CFWriteStreamRef stream,CFStreamEventType type,void * clientCallBackInfo)95 void CFStreamHandle::WriteCallback(CFWriteStreamRef stream,
96                                    CFStreamEventType type,
97                                    void* clientCallBackInfo) {
98   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
99   grpc_core::ExecCtx exec_ctx;
100   grpc_error* error;
101   CFErrorRef stream_error;
102   CFStreamHandle* handle = static_cast<CFStreamHandle*>(clientCallBackInfo);
103   if (grpc_tcp_trace.enabled()) {
104     gpr_log(GPR_DEBUG, "CFStream WriteCallback (%p, %p, %lu, %p)", handle,
105             stream, type, clientCallBackInfo);
106   }
107   switch (type) {
108     case kCFStreamEventOpenCompleted:
109       handle->open_event_.SetReady();
110       break;
111     case kCFStreamEventCanAcceptBytes:
112     case kCFStreamEventEndEncountered:
113       handle->write_event_.SetReady();
114       break;
115     case kCFStreamEventErrorOccurred:
116       stream_error = CFWriteStreamCopyError(stream);
117       error = grpc_error_set_int(
118           GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "write error"),
119           GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
120       CFRelease(stream_error);
121       handle->open_event_.SetShutdown(GRPC_ERROR_REF(error));
122       handle->write_event_.SetShutdown(GRPC_ERROR_REF(error));
123       handle->read_event_.SetShutdown(GRPC_ERROR_REF(error));
124       GRPC_ERROR_UNREF(error);
125       break;
126     default:
127       GPR_UNREACHABLE_CODE(return );
128   }
129 }
130 
CFStreamHandle(CFReadStreamRef read_stream,CFWriteStreamRef write_stream)131 CFStreamHandle::CFStreamHandle(CFReadStreamRef read_stream,
132                                CFWriteStreamRef write_stream) {
133   gpr_ref_init(&refcount_, 1);
134   open_event_.InitEvent();
135   read_event_.InitEvent();
136   write_event_.InitEvent();
137   dispatch_queue_ = dispatch_queue_create(nullptr, DISPATCH_QUEUE_SERIAL);
138   CFStreamClientContext ctx = {0, static_cast<void*>(this),
139                                CFStreamHandle::Retain, CFStreamHandle::Release,
140                                nil};
141   CFReadStreamSetClient(
142       read_stream,
143       kCFStreamEventOpenCompleted | kCFStreamEventHasBytesAvailable |
144           kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
145       CFStreamHandle::ReadCallback, &ctx);
146   CFWriteStreamSetClient(
147       write_stream,
148       kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes |
149           kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
150       CFStreamHandle::WriteCallback, &ctx);
151   grpc_apple_register_read_stream(read_stream, dispatch_queue_);
152   grpc_apple_register_write_stream(write_stream, dispatch_queue_);
153 }
154 
~CFStreamHandle()155 CFStreamHandle::~CFStreamHandle() {
156   open_event_.DestroyEvent();
157   read_event_.DestroyEvent();
158   write_event_.DestroyEvent();
159   dispatch_release(dispatch_queue_);
160 }
161 
NotifyOnOpen(grpc_closure * closure)162 void CFStreamHandle::NotifyOnOpen(grpc_closure* closure) {
163   open_event_.NotifyOn(closure);
164 }
165 
NotifyOnRead(grpc_closure * closure)166 void CFStreamHandle::NotifyOnRead(grpc_closure* closure) {
167   read_event_.NotifyOn(closure);
168 }
169 
NotifyOnWrite(grpc_closure * closure)170 void CFStreamHandle::NotifyOnWrite(grpc_closure* closure) {
171   write_event_.NotifyOn(closure);
172 }
173 
Shutdown(grpc_error * error)174 void CFStreamHandle::Shutdown(grpc_error* error) {
175   open_event_.SetShutdown(GRPC_ERROR_REF(error));
176   read_event_.SetShutdown(GRPC_ERROR_REF(error));
177   write_event_.SetShutdown(GRPC_ERROR_REF(error));
178   GRPC_ERROR_UNREF(error);
179 }
180 
Ref(const char * file,int line,const char * reason)181 void CFStreamHandle::Ref(const char* file, int line, const char* reason) {
182   if (grpc_tcp_trace.enabled()) {
183     gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count);
184     gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
185             "CFStream Handle ref %p : %s %" PRIdPTR " -> %" PRIdPTR, this,
186             reason, val, val + 1);
187   }
188   gpr_ref(&refcount_);
189 }
190 
Unref(const char * file,int line,const char * reason)191 void CFStreamHandle::Unref(const char* file, int line, const char* reason) {
192   if (grpc_tcp_trace.enabled()) {
193     gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count);
194     gpr_log(GPR_DEBUG,
195             "CFStream Handle unref %p : %s %" PRIdPTR " -> %" PRIdPTR, this,
196             reason, val, val - 1);
197   }
198   if (gpr_unref(&refcount_)) {
199     delete this;
200   }
201 }
202 
203 #else
204 
205 /* Creating a dummy function so that the grpc_cfstream library will be
206  * non-empty.
207  */
CFStreamDummy()208 void CFStreamDummy() {}
209 
210 #endif
211