• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1/*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19#import "GRXConcurrentWriteable.h"
20
21#import <RxLibrary/GRXWriteable.h>
22
23@interface GRXConcurrentWriteable ()
24// This is atomic so that cancellation can nillify it from any thread.
25@property(atomic, strong) id<GRXWriteable> writeable;
26@end
27
28@implementation GRXConcurrentWriteable {
29  dispatch_queue_t _writeableQueue;
30  // This ensures that writesFinishedWithError: is only sent once to the writeable.
31  BOOL _alreadyFinished;
32}
33
34- (instancetype)init {
35  return [self initWithWriteable:nil];
36}
37
38// Designated initializer
39- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable
40                    dispatchQueue:(dispatch_queue_t)queue {
41  if (self = [super init]) {
42    _writeableQueue = queue;
43    _writeable = writeable;
44  }
45  return self;
46}
47
48- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable {
49  return [self initWithWriteable:writeable dispatchQueue:dispatch_get_main_queue()];
50}
51
52- (void)enqueueValue:(id)value completionHandler:(void (^)(void))handler {
53  dispatch_async(_writeableQueue, ^{
54    // We're racing a possible cancellation performed by another thread. To turn all already-
55    // enqueued messages into noops, cancellation nillifies the writeable property. If we get it
56    // before it's nil, we won the race.
57    id<GRXWriteable> writeable = self.writeable;
58    if (writeable) {
59      [writeable writeValue:value];
60      handler();
61    }
62  });
63}
64
65- (void)enqueueSuccessfulCompletion {
66  __weak typeof(self) weakSelf = self;
67  dispatch_async(_writeableQueue, ^{
68    typeof(self) strongSelf = weakSelf;
69    if (strongSelf) {
70      BOOL finished = NO;
71      @synchronized(strongSelf) {
72        if (!strongSelf->_alreadyFinished) {
73          strongSelf->_alreadyFinished = YES;
74        } else {
75          finished = YES;
76        }
77      }
78      if (!finished) {
79        // Cancellation is now impossible. None of the other three blocks can run concurrently with
80        // this one.
81        [strongSelf.writeable writesFinishedWithError:nil];
82        // Skip any possible message to the wrapped writeable enqueued after this one.
83        strongSelf.writeable = nil;
84      }
85    }
86  });
87}
88
89- (void)cancelWithError:(NSError *)error {
90  NSAssert(error, @"For a successful completion, use enqueueSuccessfulCompletion.");
91  BOOL finished = NO;
92  @synchronized(self) {
93    if (!_alreadyFinished) {
94      _alreadyFinished = YES;
95    } else {
96      finished = YES;
97    }
98  }
99  if (!finished) {
100    // Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to
101    // nillify writeable because we might be running concurrently with the blocks in
102    // _writeableQueue, and assignment with ARC isn't atomic.
103    id<GRXWriteable> writeable = self.writeable;
104    self.writeable = nil;
105
106    dispatch_async(_writeableQueue, ^{
107      [writeable writesFinishedWithError:error];
108    });
109  }
110}
111
112- (void)cancelSilently {
113  BOOL finished = NO;
114  @synchronized(self) {
115    if (!_alreadyFinished) {
116      _alreadyFinished = YES;
117    } else {
118      finished = YES;
119    }
120  }
121  if (!finished) {
122    // Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to
123    // nillify writeable because we might be running concurrently with the blocks in
124    // _writeableQueue, and assignment with ARC isn't atomic.
125    self.writeable = nil;
126  }
127}
128@end
129