• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1/*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19#import "GRPCCallInternal.h"
20
21#import <GRPCClient/GRPCCall.h>
22#import <GRPCClient/GRPCInterceptor.h>
23#import <RxLibrary/GRXBufferedPipe.h>
24
25#import "../GRPCTransport+Private.h"
26#import "GRPCCall+V2API.h"
27
28@implementation GRPCCall2Internal {
29  /** Request for the call. */
30  GRPCRequestOptions *_requestOptions;
31  /** Options for the call. */
32  GRPCCallOptions *_callOptions;
33  /** The interceptor manager to process responses. */
34  GRPCTransportManager *_transportManager;
35
36  /**
37   * Make use of legacy GRPCCall to make calls. Nullified when call is finished.
38   */
39  GRPCCall *_call;
40  /** Flags whether initial metadata has been published to response handler. */
41  BOOL _initialMetadataPublished;
42  /** Streaming call writeable to the underlying call. */
43  GRXBufferedPipe *_pipe;
44  /** Serial dispatch queue for tasks inside the call. */
45  dispatch_queue_t _dispatchQueue;
46  /** Flags whether call has started. */
47  BOOL _started;
48  /** Flags whether call has been canceled. */
49  BOOL _canceled;
50  /** Flags whether call has been finished. */
51  BOOL _finished;
52  /** The number of pending messages receiving requests. */
53  NSUInteger _pendingReceiveNextMessages;
54}
55
56- (instancetype)initWithTransportManager:(GRPCTransportManager *)transportManager {
57  dispatch_queue_t dispatchQueue;
58  // Set queue QoS only when iOS version is 8.0 or above and Xcode version is 9.0 or above
59#if __IPHONE_OS_VERSION_MAX_ALLOWED >= 110000 || __MAC_OS_X_VERSION_MAX_ALLOWED >= 101300
60  if (@available(iOS 8.0, macOS 10.10, *)) {
61    dispatchQueue = dispatch_queue_create(
62        NULL, dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, 0));
63  } else {
64#else
65  {
66#endif
67    dispatchQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
68  }
69  if ((self = [super init])) {
70    _pipe = [GRXBufferedPipe pipe];
71    _transportManager = transportManager;
72    _dispatchQueue = dispatchQueue;
73  }
74  return self;
75}
76
77- (dispatch_queue_t)dispatchQueue {
78  return _dispatchQueue;
79}
80
81- (void)startWithRequestOptions:(GRPCRequestOptions *)requestOptions
82                    callOptions:(GRPCCallOptions *)callOptions {
83  NSAssert(requestOptions.host.length != 0 && requestOptions.path.length != 0,
84           @"Neither host nor path can be nil.");
85  NSAssert(requestOptions.safety <= GRPCCallSafetyDefault, @"Invalid call safety value.");
86  if (requestOptions.host.length == 0 || requestOptions.path.length == 0) {
87    NSLog(@"Invalid host and path.");
88    return;
89  }
90
91  GRPCCall *copiedCall = nil;
92  @synchronized(self) {
93    _requestOptions = requestOptions;
94    if (callOptions == nil) {
95      _callOptions = [[GRPCCallOptions alloc] init];
96    } else {
97      _callOptions = [callOptions copy];
98    }
99
100    NSAssert(!_started, @"Call already started.");
101    NSAssert(!_canceled, @"Call already canceled.");
102    if (_started) {
103      return;
104    }
105    if (_canceled) {
106      return;
107    }
108
109    _started = YES;
110
111    _call = [[GRPCCall alloc] initWithHost:_requestOptions.host
112                                      path:_requestOptions.path
113                                callSafety:_requestOptions.safety
114                            requestsWriter:_pipe
115                               callOptions:_callOptions
116                                 writeDone:^{
117                                   @synchronized(self) {
118                                     if (self->_transportManager) {
119                                       [self issueDidWriteData];
120                                     }
121                                   }
122                                 }];
123    [_call setResponseDispatchQueue:_dispatchQueue];
124    if (_callOptions.initialMetadata) {
125      [_call.requestHeaders addEntriesFromDictionary:_callOptions.initialMetadata];
126    }
127    if (_pendingReceiveNextMessages > 0) {
128      [_call receiveNextMessages:_pendingReceiveNextMessages];
129      _pendingReceiveNextMessages = 0;
130    }
131    copiedCall = _call;
132  }
133
134  void (^valueHandler)(id value) = ^(id value) {
135    @synchronized(self) {
136      if (self->_transportManager) {
137        if (!self->_initialMetadataPublished) {
138          self->_initialMetadataPublished = YES;
139          [self issueInitialMetadata:self->_call.responseHeaders];
140        }
141        if (value) {
142          [self issueMessage:value];
143        }
144      }
145    }
146  };
147  void (^completionHandler)(NSError *errorOrNil) = ^(NSError *errorOrNil) {
148    @synchronized(self) {
149      if (self->_transportManager) {
150        if (!self->_initialMetadataPublished) {
151          self->_initialMetadataPublished = YES;
152          [self issueInitialMetadata:self->_call.responseHeaders];
153        }
154        [self issueCloseWithTrailingMetadata:self->_call.responseTrailers error:errorOrNil];
155      }
156      // Clearing _call must happen *after* dispatching close in order to get trailing
157      // metadata from _call.
158      if (self->_call) {
159        // Clean up the request writers. This should have no effect to _call since its
160        // response writeable is already nullified.
161        [self->_pipe writesFinishedWithError:nil];
162        self->_call = nil;
163        self->_pipe = nil;
164      }
165    }
166  };
167  id<GRXWriteable> responseWriteable =
168      [[GRXWriteable alloc] initWithValueHandler:valueHandler completionHandler:completionHandler];
169  [copiedCall startWithWriteable:responseWriteable];
170}
171
172- (void)cancel {
173  GRPCCall *copiedCall = nil;
174  @synchronized(self) {
175    if (_canceled) {
176      return;
177    }
178
179    _canceled = YES;
180
181    copiedCall = _call;
182    _call = nil;
183    _pipe = nil;
184
185    if (_transportManager != nil) {
186      [_transportManager
187          forwardPreviousInterceptorCloseWithTrailingMetadata:nil
188                                                        error:
189                                                            [NSError
190                                                                errorWithDomain:kGRPCErrorDomain
191                                                                           code:
192                                                                               GRPCErrorCodeCancelled
193                                                                       userInfo:@{
194                                                                         NSLocalizedDescriptionKey :
195                                                                             @"Canceled by app"
196                                                                       }]];
197      [_transportManager shutDown];
198    }
199  }
200  [copiedCall cancel];
201}
202
203- (void)writeData:(id)data {
204  GRXBufferedPipe *copiedPipe = nil;
205  @synchronized(self) {
206    NSAssert(!_canceled, @"Call already canceled.");
207    NSAssert(!_finished, @"Call is half-closed before sending data.");
208    if (_canceled) {
209      return;
210    }
211    if (_finished) {
212      return;
213    }
214
215    if (_pipe) {
216      copiedPipe = _pipe;
217    }
218  }
219  [copiedPipe writeValue:data];
220}
221
222- (void)finish {
223  GRXBufferedPipe *copiedPipe = nil;
224  @synchronized(self) {
225    NSAssert(_started, @"Call not started.");
226    NSAssert(!_canceled, @"Call already canceled.");
227    NSAssert(!_finished, @"Call already half-closed.");
228    if (!_started) {
229      return;
230    }
231    if (_canceled) {
232      return;
233    }
234    if (_finished) {
235      return;
236    }
237
238    if (_pipe) {
239      copiedPipe = _pipe;
240      _pipe = nil;
241    }
242    _finished = YES;
243  }
244  [copiedPipe writesFinishedWithError:nil];
245}
246
247- (void)issueInitialMetadata:(NSDictionary *)initialMetadata {
248  if (initialMetadata != nil) {
249    // cannot directly call callback because this may not be running on manager's dispatch queue
250    GRPCTransportManager *copiedManager = _transportManager;
251    dispatch_async(copiedManager.dispatchQueue, ^{
252      [copiedManager forwardPreviousInterceptorWithInitialMetadata:initialMetadata];
253    });
254  }
255}
256
257- (void)issueMessage:(id)message {
258  if (message != nil) {
259    // cannot directly call callback because this may not be running on manager's dispatch queue
260    GRPCTransportManager *copiedManager = _transportManager;
261    dispatch_async(copiedManager.dispatchQueue, ^{
262      [copiedManager forwardPreviousInterceptorWithData:message];
263    });
264  }
265}
266
267- (void)issueCloseWithTrailingMetadata:(NSDictionary *)trailingMetadata error:(NSError *)error {
268  // cannot directly call callback because this may not be running on manager's dispatch queue
269  GRPCTransportManager *copiedManager = _transportManager;
270  dispatch_async(copiedManager.dispatchQueue, ^{
271    [copiedManager forwardPreviousInterceptorCloseWithTrailingMetadata:trailingMetadata
272                                                                 error:error];
273    [copiedManager shutDown];
274  });
275}
276
277- (void)issueDidWriteData {
278  // cannot directly call callback because this may not be running on manager's dispatch queue
279  GRPCTransportManager *copiedManager = _transportManager;
280  dispatch_async(copiedManager.dispatchQueue, ^{
281    [copiedManager forwardPreviousInterceptorDidWriteData];
282  });
283}
284
285- (void)receiveNextMessages:(NSUInteger)numberOfMessages {
286  // branching based on _callOptions.flowControlEnabled is handled inside _call
287  GRPCCall *copiedCall = nil;
288  @synchronized(self) {
289    copiedCall = _call;
290    if (copiedCall == nil) {
291      _pendingReceiveNextMessages += numberOfMessages;
292      return;
293    }
294  }
295  [copiedCall receiveNextMessages:numberOfMessages];
296}
297
298@end
299