1/* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19#import "GRXBufferedPipe.h" 20 21@interface GRXBufferedPipe () 22@property(atomic) id<GRXWriteable> writeable; 23@end 24 25@implementation GRXBufferedPipe { 26 NSError *_errorOrNil; 27 dispatch_queue_t _writeQueue; 28} 29 30@synthesize state = _state; 31 32+ (instancetype)pipe { 33 return [[self alloc] init]; 34} 35 36- (instancetype)init { 37 if (self = [super init]) { 38 _state = GRXWriterStateNotStarted; 39 _writeQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL); 40 dispatch_suspend(_writeQueue); 41 } 42 return self; 43} 44 45#pragma mark GRXWriteable implementation 46 47- (void)writeValue:(id)value { 48 if ([value respondsToSelector:@selector(copy)]) { 49 // Even if we're paused and with enqueued values, we can't excert back-pressure to our writer. 50 // So just buffer the new value. 51 // We need a copy, so that it doesn't mutate before it's written at the other end of the pipe. 52 value = [value copy]; 53 } 54 dispatch_async(_writeQueue, ^(void) { 55 @synchronized(self) { 56 if (self->_state == GRXWriterStateFinished) { 57 return; 58 } 59 [self.writeable writeValue:value]; 60 } 61 }); 62} 63 64- (void)writesFinishedWithError:(NSError *)errorOrNil { 65 dispatch_async(_writeQueue, ^{ 66 if (self->_state == GRXWriterStateFinished) { 67 return; 68 } 69 [self finishWithError:errorOrNil]; 70 }); 71} 72 73#pragma mark GRXWriter implementation 74 75- (void)setState:(GRXWriterState)newState { 76 @synchronized(self) { 77 // Manual transitions are only allowed from the started or paused states. 78 if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) { 79 return; 80 } 81 82 switch (newState) { 83 case GRXWriterStateFinished: 84 self.writeable = nil; 85 if (_state == GRXWriterStatePaused) { 86 dispatch_resume(_writeQueue); 87 } 88 _state = newState; 89 return; 90 case GRXWriterStatePaused: 91 if (_state == GRXWriterStateStarted) { 92 _state = newState; 93 dispatch_suspend(_writeQueue); 94 } 95 return; 96 case GRXWriterStateStarted: 97 if (_state == GRXWriterStatePaused) { 98 _state = newState; 99 dispatch_resume(_writeQueue); 100 } 101 return; 102 case GRXWriterStateNotStarted: 103 return; 104 } 105 } 106} 107 108- (void)startWithWriteable:(id<GRXWriteable>)writeable { 109 @synchronized(self) { 110 self.writeable = writeable; 111 _state = GRXWriterStateStarted; 112 } 113 dispatch_resume(_writeQueue); 114} 115 116- (void)finishWithError:(NSError *)errorOrNil { 117 [self.writeable writesFinishedWithError:errorOrNil]; 118} 119 120- (void)dealloc { 121 GRXWriterState state = self.state; 122 if (state == GRXWriterStateNotStarted || state == GRXWriterStatePaused) { 123 dispatch_resume(_writeQueue); 124 } 125} 126 127@end 128