• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1/*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19#import "GRPCCallInternal.h"
20
21#import <GRPCClient/GRPCCall.h>
22#import <GRPCClient/GRPCInterceptor.h>
23#import <RxLibrary/GRXBufferedPipe.h>
24
25#import "../GRPCTransport+Private.h"
26#import "GRPCCall+V2API.h"
27
28@implementation GRPCCall2Internal {
29  /** Request for the call. */
30  GRPCRequestOptions *_requestOptions;
31  /** Options for the call. */
32  GRPCCallOptions *_callOptions;
33  /** The interceptor manager to process responses. */
34  GRPCTransportManager *_transportManager;
35
36  /**
37   * Make use of legacy GRPCCall to make calls. Nullified when call is finished.
38   */
39  GRPCCall *_call;
40  /** Flags whether initial metadata has been published to response handler. */
41  BOOL _initialMetadataPublished;
42  /** Streaming call writeable to the underlying call. */
43  GRXBufferedPipe *_pipe;
44  /** Serial dispatch queue for tasks inside the call. */
45  dispatch_queue_t _dispatchQueue;
46  /** Flags whether call has started. */
47  BOOL _started;
48  /** Flags whether call has been canceled. */
49  BOOL _canceled;
50  /** Flags whether call has been finished. */
51  BOOL _finished;
52  /** The number of pending messages receiving requests. */
53  NSUInteger _pendingReceiveNextMessages;
54}
55
56- (instancetype)initWithTransportManager:(GRPCTransportManager *)transportManager {
57  dispatch_queue_t dispatchQueue;
58  // Set queue QoS only when iOS version is 8.0 or above and Xcode version is 9.0 or above
59#if __IPHONE_OS_VERSION_MAX_ALLOWED >= 110000 || __MAC_OS_X_VERSION_MAX_ALLOWED >= 101300
60  if (@available(iOS 8.0, macOS 10.10, *)) {
61    dispatchQueue = dispatch_queue_create(
62        NULL, dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, 0));
63  } else {
64#else
65  {
66#endif
67    dispatchQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
68  }
69  if ((self = [super init])) {
70    _pipe = [GRXBufferedPipe pipe];
71    _transportManager = transportManager;
72    _dispatchQueue = dispatchQueue;
73  }
74  return self;
75}
76
77- (dispatch_queue_t)dispatchQueue {
78  return _dispatchQueue;
79}
80
81- (void)startWithRequestOptions:(GRPCRequestOptions *)requestOptions
82                    callOptions:(GRPCCallOptions *)callOptions {
83  NSAssert(requestOptions.host.length != 0 && requestOptions.path.length != 0,
84           @"Neither host nor path can be nil.");
85  NSAssert(requestOptions.safety <= GRPCCallSafetyCacheableRequest, @"Invalid call safety value.");
86  if (requestOptions.host.length == 0 || requestOptions.path.length == 0) {
87    NSLog(@"Invalid host and path.");
88    return;
89  }
90  if (requestOptions.safety > GRPCCallSafetyCacheableRequest) {
91    NSLog(@"Invalid call safety.");
92    return;
93  }
94
95  GRPCCall *copiedCall = nil;
96  @synchronized(self) {
97    _requestOptions = requestOptions;
98    if (callOptions == nil) {
99      _callOptions = [[GRPCCallOptions alloc] init];
100    } else {
101      _callOptions = [callOptions copy];
102    }
103
104    NSAssert(!_started, @"Call already started.");
105    NSAssert(!_canceled, @"Call already canceled.");
106    if (_started) {
107      return;
108    }
109    if (_canceled) {
110      return;
111    }
112
113    _started = YES;
114
115    _call = [[GRPCCall alloc] initWithHost:_requestOptions.host
116                                      path:_requestOptions.path
117                                callSafety:_requestOptions.safety
118                            requestsWriter:_pipe
119                               callOptions:_callOptions
120                                 writeDone:^{
121                                   @synchronized(self) {
122                                     if (self->_transportManager) {
123                                       [self issueDidWriteData];
124                                     }
125                                   }
126                                 }];
127    [_call setResponseDispatchQueue:_dispatchQueue];
128    if (_callOptions.initialMetadata) {
129      [_call.requestHeaders addEntriesFromDictionary:_callOptions.initialMetadata];
130    }
131    if (_pendingReceiveNextMessages > 0) {
132      [_call receiveNextMessages:_pendingReceiveNextMessages];
133      _pendingReceiveNextMessages = 0;
134    }
135    copiedCall = _call;
136  }
137
138  void (^valueHandler)(id value) = ^(id value) {
139    @synchronized(self) {
140      if (self->_transportManager) {
141        if (!self->_initialMetadataPublished) {
142          self->_initialMetadataPublished = YES;
143          [self issueInitialMetadata:self->_call.responseHeaders];
144        }
145        if (value) {
146          [self issueMessage:value];
147        }
148      }
149    }
150  };
151  void (^completionHandler)(NSError *errorOrNil) = ^(NSError *errorOrNil) {
152    @synchronized(self) {
153      if (self->_transportManager) {
154        if (!self->_initialMetadataPublished) {
155          self->_initialMetadataPublished = YES;
156          [self issueInitialMetadata:self->_call.responseHeaders];
157        }
158        [self issueCloseWithTrailingMetadata:self->_call.responseTrailers error:errorOrNil];
159      }
160      // Clearing _call must happen *after* dispatching close in order to get trailing
161      // metadata from _call.
162      if (self->_call) {
163        // Clean up the request writers. This should have no effect to _call since its
164        // response writeable is already nullified.
165        [self->_pipe writesFinishedWithError:nil];
166        self->_call = nil;
167        self->_pipe = nil;
168      }
169    }
170  };
171  id<GRXWriteable> responseWriteable =
172      [[GRXWriteable alloc] initWithValueHandler:valueHandler completionHandler:completionHandler];
173  [copiedCall startWithWriteable:responseWriteable];
174}
175
176- (void)cancel {
177  GRPCCall *copiedCall = nil;
178  @synchronized(self) {
179    if (_canceled) {
180      return;
181    }
182
183    _canceled = YES;
184
185    copiedCall = _call;
186    _call = nil;
187    _pipe = nil;
188
189    if (_transportManager != nil) {
190      [_transportManager
191          forwardPreviousInterceptorCloseWithTrailingMetadata:nil
192                                                        error:
193                                                            [NSError
194                                                                errorWithDomain:kGRPCErrorDomain
195                                                                           code:
196                                                                               GRPCErrorCodeCancelled
197                                                                       userInfo:@{
198                                                                         NSLocalizedDescriptionKey :
199                                                                             @"Canceled by app"
200                                                                       }]];
201      [_transportManager shutDown];
202    }
203  }
204  [copiedCall cancel];
205}
206
207- (void)writeData:(id)data {
208  GRXBufferedPipe *copiedPipe = nil;
209  @synchronized(self) {
210    NSAssert(!_canceled, @"Call already canceled.");
211    NSAssert(!_finished, @"Call is half-closed before sending data.");
212    if (_canceled) {
213      return;
214    }
215    if (_finished) {
216      return;
217    }
218
219    if (_pipe) {
220      copiedPipe = _pipe;
221    }
222  }
223  [copiedPipe writeValue:data];
224}
225
226- (void)finish {
227  GRXBufferedPipe *copiedPipe = nil;
228  @synchronized(self) {
229    NSAssert(_started, @"Call not started.");
230    NSAssert(!_canceled, @"Call already canceled.");
231    NSAssert(!_finished, @"Call already half-closed.");
232    if (!_started) {
233      return;
234    }
235    if (_canceled) {
236      return;
237    }
238    if (_finished) {
239      return;
240    }
241
242    if (_pipe) {
243      copiedPipe = _pipe;
244      _pipe = nil;
245    }
246    _finished = YES;
247  }
248  [copiedPipe writesFinishedWithError:nil];
249}
250
251- (void)issueInitialMetadata:(NSDictionary *)initialMetadata {
252  if (initialMetadata != nil) {
253    // cannot directly call callback because this may not be running on manager's dispatch queue
254    GRPCTransportManager *copiedManager = _transportManager;
255    dispatch_async(copiedManager.dispatchQueue, ^{
256      [copiedManager forwardPreviousInterceptorWithInitialMetadata:initialMetadata];
257    });
258  }
259}
260
261- (void)issueMessage:(id)message {
262  if (message != nil) {
263    // cannot directly call callback because this may not be running on manager's dispatch queue
264    GRPCTransportManager *copiedManager = _transportManager;
265    dispatch_async(copiedManager.dispatchQueue, ^{
266      [copiedManager forwardPreviousInterceptorWithData:message];
267    });
268  }
269}
270
271- (void)issueCloseWithTrailingMetadata:(NSDictionary *)trailingMetadata error:(NSError *)error {
272  // cannot directly call callback because this may not be running on manager's dispatch queue
273  GRPCTransportManager *copiedManager = _transportManager;
274  dispatch_async(copiedManager.dispatchQueue, ^{
275    [copiedManager forwardPreviousInterceptorCloseWithTrailingMetadata:trailingMetadata
276                                                                 error:error];
277    [copiedManager shutDown];
278  });
279}
280
281- (void)issueDidWriteData {
282  // cannot directly call callback because this may not be running on manager's dispatch queue
283  GRPCTransportManager *copiedManager = _transportManager;
284  dispatch_async(copiedManager.dispatchQueue, ^{
285    [copiedManager forwardPreviousInterceptorDidWriteData];
286  });
287}
288
289- (void)receiveNextMessages:(NSUInteger)numberOfMessages {
290  // branching based on _callOptions.flowControlEnabled is handled inside _call
291  GRPCCall *copiedCall = nil;
292  @synchronized(self) {
293    copiedCall = _call;
294    if (copiedCall == nil) {
295      _pendingReceiveNextMessages += numberOfMessages;
296      return;
297    }
298  }
299  [copiedCall receiveNextMessages:numberOfMessages];
300}
301
302@end
303