1/* 2 * 3 * Copyright 2019 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19#import "GRPCCallInternal.h" 20 21#import <GRPCClient/GRPCCall.h> 22#import <GRPCClient/GRPCInterceptor.h> 23#import <RxLibrary/GRXBufferedPipe.h> 24 25#import "../GRPCTransport+Private.h" 26#import "GRPCCall+V2API.h" 27 28@implementation GRPCCall2Internal { 29 /** Request for the call. */ 30 GRPCRequestOptions *_requestOptions; 31 /** Options for the call. */ 32 GRPCCallOptions *_callOptions; 33 /** The interceptor manager to process responses. */ 34 GRPCTransportManager *_transportManager; 35 36 /** 37 * Make use of legacy GRPCCall to make calls. Nullified when call is finished. 38 */ 39 GRPCCall *_call; 40 /** Flags whether initial metadata has been published to response handler. */ 41 BOOL _initialMetadataPublished; 42 /** Streaming call writeable to the underlying call. */ 43 GRXBufferedPipe *_pipe; 44 /** Serial dispatch queue for tasks inside the call. */ 45 dispatch_queue_t _dispatchQueue; 46 /** Flags whether call has started. */ 47 BOOL _started; 48 /** Flags whether call has been canceled. */ 49 BOOL _canceled; 50 /** Flags whether call has been finished. */ 51 BOOL _finished; 52 /** The number of pending messages receiving requests. */ 53 NSUInteger _pendingReceiveNextMessages; 54} 55 56- (instancetype)initWithTransportManager:(GRPCTransportManager *)transportManager { 57 dispatch_queue_t dispatchQueue; 58 // Set queue QoS only when iOS version is 8.0 or above and Xcode version is 9.0 or above 59#if __IPHONE_OS_VERSION_MAX_ALLOWED >= 110000 || __MAC_OS_X_VERSION_MAX_ALLOWED >= 101300 60 if (@available(iOS 8.0, macOS 10.10, *)) { 61 dispatchQueue = dispatch_queue_create( 62 NULL, dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, 0)); 63 } else { 64#else 65 { 66#endif 67 dispatchQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL); 68 } 69 if ((self = [super init])) { 70 _pipe = [GRXBufferedPipe pipe]; 71 _transportManager = transportManager; 72 _dispatchQueue = dispatchQueue; 73 } 74 return self; 75} 76 77- (dispatch_queue_t)dispatchQueue { 78 return _dispatchQueue; 79} 80 81- (void)startWithRequestOptions:(GRPCRequestOptions *)requestOptions 82 callOptions:(GRPCCallOptions *)callOptions { 83 NSAssert(requestOptions.host.length != 0 && requestOptions.path.length != 0, 84 @"Neither host nor path can be nil."); 85 NSAssert(requestOptions.safety <= GRPCCallSafetyCacheableRequest, @"Invalid call safety value."); 86 if (requestOptions.host.length == 0 || requestOptions.path.length == 0) { 87 NSLog(@"Invalid host and path."); 88 return; 89 } 90 if (requestOptions.safety > GRPCCallSafetyCacheableRequest) { 91 NSLog(@"Invalid call safety."); 92 return; 93 } 94 95 GRPCCall *copiedCall = nil; 96 @synchronized(self) { 97 _requestOptions = requestOptions; 98 if (callOptions == nil) { 99 _callOptions = [[GRPCCallOptions alloc] init]; 100 } else { 101 _callOptions = [callOptions copy]; 102 } 103 104 NSAssert(!_started, @"Call already started."); 105 NSAssert(!_canceled, @"Call already canceled."); 106 if (_started) { 107 return; 108 } 109 if (_canceled) { 110 return; 111 } 112 113 _started = YES; 114 115 _call = [[GRPCCall alloc] initWithHost:_requestOptions.host 116 path:_requestOptions.path 117 callSafety:_requestOptions.safety 118 requestsWriter:_pipe 119 callOptions:_callOptions 120 writeDone:^{ 121 @synchronized(self) { 122 if (self->_transportManager) { 123 [self issueDidWriteData]; 124 } 125 } 126 }]; 127 [_call setResponseDispatchQueue:_dispatchQueue]; 128 if (_callOptions.initialMetadata) { 129 [_call.requestHeaders addEntriesFromDictionary:_callOptions.initialMetadata]; 130 } 131 if (_pendingReceiveNextMessages > 0) { 132 [_call receiveNextMessages:_pendingReceiveNextMessages]; 133 _pendingReceiveNextMessages = 0; 134 } 135 copiedCall = _call; 136 } 137 138 void (^valueHandler)(id value) = ^(id value) { 139 @synchronized(self) { 140 if (self->_transportManager) { 141 if (!self->_initialMetadataPublished) { 142 self->_initialMetadataPublished = YES; 143 [self issueInitialMetadata:self->_call.responseHeaders]; 144 } 145 if (value) { 146 [self issueMessage:value]; 147 } 148 } 149 } 150 }; 151 void (^completionHandler)(NSError *errorOrNil) = ^(NSError *errorOrNil) { 152 @synchronized(self) { 153 if (self->_transportManager) { 154 if (!self->_initialMetadataPublished) { 155 self->_initialMetadataPublished = YES; 156 [self issueInitialMetadata:self->_call.responseHeaders]; 157 } 158 [self issueCloseWithTrailingMetadata:self->_call.responseTrailers error:errorOrNil]; 159 } 160 // Clearing _call must happen *after* dispatching close in order to get trailing 161 // metadata from _call. 162 if (self->_call) { 163 // Clean up the request writers. This should have no effect to _call since its 164 // response writeable is already nullified. 165 [self->_pipe writesFinishedWithError:nil]; 166 self->_call = nil; 167 self->_pipe = nil; 168 } 169 } 170 }; 171 id<GRXWriteable> responseWriteable = 172 [[GRXWriteable alloc] initWithValueHandler:valueHandler completionHandler:completionHandler]; 173 [copiedCall startWithWriteable:responseWriteable]; 174} 175 176- (void)cancel { 177 GRPCCall *copiedCall = nil; 178 @synchronized(self) { 179 if (_canceled) { 180 return; 181 } 182 183 _canceled = YES; 184 185 copiedCall = _call; 186 _call = nil; 187 _pipe = nil; 188 189 if (_transportManager != nil) { 190 [_transportManager 191 forwardPreviousInterceptorCloseWithTrailingMetadata:nil 192 error: 193 [NSError 194 errorWithDomain:kGRPCErrorDomain 195 code: 196 GRPCErrorCodeCancelled 197 userInfo:@{ 198 NSLocalizedDescriptionKey : 199 @"Canceled by app" 200 }]]; 201 [_transportManager shutDown]; 202 } 203 } 204 [copiedCall cancel]; 205} 206 207- (void)writeData:(id)data { 208 GRXBufferedPipe *copiedPipe = nil; 209 @synchronized(self) { 210 NSAssert(!_canceled, @"Call already canceled."); 211 NSAssert(!_finished, @"Call is half-closed before sending data."); 212 if (_canceled) { 213 return; 214 } 215 if (_finished) { 216 return; 217 } 218 219 if (_pipe) { 220 copiedPipe = _pipe; 221 } 222 } 223 [copiedPipe writeValue:data]; 224} 225 226- (void)finish { 227 GRXBufferedPipe *copiedPipe = nil; 228 @synchronized(self) { 229 NSAssert(_started, @"Call not started."); 230 NSAssert(!_canceled, @"Call already canceled."); 231 NSAssert(!_finished, @"Call already half-closed."); 232 if (!_started) { 233 return; 234 } 235 if (_canceled) { 236 return; 237 } 238 if (_finished) { 239 return; 240 } 241 242 if (_pipe) { 243 copiedPipe = _pipe; 244 _pipe = nil; 245 } 246 _finished = YES; 247 } 248 [copiedPipe writesFinishedWithError:nil]; 249} 250 251- (void)issueInitialMetadata:(NSDictionary *)initialMetadata { 252 if (initialMetadata != nil) { 253 // cannot directly call callback because this may not be running on manager's dispatch queue 254 GRPCTransportManager *copiedManager = _transportManager; 255 dispatch_async(copiedManager.dispatchQueue, ^{ 256 [copiedManager forwardPreviousInterceptorWithInitialMetadata:initialMetadata]; 257 }); 258 } 259} 260 261- (void)issueMessage:(id)message { 262 if (message != nil) { 263 // cannot directly call callback because this may not be running on manager's dispatch queue 264 GRPCTransportManager *copiedManager = _transportManager; 265 dispatch_async(copiedManager.dispatchQueue, ^{ 266 [copiedManager forwardPreviousInterceptorWithData:message]; 267 }); 268 } 269} 270 271- (void)issueCloseWithTrailingMetadata:(NSDictionary *)trailingMetadata error:(NSError *)error { 272 // cannot directly call callback because this may not be running on manager's dispatch queue 273 GRPCTransportManager *copiedManager = _transportManager; 274 dispatch_async(copiedManager.dispatchQueue, ^{ 275 [copiedManager forwardPreviousInterceptorCloseWithTrailingMetadata:trailingMetadata 276 error:error]; 277 [copiedManager shutDown]; 278 }); 279} 280 281- (void)issueDidWriteData { 282 // cannot directly call callback because this may not be running on manager's dispatch queue 283 GRPCTransportManager *copiedManager = _transportManager; 284 dispatch_async(copiedManager.dispatchQueue, ^{ 285 [copiedManager forwardPreviousInterceptorDidWriteData]; 286 }); 287} 288 289- (void)receiveNextMessages:(NSUInteger)numberOfMessages { 290 // branching based on _callOptions.flowControlEnabled is handled inside _call 291 GRPCCall *copiedCall = nil; 292 @synchronized(self) { 293 copiedCall = _call; 294 if (copiedCall == nil) { 295 _pendingReceiveNextMessages += numberOfMessages; 296 return; 297 } 298 } 299 [copiedCall receiveNextMessages:numberOfMessages]; 300} 301 302@end 303