• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1/*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19#import "GRXConcurrentWriteable.h"
20
21#import <RxLibrary/GRXWriteable.h>
22
23@interface GRXConcurrentWriteable ()
24// This is atomic so that cancellation can nillify it from any thread.
25@property(atomic, strong) id<GRXWriteable> writeable;
26@end
27
28@implementation GRXConcurrentWriteable {
29  dispatch_queue_t _writeableQueue;
30
31  // This ivar ensures that writesFinishedWithError: is only sent once to the writeable. Protected
32  // by _writeableQueue.
33  BOOL _alreadyFinished;
34
35  // This ivar ensures that a cancelWithError: call prevents further values to be sent to
36  // self.writeable. It must support manipulation outside of _writeableQueue and thus needs to be
37  // protected by self lock.
38  BOOL _cancelled;
39}
40
41- (instancetype)init {
42  return [self initWithWriteable:nil];
43}
44
45// Designated initializer
46- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable
47                    dispatchQueue:(dispatch_queue_t)queue {
48  if (self = [super init]) {
49    _writeableQueue = queue;
50    _writeable = writeable;
51    _alreadyFinished = NO;
52    _cancelled = NO;
53  }
54  return self;
55}
56
57- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable {
58  return [self initWithWriteable:writeable dispatchQueue:dispatch_get_main_queue()];
59}
60
61- (void)enqueueValue:(id)value completionHandler:(void (^)(void))handler {
62  dispatch_async(_writeableQueue, ^{
63    if (self->_alreadyFinished) {
64      return;
65    }
66
67    @synchronized(self) {
68      if (self->_cancelled) {
69        return;
70      }
71    }
72
73    [self.writeable writeValue:value];
74    handler();
75  });
76}
77
78- (void)enqueueSuccessfulCompletion {
79  dispatch_async(_writeableQueue, ^{
80    if (self->_alreadyFinished) {
81      return;
82    }
83    @synchronized(self) {
84      if (self->_cancelled) {
85        return;
86      }
87    }
88    [self.writeable writesFinishedWithError:nil];
89
90    // Skip any possible message to the wrapped writeable enqueued after this one.
91    self->_alreadyFinished = YES;
92    self.writeable = nil;
93  });
94}
95
96- (void)cancelWithError:(NSError *)error {
97  NSAssert(error != nil, @"For a successful completion, use enqueueSuccessfulCompletion.");
98  @synchronized(self) {
99    self->_cancelled = YES;
100  }
101  dispatch_async(_writeableQueue, ^{
102    if (self->_alreadyFinished) {
103      // a cancel or a successful completion is already issued
104      return;
105    }
106    [self.writeable writesFinishedWithError:error];
107
108    // Skip any possible message to the wrapped writeable enqueued after this one.
109    self->_alreadyFinished = YES;
110    self.writeable = nil;
111  });
112}
113
114- (void)cancelSilently {
115  dispatch_async(_writeableQueue, ^{
116    if (self->_alreadyFinished) {
117      return;
118    }
119    self.writeable = nil;
120  });
121}
122@end
123