1/* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19#import "GRXConcurrentWriteable.h" 20 21#import <RxLibrary/GRXWriteable.h> 22 23@interface GRXConcurrentWriteable () 24// This is atomic so that cancellation can nillify it from any thread. 25@property(atomic, strong) id<GRXWriteable> writeable; 26@end 27 28@implementation GRXConcurrentWriteable { 29 dispatch_queue_t _writeableQueue; 30 // This ensures that writesFinishedWithError: is only sent once to the writeable. 31 BOOL _alreadyFinished; 32} 33 34- (instancetype)init { 35 return [self initWithWriteable:nil]; 36} 37 38// Designated initializer 39- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable 40 dispatchQueue:(dispatch_queue_t)queue { 41 if (self = [super init]) { 42 _writeableQueue = queue; 43 _writeable = writeable; 44 } 45 return self; 46} 47 48- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable { 49 return [self initWithWriteable:writeable dispatchQueue:dispatch_get_main_queue()]; 50} 51 52- (void)enqueueValue:(id)value completionHandler:(void (^)(void))handler { 53 dispatch_async(_writeableQueue, ^{ 54 // We're racing a possible cancellation performed by another thread. To turn all already- 55 // enqueued messages into noops, cancellation nillifies the writeable property. If we get it 56 // before it's nil, we won the race. 57 id<GRXWriteable> writeable = self.writeable; 58 if (writeable) { 59 [writeable writeValue:value]; 60 handler(); 61 } 62 }); 63} 64 65- (void)enqueueSuccessfulCompletion { 66 __weak typeof(self) weakSelf = self; 67 dispatch_async(_writeableQueue, ^{ 68 typeof(self) strongSelf = weakSelf; 69 if (strongSelf) { 70 BOOL finished = NO; 71 @synchronized(strongSelf) { 72 if (!strongSelf->_alreadyFinished) { 73 strongSelf->_alreadyFinished = YES; 74 } else { 75 finished = YES; 76 } 77 } 78 if (!finished) { 79 // Cancellation is now impossible. None of the other three blocks can run concurrently with 80 // this one. 81 [strongSelf.writeable writesFinishedWithError:nil]; 82 // Skip any possible message to the wrapped writeable enqueued after this one. 83 strongSelf.writeable = nil; 84 } 85 } 86 }); 87} 88 89- (void)cancelWithError:(NSError *)error { 90 NSAssert(error, @"For a successful completion, use enqueueSuccessfulCompletion."); 91 BOOL finished = NO; 92 @synchronized(self) { 93 if (!_alreadyFinished) { 94 _alreadyFinished = YES; 95 } else { 96 finished = YES; 97 } 98 } 99 if (!finished) { 100 // Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to 101 // nillify writeable because we might be running concurrently with the blocks in 102 // _writeableQueue, and assignment with ARC isn't atomic. 103 id<GRXWriteable> writeable = self.writeable; 104 self.writeable = nil; 105 106 dispatch_async(_writeableQueue, ^{ 107 [writeable writesFinishedWithError:error]; 108 }); 109 } 110} 111 112- (void)cancelSilently { 113 BOOL finished = NO; 114 @synchronized(self) { 115 if (!_alreadyFinished) { 116 _alreadyFinished = YES; 117 } else { 118 finished = YES; 119 } 120 } 121 if (!finished) { 122 // Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to 123 // nillify writeable because we might be running concurrently with the blocks in 124 // _writeableQueue, and assignment with ARC isn't atomic. 125 self.writeable = nil; 126 } 127} 128@end 129