1/* 2 * 3 * Copyright 2019 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19#import "GRPCCallInternal.h" 20 21#import <GRPCClient/GRPCCall.h> 22#import <GRPCClient/GRPCInterceptor.h> 23#import <RxLibrary/GRXBufferedPipe.h> 24 25#import "../GRPCTransport+Private.h" 26#import "GRPCCall+V2API.h" 27 28@implementation GRPCCall2Internal { 29 /** Request for the call. */ 30 GRPCRequestOptions *_requestOptions; 31 /** Options for the call. */ 32 GRPCCallOptions *_callOptions; 33 /** The interceptor manager to process responses. */ 34 GRPCTransportManager *_transportManager; 35 36 /** 37 * Make use of legacy GRPCCall to make calls. Nullified when call is finished. 38 */ 39 GRPCCall *_call; 40 /** Flags whether initial metadata has been published to response handler. */ 41 BOOL _initialMetadataPublished; 42 /** Streaming call writeable to the underlying call. */ 43 GRXBufferedPipe *_pipe; 44 /** Serial dispatch queue for tasks inside the call. */ 45 dispatch_queue_t _dispatchQueue; 46 /** Flags whether call has started. */ 47 BOOL _started; 48 /** Flags whether call has been canceled. */ 49 BOOL _canceled; 50 /** Flags whether call has been finished. */ 51 BOOL _finished; 52 /** The number of pending messages receiving requests. */ 53 NSUInteger _pendingReceiveNextMessages; 54} 55 56- (instancetype)initWithTransportManager:(GRPCTransportManager *)transportManager { 57 dispatch_queue_t dispatchQueue; 58 // Set queue QoS only when iOS version is 8.0 or above and Xcode version is 9.0 or above 59#if __IPHONE_OS_VERSION_MAX_ALLOWED >= 110000 || __MAC_OS_X_VERSION_MAX_ALLOWED >= 101300 60 if (@available(iOS 8.0, macOS 10.10, *)) { 61 dispatchQueue = dispatch_queue_create( 62 NULL, dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, 0)); 63 } else { 64#else 65 { 66#endif 67 dispatchQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL); 68 } 69 if ((self = [super init])) { 70 _pipe = [GRXBufferedPipe pipe]; 71 _transportManager = transportManager; 72 _dispatchQueue = dispatchQueue; 73 } 74 return self; 75} 76 77- (dispatch_queue_t)dispatchQueue { 78 return _dispatchQueue; 79} 80 81- (void)startWithRequestOptions:(GRPCRequestOptions *)requestOptions 82 callOptions:(GRPCCallOptions *)callOptions { 83 NSAssert(requestOptions.host.length != 0 && requestOptions.path.length != 0, 84 @"Neither host nor path can be nil."); 85 NSAssert(requestOptions.safety <= GRPCCallSafetyDefault, @"Invalid call safety value."); 86 if (requestOptions.host.length == 0 || requestOptions.path.length == 0) { 87 NSLog(@"Invalid host and path."); 88 return; 89 } 90 91 GRPCCall *copiedCall = nil; 92 @synchronized(self) { 93 _requestOptions = requestOptions; 94 if (callOptions == nil) { 95 _callOptions = [[GRPCCallOptions alloc] init]; 96 } else { 97 _callOptions = [callOptions copy]; 98 } 99 100 NSAssert(!_started, @"Call already started."); 101 NSAssert(!_canceled, @"Call already canceled."); 102 if (_started) { 103 return; 104 } 105 if (_canceled) { 106 return; 107 } 108 109 _started = YES; 110 111 _call = [[GRPCCall alloc] initWithHost:_requestOptions.host 112 path:_requestOptions.path 113 callSafety:_requestOptions.safety 114 requestsWriter:_pipe 115 callOptions:_callOptions 116 writeDone:^{ 117 @synchronized(self) { 118 if (self->_transportManager) { 119 [self issueDidWriteData]; 120 } 121 } 122 }]; 123 [_call setResponseDispatchQueue:_dispatchQueue]; 124 if (_callOptions.initialMetadata) { 125 [_call.requestHeaders addEntriesFromDictionary:_callOptions.initialMetadata]; 126 } 127 if (_pendingReceiveNextMessages > 0) { 128 [_call receiveNextMessages:_pendingReceiveNextMessages]; 129 _pendingReceiveNextMessages = 0; 130 } 131 copiedCall = _call; 132 } 133 134 void (^valueHandler)(id value) = ^(id value) { 135 @synchronized(self) { 136 if (self->_transportManager) { 137 if (!self->_initialMetadataPublished) { 138 self->_initialMetadataPublished = YES; 139 [self issueInitialMetadata:self->_call.responseHeaders]; 140 } 141 if (value) { 142 [self issueMessage:value]; 143 } 144 } 145 } 146 }; 147 void (^completionHandler)(NSError *errorOrNil) = ^(NSError *errorOrNil) { 148 @synchronized(self) { 149 if (self->_transportManager) { 150 if (!self->_initialMetadataPublished) { 151 self->_initialMetadataPublished = YES; 152 [self issueInitialMetadata:self->_call.responseHeaders]; 153 } 154 [self issueCloseWithTrailingMetadata:self->_call.responseTrailers error:errorOrNil]; 155 } 156 // Clearing _call must happen *after* dispatching close in order to get trailing 157 // metadata from _call. 158 if (self->_call) { 159 // Clean up the request writers. This should have no effect to _call since its 160 // response writeable is already nullified. 161 [self->_pipe writesFinishedWithError:nil]; 162 self->_call = nil; 163 self->_pipe = nil; 164 } 165 } 166 }; 167 id<GRXWriteable> responseWriteable = 168 [[GRXWriteable alloc] initWithValueHandler:valueHandler completionHandler:completionHandler]; 169 [copiedCall startWithWriteable:responseWriteable]; 170} 171 172- (void)cancel { 173 GRPCCall *copiedCall = nil; 174 @synchronized(self) { 175 if (_canceled) { 176 return; 177 } 178 179 _canceled = YES; 180 181 copiedCall = _call; 182 _call = nil; 183 _pipe = nil; 184 185 if (_transportManager != nil) { 186 [_transportManager 187 forwardPreviousInterceptorCloseWithTrailingMetadata:nil 188 error: 189 [NSError 190 errorWithDomain:kGRPCErrorDomain 191 code: 192 GRPCErrorCodeCancelled 193 userInfo:@{ 194 NSLocalizedDescriptionKey : 195 @"Canceled by app" 196 }]]; 197 [_transportManager shutDown]; 198 } 199 } 200 [copiedCall cancel]; 201} 202 203- (void)writeData:(id)data { 204 GRXBufferedPipe *copiedPipe = nil; 205 @synchronized(self) { 206 NSAssert(!_canceled, @"Call already canceled."); 207 NSAssert(!_finished, @"Call is half-closed before sending data."); 208 if (_canceled) { 209 return; 210 } 211 if (_finished) { 212 return; 213 } 214 215 if (_pipe) { 216 copiedPipe = _pipe; 217 } 218 } 219 [copiedPipe writeValue:data]; 220} 221 222- (void)finish { 223 GRXBufferedPipe *copiedPipe = nil; 224 @synchronized(self) { 225 NSAssert(_started, @"Call not started."); 226 NSAssert(!_canceled, @"Call already canceled."); 227 NSAssert(!_finished, @"Call already half-closed."); 228 if (!_started) { 229 return; 230 } 231 if (_canceled) { 232 return; 233 } 234 if (_finished) { 235 return; 236 } 237 238 if (_pipe) { 239 copiedPipe = _pipe; 240 _pipe = nil; 241 } 242 _finished = YES; 243 } 244 [copiedPipe writesFinishedWithError:nil]; 245} 246 247- (void)issueInitialMetadata:(NSDictionary *)initialMetadata { 248 if (initialMetadata != nil) { 249 // cannot directly call callback because this may not be running on manager's dispatch queue 250 GRPCTransportManager *copiedManager = _transportManager; 251 dispatch_async(copiedManager.dispatchQueue, ^{ 252 [copiedManager forwardPreviousInterceptorWithInitialMetadata:initialMetadata]; 253 }); 254 } 255} 256 257- (void)issueMessage:(id)message { 258 if (message != nil) { 259 // cannot directly call callback because this may not be running on manager's dispatch queue 260 GRPCTransportManager *copiedManager = _transportManager; 261 dispatch_async(copiedManager.dispatchQueue, ^{ 262 [copiedManager forwardPreviousInterceptorWithData:message]; 263 }); 264 } 265} 266 267- (void)issueCloseWithTrailingMetadata:(NSDictionary *)trailingMetadata error:(NSError *)error { 268 // cannot directly call callback because this may not be running on manager's dispatch queue 269 GRPCTransportManager *copiedManager = _transportManager; 270 dispatch_async(copiedManager.dispatchQueue, ^{ 271 [copiedManager forwardPreviousInterceptorCloseWithTrailingMetadata:trailingMetadata 272 error:error]; 273 [copiedManager shutDown]; 274 }); 275} 276 277- (void)issueDidWriteData { 278 // cannot directly call callback because this may not be running on manager's dispatch queue 279 GRPCTransportManager *copiedManager = _transportManager; 280 dispatch_async(copiedManager.dispatchQueue, ^{ 281 [copiedManager forwardPreviousInterceptorDidWriteData]; 282 }); 283} 284 285- (void)receiveNextMessages:(NSUInteger)numberOfMessages { 286 // branching based on _callOptions.flowControlEnabled is handled inside _call 287 GRPCCall *copiedCall = nil; 288 @synchronized(self) { 289 copiedCall = _call; 290 if (copiedCall == nil) { 291 _pendingReceiveNextMessages += numberOfMessages; 292 return; 293 } 294 } 295 [copiedCall receiveNextMessages:numberOfMessages]; 296} 297 298@end 299