1/* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19#import "GRXConcurrentWriteable.h" 20 21#import <RxLibrary/GRXWriteable.h> 22 23@interface GRXConcurrentWriteable () 24// This is atomic so that cancellation can nillify it from any thread. 25@property(atomic, strong) id<GRXWriteable> writeable; 26@end 27 28@implementation GRXConcurrentWriteable { 29 dispatch_queue_t _writeableQueue; 30 31 // This ivar ensures that writesFinishedWithError: is only sent once to the writeable. Protected 32 // by _writeableQueue. 33 BOOL _alreadyFinished; 34 35 // This ivar ensures that a cancelWithError: call prevents further values to be sent to 36 // self.writeable. It must support manipulation outside of _writeableQueue and thus needs to be 37 // protected by self lock. 38 BOOL _cancelled; 39} 40 41- (instancetype)init { 42 return [self initWithWriteable:nil]; 43} 44 45// Designated initializer 46- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable 47 dispatchQueue:(dispatch_queue_t)queue { 48 if (self = [super init]) { 49 _writeableQueue = queue; 50 _writeable = writeable; 51 _alreadyFinished = NO; 52 _cancelled = NO; 53 } 54 return self; 55} 56 57- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable { 58 return [self initWithWriteable:writeable dispatchQueue:dispatch_get_main_queue()]; 59} 60 61- (void)enqueueValue:(id)value completionHandler:(void (^)(void))handler { 62 dispatch_async(_writeableQueue, ^{ 63 if (self->_alreadyFinished) { 64 return; 65 } 66 67 @synchronized(self) { 68 if (self->_cancelled) { 69 return; 70 } 71 } 72 73 [self.writeable writeValue:value]; 74 handler(); 75 }); 76} 77 78- (void)enqueueSuccessfulCompletion { 79 dispatch_async(_writeableQueue, ^{ 80 if (self->_alreadyFinished) { 81 return; 82 } 83 @synchronized(self) { 84 if (self->_cancelled) { 85 return; 86 } 87 } 88 [self.writeable writesFinishedWithError:nil]; 89 90 // Skip any possible message to the wrapped writeable enqueued after this one. 91 self->_alreadyFinished = YES; 92 self.writeable = nil; 93 }); 94} 95 96- (void)cancelWithError:(NSError *)error { 97 NSAssert(error != nil, @"For a successful completion, use enqueueSuccessfulCompletion."); 98 @synchronized(self) { 99 self->_cancelled = YES; 100 } 101 dispatch_async(_writeableQueue, ^{ 102 if (self->_alreadyFinished) { 103 // a cancel or a successful completion is already issued 104 return; 105 } 106 [self.writeable writesFinishedWithError:error]; 107 108 // Skip any possible message to the wrapped writeable enqueued after this one. 109 self->_alreadyFinished = YES; 110 self.writeable = nil; 111 }); 112} 113 114- (void)cancelSilently { 115 dispatch_async(_writeableQueue, ^{ 116 if (self->_alreadyFinished) { 117 return; 118 } 119 self.writeable = nil; 120 }); 121} 122@end 123