• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/port.h"
22 #include "src/core/util/memory.h"
23 
24 #ifdef GRPC_CFSTREAM
25 #import <CoreFoundation/CoreFoundation.h>
26 #include <grpc/grpc.h>
27 #include <grpc/support/atm.h>
28 #include <grpc/support/sync.h>
29 
30 #include "absl/log/log.h"
31 #include "src/core/lib/debug/trace.h"
32 #import "src/core/lib/iomgr/cfstream_handle.h"
33 #include "src/core/lib/iomgr/closure.h"
34 #include "src/core/lib/iomgr/error_cfstream.h"
35 #include "src/core/lib/iomgr/ev_apple.h"
36 #include "src/core/lib/iomgr/exec_ctx.h"
37 
GrpcLibraryInitHolder()38 GrpcLibraryInitHolder::GrpcLibraryInitHolder() { grpc_init(); }
39 
~GrpcLibraryInitHolder()40 GrpcLibraryInitHolder::~GrpcLibraryInitHolder() { grpc_shutdown(); }
41 
Retain(void * info)42 void* CFStreamHandle::Retain(void* info) {
43   CFStreamHandle* handle = static_cast<CFStreamHandle*>(info);
44   CFSTREAM_HANDLE_REF(handle, "retain");
45   return info;
46 }
47 
Release(void * info)48 void CFStreamHandle::Release(void* info) {
49   CFStreamHandle* handle = static_cast<CFStreamHandle*>(info);
50   CFSTREAM_HANDLE_UNREF(handle, "release");
51 }
52 
CreateStreamHandle(CFReadStreamRef read_stream,CFWriteStreamRef write_stream)53 CFStreamHandle* CFStreamHandle::CreateStreamHandle(
54     CFReadStreamRef read_stream, CFWriteStreamRef write_stream) {
55   return new CFStreamHandle(read_stream, write_stream);
56 }
57 
ReadCallback(CFReadStreamRef stream,CFStreamEventType type,void * client_callback_info)58 void CFStreamHandle::ReadCallback(CFReadStreamRef stream,
59                                   CFStreamEventType type,
60                                   void* client_callback_info) {
61   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
62   grpc_core::ExecCtx exec_ctx;
63   grpc_error_handle error;
64   CFErrorRef stream_error;
65   CFStreamHandle* handle = static_cast<CFStreamHandle*>(client_callback_info);
66   GRPC_TRACE_VLOG(tcp, 2) << "CFStream ReadCallback (" << handle << ", "
67                           << stream << ", " << type << ", "
68                           << client_callback_info << ")";
69   switch (type) {
70     case kCFStreamEventOpenCompleted:
71       handle->open_event_.SetReady();
72       break;
73     case kCFStreamEventHasBytesAvailable:
74     case kCFStreamEventEndEncountered:
75       handle->read_event_.SetReady();
76       break;
77     case kCFStreamEventErrorOccurred:
78       stream_error = CFReadStreamCopyError(stream);
79       error = grpc_error_set_int(
80           GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "read error"),
81           grpc_core::StatusIntProperty::kRpcStatus, GRPC_STATUS_UNAVAILABLE);
82       CFRelease(stream_error);
83       handle->open_event_.SetShutdown(error);
84       handle->write_event_.SetShutdown(error);
85       handle->read_event_.SetShutdown(error);
86       break;
87     default:
88       GPR_UNREACHABLE_CODE(return);
89   }
90 }
WriteCallback(CFWriteStreamRef stream,CFStreamEventType type,void * clientCallBackInfo)91 void CFStreamHandle::WriteCallback(CFWriteStreamRef stream,
92                                    CFStreamEventType type,
93                                    void* clientCallBackInfo) {
94   grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
95   grpc_core::ExecCtx exec_ctx;
96   grpc_error_handle error;
97   CFErrorRef stream_error;
98   CFStreamHandle* handle = static_cast<CFStreamHandle*>(clientCallBackInfo);
99   GRPC_TRACE_VLOG(tcp, 2) << "CFStream WriteCallback (" << handle << ", "
100                           << stream << ", " << type << ", "
101                           << clientCallBackInfo << ")";
102   switch (type) {
103     case kCFStreamEventOpenCompleted:
104       handle->open_event_.SetReady();
105       break;
106     case kCFStreamEventCanAcceptBytes:
107     case kCFStreamEventEndEncountered:
108       handle->write_event_.SetReady();
109       break;
110     case kCFStreamEventErrorOccurred:
111       stream_error = CFWriteStreamCopyError(stream);
112       error = grpc_error_set_int(
113           GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "write error"),
114           grpc_core::StatusIntProperty::kRpcStatus, GRPC_STATUS_UNAVAILABLE);
115       CFRelease(stream_error);
116       handle->open_event_.SetShutdown(error);
117       handle->write_event_.SetShutdown(error);
118       handle->read_event_.SetShutdown(error);
119       break;
120     default:
121       GPR_UNREACHABLE_CODE(return);
122   }
123 }
124 
CFStreamHandle(CFReadStreamRef read_stream,CFWriteStreamRef write_stream)125 CFStreamHandle::CFStreamHandle(CFReadStreamRef read_stream,
126                                CFWriteStreamRef write_stream) {
127   gpr_ref_init(&refcount_, 1);
128   open_event_.InitEvent();
129   read_event_.InitEvent();
130   write_event_.InitEvent();
131   dispatch_queue_ = dispatch_queue_create(nullptr, DISPATCH_QUEUE_SERIAL);
132   CFStreamClientContext ctx = {0, static_cast<void*>(this),
133                                CFStreamHandle::Retain, CFStreamHandle::Release,
134                                nil};
135   CFReadStreamSetClient(
136       read_stream,
137       kCFStreamEventOpenCompleted | kCFStreamEventHasBytesAvailable |
138           kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
139       CFStreamHandle::ReadCallback, &ctx);
140   CFWriteStreamSetClient(
141       write_stream,
142       kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes |
143           kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
144       CFStreamHandle::WriteCallback, &ctx);
145   grpc_apple_register_read_stream(read_stream, dispatch_queue_);
146   grpc_apple_register_write_stream(write_stream, dispatch_queue_);
147 }
148 
~CFStreamHandle()149 CFStreamHandle::~CFStreamHandle() {
150   open_event_.DestroyEvent();
151   read_event_.DestroyEvent();
152   write_event_.DestroyEvent();
153   dispatch_release(dispatch_queue_);
154 }
155 
NotifyOnOpen(grpc_closure * closure)156 void CFStreamHandle::NotifyOnOpen(grpc_closure* closure) {
157   open_event_.NotifyOn(closure);
158 }
159 
NotifyOnRead(grpc_closure * closure)160 void CFStreamHandle::NotifyOnRead(grpc_closure* closure) {
161   read_event_.NotifyOn(closure);
162 }
163 
NotifyOnWrite(grpc_closure * closure)164 void CFStreamHandle::NotifyOnWrite(grpc_closure* closure) {
165   write_event_.NotifyOn(closure);
166 }
167 
Shutdown(grpc_error_handle error)168 void CFStreamHandle::Shutdown(grpc_error_handle error) {
169   open_event_.SetShutdown(error);
170   read_event_.SetShutdown(error);
171   write_event_.SetShutdown(error);
172 }
173 
Ref(const char * file,int line,const char * reason)174 void CFStreamHandle::Ref(const char* file, int line, const char* reason) {
175   if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
176     gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count);
177     VLOG(2).AtLocation(file, line) << "CFStream Handle ref " << this << " : "
178                                    << reason << " " << val << " -> " << val + 1;
179   }
180   gpr_ref(&refcount_);
181 }
182 
Unref(const char * file,int line,const char * reason)183 void CFStreamHandle::Unref(const char* file, int line, const char* reason) {
184   if (GRPC_TRACE_FLAG_ENABLED(tcp)) {
185     gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count);
186     VLOG(2).AtLocation(file, line) << "CFStream Handle unref " << this << " : "
187                                    << reason << " " << val << " -> " << val - 1;
188   }
189   if (gpr_unref(&refcount_)) {
190     delete this;
191   }
192 }
193 
194 #else
195 
196 // Creating a phony function so that the grpc_cfstream library will be
197 // non-empty.
198 //
CFStreamPhony()199 void CFStreamPhony() {}
200 
201 #endif
202