1/* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19#import "GRPCCall.h" 20 21#import "GRPCCall+OAuth2.h" 22 23#import <RxLibrary/GRXConcurrentWriteable.h> 24#import <RxLibrary/GRXImmediateSingleWriter.h> 25#include <grpc/grpc.h> 26#include <grpc/support/time.h> 27 28#import "private/GRPCConnectivityMonitor.h" 29#import "private/GRPCHost.h" 30#import "private/GRPCRequestHeaders.h" 31#import "private/GRPCWrappedCall.h" 32#import "private/NSData+GRPC.h" 33#import "private/NSDictionary+GRPC.h" 34#import "private/NSError+GRPC.h" 35 36// At most 6 ops can be in an op batch for a client: SEND_INITIAL_METADATA, 37// SEND_MESSAGE, SEND_CLOSE_FROM_CLIENT, RECV_INITIAL_METADATA, RECV_MESSAGE, 38// and RECV_STATUS_ON_CLIENT. 39NSInteger kMaxClientBatch = 6; 40 41NSString *const kGRPCHeadersKey = @"io.grpc.HeadersKey"; 42NSString *const kGRPCTrailersKey = @"io.grpc.TrailersKey"; 43static NSMutableDictionary *callFlags; 44 45static NSString *const kAuthorizationHeader = @"authorization"; 46static NSString *const kBearerPrefix = @"Bearer "; 47 48const char *kCFStreamVarName = "grpc_cfstream"; 49 50@interface GRPCCall ()<GRXWriteable> 51// Make them read-write. 52@property(atomic, strong) NSDictionary *responseHeaders; 53@property(atomic, strong) NSDictionary *responseTrailers; 54@property(atomic) BOOL isWaitingForToken; 55@end 56 57// The following methods of a C gRPC call object aren't reentrant, and thus 58// calls to them must be serialized: 59// - start_batch 60// - destroy 61// 62// start_batch with a SEND_MESSAGE argument can only be called after the 63// OP_COMPLETE event for any previous write is received. This is achieved by 64// pausing the requests writer immediately every time it writes a value, and 65// resuming it again when OP_COMPLETE is received. 66// 67// Similarly, start_batch with a RECV_MESSAGE argument can only be called after 68// the OP_COMPLETE event for any previous read is received.This is easier to 69// enforce, as we're writing the received messages into the writeable: 70// start_batch is enqueued once upon receiving the OP_COMPLETE event for the 71// RECV_METADATA batch, and then once after receiving each OP_COMPLETE event for 72// each RECV_MESSAGE batch. 73@implementation GRPCCall { 74 dispatch_queue_t _callQueue; 75 76 NSString *_host; 77 NSString *_path; 78 GRPCWrappedCall *_wrappedCall; 79 GRPCConnectivityMonitor *_connectivityMonitor; 80 81 // The C gRPC library has less guarantees on the ordering of events than we 82 // do. Particularly, in the face of errors, there's no ordering guarantee at 83 // all. This wrapper over our actual writeable ensures thread-safety and 84 // correct ordering. 85 GRXConcurrentWriteable *_responseWriteable; 86 87 // The network thread wants the requestWriter to resume (when the server is ready for more input), 88 // or to stop (on errors), concurrently with user threads that want to start it, pause it or stop 89 // it. Because a writer isn't thread-safe, we'll synchronize those operations on it. 90 // We don't use a dispatch queue for that purpose, because the writer can call writeValue: or 91 // writesFinishedWithError: on this GRPCCall as part of those operations. We want to be able to 92 // pause the writer immediately on writeValue:, so we need our locking to be recursive. 93 GRXWriter *_requestWriter; 94 95 // To create a retain cycle when a call is started, up until it finishes. See 96 // |startWithWriteable:| and |finishWithError:|. This saves users from having to retain a 97 // reference to the call object if all they're interested in is the handler being executed when 98 // the response arrives. 99 GRPCCall *_retainSelf; 100 101 GRPCRequestHeaders *_requestHeaders; 102 103 // In the case that the call is a unary call (i.e. the writer to GRPCCall is of type 104 // GRXImmediateSingleWriter), GRPCCall will delay sending ops (not send them to C core 105 // immediately) and buffer them into a batch _unaryOpBatch. The batch is sent to C core when 106 // the SendClose op is added. 107 BOOL _unaryCall; 108 NSMutableArray *_unaryOpBatch; 109 110 // The dispatch queue to be used for enqueuing responses to user. Defaulted to the main dispatch 111 // queue 112 dispatch_queue_t _responseQueue; 113 114 // Whether the call is finished. If it is, should not call finishWithError again. 115 BOOL _finished; 116} 117 118@synthesize state = _state; 119 120+ (void)initialize { 121 // Guarantees the code in {} block is invoked only once. See ref at: 122 // https://developer.apple.com/documentation/objectivec/nsobject/1418639-initialize?language=objc 123 if (self == [GRPCCall self]) { 124 grpc_init(); 125 callFlags = [NSMutableDictionary dictionary]; 126 } 127} 128 129+ (void)setCallSafety:(GRPCCallSafety)callSafety host:(NSString *)host path:(NSString *)path { 130 NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path]; 131 switch (callSafety) { 132 case GRPCCallSafetyDefault: 133 callFlags[hostAndPath] = @0; 134 break; 135 case GRPCCallSafetyIdempotentRequest: 136 callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST; 137 break; 138 case GRPCCallSafetyCacheableRequest: 139 callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_CACHEABLE_REQUEST; 140 break; 141 default: 142 break; 143 } 144} 145 146+ (uint32_t)callFlagsForHost:(NSString *)host path:(NSString *)path { 147 NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path]; 148 return [callFlags[hostAndPath] intValue]; 149} 150 151- (instancetype)init { 152 return [self initWithHost:nil path:nil requestsWriter:nil]; 153} 154 155// Designated initializer 156- (instancetype)initWithHost:(NSString *)host 157 path:(NSString *)path 158 requestsWriter:(GRXWriter *)requestWriter { 159 if (!host || !path) { 160 [NSException raise:NSInvalidArgumentException format:@"Neither host nor path can be nil."]; 161 } 162 if (requestWriter.state != GRXWriterStateNotStarted) { 163 [NSException raise:NSInvalidArgumentException 164 format:@"The requests writer can't be already started."]; 165 } 166 if ((self = [super init])) { 167 _host = [host copy]; 168 _path = [path copy]; 169 170 // Serial queue to invoke the non-reentrant methods of the grpc_call object. 171 _callQueue = dispatch_queue_create("io.grpc.call", NULL); 172 173 _requestWriter = requestWriter; 174 175 _requestHeaders = [[GRPCRequestHeaders alloc] initWithCall:self]; 176 177 if ([requestWriter isKindOfClass:[GRXImmediateSingleWriter class]]) { 178 _unaryCall = YES; 179 _unaryOpBatch = [NSMutableArray arrayWithCapacity:kMaxClientBatch]; 180 } 181 182 _responseQueue = dispatch_get_main_queue(); 183 } 184 return self; 185} 186 187- (void)setResponseDispatchQueue:(dispatch_queue_t)queue { 188 if (_state != GRXWriterStateNotStarted) { 189 return; 190 } 191 _responseQueue = queue; 192} 193 194#pragma mark Finish 195 196- (void)finishWithError:(NSError *)errorOrNil { 197 @synchronized(self) { 198 _state = GRXWriterStateFinished; 199 } 200 201 // If there were still request messages coming, stop them. 202 @synchronized(_requestWriter) { 203 _requestWriter.state = GRXWriterStateFinished; 204 } 205 206 if (errorOrNil) { 207 [_responseWriteable cancelWithError:errorOrNil]; 208 } else { 209 [_responseWriteable enqueueSuccessfulCompletion]; 210 } 211 212 // Connectivity monitor is not required for CFStream 213 char *enableCFStream = getenv(kCFStreamVarName); 214 if (enableCFStream == nil || enableCFStream[0] != '1') { 215 [GRPCConnectivityMonitor unregisterObserver:self]; 216 } 217 218 // If the call isn't retained anywhere else, it can be deallocated now. 219 _retainSelf = nil; 220} 221 222- (void)cancelCall { 223 // Can be called from any thread, any number of times. 224 [_wrappedCall cancel]; 225} 226 227- (void)cancel { 228 if (!self.isWaitingForToken) { 229 [self cancelCall]; 230 } else { 231 self.isWaitingForToken = NO; 232 } 233 [self 234 maybeFinishWithError:[NSError 235 errorWithDomain:kGRPCErrorDomain 236 code:GRPCErrorCodeCancelled 237 userInfo:@{NSLocalizedDescriptionKey : @"Canceled by app"}]]; 238} 239 240- (void)maybeFinishWithError:(NSError *)errorOrNil { 241 BOOL toFinish = NO; 242 @synchronized(self) { 243 if (_finished == NO) { 244 _finished = YES; 245 toFinish = YES; 246 } 247 } 248 if (toFinish == YES) { 249 [self finishWithError:errorOrNil]; 250 } 251} 252 253- (void)dealloc { 254 __block GRPCWrappedCall *wrappedCall = _wrappedCall; 255 dispatch_async(_callQueue, ^{ 256 wrappedCall = nil; 257 }); 258} 259 260#pragma mark Read messages 261 262// Only called from the call queue. 263// The handler will be called from the network queue. 264- (void)startReadWithHandler:(void (^)(grpc_byte_buffer *))handler { 265 // TODO(jcanizales): Add error handlers for async failures 266 [_wrappedCall startBatchWithOperations:@[ [[GRPCOpRecvMessage alloc] initWithHandler:handler] ]]; 267} 268 269// Called initially from the network queue once response headers are received, 270// then "recursively" from the responseWriteable queue after each response from the 271// server has been written. 272// If the call is currently paused, this is a noop. Restarting the call will invoke this 273// method. 274// TODO(jcanizales): Rename to readResponseIfNotPaused. 275- (void)startNextRead { 276 @synchronized(self) { 277 if (self.state == GRXWriterStatePaused) { 278 return; 279 } 280 } 281 282 dispatch_async(_callQueue, ^{ 283 __weak GRPCCall *weakSelf = self; 284 __weak GRXConcurrentWriteable *weakWriteable = self->_responseWriteable; 285 [self startReadWithHandler:^(grpc_byte_buffer *message) { 286 __strong GRPCCall *strongSelf = weakSelf; 287 __strong GRXConcurrentWriteable *strongWriteable = weakWriteable; 288 if (message == NULL) { 289 // No more messages from the server 290 return; 291 } 292 NSData *data = [NSData grpc_dataWithByteBuffer:message]; 293 grpc_byte_buffer_destroy(message); 294 if (!data) { 295 // The app doesn't have enough memory to hold the server response. We 296 // don't want to throw, because the app shouldn't crash for a behavior 297 // that's on the hands of any server to have. Instead we finish and ask 298 // the server to cancel. 299 [strongSelf cancelCall]; 300 [strongSelf 301 maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain 302 code:GRPCErrorCodeResourceExhausted 303 userInfo:@{ 304 NSLocalizedDescriptionKey : 305 @"Client does not have enough memory to " 306 @"hold the server response." 307 }]]; 308 return; 309 } 310 [strongWriteable enqueueValue:data 311 completionHandler:^{ 312 [strongSelf startNextRead]; 313 }]; 314 }]; 315 }); 316} 317 318#pragma mark Send headers 319 320- (void)sendHeaders:(NSDictionary *)headers { 321 // TODO(jcanizales): Add error handlers for async failures 322 GRPCOpSendMetadata *op = [[GRPCOpSendMetadata alloc] 323 initWithMetadata:headers 324 flags:[GRPCCall callFlagsForHost:_host path:_path] 325 handler:nil]; // No clean-up needed after SEND_INITIAL_METADATA 326 if (!_unaryCall) { 327 [_wrappedCall startBatchWithOperations:@[ op ]]; 328 } else { 329 [_unaryOpBatch addObject:op]; 330 } 331} 332 333#pragma mark GRXWriteable implementation 334 335// Only called from the call queue. The error handler will be called from the 336// network queue if the write didn't succeed. 337// If the call is a unary call, parameter \a errorHandler will be ignored and 338// the error handler of GRPCOpSendClose will be executed in case of error. 339- (void)writeMessage:(NSData *)message withErrorHandler:(void (^)(void))errorHandler { 340 __weak GRPCCall *weakSelf = self; 341 void (^resumingHandler)(void) = ^{ 342 // Resume the request writer. 343 GRPCCall *strongSelf = weakSelf; 344 if (strongSelf) { 345 @synchronized(strongSelf->_requestWriter) { 346 strongSelf->_requestWriter.state = GRXWriterStateStarted; 347 } 348 } 349 }; 350 351 GRPCOpSendMessage *op = 352 [[GRPCOpSendMessage alloc] initWithMessage:message handler:resumingHandler]; 353 if (!_unaryCall) { 354 [_wrappedCall startBatchWithOperations:@[ op ] errorHandler:errorHandler]; 355 } else { 356 // Ignored errorHandler since it is the same as the one for GRPCOpSendClose. 357 // TODO (mxyan): unify the error handlers of all Ops into a single closure. 358 [_unaryOpBatch addObject:op]; 359 } 360} 361 362- (void)writeValue:(id)value { 363 // TODO(jcanizales): Throw/assert if value isn't NSData. 364 365 // Pause the input and only resume it when the C layer notifies us that writes 366 // can proceed. 367 @synchronized(_requestWriter) { 368 _requestWriter.state = GRXWriterStatePaused; 369 } 370 371 dispatch_async(_callQueue, ^{ 372 // Write error is not processed here. It is handled by op batch of GRPC_OP_RECV_STATUS_ON_CLIENT 373 [self writeMessage:value withErrorHandler:nil]; 374 }); 375} 376 377// Only called from the call queue. The error handler will be called from the 378// network queue if the requests stream couldn't be closed successfully. 379- (void)finishRequestWithErrorHandler:(void (^)(void))errorHandler { 380 if (!_unaryCall) { 381 [_wrappedCall startBatchWithOperations:@[ [[GRPCOpSendClose alloc] init] ] 382 errorHandler:errorHandler]; 383 } else { 384 [_unaryOpBatch addObject:[[GRPCOpSendClose alloc] init]]; 385 [_wrappedCall startBatchWithOperations:_unaryOpBatch errorHandler:errorHandler]; 386 } 387} 388 389- (void)writesFinishedWithError:(NSError *)errorOrNil { 390 if (errorOrNil) { 391 [self cancel]; 392 } else { 393 dispatch_async(_callQueue, ^{ 394 // EOS error is not processed here. It is handled by op batch of GRPC_OP_RECV_STATUS_ON_CLIENT 395 [self finishRequestWithErrorHandler:nil]; 396 }); 397 } 398} 399 400#pragma mark Invoke 401 402// Both handlers will eventually be called, from the network queue. Writes can start immediately 403// after this. 404// The first one (headersHandler), when the response headers are received. 405// The second one (completionHandler), whenever the RPC finishes for any reason. 406- (void)invokeCallWithHeadersHandler:(void (^)(NSDictionary *))headersHandler 407 completionHandler:(void (^)(NSError *, NSDictionary *))completionHandler { 408 // TODO(jcanizales): Add error handlers for async failures 409 [_wrappedCall 410 startBatchWithOperations:@[ [[GRPCOpRecvMetadata alloc] initWithHandler:headersHandler] ]]; 411 [_wrappedCall 412 startBatchWithOperations:@[ [[GRPCOpRecvStatus alloc] initWithHandler:completionHandler] ]]; 413} 414 415- (void)invokeCall { 416 __weak GRPCCall *weakSelf = self; 417 [self invokeCallWithHeadersHandler:^(NSDictionary *headers) { 418 // Response headers received. 419 __strong GRPCCall *strongSelf = weakSelf; 420 if (strongSelf) { 421 strongSelf.responseHeaders = headers; 422 [strongSelf startNextRead]; 423 } 424 } 425 completionHandler:^(NSError *error, NSDictionary *trailers) { 426 __strong GRPCCall *strongSelf = weakSelf; 427 if (strongSelf) { 428 strongSelf.responseTrailers = trailers; 429 430 if (error) { 431 NSMutableDictionary *userInfo = [NSMutableDictionary dictionary]; 432 if (error.userInfo) { 433 [userInfo addEntriesFromDictionary:error.userInfo]; 434 } 435 userInfo[kGRPCTrailersKey] = strongSelf.responseTrailers; 436 // TODO(jcanizales): The C gRPC library doesn't guarantee that the headers block will be 437 // called before this one, so an error might end up with trailers but no headers. We 438 // shouldn't call finishWithError until ater both blocks are called. It is also when 439 // this is done that we can provide a merged view of response headers and trailers in a 440 // thread-safe way. 441 if (strongSelf.responseHeaders) { 442 userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders; 443 } 444 error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo]; 445 } 446 [strongSelf maybeFinishWithError:error]; 447 } 448 }]; 449 // Now that the RPC has been initiated, request writes can start. 450 @synchronized(_requestWriter) { 451 [_requestWriter startWithWriteable:self]; 452 } 453} 454 455#pragma mark GRXWriter implementation 456 457- (void)startCallWithWriteable:(id<GRXWriteable>)writeable { 458 _responseWriteable = 459 [[GRXConcurrentWriteable alloc] initWithWriteable:writeable dispatchQueue:_responseQueue]; 460 461 _wrappedCall = [[GRPCWrappedCall alloc] initWithHost:_host 462 serverName:_serverName 463 path:_path 464 timeout:_timeout]; 465 NSAssert(_wrappedCall, @"Error allocating RPC objects. Low memory?"); 466 467 [self sendHeaders:_requestHeaders]; 468 [self invokeCall]; 469 470 // Connectivity monitor is not required for CFStream 471 char *enableCFStream = getenv(kCFStreamVarName); 472 if (enableCFStream == nil || enableCFStream[0] != '1') { 473 [GRPCConnectivityMonitor registerObserver:self selector:@selector(connectivityChanged:)]; 474 } 475} 476 477- (void)startWithWriteable:(id<GRXWriteable>)writeable { 478 @synchronized(self) { 479 _state = GRXWriterStateStarted; 480 } 481 482 // Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled). 483 // This makes RPCs in which the call isn't externally retained possible (as long as it is started 484 // before being autoreleased). 485 // Care is taken not to retain self strongly in any of the blocks used in this implementation, so 486 // that the life of the instance is determined by this retain cycle. 487 _retainSelf = self; 488 489 if (self.tokenProvider != nil) { 490 self.isWaitingForToken = YES; 491 __weak typeof(self) weakSelf = self; 492 [self.tokenProvider getTokenWithHandler:^(NSString *token) { 493 typeof(self) strongSelf = weakSelf; 494 if (strongSelf && strongSelf.isWaitingForToken) { 495 if (token) { 496 NSString *t = [kBearerPrefix stringByAppendingString:token]; 497 strongSelf.requestHeaders[kAuthorizationHeader] = t; 498 } 499 [strongSelf startCallWithWriteable:writeable]; 500 strongSelf.isWaitingForToken = NO; 501 } 502 }]; 503 } else { 504 [self startCallWithWriteable:writeable]; 505 } 506} 507 508- (void)setState:(GRXWriterState)newState { 509 @synchronized(self) { 510 // Manual transitions are only allowed from the started or paused states. 511 if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) { 512 return; 513 } 514 515 switch (newState) { 516 case GRXWriterStateFinished: 517 _state = newState; 518 // Per GRXWriter's contract, setting the state to Finished manually 519 // means one doesn't wish the writeable to be messaged anymore. 520 [_responseWriteable cancelSilently]; 521 _responseWriteable = nil; 522 return; 523 case GRXWriterStatePaused: 524 _state = newState; 525 return; 526 case GRXWriterStateStarted: 527 if (_state == GRXWriterStatePaused) { 528 _state = newState; 529 [self startNextRead]; 530 } 531 return; 532 case GRXWriterStateNotStarted: 533 return; 534 } 535 } 536} 537 538- (void)connectivityChanged:(NSNotification *)note { 539 // Cancel underlying call upon this notification 540 __strong GRPCCall *strongSelf = self; 541 if (strongSelf) { 542 [self cancelCall]; 543 [self 544 maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain 545 code:GRPCErrorCodeUnavailable 546 userInfo:@{ 547 NSLocalizedDescriptionKey : @"Connectivity lost." 548 }]]; 549 } 550} 551 552@end 553