1/* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19#import "GRXBufferedPipe.h" 20 21@interface GRXBufferedPipe () 22@property(atomic) id<GRXWriteable> writeable; 23@end 24 25@implementation GRXBufferedPipe { 26 NSError *_errorOrNil; 27 dispatch_queue_t _writeQueue; 28} 29 30@synthesize state = _state; 31 32+ (instancetype)pipe { 33 return [[self alloc] init]; 34} 35 36- (instancetype)init { 37 if (self = [super init]) { 38 _state = GRXWriterStateNotStarted; 39 _writeQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL); 40 dispatch_suspend(_writeQueue); 41 } 42 return self; 43} 44 45#pragma mark GRXWriteable implementation 46 47- (void)writeValue:(id)value { 48 if ([value respondsToSelector:@selector(copy)]) { 49 // Even if we're paused and with enqueued values, we can't excert back-pressure to our writer. 50 // So just buffer the new value. 51 // We need a copy, so that it doesn't mutate before it's written at the other end of the pipe. 52 value = [value copy]; 53 } 54 __weak GRXBufferedPipe *weakSelf = self; 55 dispatch_async(_writeQueue, ^(void) { 56 [weakSelf.writeable writeValue:value]; 57 }); 58} 59 60- (void)writesFinishedWithError:(NSError *)errorOrNil { 61 __weak GRXBufferedPipe *weakSelf = self; 62 dispatch_async(_writeQueue, ^{ 63 [weakSelf finishWithError:errorOrNil]; 64 }); 65} 66 67#pragma mark GRXWriter implementation 68 69- (void)setState:(GRXWriterState)newState { 70 @synchronized(self) { 71 // Manual transitions are only allowed from the started or paused states. 72 if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) { 73 return; 74 } 75 76 switch (newState) { 77 case GRXWriterStateFinished: 78 self.writeable = nil; 79 if (_state == GRXWriterStatePaused) { 80 dispatch_resume(_writeQueue); 81 } 82 _state = newState; 83 return; 84 case GRXWriterStatePaused: 85 if (_state == GRXWriterStateStarted) { 86 _state = newState; 87 dispatch_suspend(_writeQueue); 88 } 89 return; 90 case GRXWriterStateStarted: 91 if (_state == GRXWriterStatePaused) { 92 _state = newState; 93 dispatch_resume(_writeQueue); 94 } 95 return; 96 case GRXWriterStateNotStarted: 97 return; 98 } 99 } 100} 101 102- (void)startWithWriteable:(id<GRXWriteable>)writeable { 103 self.writeable = writeable; 104 _state = GRXWriterStateStarted; 105 dispatch_resume(_writeQueue); 106} 107 108- (void)finishWithError:(NSError *)errorOrNil { 109 [self.writeable writesFinishedWithError:errorOrNil]; 110 self.state = GRXWriterStateFinished; 111} 112 113- (void)dealloc { 114 GRXWriterState state = self.state; 115 if (state == GRXWriterStateNotStarted || state == GRXWriterStatePaused) { 116 dispatch_resume(_writeQueue); 117 } 118} 119 120@end 121