• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1/*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19#import "GRXBufferedPipe.h"
20
21@interface GRXBufferedPipe ()
22@property(atomic) id<GRXWriteable> writeable;
23@end
24
25@implementation GRXBufferedPipe {
26  NSError *_errorOrNil;
27  dispatch_queue_t _writeQueue;
28}
29
30@synthesize state = _state;
31
32+ (instancetype)pipe {
33  return [[self alloc] init];
34}
35
36- (instancetype)init {
37  if (self = [super init]) {
38    _state = GRXWriterStateNotStarted;
39    _writeQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
40    dispatch_suspend(_writeQueue);
41  }
42  return self;
43}
44
45#pragma mark GRXWriteable implementation
46
47- (void)writeValue:(id)value {
48  if ([value respondsToSelector:@selector(copy)]) {
49    // Even if we're paused and with enqueued values, we can't excert back-pressure to our writer.
50    // So just buffer the new value.
51    // We need a copy, so that it doesn't mutate before it's written at the other end of the pipe.
52    value = [value copy];
53  }
54  __weak GRXBufferedPipe *weakSelf = self;
55  dispatch_async(_writeQueue, ^(void) {
56    [weakSelf.writeable writeValue:value];
57  });
58}
59
60- (void)writesFinishedWithError:(NSError *)errorOrNil {
61  __weak GRXBufferedPipe *weakSelf = self;
62  dispatch_async(_writeQueue, ^{
63    [weakSelf finishWithError:errorOrNil];
64  });
65}
66
67#pragma mark GRXWriter implementation
68
69- (void)setState:(GRXWriterState)newState {
70  @synchronized(self) {
71    // Manual transitions are only allowed from the started or paused states.
72    if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
73      return;
74    }
75
76    switch (newState) {
77      case GRXWriterStateFinished:
78        self.writeable = nil;
79        if (_state == GRXWriterStatePaused) {
80          dispatch_resume(_writeQueue);
81        }
82        _state = newState;
83        return;
84      case GRXWriterStatePaused:
85        if (_state == GRXWriterStateStarted) {
86          _state = newState;
87          dispatch_suspend(_writeQueue);
88        }
89        return;
90      case GRXWriterStateStarted:
91        if (_state == GRXWriterStatePaused) {
92          _state = newState;
93          dispatch_resume(_writeQueue);
94        }
95        return;
96      case GRXWriterStateNotStarted:
97        return;
98    }
99  }
100}
101
102- (void)startWithWriteable:(id<GRXWriteable>)writeable {
103  self.writeable = writeable;
104  _state = GRXWriterStateStarted;
105  dispatch_resume(_writeQueue);
106}
107
108- (void)finishWithError:(NSError *)errorOrNil {
109  [self.writeable writesFinishedWithError:errorOrNil];
110  self.state = GRXWriterStateFinished;
111}
112
113- (void)dealloc {
114  GRXWriterState state = self.state;
115  if (state == GRXWriterStateNotStarted || state == GRXWriterStatePaused) {
116    dispatch_resume(_writeQueue);
117  }
118}
119
120@end
121