• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1/*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19#import "GRXBufferedPipe.h"
20
21@interface GRXBufferedPipe ()
22@property(atomic) id<GRXWriteable> writeable;
23@end
24
25@implementation GRXBufferedPipe {
26  NSError *_errorOrNil;
27  dispatch_queue_t _writeQueue;
28}
29
30@synthesize state = _state;
31
32+ (instancetype)pipe {
33  return [[self alloc] init];
34}
35
36- (instancetype)init {
37  if (self = [super init]) {
38    _state = GRXWriterStateNotStarted;
39    _writeQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
40    dispatch_suspend(_writeQueue);
41  }
42  return self;
43}
44
45#pragma mark GRXWriteable implementation
46
47- (void)writeValue:(id)value {
48  if ([value respondsToSelector:@selector(copy)]) {
49    // Even if we're paused and with enqueued values, we can't excert back-pressure to our writer.
50    // So just buffer the new value.
51    // We need a copy, so that it doesn't mutate before it's written at the other end of the pipe.
52    value = [value copy];
53  }
54  dispatch_async(_writeQueue, ^(void) {
55    @synchronized(self) {
56      if (self->_state == GRXWriterStateFinished) {
57        return;
58      }
59      [self.writeable writeValue:value];
60    }
61  });
62}
63
64- (void)writesFinishedWithError:(NSError *)errorOrNil {
65  dispatch_async(_writeQueue, ^{
66    if (self->_state == GRXWriterStateFinished) {
67      return;
68    }
69    [self finishWithError:errorOrNil];
70  });
71}
72
73#pragma mark GRXWriter implementation
74
75- (void)setState:(GRXWriterState)newState {
76  @synchronized(self) {
77    // Manual transitions are only allowed from the started or paused states.
78    if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
79      return;
80    }
81
82    switch (newState) {
83      case GRXWriterStateFinished:
84        self.writeable = nil;
85        if (_state == GRXWriterStatePaused) {
86          dispatch_resume(_writeQueue);
87        }
88        _state = newState;
89        return;
90      case GRXWriterStatePaused:
91        if (_state == GRXWriterStateStarted) {
92          _state = newState;
93          dispatch_suspend(_writeQueue);
94        }
95        return;
96      case GRXWriterStateStarted:
97        if (_state == GRXWriterStatePaused) {
98          _state = newState;
99          dispatch_resume(_writeQueue);
100        }
101        return;
102      case GRXWriterStateNotStarted:
103        return;
104    }
105  }
106}
107
108- (void)startWithWriteable:(id<GRXWriteable>)writeable {
109  @synchronized(self) {
110    self.writeable = writeable;
111    _state = GRXWriterStateStarted;
112  }
113  dispatch_resume(_writeQueue);
114}
115
116- (void)finishWithError:(NSError *)errorOrNil {
117  [self.writeable writesFinishedWithError:errorOrNil];
118}
119
120- (void)dealloc {
121  GRXWriterState state = self.state;
122  if (state == GRXWriterStateNotStarted || state == GRXWriterStatePaused) {
123    dispatch_resume(_writeQueue);
124  }
125}
126
127@end
128