1/* 2 * 3 * Copyright 2019 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19#import "GRPCCallLegacy.h" 20 21#import "GRPCCall+OAuth2.h" 22#import "GRPCCallOptions.h" 23#import "GRPCTypes.h" 24 25#import "private/GRPCCore/GRPCChannelPool.h" 26#import "private/GRPCCore/GRPCCompletionQueue.h" 27#import "private/GRPCCore/GRPCHost.h" 28#import "private/GRPCCore/GRPCWrappedCall.h" 29#import "private/GRPCCore/NSData+GRPC.h" 30 31#import <RxLibrary/GRXBufferedPipe.h> 32#import <RxLibrary/GRXConcurrentWriteable.h> 33#import <RxLibrary/GRXImmediateSingleWriter.h> 34#import <RxLibrary/GRXWriter+Immediate.h> 35 36#include <grpc/grpc.h> 37 38const char *kCFStreamVarName = "grpc_cfstream"; 39static NSMutableDictionary *callFlags; 40 41// At most 6 ops can be in an op batch for a client: SEND_INITIAL_METADATA, 42// SEND_MESSAGE, SEND_CLOSE_FROM_CLIENT, RECV_INITIAL_METADATA, RECV_MESSAGE, 43// and RECV_STATUS_ON_CLIENT. 44NSInteger kMaxClientBatch = 6; 45 46static NSString *const kAuthorizationHeader = @"authorization"; 47static NSString *const kBearerPrefix = @"Bearer "; 48 49@interface GRPCCall () <GRXWriteable> 50// Make them read-write. 51@property(atomic, strong) NSDictionary *responseHeaders; 52@property(atomic, strong) NSDictionary *responseTrailers; 53 54- (void)receiveNextMessages:(NSUInteger)numberOfMessages; 55 56@end 57 58// The following methods of a C gRPC call object aren't reentrant, and thus 59// calls to them must be serialized: 60// - start_batch 61// - destroy 62// 63// start_batch with a SEND_MESSAGE argument can only be called after the 64// OP_COMPLETE event for any previous write is received. This is achieved by 65// pausing the requests writer immediately every time it writes a value, and 66// resuming it again when OP_COMPLETE is received. 67// 68// Similarly, start_batch with a RECV_MESSAGE argument can only be called after 69// the OP_COMPLETE event for any previous read is received.This is easier to 70// enforce, as we're writing the received messages into the writeable: 71// start_batch is enqueued once upon receiving the OP_COMPLETE event for the 72// RECV_METADATA batch, and then once after receiving each OP_COMPLETE event for 73// each RECV_MESSAGE batch. 74@implementation GRPCCall { 75 dispatch_queue_t _callQueue; 76 77 NSString *_host; 78 NSString *_path; 79 GRPCCallSafety _callSafety; 80 GRPCCallOptions *_callOptions; 81 GRPCWrappedCall *_wrappedCall; 82 83 // The C gRPC library has less guarantees on the ordering of events than we 84 // do. Particularly, in the face of errors, there's no ordering guarantee at 85 // all. This wrapper over our actual writeable ensures thread-safety and 86 // correct ordering. 87 GRXConcurrentWriteable *_responseWriteable; 88 89 // The network thread wants the requestWriter to resume (when the server is ready for more input), 90 // or to stop (on errors), concurrently with user threads that want to start it, pause it or stop 91 // it. Because a writer isn't thread-safe, we'll synchronize those operations on it. 92 // We don't use a dispatch queue for that purpose, because the writer can call writeValue: or 93 // writesFinishedWithError: on this GRPCCall as part of those operations. We want to be able to 94 // pause the writer immediately on writeValue:, so we need our locking to be recursive. 95 GRXWriter *_requestWriter; 96 97 // To create a retain cycle when a call is started, up until it finishes. See 98 // |startWithWriteable:| and |finishWithError:|. This saves users from having to retain a 99 // reference to the call object if all they're interested in is the handler being executed when 100 // the response arrives. 101 GRPCCall *_retainSelf; 102 103 GRPCRequestHeaders *_requestHeaders; 104 105 // In the case that the call is a unary call (i.e. the writer to GRPCCall is of type 106 // GRXImmediateSingleWriter), GRPCCall will delay sending ops (not send them to C core 107 // immediately) and buffer them into a batch _unaryOpBatch. The batch is sent to C core when 108 // the SendClose op is added. 109 BOOL _unaryCall; 110 NSMutableArray *_unaryOpBatch; 111 112 // The dispatch queue to be used for enqueuing responses to user. Defaulted to the main dispatch 113 // queue 114 dispatch_queue_t _responseQueue; 115 116 // The OAuth2 token fetched from a token provider. 117 NSString *_fetchedOauth2AccessToken; 118 119 // The callback to be called when a write message op is done. 120 void (^_writeDone)(void); 121 122 // Indicate a read request to core is pending. 123 BOOL _pendingCoreRead; 124 125 // Indicate pending read message request from user. 126 NSUInteger _pendingReceiveNextMessages; 127} 128 129@synthesize state = _state; 130 131+ (void)initialize { 132 // Guarantees the code in {} block is invoked only once. See ref at: 133 // https://developer.apple.com/documentation/objectivec/nsobject/1418639-initialize?language=objc 134 if (self == [GRPCCall self]) { 135 grpc_init(); 136 callFlags = [NSMutableDictionary dictionary]; 137 } 138} 139 140+ (void)setCallSafety:(GRPCCallSafety)callSafety host:(NSString *)host path:(NSString *)path { 141 if (host.length == 0 || path.length == 0) { 142 return; 143 } 144 NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path]; 145 @synchronized(callFlags) { 146 switch (callSafety) { 147 case GRPCCallSafetyDefault: 148 callFlags[hostAndPath] = @0; 149 break; 150 case GRPCCallSafetyIdempotentRequest: 151 callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST; 152 break; 153 case GRPCCallSafetyCacheableRequest: 154 callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_CACHEABLE_REQUEST; 155 break; 156 default: 157 break; 158 } 159 } 160} 161 162+ (uint32_t)callFlagsForHost:(NSString *)host path:(NSString *)path { 163 NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path]; 164 @synchronized(callFlags) { 165 return [callFlags[hostAndPath] intValue]; 166 } 167} 168 169- (instancetype)initWithHost:(NSString *)host 170 path:(NSString *)path 171 requestsWriter:(GRXWriter *)requestWriter { 172 return [self initWithHost:host 173 path:path 174 callSafety:GRPCCallSafetyDefault 175 requestsWriter:requestWriter 176 callOptions:nil 177 writeDone:nil]; 178} 179 180- (instancetype)initWithHost:(NSString *)host 181 path:(NSString *)path 182 callSafety:(GRPCCallSafety)safety 183 requestsWriter:(GRXWriter *)requestsWriter 184 callOptions:(GRPCCallOptions *)callOptions 185 writeDone:(void (^)(void))writeDone { 186 // Purposely using pointer rather than length (host.length == 0) for backwards compatibility. 187 NSAssert(host != nil && path != nil, @"Neither host nor path can be nil."); 188 NSAssert(safety <= GRPCCallSafetyCacheableRequest, @"Invalid call safety value."); 189 NSAssert(requestsWriter.state == GRXWriterStateNotStarted, 190 @"The requests writer can't be already started."); 191 if (!host || !path) { 192 return nil; 193 } 194 if (safety > GRPCCallSafetyCacheableRequest) { 195 return nil; 196 } 197 if (requestsWriter.state != GRXWriterStateNotStarted) { 198 return nil; 199 } 200 201 if ((self = [super init])) { 202 _host = [host copy]; 203 _path = [path copy]; 204 _callSafety = safety; 205 _callOptions = [callOptions copy]; 206 207 // Serial queue to invoke the non-reentrant methods of the grpc_call object. 208 _callQueue = dispatch_queue_create("io.grpc.call", DISPATCH_QUEUE_SERIAL); 209 210 _requestWriter = requestsWriter; 211 _requestHeaders = [[GRPCRequestHeaders alloc] initWithCall:self]; 212 _writeDone = writeDone; 213 214 if ([requestsWriter isKindOfClass:[GRXImmediateSingleWriter class]]) { 215 _unaryCall = YES; 216 _unaryOpBatch = [NSMutableArray arrayWithCapacity:kMaxClientBatch]; 217 } 218 219 _responseQueue = dispatch_get_main_queue(); 220 221 // do not start a read until initial metadata is received 222 _pendingReceiveNextMessages = 0; 223 _pendingCoreRead = YES; 224 } 225 return self; 226} 227 228- (void)setResponseDispatchQueue:(dispatch_queue_t)queue { 229 @synchronized(self) { 230 if (_state != GRXWriterStateNotStarted) { 231 return; 232 } 233 _responseQueue = queue; 234 } 235} 236 237#pragma mark Finish 238 239// This function should support being called within a @synchronized(self) block in another function 240// Should not manipulate _requestWriter for deadlock prevention. 241- (void)finishWithError:(NSError *)errorOrNil { 242 @synchronized(self) { 243 if (_state == GRXWriterStateFinished) { 244 return; 245 } 246 _state = GRXWriterStateFinished; 247 248 if (errorOrNil) { 249 [_responseWriteable cancelWithError:errorOrNil]; 250 } else { 251 [_responseWriteable enqueueSuccessfulCompletion]; 252 } 253 254 // If the call isn't retained anywhere else, it can be deallocated now. 255 _retainSelf = nil; 256 } 257} 258 259- (void)cancel { 260 @synchronized(self) { 261 if (_state == GRXWriterStateFinished) { 262 return; 263 } 264 [self finishWithError:[NSError 265 errorWithDomain:kGRPCErrorDomain 266 code:GRPCErrorCodeCancelled 267 userInfo:@{NSLocalizedDescriptionKey : @"Canceled by app"}]]; 268 [_wrappedCall cancel]; 269 } 270 _requestWriter.state = GRXWriterStateFinished; 271} 272 273- (void)dealloc { 274 __block GRPCWrappedCall *wrappedCall = _wrappedCall; 275 dispatch_async(_callQueue, ^{ 276 wrappedCall = nil; 277 }); 278} 279 280#pragma mark Read messages 281 282// Only called from the call queue. 283// The handler will be called from the network queue. 284- (void)startReadWithHandler:(void (^)(grpc_byte_buffer *))handler { 285 // TODO(jcanizales): Add error handlers for async failures 286 [_wrappedCall startBatchWithOperations:@[ [[GRPCOpRecvMessage alloc] initWithHandler:handler] ]]; 287} 288 289// Called initially from the network queue once response headers are received, 290// then "recursively" from the responseWriteable queue after each response from the 291// server has been written. 292// If the call is currently paused, this is a noop. Restarting the call will invoke this 293// method. 294// TODO(jcanizales): Rename to readResponseIfNotPaused. 295- (void)maybeStartNextRead { 296 @synchronized(self) { 297 if (_state != GRXWriterStateStarted) { 298 return; 299 } 300 if (_callOptions.flowControlEnabled && (_pendingCoreRead || _pendingReceiveNextMessages == 0)) { 301 return; 302 } 303 _pendingCoreRead = YES; 304 _pendingReceiveNextMessages--; 305 } 306 307 dispatch_async(_callQueue, ^{ 308 __weak GRPCCall *weakSelf = self; 309 [self startReadWithHandler:^(grpc_byte_buffer *message) { 310 if (message == NULL) { 311 // No more messages from the server 312 return; 313 } 314 __strong GRPCCall *strongSelf = weakSelf; 315 if (strongSelf == nil) { 316 grpc_byte_buffer_destroy(message); 317 return; 318 } 319 NSData *data = [NSData grpc_dataWithByteBuffer:message]; 320 grpc_byte_buffer_destroy(message); 321 if (!data) { 322 // The app doesn't have enough memory to hold the server response. We 323 // don't want to throw, because the app shouldn't crash for a behavior 324 // that's on the hands of any server to have. Instead we finish and ask 325 // the server to cancel. 326 @synchronized(strongSelf) { 327 strongSelf->_pendingCoreRead = NO; 328 [strongSelf 329 finishWithError:[NSError errorWithDomain:kGRPCErrorDomain 330 code:GRPCErrorCodeResourceExhausted 331 userInfo:@{ 332 NSLocalizedDescriptionKey : 333 @"Client does not have enough memory to " 334 @"hold the server response." 335 }]]; 336 [strongSelf->_wrappedCall cancel]; 337 } 338 strongSelf->_requestWriter.state = GRXWriterStateFinished; 339 } else { 340 @synchronized(strongSelf) { 341 [strongSelf->_responseWriteable enqueueValue:data 342 completionHandler:^{ 343 __strong GRPCCall *strongSelf = weakSelf; 344 if (strongSelf) { 345 @synchronized(strongSelf) { 346 strongSelf->_pendingCoreRead = NO; 347 [strongSelf maybeStartNextRead]; 348 } 349 } 350 }]; 351 } 352 } 353 }]; 354 }); 355} 356 357#pragma mark Send headers 358 359- (void)sendHeaders { 360 // TODO (mxyan): Remove after deprecated methods are removed 361 uint32_t callSafetyFlags = 0; 362 switch (_callSafety) { 363 case GRPCCallSafetyDefault: 364 callSafetyFlags = 0; 365 break; 366 case GRPCCallSafetyIdempotentRequest: 367 callSafetyFlags = GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST; 368 break; 369 case GRPCCallSafetyCacheableRequest: 370 callSafetyFlags = GRPC_INITIAL_METADATA_CACHEABLE_REQUEST; 371 break; 372 } 373 374 NSMutableDictionary *headers = [_requestHeaders mutableCopy]; 375 NSString *fetchedOauth2AccessToken; 376 @synchronized(self) { 377 fetchedOauth2AccessToken = _fetchedOauth2AccessToken; 378 } 379 if (fetchedOauth2AccessToken != nil) { 380 headers[@"authorization"] = [kBearerPrefix stringByAppendingString:fetchedOauth2AccessToken]; 381 } else if (_callOptions.oauth2AccessToken != nil) { 382 headers[@"authorization"] = 383 [kBearerPrefix stringByAppendingString:_callOptions.oauth2AccessToken]; 384 } 385 386 // TODO(jcanizales): Add error handlers for async failures 387 GRPCOpSendMetadata *op = [[GRPCOpSendMetadata alloc] 388 initWithMetadata:headers 389 flags:callSafetyFlags 390 handler:nil]; // No clean-up needed after SEND_INITIAL_METADATA 391 dispatch_async(_callQueue, ^{ 392 if (!self->_unaryCall) { 393 [self->_wrappedCall startBatchWithOperations:@[ op ]]; 394 } else { 395 [self->_unaryOpBatch addObject:op]; 396 } 397 }); 398} 399 400- (void)receiveNextMessages:(NSUInteger)numberOfMessages { 401 if (numberOfMessages == 0) { 402 return; 403 } 404 @synchronized(self) { 405 _pendingReceiveNextMessages += numberOfMessages; 406 407 if (_state != GRXWriterStateStarted || !_callOptions.flowControlEnabled) { 408 return; 409 } 410 [self maybeStartNextRead]; 411 } 412} 413 414#pragma mark GRXWriteable implementation 415 416// Only called from the call queue. The error handler will be called from the 417// network queue if the write didn't succeed. 418// If the call is a unary call, parameter \a errorHandler will be ignored and 419// the error handler of GRPCOpSendClose will be executed in case of error. 420- (void)writeMessage:(NSData *)message withErrorHandler:(void (^)(void))errorHandler { 421 __weak GRPCCall *weakSelf = self; 422 void (^resumingHandler)(void) = ^{ 423 // Resume the request writer. 424 GRPCCall *strongSelf = weakSelf; 425 if (strongSelf) { 426 strongSelf->_requestWriter.state = GRXWriterStateStarted; 427 if (strongSelf->_writeDone) { 428 strongSelf->_writeDone(); 429 } 430 } 431 }; 432 GRPCOpSendMessage *op = [[GRPCOpSendMessage alloc] initWithMessage:message 433 handler:resumingHandler]; 434 if (!_unaryCall) { 435 [_wrappedCall startBatchWithOperations:@[ op ] errorHandler:errorHandler]; 436 } else { 437 // Ignored errorHandler since it is the same as the one for GRPCOpSendClose. 438 // TODO (mxyan): unify the error handlers of all Ops into a single closure. 439 [_unaryOpBatch addObject:op]; 440 } 441} 442 443- (void)writeValue:(id)value { 444 NSAssert([value isKindOfClass:[NSData class]], @"value must be of type NSData"); 445 446 @synchronized(self) { 447 if (_state == GRXWriterStateFinished) { 448 return; 449 } 450 } 451 452 // Pause the input and only resume it when the C layer notifies us that writes 453 // can proceed. 454 _requestWriter.state = GRXWriterStatePaused; 455 456 dispatch_async(_callQueue, ^{ 457 // Write error is not processed here. It is handled by op batch of GRPC_OP_RECV_STATUS_ON_CLIENT 458 [self writeMessage:value withErrorHandler:nil]; 459 }); 460} 461 462// Only called from the call queue. The error handler will be called from the 463// network queue if the requests stream couldn't be closed successfully. 464- (void)finishRequestWithErrorHandler:(void (^)(void))errorHandler { 465 if (!_unaryCall) { 466 [_wrappedCall startBatchWithOperations:@[ [[GRPCOpSendClose alloc] init] ] 467 errorHandler:errorHandler]; 468 } else { 469 [_unaryOpBatch addObject:[[GRPCOpSendClose alloc] init]]; 470 [_wrappedCall startBatchWithOperations:_unaryOpBatch errorHandler:errorHandler]; 471 } 472} 473 474- (void)writesFinishedWithError:(NSError *)errorOrNil { 475 if (errorOrNil) { 476 [self cancel]; 477 } else { 478 dispatch_async(_callQueue, ^{ 479 // EOS error is not processed here. It is handled by op batch of GRPC_OP_RECV_STATUS_ON_CLIENT 480 [self finishRequestWithErrorHandler:nil]; 481 }); 482 } 483} 484 485#pragma mark Invoke 486 487// Both handlers will eventually be called, from the network queue. Writes can start immediately 488// after this. 489// The first one (headersHandler), when the response headers are received. 490// The second one (completionHandler), whenever the RPC finishes for any reason. 491- (void)invokeCallWithHeadersHandler:(void (^)(NSDictionary *))headersHandler 492 completionHandler:(void (^)(NSError *, NSDictionary *))completionHandler { 493 dispatch_async(_callQueue, ^{ 494 // TODO(jcanizales): Add error handlers for async failures 495 [self->_wrappedCall 496 startBatchWithOperations:@[ [[GRPCOpRecvMetadata alloc] initWithHandler:headersHandler] ]]; 497 [self->_wrappedCall 498 startBatchWithOperations:@[ [[GRPCOpRecvStatus alloc] initWithHandler:completionHandler] ]]; 499 }); 500} 501 502- (void)invokeCall { 503 __weak GRPCCall *weakSelf = self; 504 [self 505 invokeCallWithHeadersHandler:^(NSDictionary *headers) { 506 // Response headers received. 507 __strong GRPCCall *strongSelf = weakSelf; 508 if (strongSelf) { 509 @synchronized(strongSelf) { 510 // it is ok to set nil because headers are only received once 511 strongSelf.responseHeaders = nil; 512 // copy the header so that the GRPCOpRecvMetadata object may be dealloc'ed 513 NSDictionary *copiedHeaders = [[NSDictionary alloc] initWithDictionary:headers 514 copyItems:YES]; 515 strongSelf.responseHeaders = copiedHeaders; 516 strongSelf->_pendingCoreRead = NO; 517 [strongSelf maybeStartNextRead]; 518 } 519 } 520 } 521 completionHandler:^(NSError *error, NSDictionary *trailers) { 522 __strong GRPCCall *strongSelf = weakSelf; 523 if (strongSelf) { 524 strongSelf.responseTrailers = trailers; 525 526 if (error) { 527 NSMutableDictionary *userInfo = [NSMutableDictionary dictionary]; 528 if (error.userInfo) { 529 [userInfo addEntriesFromDictionary:error.userInfo]; 530 } 531 userInfo[kGRPCTrailersKey] = strongSelf.responseTrailers; 532 // Since gRPC core does not guarantee the headers block being called before this block, 533 // responseHeaders might be nil. 534 userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders; 535 error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo]; 536 } 537 [strongSelf finishWithError:error]; 538 strongSelf->_requestWriter.state = GRXWriterStateFinished; 539 } 540 }]; 541} 542 543#pragma mark GRXWriter implementation 544 545// Lock acquired inside startWithWriteable: 546- (void)startCallWithWriteable:(id<GRXWriteable>)writeable { 547 @synchronized(self) { 548 if (_state == GRXWriterStateFinished) { 549 return; 550 } 551 552 _responseWriteable = [[GRXConcurrentWriteable alloc] initWithWriteable:writeable 553 dispatchQueue:_responseQueue]; 554 555 GRPCPooledChannel *channel = [[GRPCChannelPool sharedInstance] channelWithHost:_host 556 callOptions:_callOptions]; 557 _wrappedCall = [channel wrappedCallWithPath:_path 558 completionQueue:[GRPCCompletionQueue completionQueue] 559 callOptions:_callOptions]; 560 561 if (_wrappedCall == nil) { 562 [self finishWithError:[NSError errorWithDomain:kGRPCErrorDomain 563 code:GRPCErrorCodeUnavailable 564 userInfo:@{ 565 NSLocalizedDescriptionKey : 566 @"Failed to create call or channel." 567 }]]; 568 return; 569 } 570 571 [self sendHeaders]; 572 [self invokeCall]; 573 } 574 575 // Now that the RPC has been initiated, request writes can start. 576 [_requestWriter startWithWriteable:self]; 577} 578 579- (void)startWithWriteable:(id<GRXWriteable>)writeable { 580 id<GRPCAuthorizationProtocol> tokenProvider = nil; 581 @synchronized(self) { 582 _state = GRXWriterStateStarted; 583 584 // Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled). 585 // This makes RPCs in which the call isn't externally retained possible (as long as it is 586 // started before being autoreleased). Care is taken not to retain self strongly in any of the 587 // blocks used in this implementation, so that the life of the instance is determined by this 588 // retain cycle. 589 _retainSelf = self; 590 591 // If _callOptions is nil, people must be using the deprecated v1 interface. In this case, 592 // generate the call options from the corresponding GRPCHost configs and apply other options 593 // that are not covered by GRPCHost. 594 if (_callOptions == nil) { 595 GRPCMutableCallOptions *callOptions = [[GRPCHost callOptionsForHost:_host] mutableCopy]; 596 if (_serverName.length != 0) { 597 callOptions.serverAuthority = _serverName; 598 } 599 if (_timeout > 0) { 600 callOptions.timeout = _timeout; 601 } 602 uint32_t callFlags = [GRPCCall callFlagsForHost:_host path:_path]; 603 if (callFlags != 0) { 604 if (callFlags == GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) { 605 _callSafety = GRPCCallSafetyIdempotentRequest; 606 } else if (callFlags == GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) { 607 _callSafety = GRPCCallSafetyCacheableRequest; 608 } 609 } 610 611 id<GRPCAuthorizationProtocol> tokenProvider = self.tokenProvider; 612 if (tokenProvider != nil) { 613 callOptions.authTokenProvider = tokenProvider; 614 } 615 _callOptions = callOptions; 616 } 617 618 NSAssert(_callOptions.authTokenProvider == nil || _callOptions.oauth2AccessToken == nil, 619 @"authTokenProvider and oauth2AccessToken cannot be set at the same time"); 620 621 tokenProvider = _callOptions.authTokenProvider; 622 } 623 624 if (tokenProvider != nil) { 625 __weak typeof(self) weakSelf = self; 626 [tokenProvider getTokenWithHandler:^(NSString *token) { 627 __strong typeof(self) strongSelf = weakSelf; 628 if (strongSelf) { 629 BOOL startCall = NO; 630 @synchronized(strongSelf) { 631 if (strongSelf->_state != GRXWriterStateFinished) { 632 startCall = YES; 633 if (token) { 634 strongSelf->_fetchedOauth2AccessToken = [token copy]; 635 } 636 } 637 } 638 if (startCall) { 639 [strongSelf startCallWithWriteable:writeable]; 640 } 641 } 642 }]; 643 } else { 644 [self startCallWithWriteable:writeable]; 645 } 646} 647 648- (void)setState:(GRXWriterState)newState { 649 @synchronized(self) { 650 // Manual transitions are only allowed from the started or paused states. 651 if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) { 652 return; 653 } 654 655 switch (newState) { 656 case GRXWriterStateFinished: 657 _state = newState; 658 // Per GRXWriter's contract, setting the state to Finished manually 659 // means one doesn't wish the writeable to be messaged anymore. 660 [_responseWriteable cancelSilently]; 661 _responseWriteable = nil; 662 return; 663 case GRXWriterStatePaused: 664 _state = newState; 665 return; 666 case GRXWriterStateStarted: 667 if (_state == GRXWriterStatePaused) { 668 _state = newState; 669 [self maybeStartNextRead]; 670 } 671 return; 672 case GRXWriterStateNotStarted: 673 return; 674 } 675 } 676} 677 678@end 679